日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python+Twisted+Autobahn实现Websocket(附完整demo)

發布時間:2023/12/29 python 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python+Twisted+Autobahn实现Websocket(附完整demo) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在游戲行業游戲客戶端和服務端需要大量的,快速的通訊,這里面就會用到websocket 。
Autobahn 是一個高性能的websocket 它采用了兩種實現方案 。
1 基于Twisted
2 基于asyncio

開發語言: python
PS:twisted 的 reactor.callLater回調方法本身有bug,不穩定,過一段時間會假死,可能是因為我沒深入了解的原因。所以我用tornado替代了twisted實現了websocket,會在下一篇詳解,tornado做websocket是真NB,穩定高效簡介,可參考我這篇博客【https://editor.csdn.net/md/?articleId=106819559】,而且更簡單。

Autobahn 有如下的特點:

framework for WebSocket and WAMP clients (Websocket框架和WAMP框架) compatible with Python 2.7 and 3.3+ (兼容python2.7和python3.3以上的版本) runs on CPython, PyPy and Jython (可以運行在Cpython, PyPy 和Jython 上面) runs under Twisted and asyncio (可以選擇Twisted 或者是asyncio的方式來運行) implements WebSocket RFC6455 (and draft versions Hybi-10+) (實現WebSocket RFC6455(以及草案版本Hybi-10 +) implements WebSocket compression (實現WebSocket壓縮) implements WAMP, the Web Application Messaging Protocol (實現Web應用程序消息傳遞協議) supports TLS (secure WebSocket) and proxies (支持TLS(安全WebSocket)和代理) Open-source (MIT license) (開源)

Autobahn 可以用來做什么

Autobahn 非常適用于交易系統,多人游戲, 實時聊天等應用程序的開發
一個簡單的webSocket服務端程序

from autobahn.twisted.websocket import WebSocketServerProtocol #or: from autobahn.asyncio.websocket import WebSocketServerProtocol class MyServerProtocol(WebSocketServerProtocol):def onConnect(self, request):print("Client connecting: {}".format(request.peer))def onOpen(self):print("WebSocket connection open.")def onMessage(self, payload, isBinary):if isBinary:print("Binary message received: {} bytes".format(len(payload)))else:print("Text message received: {}".format(payload.decode('utf8')))## echo back message verbatimself.sendMessage(payload, isBinary)def onClose(self, wasClean, code, reason):print("WebSocket connection closed: {}".format(reason))

幾個重要的方法:

onConnect: 當有新的客戶端連接進來的時候調用該方法
onOpen: 當連接成功時調用該方法
onMessage:該方法是接收來自客戶端的消息
onClose: 當關閉時調用該方法

安裝Autobahn

Autobahn 的運行依賴于 Twisted 和 asyncio 因此安裝Autobahn 需要確認你的python支持了
Twisted 和 asyncio

各個python 版本對 Twisted 和 asyncio 支持如下
使用pip 安裝
pip install autobahn[twisted]
pip install autobahn[asyncio]

驗證是否安裝成功
from autobahn import version
print(version)
0.9.1

啟動webScoketServer
twisted 版本:

import sysfrom twisted.python import logfrom twisted.internet import reactorlog.startLogging(sys.stdout)from autobahn.twisted.websocket import WebSocketServerFactoryfactory = WebSocketServerFactory()factory.protocol = MyServerProtocolreactor.listenTCP(9000, factory)reactor.run()

asyncio 版本:

try:import asyncioexcept ImportError:## Trollius >= 0.3 was renamedimport trollius as asynciofrom autobahn.asyncio.websocket import WebSocketServerFactoryfactory = WebSocketServerFactory()factory.protocol = MyServerProtocolloop = asyncio.get_event_loop()coro = loop.create_server(factory, '127.0.0.1', 9000)server = loop.run_until_complete(coro)try:loop.run_forever()except KeyboardInterrupt:passfinally:server.close()loop.close()

創建WebSocketClient

class MyClientProtocol(WebSocketClientProtocol):def onOpen(self):self.sendMessage(u"Hello, world!".encode('utf8'))def onMessage(self, payload, isBinary):if isBinary:print("Binary message received: {0} bytes".format(len(payload)))else:print("Text message received: {0}".format(payload.decode('utf8')))

WebSocketClientProtocol 可以使用:

autobahn.twisted.websocket.WebSocketClientProtocol
autobahn.asyncio.websocket.WebSocketClientProtocol

使用Client

Twisted版本

import sysfrom twisted.python import logfrom twisted.internet import reactorlog.startLogging(sys.stdout)from autobahn.twisted.websocket import WebSocketClientFactoryfactory = WebSocketClientFactory()factory.protocol = MyClientProtocolreactor.connectTCP("127.0.0.1", 9000, factory)reactor.run()

asyncio版本

try:import asyncioexcept ImportError:## Trollius >= 0.3 was renamedimport trollius as asynciofrom autobahn.asyncio.websocket import WebSocketClientFactoryfactory = WebSocketClientFactory()factory.protocol = MyClientProtocolloop = asyncio.get_event_loop()coro = loop.create_connection(factory, '127.0.0.1', 9000)loop.run_until_complete(coro)loop.run_forever()loop.close()

發送消息
當我們的server或者client 繼承了autobahn之后就可以使用 sendMessage 方法來發送消息

接收消息
當我們的server或者client 實現了onMessage(self, payload, isBinary): 定義了該方法之后,就可以接收到消息

更多詳細介紹請查看: https://autobahn.readthedocs.io/en/latest/index.html

Twisted demo,本人寫的測試腳本:

以下的爬取數據方法,以及組裝數據結構到客戶端組方法,需要你自己重寫下,大致流程就是下面這樣,親測可用。
PS:twisted 的 reactor.callLater回調方法本身有bug,不穩定,過一段時間會假死。所以我用tornado替代了twisted實現了websocket,會在下一篇詳解,tornado做websocket是真NB,不解釋。

# -*- coding: utf-8 -*- """ -------------------------------------------------File Name :viewsDescription :Author : cjhdate :2020-02-20 ------------------------------------------------- """ import sys import json import time from utils.dbapi import DBPool from twisted.python import log from twisted.internet import reactor from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol, listenWSlog.startLogging(sys.stdout)class BroadcastServerProtocol(WebSocketServerProtocol):""" webSocket 服務端 """def onOpen(self):"""打開連接:return:"""self.factory.register(self)print("WebSocket connection open.")def onMessage(self, payload, isBinary):"""接收消息:param payload::param isBinary: Boolean值,判斷payload是否為二進制數據:return:"""if isBinary:print('WebSocket Data Type Error')else:obj = eval(payload.decode('utf-8'))unit_id = obj.get('unit_id') # 工程idhandle = obj.get('handle') # 操作類型db_code = obj.get('db_code') # 數據庫 編碼config = eval(obj.get('config')) # 數據庫 鏈接信息self.factory.client_to_group(self, db_code, handle, unit_id, config)def onConnect(self, request):"""打開鏈接:param request::return:"""print("Client connecting: {}".format(request.peer))def onClose(self, wasClean, code, reason):"""關閉連接:param wasClean::param code::param reason::return:"""print("WebSocket connection closed: {}".format(reason))self.factory.un_register(self)def send_msg(self, payload, is_binary):"""重寫 sendMessage:param payload: 務必編碼 .encode('utf-8'):param is_binary::return:"""self.sendMessage(payload=payload, isBinary=is_binary)class BroadcastServerFactory(WebSocketServerFactory, DBPool):""" 工廠 創建 協議類的實例"""def __init__(self, url):WebSocketServerFactory.__init__(self, url)self.data = [] # 待發送客戶端的數據self.send_list = [] # 待發送客戶端 列表self.client_list = [] # 客戶端 列表self.client_group = {} # 請求參數,組裝后字典self.polling() # 回調函數,定時主動鏈接客戶端,廣播數據def polling(self):"""回調函數:return:"""self.item_client_group()reactor.callLater(5, self.polling) # 異步遞歸調用,5秒def register(self, client):"""注冊客戶端,加入客戶端 列表:param client::return:"""if client not in self.client_list:print("registered client {}".format(client.peer))self.client_list.append(client)print self.client_listprint self.client_groupdef un_register(self, client):"""刪除客戶端:param client::return:"""if client in self.client_list:print("unregistered client {}".format(client.peer))self.client_list.remove(client)print self.client_listprint self.client_groupdef send_msg(self):"""輪循服務列表,逐個發消息(會一直被回調函數觸發):param data::return:"""self.data = json.dumps(self.data).encode('utf8')for client in self.send_list:client.sendMessage(self.data)print("broadcasting message to {}".format(client.peer))def client_to_group(self, client, db_code, handle, unit_id, config):"""組裝請求參數,掛載到客戶端:param client::param db_code::param unit_id::param config::param handle::return:"""if db_code in self.client_group:if handle in self.client_group[db_code]:if unit_id in self.client_group[db_code][handle]:self.client_group[db_code][handle][unit_id]['client'].append(client)else:self.client_group[db_code][handle].update({unit_id: {'client': [client], 'config': config}})else:self.client_group[db_code].update({handle: {unit_id: {'client': [client], 'config': config}}})else:self.client_group.update({db_code: {handle: {unit_id: {'client': [client], 'config': config}}}})def item_client_group(self):"""迭代客戶端及請求參數:return:"""for db_code, v_1 in self.client_group.items():for handle, v_2 in v_1.items():for unit_id, v_3 in v_2.items():self.get_close_client(v_3['client'])v_3['client'] = self.send_listself.get_data(handle, unit_id, v_3['config']) # 實際業務操作,爬取數據self.send_msg() # 發送數據到客戶端def get_close_client(self, client_send):"""主動剔除已關閉 客戶端:param client_send::return:"""self.send_list = list(set(self.client_list) & set(client_send))def get_data(self, handle, unit_id, config):"""爬取數據:param db_code::param handle::param unit_id::param config::return:"""print 'start'print time.time()self.data = [] # 初始化if handle == 'real' and unit_id and config:config = dict(host=config.get('HOST'), port=eval(config.get('PORT')), db=config.get('NAME'),user=config.get('USER'), passwd=config.get('PASSWORD'))DBPool.__init__(self, config)sql_1 = "select `code`,recode,`name`,`rename`,show_id,gateway_address address,gateway_id,lot_type " \"address_type from archive_device where status='a' and unit_id={}".format(unit_id)sql_data = "select * from device_data_real where equip_address='{}' limit 1"self.data = self.query(sql_1)for i in self.data:data = self.query(sql_data.format(i.get('address')))if data:value = data[0].get(i.get('code'))i['value'] = valueelse:i['value'] = ''print time.time()print '_________________________________________'def run():factory = BroadcastServerFactoryfactory = factory('ws://0.0.0.0:4053') # 實例化 WebSocketServerFactory 類factory.protocol = BroadcastServerProtocol # 運行服務listenWS(factory) # webSocket監聽reactor.run() # 監聽if __name__ == '__main__':run()

總結

以上是生活随笔為你收集整理的Python+Twisted+Autobahn实现Websocket(附完整demo)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。