生活随笔
收集整理的這篇文章主要介紹了
Python实现心跳保活TCP长连接
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
之前參與了一個橫向項目,對方要求和他們的服務端對接時,我們開發的客戶端必須一直保持連接,即維護一個長連接,這樣服務端可以隨時對我們下發控制命令。
簡介
本文主要介紹如何實現TCP的長連接維護,主要通過Python的socket模塊來實現,采用的實現方式為心跳保活策略,即定期發送約定好的心跳包以維持連接不斷開。
原理簡介
短連接指的是開啟一個socket連接,收發完數據后,立刻關閉連接。我們通常使用的TCP就是這種連接方式,其示意圖和工作流程如下(Client表示客戶端,Server表示服務端)。
Client對Server發起連接請求;Server收到請求,雙方建立連接;Client向Server發送數據;Server回應Client;一次讀寫完成,此時雙方任何一個都可以發起關閉連接操作;另一方收到關閉連接后,斷開本次連接。
長連接指的是開啟一個socket連接,多次收發數據包直到需要的時候再關閉連接。因此,需要定期發送一個不占用數據傳輸的心跳包來告知服務端自己的狀態以維持連接。其示意圖和工作流程如下(Client表示客戶端,Server表示服務端)。
Client向Server發起連接請求;Server收到請求,雙方建立連接;Client多次向Server發送數據(包括心跳包);Server對每個收到的數據進行回應;長時間操作之后Client發起關閉請求;Server斷開和Client的連接。
代碼實現
我們約定傳輸的數據采用JSON格式,且報文結尾都緊跟一個換行符以方便服務端進行解析,下面時具體的代碼,為這里去掉了一些具體的業務線程,只保留了簡單的心跳發送和接受服務端消息的線程,以方便理解。
- 登錄設備,若連接建立則登錄成功
- 每隔固定時間向服務端發送一次心跳以保活連接
- 每隔固定時間接受一次服務端發送過來的數據,按照約定的間隔符解析出報文以方便業務處理
import os
import socket
import time
import threading
import json
from loguru
import logger
class TCPSocket(object):def __init__(self
, size
, ip
, port
):"""@param size: 報文上限大小@param ip: ip地址@param port: 端口"""self
.sk
= Noneself
.size
= sizeself
.format = "utf8"self
.ip_port
= (ip
, port
)self
.logger
= loggerself
.msg_type
= ['LOGIN', 'HEART']self
.login_dict
= {"code": "LOGIN",}self
.heart_interval
= 5self
.status_interval
= 5self
.adapt_time
= Falsedef connect(self
):self
.sk
= socket
.socket
(socket
.AF_INET
, socket
.SOCK_STREAM
)try:self
.sk
.connect
(self
.ip_port
)except Exception
as e
:self
.logger
.error
("connect to server failed,prepare to reconnect", e
)self
.reconnect
()def reconnect(self
):self
.logger
.info
("try to reconnect")while True:try:self
.sk
= socket
.socket
(socket
.AF_INET
, socket
.SOCK_STREAM
)self
.sk
.connect
(self
.ip_port
)self
.login_send
()self
.logger
.info
('client start connect to host/port:{}'.format(self
.ip_port
))breakexcept ConnectionRefusedError
:self
.logger
.error
('socket server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self
.ip_port
))time
.sleep
(5)except Exception
as e
:self
.logger
.error
('do connect error:{}'.format(str(e
)))time
.sleep
(5)self
.logger
.info
("reconnect successfully!!!")def login_send(self
):try:login_msg
= self
.build_request_json
("LOGIN", **self
.login_dict
)self
.sk
.send
(login_msg
.encode
(self
.format))self
.logger
.info
("[tcp client] send logon message:{}".format(login_msg
.replace
(os
.linesep
, "")))except socket
.error
:self
.logger
.info
('socket error,do reconnect')time
.sleep
(5)except Exception
as e
:self
.logger
.error
(e
)time
.sleep
(5)def rec(self
):while True:try:message
= self
.sk
.recv
(self
.size
)messages
= self
.parse_response_json
(message
)if messages
:for msg
in messages
:if msg
['code'] == "LOGIN":self
.logger
.info
("[tcp client] receive logon response: {}".format(msg
))elif msg
['code'] == "HEART":self
.logger
.info
("[tcp client] receive heart response: {}".format(msg
))else:self
.logger
.warning
("message queue is not supported!!!")else:self
.logger
.info
("no message from server or messages are not valid:{}".format(messages
))except socket
.error
as e
:self
.logger
.error
(e
)time
.sleep
(5)self
.reconnect
()except Exception
as e
:self
.logger
.error
(e
)time
.sleep
(5)def heartbeats(self
):while True:try:msg
= self
.build_request_json
("HEART")self
.sk
.send
(msg
.encode
(self
.format))self
.logger
.info
("[tcp client] send heart message:{}".format(msg
.replace
(os
.linesep
, "")))except socket
.error
:self
.logger
.error
('socket error,do reconnect')self
.reconnect
()except Exception
as e
:self
.logger
.error
('other error occur', e
)time
.sleep
(5)self
.reconnect
()time
.sleep
(self
.heart_interval
)@staticmethoddef build_request_json(method
: str, **args
) -> str:""":param method: 該請求的方法類型:return: 構建好的用于和服務端通信的Json數據"""if method
== "LOGIN":json_data
= {"code": "LOGIN",}elif method
== "HEART":json_data
= {"code": "HEART",}else:print("this method {} is not supported now!!!".format(method
))json_data
= Nonereturn json
.dumps
(json_data
) + os
.linesep
if json_data
else Nonedef parse_response_json(self
, data
: bytes):msgs
= []try:data_list
= data
.decode
(self
.format).split
(os
.linesep
)data_list
= list(filter(lambda x
: x
.strip
().startswith
("{"), data_list
))for msg
in data_list
:msg
= json
.loads
(msg
)if msg
['code'] in self
.msg_type
:msgs
.append
(msg
)return msgs
except Exception
as e
:self
.logger
.error
(e
)return Noneif __name__
== '__main__':socket1
= TCPSocket
(1024, "127.0.0.1", 5433)socket1
.connect
()socket1
.login_send
()t1
= threading
.Thread
(target
=socket1
.rec
)t2
= threading
.Thread
(target
=socket1
.heartbeats
)t1
.start
()t2
.start
()
服務端的代碼正常實現即可,這里就不貼了。
總結
本文簡單介紹了如何使用Python實現基于心跳保活的TCP長連接。
總結
以上是生活随笔為你收集整理的Python实现心跳保活TCP长连接的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。