當前位置:
首頁 >
autobahn-python的使用——sendMessage()和断线自动重连
發布時間:2023/12/29
42
豆豆
生活随笔
收集整理的這篇文章主要介紹了
autobahn-python的使用——sendMessage()和断线自动重连
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
客戶端代碼:
# -*-coding:utf8-*- import threading import timefrom autobahn.asyncio.websocket import WebSocketClientProtocolfrom autobahn.asyncio.websocket import WebSocketClientFactory import asyncioclass BaseSipClientProtocol(WebSocketClientProtocol):KEEPALIVE_INTERVAL = 5def check_keepalive(self):last_interval = time.time() - self.last_ping_timeif last_interval > 2 * self.KEEPALIVE_INTERVAL:# drop connectionself.dropConnection(abort=True)else:# reschedule next checkself.schedule_keepalive()def schedule_keepalive(self):""" Store the future in the class to cancel it later. """try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()self.keepalive_fut = loop.call_later(self.KEEPALIVE_INTERVAL,self.check_keepalive)def onConnect(self, response):print("Server connected: {0}".format(response.peer))# save connection to server handle on factoryself.factory.saveConnectionToServer(self)self.onConnectToServer(response.peer)def onOpen(self):""" Start scheduling the keepalive check. """self.last_ping_time = time.time()self.schedule_keepalive()def onPing(self, payload):""" Respond to the ping request. """self.last_ping_time = time.time()self.sendPong(payload)print('Ping == ', payload)def connection_lost(self, exc):""" Cancel the scheduled future. """self.keepalive_fut.cancel()try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asyncioloop = asyncio.get_event_loop()loop.stop()def onMessage(self, payload, isBinary):# 做一些處理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection to serverself.factory.delConnectionToServer()self.onDisConnectFromServer(wasClean, code, reason)# client連接上server的時候回調def onConnectToServer(self, peer):pass# client斷開與server的連接的時候回調def onDisConnectFromServer(self, wasClean, code, reason):pass# 收到Msg消息時回調def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))# 保存一個與server連接的句柄 class BaseSipClientFactory(WebSocketClientFactory):_connectionToServer = None# save connection to serverdef saveConnectionToServer(self, connectedHandle):self._connectionToServer = connectedHandle# remove connection to serverdef delConnectionToServer(self):self._connectionToServer = Nonedef getConnectionToServer(self):return self._connectionToServer# support sendMsg to clientdef sendMsg(self, data):if self._connectionToServer is not None:if isinstance(data,bytes):self._connectionToServer.sendMessage(data, True)else:self._connectionToServer.sendMessage(data.encode('utf-8'))else:raise Exception('與server的連接不存在')if __name__ == '__main__':try:import asyncioexcept ImportError:# Trollius >= 0.3 was renamedimport trollius as asynciofactory = BaseSipClientFactory(u"ws://192.168.88.3:9009")factory.protocol = BaseSipClientProtocolloop = asyncio.get_event_loop()while True:fut = loop.create_connection(factory, '192.168.88.3', 9009)try:transport, protocol = loop.run_until_complete(asyncio.wait_for(fut, 5))loop.run_forever()except asyncio.TimeoutError:print('TimeoutError')continueexcept OSError as err:print('OSError == ' + str(err))# a little timeout before trying againloop.run_until_complete(asyncio.sleep(5))loop.close()服務端代碼:
# -*-coding:utf8-*- from autobahn.asyncio.websocket import WebSocketServerFactoryfrom autobahn.asyncio.websocket import WebSocketServerProtocolclass BaseSipServerProtocol(WebSocketServerProtocol):def onConnect(self, request):print("Client connecting: {0}".format(request.peer))# save connection in factory _connectionSetsself.factory.addConnection(request.peer, self)self.onClientConnected(request.peer)def onOpen(self):print("WebSocket connection open.")def onMessage(self, payload, isBinary):# 做一些處理,peer mapping msgself.onMsgReceived(self.peer, payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {0}".format(reason))# remove connection from factory _connectionSetsself.factory.removeConnection(self)self.onClientLostConnected(wasClean, code, reason)# client連接上來的時候回調def onClientConnected(self, peer):pass# client斷開連接的時候回調def onClientLostConnected(self, wasClean, code, reason):pass# 收到Msg消息時回調def onMsgReceived(self, peer, data, isBinary):print('received {0} from {1}'.format(data, peer))class BaseSipServerFactory(WebSocketServerFactory):_connectionSets = dict()# save connectiondef addConnection(self, peer, connectedHandle):self._connectionSets.setdefault(peer, connectedHandle)# remove connectiondef removeConnection(self, connectedHandle):removePeer = Nonefor k, v in self._connectionSets.items():if v == connectedHandle:removePeer = kbreakif removePeer is not None:del self._connectionSets[removePeer]def getConnectionByPeer(self, peer):return self._connectionSets.get(peer)def getConnections(self):return self._connectionSets# support sendMsg to clientdef sendMsg(self, peer, data):connectedHandle = self.getConnectionByPeer(peer)if connectedHandle is not None:if isinstance(data,bytes):connectedHandle.sendMessage(data, True)else:connectedHandle.sendMessage(data.encode('utf-8'))else:raise Exception('peer的連接不存在')啟動Server:
class CommunicationTool:Server = 1Client = 0def __init__(self):self.isAutoReconnect = False# 設置創建的Server還是Clientdef setFlag(self, flag):self.flag = flag# 創建serverdef createServer(self, addr, port):# 先判斷創建的類型if self._isServer():if isinstance(addr, str) and isinstance(port, int):self.addr = addrself.port = portelse:raise Exception('createServer的參數類型有誤')else:raise Exception('不支持client類型執行此方法')def startListen(self):# 先判斷必填參數是否都填了if self._isServer():if self.serverProtocol is None:raise Exception('未設置protocol')if self.serverFactory is None:raise Exception('未設置factory')if self.addr is None or self.port is None:raise Exception('未設置createServer')else:raise Exception('未設置flag或不支持此方法')# 開啟服務器,應該在子線程中一直運行# 調用父類的startListen方法,將數據傳入factory = self.serverFactoryfactory.protocol = self.serverProtocolif self.isAutoReconnect: # set auto-reconnectionfactory.setProtocolOptions(autoPingInterval=5, autoPingTimeout=2)loop = asyncio.get_event_loop()coro = loop.create_server(factory, self.addr, self.port)server = loop.run_until_complete(coro)serverThread = threading.Thread(target=self._run, args=(loop, server), name='serverThread')serverThread.start()def _run(self, loop, server):try:loop.run_forever()except KeyboardInterrupt:passfinally:server.close()loop.close()# 設置是否自動斷線重連def setAutoReconnect(self, isAutoReconnect):if isinstance(isAutoReconnect, bool):self.isAutoReconnect = isAutoReconnectelse:raise Exception('參數類型錯誤')def setServerProtocol(self, protocol):if self._isServer():self.serverProtocol = protocolelse:raise Exception('Client類型不能調用此方法')def setServerFactory(self, factory):if self._isServer():self.serverFactory = factoryelse:raise Exception('Client類型不能調用此方法')def _isServer(self):if self.flag == self.Server:return Trueif self.flag == self.Client:return Falseelse:raise Exception('未設置flag')if __name__ == '__main__':cTool = CommunicationTool() # 傳入webSocket的地址cTool.setFlag(CommunicationTool.Server) # 設置類型cTool.setServerProtocol(BaseSipServerProtocol)factory = BaseSipServerFactory(u"ws://127.0.0.1:9009")cTool.setServerFactory(factory)factory.setProtocolOptions(autoPingInterval=5)cTool.setAutoReconnect(True) # 設置是否自動斷線重連cTool.createServer('0.0.0.0', 9009) # 創建servercTool.startListen() # 開啟server監聽總結
以上是生活随笔為你收集整理的autobahn-python的使用——sendMessage()和断线自动重连的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我们的游戏世界(背包【仓库】,交易,任务
- 下一篇: RIO与泛洪填充——(OpenCV+Py