2021-12-19 老杨博客推荐\TCP像串口的多程编写的一个弱鸡版本类MQTT的TCP实现\字典值查键\微PYTHON与PYTHON的JSON区别\以及一个ESP32领导多个ESP8266组网模式
生活随笔
收集整理的這篇文章主要介紹了
2021-12-19 老杨博客推荐\TCP像串口的多程编写的一个弱鸡版本类MQTT的TCP实现\字典值查键\微PYTHON与PYTHON的JSON区别\以及一个ESP32领导多个ESP8266组网模式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
都說程序員最拿手的好戲是復制粘貼,對于這件事我也是深以為然,遇到問題先看看別人,有么有寫好的,沒有寫好的看看類似的,實在不行繼續百度群里問問,最后再自己操刀上吧,microPython作為小眾的控制器編程模式熱度逐步提高,但是普遍水平不高,大部分都是二把刀,三把刀的樣子很是娛樂性質,那我先推一推老楊的博客,他算是整的比較系統的了,雖然GIT里邊沒啥注釋,代碼也寫的比較生硬,好在內容很雜總能翻找一些資料,我個人比較喜歡寫博客類似寫日志的記錄一些內容。因為工作多年了,都是PLC WEB 嵌入式 還有政工寫材料 這么個工作來回交替,所以我就是為了講給新學的甚至是幾年后的自己聽的,下面是老楊的博客跳轉頁面推薦~
老楊的推廣連接
連接推廣完畢了,下面先來個ESP32以及PC通用的像串口一樣的TCP方法庫,因為服務器與客戶端都使用了線程模塊所以8266是不能用這個庫的。庫里有我的一些修改,以及思路擴展。
先研究研究像串口一樣使用TCP這個庫:并寫下自己的修改和注釋
import socket import _thread#使用demo # from mytcp import * # ip='127.0.0.1' # port = 8081 # server = mytcp_server(ip,port) # # client = mytcp_client(ip,port) # # ############# # client.write(b'\x01\x02') # server.read(0) # # server.write(0,b'\x04\x06') # client.read() # # server.name_addr #獲取已經連接ip與ID對應列表,當服務器主動斷開時這個列表會更新,客戶端斷開掉線則這個列表不會更新,默認是不掉線的 # 如果是服務器只拿數據的情況比如溫度測量,我們不用關心誰連接過來,只要解析報文內容既可以知道。 # 但如果服務器需要通信就必須知道對方是誰,尤其是客戶端之間要互相中專通信時尤其如此, # 設計這個列表,為了把ID號與IP號進行一次對應。其實這只完成了一半,另一半需要在客戶端發過 # 來內容時送來{ip:設備名}再進行一次字典對應,因為局域網連接中IP是相對可靠的所以都是以IP為鍵再建立一個字典實現綁定 # 這里,因涉及到業務邏輯,所以需要留到具體的業務時再實現,無法事先弄好。 # 另外,因為是線程式的工作模式,需要主動輪詢ID號來讀取內容,這一點也需要確定哪些號有設備接入,所以這個玩意還真得有個 # #業務模式: ##以下功能是業務擴展思路,并不是函數功能,因涉及到具體業務支持所以只作為提醒記錄 #1、設備名》服務器: 服務器記錄下設備名字,就可以了 【發出設備名,服務器名,信息】 #2、設備名》設備名: 服務器根據接收信息: 【發出設備名 , 接收設備名, 信息】,執行:解析目標設備名,查找Id號再把初始信息原封傳過去 #3、服務器》設備名: 服務器根據設備名查找IP,再根據IP 找ID發送 【服務器名,接收設備名,信息】 # # # client.close() # server.close(0)class mytcp_core():def __init__(self,conn,uid):self.uid= int(uid)-1self.read_buf=b''self.conn = connself.thread_flag = 1_thread.start_new_thread(self.read_thread,())def read(self):temp = self.read_bufself.read_buf=b''return tempdef read_thread(self):while 1:if self.thread_flag==0:breaktry:ret = self.conn.recv(1024)# print(ret)self.read_buf += retexcept:print("closed",self.uid)self.conn.close()break #退出def write(self,context): #bytesself.conn.send(context)print(len(context))def close(self):self.thread_flag =0self.conn.close()class mytcp_client():def __init__(self,ip,port):self.conn = socket.socket()self.flag = 0 #運行狀態try:self.conn.connect((ip,port))print("connected")self.tcp_core = mytcp_core(self.conn)self.flag = 1except:print("cannot connect")def read(self):if self.flag == 1:return self.tcp_core.read()else:print("cannot connected")returndef write(self,context):if self.flag == 1:return self.tcp_core.write(context)else:print("cannot connected")returndef close(self):self.tcp_core.close()return "OK"class mytcp_server():def __init__(self,ip,port,listen_num=5):self.listenSocket = socket.socket()self.name_addr=dict()self.socks = [] # 放每個客戶端的socketself.socks_tcp_core = []self.socks_addr = [] # addr listtry:self.listenSocket.bind((ip,port))self.listenSocket.listen(listen_num)_thread.start_new_thread(self.listen_add,())except:print("cannot connect")# 添加連接def listen_add(self):print("listening...")while True:try:cli, addr = self.listenSocket.accept()print("get "+(str)(len(self.socks))+":"+str(addr[0]))##################################################當連接傳入時:存入字典,以IP為主鍵,ID為值self.name_addr[str(addr[0])]=str(len(self.socks))##################################################self.socks.append(cli)self.socks_addr.append(addr)self.socks_tcp_core.append(mytcp_core(cli,str(len(self.socks))))except:passdef read(self,id):try:return self.socks_tcp_core[id].read()except:returndef write(self,id,context):try:return self.socks_tcp_core[id].write(context)except:returndef close(self,id):try:self.socks_tcp_core[id].close()self.socks[id].close()for k,v in self.name_addr.items():# 展開字典if v == str(id): #值等于要刪除值時del self.name_addr[k] #刪除這個字典鍵值對return "OK"except:return s=mytcp_server('192.168.3.190',9999)下面是一個根據字典值查找鍵的內容,這里值必須是字典唯一的,不能多建一值,畢竟設備號和IP號在同一時間下都應該是唯一的。
for k,v in self.字典.items():# 展開字典if v == 要找的值: #值等于要查找的值=del self.name_addr[k] #執行刪除這個字典鍵值對上面說了不支持8266 ,那8266怎么辦,其實8266就更簡單了,因為沒有線程所以就是普通的TCP客戶端拉手就行了。只能做客戶端,但是作為8266其實就是適合當客戶端,非要當服務端,那就老老實實的普通TCP建立吧。
import socket,time s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('192.168.3.190',9999)) #看清楚是兩層括號 里邊是IP ,端口 s.send(b'12312312') ccc=s.recv(5)ESP32 領一群8266,其實這個方案也是不錯的,ESP32勉強可以勝任,帶它10來個8266傳感器繼電器也能做一些功能了。下面就簡單寫一個設備到服務器,所有設備都提交數據到服務器的星型結構的例子,原則是能跑就行的。注釋好不容易寫的都留著。
下面是服務端了,這個服務器支持 [本機設備號,收取設備號,本機消息] 這么個格式,連接上以后按照這個格式發json就可以用了。
服務器端ESP32:寫在前面,ESP32絕對能完成這個功能,但是性能在那擺著呢,當主控服務器穩定性肯定不怎么地,一般來說可以掛載至多5個線程連接,再多了可靠性下降可能也要無響應了,網絡建立也可能會失敗這是概率性事件。連接服務器的客戶端可能會掉線也是概率性事件,這需要在客戶端位置寫好重連,客戶端多次連續取出b’None’這個值就可以認為要求數據的設備離線了,寫好判斷即可,總之,功能可測試娛樂使用,日常生產環境如果開發人員持續在場可以使用,否則慎用。非得要用更推薦PC加路由器建立熱點然后,傳感器去連接這個熱點的PC服務器,起碼來說資源充足,連接相對穩定覆蓋距離也大。以下ESP32代碼是我多次測試修改的,所以功能呢相對PC版本要多一些,同時也測試出了不足,PC版本服務器參照ESP32這個邏輯改改就行。下面的PC服務器直接用的話需要補充兩點:1、超時斷開 2、設備定時清空數據篩出離線設備。這兩個都在ESP32端寫好了,還是建議修改使用,就修改JSON部分即可。
import socket import _thread import json#使用demo # from mytcp import * # ip='127.0.0.1' # port = 8081 # server = mytcp_server(ip,port) # # client = mytcp_client(ip,port) # # ############# # client.write(b'\x01\x02') # server.read(0) # # server.write(0,b'\x04\x06') # client.read() # # server.name_addr #獲取已經連接ip與ID對應列表,當服務器主動斷開時這個列表會更新,客戶端斷開掉線則這個列表不會更新,默認是不掉線的 # 如果是服務器只拿數據的情況比如溫度測量,我們不用關心誰連接過來,只要解析報文內容既可以知道。 # 但如果服務器需要通信就必須知道對方是誰,尤其是客戶端之間要互相中專通信時尤其如此, # 設計這個列表,為了把ID號與IP號進行一次對應。其實這只完成了一半,另一半需要在客戶端發過 # 來內容時送來{ip:設備名}再進行一次字典對應,因為局域網連接中IP是相對可靠的所以都是以IP為鍵再建立一個字典實現綁定 # 這里,因涉及到業務邏輯,所以需要留到具體的業務時再實現,無法事先弄好。 # 另外,因為是線程式的工作模式,需要主動輪詢ID號來讀取內容,這一點也需要確定哪些號有設備接入,所以這個玩意還真得有個 # #業務模式: ##以下功能是業務擴展思路,并不是函數功能,因涉及到具體業務支持所以只作為提醒記錄 #1、設備名》服務器: 服務器記錄下設備名字,就可以了 【發出設備名,服務器名,信息】 #2、設備名》設備名: 服務器根據接收信息: 【發出設備名 , 接收設備名, 信息】,執行:解析目標設備名,查找Id號再把初始信息原封傳過去 #3、服務器》設備名: 服務器根據設備名查找IP,再根據IP 找ID發送 【服務器名,接收設備名,信息】 # # # client.close() # server.close(0)class mytcp_core():def __init__(self,conn):self.read_buf=b''self.conn = connself.thread_flag = 1_thread.start_new_thread(self.read_thread,())def read(self):temp = self.read_bufself.read_buf=b''return tempdef read_thread(self):while 1:if self.thread_flag==0:breaktry:time.sleep(0.5) ####這個延時防止CPU耗盡######self.conn.settimeout(60) #這個超時可以斷開死連接,節省資源ret = self.conn.recv(1024)self.read_buf += ret except:print("closed")self.conn.close()break #退出def write(self,context): #bytesself.conn.send(context)print(len(context))def close(self):self.thread_flag =0self.conn.close()class mytcp_client():def __init__(self,ip,port):self.conn = socket.socket()self.flag = 0 #運行狀態try:self.conn.connect((ip,port))print("connected")self.tcp_core = mytcp_core(self.conn)self.flag = 1except:print("cannot connect")def read(self):if self.flag == 1:return self.tcp_core.read()else:print("cannot connected")returndef write(self,context):if self.flag == 1:return self.tcp_core.write(context)else:print("cannot connected")returndef close(self):self.tcp_core.close()return "OK"class mytcp_server():def __init__(self,ip,port,listen_num=5):self.listenSocket = socket.socket()self.name_addr=dict()self.socks = [] # 放每個客戶端的socketself.socks_tcp_core = []self.socks_addr = [] # addr listtry:self.listenSocket.bind((ip,port))self.listenSocket.listen(listen_num)_thread.start_new_thread(self.listen_add,())except:print("cannot connect")# 添加連接def listen_add(self):print("listening...")while True:try:cli, addr = self.listenSocket.accept()print("get "+(str)(len(self.socks))+":"+str(addr[0]))##################################################當連接傳入時:存入字典,以IP為主鍵,ID為值self.name_addr[str(addr[0])]=str(len(self.socks))##################################################self.socks.append(cli)self.socks_addr.append(addr)self.socks_tcp_core.append(mytcp_core(cli))except:passdef read(self,id):try:return self.socks_tcp_core[id].read()except:returndef write(self,id,context):try:return self.socks_tcp_core[id].write(context)except:returndef close(self,id):try:self.socks_tcp_core[id].close()self.socks[id].close()for k,v in self.name_addr.items():# 展開字典if v == str(id): #值等于要刪除值時del self.name_addr[k] #刪除這個字典鍵值對return "OK"except:returnfrom time import sleep import time import network,machine from machine import Pin time.sleep(5)#################### # wifi類 #################### class Sta():wlan = Nonedef __init__(self, wifi_ssid, wifi_pwd):self.wifi_ssid = wifi_ssidself.wifi_pwd = wifi_pwddef connect(self):network.WLAN(network.AP_IF).active(False) # disable access pointself.wlan = network.WLAN(network.STA_IF)self.wlan.active(True)self.wlan.disconnect()if not self.wlan.isconnected(): self.wlan.connect(self.wifi_ssid, self.wifi_pwd)def status(self):if self.wlan.isconnected():return self.wlan.ifconfig()else:return ()def wait(self):cnt = 40while cnt > 0:print("Waiting ..." )# con(self.wifi_ssid, self.wifi_pwd) # Connect to an WIFI_SSIDif self.wlan.isconnected():print("Connected to %s" % self.wifi_ssid)print('network config:', self.wlan.ifconfig())cnt = 0else:sleep(5)cnt -= 5returndef scan(self):return self.wlan.scan() # Scan for available access pointsdef CreatNetwork(self):cnt = 40ap_if = network.WLAN(network.AP_IF)#AP 模式ap_if.active(True)ap_if.config(essid=self.wifi_ssid,password=self.wifi_pwd,authmode=2)while cnt > 0:print("Waiting ..." )# con(self.wifi_ssid, self.wifi_pwd) # Connect to an WIFI_SSIDif ap_if.ifconfig()[0]!='0.0.0.0':print("start %s" % self.wifi_ssid)print('network config:',ap_if.ifconfig()[0])cnt = 0return ap_if.ifconfig()[0]else:sleep(5)cnt -= 5 if __name__=='__main__': #服務器邏輯入口#####聯網調用#熱點模式a=Sta('201king','13704677369')ip=a.CreatNetwork()port = 9999 #端口號#####設定服務器端口和IP,這個服務器沒有自建熱點,接入WIFI好測試,使用時可以自建熱點更可靠。port = 9999 #端口號try:s=mytcp_server(ip,9999) #里邊填寫IP 和端口except:machine.reset()#用來放接收的內容addr_rec=dict()add=100while 1:add=add-1time.sleep(0.5) #延時,這個實際可以很小if add<=0:add=100 #設備掉線檢測延時計數器for k,v in addr_rec.items():addr_rec[k]=b'None' # 把所有信息更改成b'None'值,連續幾次取出None,則可以認為改數據提供設備離線。for k,v in s.name_addr.items(): # 讀取連接數據,主要用來確定哪些ID被使用了,好去那個ID口執行讀取操作try:## 因為使用了JSON ,如果讀個空二進制會報錯,這里TRY一下rec_unpack=json.loads(s.read(int(v)))# 獲取數據應該是一個列表類似【1,1,‘26.545’】 設備號,讀取號,消息內容addr_rec[rec_unpack[0]]=rec_unpack[2]#獲取到的數據存入字典,鍵是設備號,值是消息內容。 # 注意這里:s_data=addr_rec.get(rec_unpack[1])if s_data:passelse:s_data=b'None's.write(int(v),s_data)#這里發送設備號所屬的消息,接收的時候列表0號元素是設備號,1號元素是他要的數據給他就行,調試的時候注意字符串和二進制,不行就上個JSON,同時最好加個空值判斷,返回個內容啥的,,平時更新這倆字典就可以設備間通信了。#print(k,v) #測試用print(addr_rec[rec_unpack[1]])#測試用#print(int(v),addr_rec[rec_unpack[1]].encode()) #測試用except:pass # client.write(b'\x01\x02') # server.read(0) # server.write(0,b'\x04\x06') # client.read() # client.close() # server.close(0)8266客戶端
import time ,json from time import sleep import time import network,machine from machine import Pin time.sleep(5)#################### # wifi類 #################### class Sta():WIFI_SSID = "201king" WIFI_PWD = "13704677369"wlan = Nonedef __init__(self, wifi_ssid='', wifi_pwd=''):network.WLAN(network.AP_IF).active(False) # disable access pointself.wlan = network.WLAN(network.STA_IF)self.wlan.active(True)if wifi_ssid == '':self.wifi_ssid = Sta.WIFI_SSIDself.wifi_pwd = Sta.WIFI_PWD else:self.wifi_ssid = wifi_ssidself.wifi_pwd = wifi_pwddef connect(self, wifi_ssid='', wifi_pwd=''):if wifi_ssid != '':self.wifi_ssid = wifi_ssidself.wifi_pwd = wifi_pwdif not self.wlan.isconnected(): self.wlan.connect(self.wifi_ssid, self.wifi_pwd)def status(self):if self.wlan.isconnected():return self.wlan.ifconfig()else:return ()def wait(self):cnt = 30while cnt > 0:print("Waiting ..." )# con(self.wifi_ssid, self.wifi_pwd) # Connect to an WIFI_SSIDif self.wlan.isconnected():print("Connected to %s" % self.wifi_ssid)print('network config:', self.wlan.ifconfig())cnt = 0else:sleep(5)cnt -= 5returndef scan(self):return self.wlan.scan() # Scan for available access pointsa=Sta() a.connect() a.wait()import random import socket,time,json s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('192.168.4.1',9999)) s.settimeout(8) #設計一個8秒超時,這地方有文章,留著用 def k(x):time.sleep(2)#s.send(json.dumps([1,1,'haha'])) #發送數據,JSON格式s.send(json.dumps(x))ccc=s.recv(20)return ccc lost=0 while 1 :ccc=k([-1,1,'60%d'%random.getrandbits(3)])if ccc == b'None':lost+=1passelse:lost=0print(ccc)if lost>8:print('讀取設備離線')pc客戶端:PC的JSON和MICROPTYON的JSON 略微不同,PC轉完了是字符串形式需要進行一次編碼。
import socket,time,json s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('192.168.3.38',9999)) s.settimeout(8) #如果沒這個,去請求無效數據小心卡死哦,有這個超時就報錯了 s.send(json.dumps([1,2,'123']).encode()) ccc=s.recv(100)服務器,PC端的寫法。(這里有注釋,很大段,用來理解消息結構以及服務器邏輯)
import socket import _thread import json,time#使用demo # from mytcp import * # ip='127.0.0.1' # port = 8081 # server = mytcp_server(ip,port) # # client = mytcp_client(ip,port) # # ############# # client.write(b'\x01\x02') # server.read(0) # # server.write(0,b'\x04\x06') # client.read() # # server.name_addr #獲取已經連接ip與ID對應列表,當服務器主動斷開時這個列表會更新,客戶端斷開掉線則這個列表不會更新,默認是不掉線的 # 如果是服務器只拿數據的情況比如溫度測量,我們不用關心誰連接過來,只要解析報文內容既可以知道。 # 但如果服務器需要通信就必須知道對方是誰,尤其是客戶端之間要互相中專通信時尤其如此, # 設計這個列表,為了把ID號與IP號進行一次對應。其實這只完成了一半,另一半需要在客戶端發過 # 來內容時送來{ip:設備名}再進行一次字典對應,因為局域網連接中IP是相對可靠的所以都是以IP為鍵再建立一個字典實現綁定 # 這里,因涉及到業務邏輯,所以需要留到具體的業務時再實現,無法事先弄好。 # 另外,因為是線程式的工作模式,需要主動輪詢ID號來讀取內容,這一點也需要確定哪些號有設備接入,所以這個玩意還真得有個 # #業務模式: ##以下功能是業務擴展思路,并不是函數功能,因涉及到具體業務支持所以只作為提醒記錄 #1、設備名》服務器: 服務器記錄下設備名字,就可以了 【發出設備名,服務器名,信息】 #2、設備名》設備名: 服務器根據接收信息: 【發出設備名 , 接收設備名, 信息】,執行:解析目標設備名,查找Id號再把初始信息原封傳過去 #3、服務器》設備名: 服務器根據設備名查找IP,再根據IP 找ID發送 【服務器名,接收設備名,信息】 # # # client.close() # server.close(0)class mytcp_core():def __init__(self,conn):self.read_buf=b''self.conn = connself.thread_flag = 1_thread.start_new_thread(self.read_thread,())def read(self):temp = self.read_bufself.read_buf=b''return tempdef read_thread(self):while 1:if self.thread_flag==0:breaktry:time.sleep(0.1) #這個延時防止云服務器CPU耗盡ret = self.conn.recv(1024)# print(ret)self.read_buf += retexcept:print("closed")self.conn.close()break #退出def write(self,context): #bytesself.conn.send(context)print(len(context))def close(self):self.thread_flag =0self.conn.close()class mytcp_client():def __init__(self,ip,port):self.conn = socket.socket()self.flag = 0 #運行狀態try:self.conn.connect((ip,port))print("connected")self.tcp_core = mytcp_core(self.conn)self.flag = 1except:print("cannot connect")def read(self):if self.flag == 1:return self.tcp_core.read()else:print("cannot connected")returndef write(self,context):if self.flag == 1:return self.tcp_core.write(context)else:print("cannot connected")returndef close(self):self.tcp_core.close()return "OK"class mytcp_server():def __init__(self,ip,port,listen_num=5):self.listenSocket = socket.socket()self.name_addr=dict()self.socks = [] # 放每個客戶端的socketself.socks_tcp_core = []self.socks_addr = [] # addr listtry:self.listenSocket.bind((ip,port))self.listenSocket.listen(listen_num)_thread.start_new_thread(self.listen_add,())except:print("cannot connect")# 添加連接def listen_add(self):print("listening...")while True:try:cli, addr = self.listenSocket.accept()print("get "+(str)(len(self.socks))+":"+str(addr[0]))##################################################當連接傳入時:存入字典,以IP為主鍵,ID為值self.name_addr[str(addr[0])]=str(len(self.socks))##################################################self.socks.append(cli)self.socks_addr.append(addr)self.socks_tcp_core.append(mytcp_core(cli))except:passdef read(self,id):try:return self.socks_tcp_core[id].read()except:returndef write(self,id,context):try:return self.socks_tcp_core[id].write(context)except:returndef close(self,id):try:self.socks_tcp_core[id].close()self.socks[id].close()for k,v in self.name_addr.items():# 展開字典if v == str(id): #值等于要刪除值時del self.name_addr[k] #刪除這個字典鍵值對return "OK"except:returnif __name__=='__main__': #服務器邏輯入口#####設定服務器端口和IP,這個服務器沒有自建熱點,接入WIFI好測試,使用時可以自建熱點更可靠。port = 9999 #端口號s=mytcp_server('10.168.1.107',9999) #里邊填寫IP 和端口#字典用來放接收的內容addr_rec=dict()while 1:time.sleep(2) #延時,這個實際可以很小for k,v in s.name_addr.items(): # 讀取連接數據,主要用來確定哪些ID被使用了,好去那個ID口執行讀取操作try: ## 因為使用了JSON ,如果讀個空二進制會報錯,這里TRY一下rec_unpack=json.loads(s.read(int(v)))# 獲取數據應該是一個列表類似【1,1,‘26.545’】 本機設備號,需求數據設備號,本機發來消息內容addr_rec[rec_unpack[0]]=rec_unpack[2]#獲取到的數據存入字典,鍵是設備號,值是消息內容。################################################ 這里發送設備號所屬的消息,接收的時候列表0號元素是設備號,這個號是設備自己傳過來的是受信帶來的消息傳入0號元素的字典鍵值對中,1號元素是需求的信息,#,那么發回數據的時候就發送 addr_rec[rec_unpack[1]] 這個字典1號鍵的值,平時更新這倆字典就可以設備間通信了。# 白話邏輯:一個人說:【我是1,我要1的數據,我的數據是:你好 】,那么這個動作就是把 你好存到1中 并返回1號這個你好信息# 那么,另一個人說 :[我是2,我要1的數據,我的數據是 空] ,那么這個動作就是把空存到2中,并返回1號存好的你好這個信息# 所以,這個服務器只判斷報文內容來確定身份和功能,這樣其實就比較像MQTT那樣的隨來隨用,要誰有誰的特點。# 下面是回傳,因為這個是在PC上跑的所以,JSON格式需要編碼才能傳送。###############################################s.write(int(v),json.dumps(addr_rec[rec_unpack[1]]).encode())print(addr_rec,addr_rec[rec_unpack[1]].encode()) #測試用print(k,v) #測試用print(int(v),addr_rec[rec_unpack[1]].encode()) #測試用except:pass # client.write(b'\x01\x02') # server.read(0) # server.write(0,b'\x04\x06') # client.read() # client.close() # server.close(0)寫了這么多其實這個帖子寫了2天左右,邊寫邊改導致前后文主旨不一致,現在確實要寫完了,那就先做個檢討吧,我有罪不該邊寫博客邊調試。應該寫完了直接出個成品再寫博客,雖然開發過程的思路也是博客的一部分,但是對閱讀者會造成困擾。下面說點正事,這個TCP下的弱雞版本MQTT是根據老楊的像串口一樣的TCP庫這個東西,我突然冒出來的靈感改編而來,一個是修改了注冊隊列的機制,提高了多設備接入時設備輪詢的效率,另外是,直接拋棄了IP ID 綁定的思路(其實我是寫了的,后來覺得很累贅直接棄掉了),使用設備消息處理認證的方式構建通信邏輯,這樣就接近MQTT的訂閱形式了,這就像打電話,每個設備都有自己的電話號碼,你發來信息就記錄下你的號碼和你的消息,你要哪個號碼就把哪個號碼上記錄的消息發給你,都是時間片內單一的工作完成的。ESP32服務器我測試了下 ,可運行,今天部署到阿里云上一份,也當MQTT用幾天,檢驗下這個玩意。雖然沒有人家MQTT穩定可靠,但是這個輕啊,ESP32自己就能跑啊,自己開熱點就能覆蓋一個小區域,還能帶著8266啥的干活,這不是也挺好嘛,起碼是個多設備無線通信方式,遇到特定不能上物聯網的環境,拿著個出來頂一頂其實也不是不錯的~
補充一個內容:阿里云服務器在部署TCP時IP地址不能填寫公網ip而是填寫‘0.0.0.0’這個地址即可綁定到公網IP上
s=mytcp_server('0.0.0.0',9999) #里邊填寫IP 和端口另外,云服務器是有工作基線的一般是20%以下,所以如果部署云要在線程里邊弄個延時,防止CPU被耗盡,當然如果是ESP32這種單片機既然開了線程。。。這線程還不咋可靠,那就也給安排個延時比較穩妥。
def read_thread(self):while 1:if self.thread_flag==0:breaktry:time.sleep(0.1) ####這個延時防止云服務器CPU耗盡######ret = self.conn.recv(1024)# print(ret)self.read_buf += retexcept:print("closed")self.conn.close()break #退出總結
以上是生活随笔為你收集整理的2021-12-19 老杨博客推荐\TCP像串口的多程编写的一个弱鸡版本类MQTT的TCP实现\字典值查键\微PYTHON与PYTHON的JSON区别\以及一个ESP32领导多个ESP8266组网模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 即时通讯系统
- 下一篇: java网上拍卖管理系统设计与实现(SS