在游戲行業游戲客戶端和服務端需要大量的,快速的通訊,這里面就會用到websocket 。
Autobahn 是一個高性能的websocket 它采用了兩種實現方案 。
1 基于Twisted
2 基于asyncio
開發語言: python
PS:twisted 的 reactor.callLater回調方法本身有bug,不穩定,過一段時間會假死,可能是因為我沒深入了解的原因。所以我用tornado替代了twisted實現了websocket,會在下一篇詳解,tornado做websocket是真NB,穩定高效簡介,可參考我這篇博客【https://editor.csdn.net/md/?articleId=106819559】,而且更簡單。
Autobahn 有如下的特點:
framework
for WebSocket
and WAMP clients
(Websocket框架和WAMP框架
)
compatible
with Python
2.7 and 3.3+ (兼容python2
.7和python3
.3以上的版本
)
runs on CPython
, PyPy
and Jython
(可以運行在Cpython
, PyPy 和Jython 上面
)
runs under Twisted
and asyncio
(可以選擇Twisted 或者是asyncio的方式來運行
)
implements WebSocket RFC6455
(and draft versions Hybi
-10+) (實現WebSocket RFC6455(以及草案版本Hybi
-10 +)
)
implements WebSocket compression
(實現WebSocket壓縮
)
implements WAMP
, the Web Application Messaging Protocol
(實現Web應用程序消息傳遞協議
)
supports TLS
(secure WebSocket
) and proxies
(支持TLS(安全WebSocket)和代理
)
Open
-source
(MIT license
) (開源
)
Autobahn 可以用來做什么
Autobahn 非常適用于交易系統,多人游戲, 實時聊天等應用程序的開發
一個簡單的webSocket服務端程序
from autobahn
.twisted
.websocket
import WebSocketServerProtocol
class MyServerProtocol(WebSocketServerProtocol
):def onConnect(self
, request
):print("Client connecting: {}".format(request
.peer
))def onOpen(self
):print("WebSocket connection open.")def onMessage(self
, payload
, isBinary
):if isBinary
:print("Binary message received: {} bytes".format(len(payload
)))else:print("Text message received: {}".format(payload
.decode
('utf8')))self
.sendMessage
(payload
, isBinary
)def onClose(self
, wasClean
, code
, reason
):print("WebSocket connection closed: {}".format(reason
))
幾個重要的方法:
onConnect: 當有新的客戶端連接進來的時候調用該方法
onOpen: 當連接成功時調用該方法
onMessage:該方法是接收來自客戶端的消息
onClose: 當關閉時調用該方法
安裝Autobahn
Autobahn 的運行依賴于 Twisted 和 asyncio 因此安裝Autobahn 需要確認你的python支持了
Twisted 和 asyncio
各個python 版本對 Twisted 和 asyncio 支持如下
使用pip 安裝
pip install autobahn[twisted]
pip install autobahn[asyncio]
驗證是否安裝成功
from autobahn import version
print(version)
0.9.1
啟動webScoketServer
twisted 版本:
import sys
from twisted
.python
import log
from twisted
.internet
import reactorlog
.startLogging
(sys
.stdout
)from autobahn
.twisted
.websocket
import WebSocketServerFactoryfactory
= WebSocketServerFactory
()factory
.protocol
= MyServerProtocolreactor
.listenTCP
(9000, factory
)reactor
.run
()
asyncio 版本:
try:import asyncio
except ImportError
:import trollius
as asyncio
from autobahn
.asyncio
.websocket
import WebSocketServerFactoryfactory
= WebSocketServerFactory
()factory
.protocol
= MyServerProtocolloop
= asyncio
.get_event_loop
()coro
= loop
.create_server
(factory
, '127.0.0.1', 9000)server
= loop
.run_until_complete
(coro
)try:loop
.run_forever
()except KeyboardInterrupt
:passfinally:server
.close
()loop
.close
()
創建WebSocketClient
class MyClientProtocol(WebSocketClientProtocol
):def onOpen(self
):self
.sendMessage
(u
"Hello, world!".encode
('utf8'))def onMessage(self
, payload
, isBinary
):if isBinary
:print("Binary message received: {0} bytes".format(len(payload
)))else:print("Text message received: {0}".format(payload
.decode
('utf8')))
WebSocketClientProtocol 可以使用:
autobahn.twisted.websocket.WebSocketClientProtocol
autobahn.asyncio.websocket.WebSocketClientProtocol
使用Client
Twisted版本
import sys
from twisted
.python
import log
from twisted
.internet
import reactorlog
.startLogging
(sys
.stdout
)from autobahn
.twisted
.websocket
import WebSocketClientFactoryfactory
= WebSocketClientFactory
()factory
.protocol
= MyClientProtocolreactor
.connectTCP
("127.0.0.1", 9000, factory
)reactor
.run
()
asyncio版本
try:import asyncio
except ImportError
:import trollius
as asyncio
from autobahn
.asyncio
.websocket
import WebSocketClientFactoryfactory
= WebSocketClientFactory
()factory
.protocol
= MyClientProtocolloop
= asyncio
.get_event_loop
()coro
= loop
.create_connection
(factory
, '127.0.0.1', 9000)loop
.run_until_complete
(coro
)loop
.run_forever
()loop
.close
()
發送消息
當我們的server或者client 繼承了autobahn之后就可以使用 sendMessage 方法來發送消息
接收消息
當我們的server或者client 實現了onMessage(self, payload, isBinary): 定義了該方法之后,就可以接收到消息
更多詳細介紹請查看: https://autobahn.readthedocs.io/en/latest/index.html
Twisted demo,本人寫的測試腳本:
以下的爬取數據方法,以及組裝數據結構到客戶端組方法,需要你自己重寫下,大致流程就是下面這樣,親測可用。
PS:twisted 的 reactor.callLater回調方法本身有bug,不穩定,過一段時間會假死。所以我用tornado替代了twisted實現了websocket,會在下一篇詳解,tornado做websocket是真NB,不解釋。
"""
-------------------------------------------------File Name :viewsDescription :Author : cjhdate :2020-02-20
-------------------------------------------------
"""
import sys
import json
import time
from utils
.dbapi
import DBPool
from twisted
.python
import log
from twisted
.internet
import reactor
from autobahn
.twisted
.websocket
import WebSocketServerFactory
, WebSocketServerProtocol
, listenWSlog
.startLogging
(sys
.stdout
)class BroadcastServerProtocol(WebSocketServerProtocol
):""" webSocket 服務端 """def onOpen(self
):"""打開連接:return:"""self
.factory
.register
(self
)print("WebSocket connection open.")def onMessage(self
, payload
, isBinary
):"""接收消息:param payload::param isBinary: Boolean值,判斷payload是否為二進制數據:return:"""if isBinary
:print('WebSocket Data Type Error')else:obj
= eval(payload
.decode
('utf-8'))unit_id
= obj
.get
('unit_id') handle
= obj
.get
('handle') db_code
= obj
.get
('db_code') config
= eval(obj
.get
('config')) self
.factory
.client_to_group
(self
, db_code
, handle
, unit_id
, config
)def onConnect(self
, request
):"""打開鏈接:param request::return:"""print("Client connecting: {}".format(request
.peer
))def onClose(self
, wasClean
, code
, reason
):"""關閉連接:param wasClean::param code::param reason::return:"""print("WebSocket connection closed: {}".format(reason
))self
.factory
.un_register
(self
)def send_msg(self
, payload
, is_binary
):"""重寫 sendMessage:param payload: 務必編碼 .encode('utf-8'):param is_binary::return:"""self
.sendMessage
(payload
=payload
, isBinary
=is_binary
)class BroadcastServerFactory(WebSocketServerFactory
, DBPool
):""" 工廠 創建 協議類的實例"""def __init__(self
, url
):WebSocketServerFactory
.__init__
(self
, url
)self
.data
= [] self
.send_list
= [] self
.client_list
= [] self
.client_group
= {} self
.polling
() def polling(self
):"""回調函數:return:"""self
.item_client_group
()reactor
.callLater
(5, self
.polling
) def register(self
, client
):"""注冊客戶端,加入客戶端 列表:param client::return:"""if client
not in self
.client_list
:print("registered client {}".format(client
.peer
))self
.client_list
.append
(client
)print self
.client_list
print self
.client_group
def un_register(self
, client
):"""刪除客戶端:param client::return:"""if client
in self
.client_list
:print("unregistered client {}".format(client
.peer
))self
.client_list
.remove
(client
)print self
.client_list
print self
.client_group
def send_msg(self
):"""輪循服務列表,逐個發消息(會一直被回調函數觸發):param data::return:"""self
.data
= json
.dumps
(self
.data
).encode
('utf8')for client
in self
.send_list
:client
.sendMessage
(self
.data
)print("broadcasting message to {}".format(client
.peer
))def client_to_group(self
, client
, db_code
, handle
, unit_id
, config
):"""組裝請求參數,掛載到客戶端:param client::param db_code::param unit_id::param config::param handle::return:"""if db_code
in self
.client_group
:if handle
in self
.client_group
[db_code
]:if unit_id
in self
.client_group
[db_code
][handle
]:self
.client_group
[db_code
][handle
][unit_id
]['client'].append
(client
)else:self
.client_group
[db_code
][handle
].update
({unit_id
: {'client': [client
], 'config': config
}})else:self
.client_group
[db_code
].update
({handle
: {unit_id
: {'client': [client
], 'config': config
}}})else:self
.client_group
.update
({db_code
: {handle
: {unit_id
: {'client': [client
], 'config': config
}}}})def item_client_group(self
):"""迭代客戶端及請求參數:return:"""for db_code
, v_1
in self
.client_group
.items
():for handle
, v_2
in v_1
.items
():for unit_id
, v_3
in v_2
.items
():self
.get_close_client
(v_3
['client'])v_3
['client'] = self
.send_listself
.get_data
(handle
, unit_id
, v_3
['config']) self
.send_msg
() def get_close_client(self
, client_send
):"""主動剔除已關閉 客戶端:param client_send::return:"""self
.send_list
= list(set(self
.client_list
) & set(client_send
))def get_data(self
, handle
, unit_id
, config
):"""爬取數據:param db_code::param handle::param unit_id::param config::return:"""print 'start'print time
.time
()self
.data
= [] if handle
== 'real' and unit_id
and config
:config
= dict(host
=config
.get
('HOST'), port
=eval(config
.get
('PORT')), db
=config
.get
('NAME'),user
=config
.get
('USER'), passwd
=config
.get
('PASSWORD'))DBPool
.__init__
(self
, config
)sql_1
= "select `code`,recode,`name`,`rename`,show_id,gateway_address address,gateway_id,lot_type " \
"address_type from archive_device where status='a' and unit_id={}".format(unit_id
)sql_data
= "select * from device_data_real where equip_address='{}' limit 1"self
.data
= self
.query
(sql_1
)for i
in self
.data
:data
= self
.query
(sql_data
.format(i
.get
('address')))if data
:value
= data
[0].get
(i
.get
('code'))i
['value'] = value
else:i
['value'] = ''print time
.time
()print '_________________________________________'def run():factory
= BroadcastServerFactoryfactory
= factory
('ws://0.0.0.0:4053') factory
.protocol
= BroadcastServerProtocol listenWS
(factory
) reactor
.run
() if __name__
== '__main__':run
()
總結
以上是生活随笔為你收集整理的Python+Twisted+Autobahn实现Websocket(附完整demo)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。