Twisted——基于事件驱动的Python网络框架
對于追求服務(wù)器程序性能的應(yīng)用有什么適用的Python框架嗎?那就是今天和大家分享的Twisted框架,它支持許多常見的傳輸及應(yīng)用層協(xié)議,包括TCP、UDP、SSL/TLS、HTTP、FTP等,這也意味著能為客戶端和服務(wù)器端提供自定義開發(fā)工具。那為什么就說它能保證高效能通信呢?
Twisted在不同的操作系統(tǒng)平臺上利用了不同的底層技術(shù):在Windows中,基于IO完成端口技術(shù)保證了底層高效地將I/O事件通知給框架及應(yīng)用程序;在Linux中采用epoll技術(shù),它能顯著提高在大量并發(fā)連接中只有少量活躍的情況下CPU利用率。Twisted框架采用Reactor設(shè)計模式,它的核心是Reactor的事件循環(huán),監(jiān)聽網(wǎng)絡(luò)、文件系統(tǒng)以及定時器等事件,并提供統(tǒng)一處理接口,使得事件能被快速響應(yīng)。
在上一篇事件驅(qū)動中介紹過:對于不需要同步處理的多任務(wù),我們可以使用事件驅(qū)動。那么在Twisted中使得程序設(shè)計可以采用事件驅(qū)動機制得益于Deferred(延遲)對象,它是一個管理回調(diào)函數(shù)的對象,我們可以向該對象添加需要回調(diào)的函數(shù),同時可以指定該組回調(diào)函數(shù)何時被調(diào)用。
from twisted.internet import deferfrom twisted.python import failureimport sys?d = defer.Deferred() # 定義defer實例def printSquare(d): # 正常處理函數(shù) print("Square of %d is %d" % (d, d * d))?def processError(f): # 錯誤處理函數(shù) print("Error with process ")d.addCallback(printSquare) # 添加正常處理的回調(diào)函數(shù)d.addErrback(processError) # 添加錯誤處理回調(diào)函數(shù)# 開始調(diào)用deferif len(sys.argv) > 1 and sys.argv[1] == 'call_error': f = failure.Failure(Exception('my Exception')) d.errback(f) # 調(diào)用錯誤處理函數(shù)processErrorelse: d.callback(4) # 調(diào)用正常處理函數(shù)printSquare(4)代碼圍繞twisted.internet.defer.Deffered對象展開。
Defer中可以管理兩種回調(diào)函數(shù):Deffered.addCallback()正常處理函數(shù)和Deffered.addErrback錯誤處理函數(shù)。兩種回調(diào)函數(shù)可以通過Deffered.callback()和Deffered.errback()進(jìn)行調(diào)用。
另外可以給一個Deffer對象賦予多個正常或錯誤處理的回調(diào)函數(shù),這樣在Defer對象內(nèi)部形成正常處理函數(shù)鏈和錯誤處理函數(shù)鏈,示例代碼如下。
from twisted.internet import defer?d?=?defer.Deferred()?def printSquare(d): print("Square of %d is %d", (d, d * d)) return d?def processError(f): print("error with process")?def printTwice(d): print("Twice of %d is %d", (2 * d)) return d?d.addCallback(printSquare)d.addErrback(processError)d.addCallback(printTwice)?d.callback(5)# Square of %d is %d (5, 25)#?Twice?of?%d?is?%d?10Deffered的主要成員函數(shù)包括:
| addCallback(self, callback, *args, **kwargs) | 給Defer對象添加正常處理回調(diào)函數(shù),需要至少有一個輸入?yún)?shù) |
| addErrback(self,?errback, *args, **kwargs) | 給Defer對象添加錯誤處理回調(diào)函數(shù),errback為錯誤處理函數(shù)名,需要至少有一個輸入?yún)?shù) |
| addBoth(self, callback, *args, **kwargs) | 回調(diào)函數(shù)同時作為正常和錯誤處理回調(diào)函數(shù)添加到Defer對象中 |
| chainDeffered(self, d) | 將另一個Defer對象的正常和錯誤處理回調(diào)函數(shù)添加到本Defer對象中。本函數(shù)是單向的 |
| callback(self, result) | 調(diào)用正常處理函數(shù)鏈,result是傳遞給第一個正常處理回調(diào)函數(shù)的參數(shù) |
| errback(self, fail=None) | 調(diào)用錯誤處理函數(shù)鏈,result是傳遞給第一個錯誤處理回調(diào)函數(shù)的參數(shù)。 |
| pause(self)和unpause(self) | pause(self)和unpause(self)?用來暫停和繼續(xù)調(diào)用鏈 |
Defer為什么要分別管理兩條回調(diào)函數(shù)調(diào)用鏈?因為調(diào)用鏈函數(shù)之間除了簡單的順序調(diào)用關(guān)系,還存在交叉調(diào)用關(guān)系,兩條為了對回調(diào)過程提供更好的可控性,調(diào)用流程圖如下:
其中實線為回調(diào)函數(shù)正常返回時的繼續(xù)調(diào)用路徑,虛線為處理函數(shù)中產(chǎn)生異常時的后續(xù)調(diào)用路徑。
我們再將Deffer對象和reactor的延時調(diào)用機制結(jié)合在一起,來實現(xiàn)異步調(diào)用函數(shù)。
makeDefer函數(shù)內(nèi)定義了調(diào)用鏈執(zhí)行的邏輯關(guān)系,其中 reactor.callLater(2, d.callback, 5)表示在reactor.run()運行后的2后,twisted框架才去調(diào)用callback對應(yīng)的兩個函數(shù)(printSquare,printTwice)。
callLater()函數(shù)原型如下
def?callLater(delay,?callable,?*args,?**kw):??passdelay定義延時調(diào)用秒數(shù),如果為0則是立即調(diào)用;callable為被調(diào)用的函數(shù)名及其參數(shù)。
通過reactor.callLater(4, reactor.stop)定義4秒后調(diào)用函數(shù)reactor.stop(),還可以實現(xiàn)定時退出Twisted消息循環(huán)。
下面我們通過一個實時通信的廣播系統(tǒng)模型介紹下用Twisted框架開發(fā)基于TCP的網(wǎng)絡(luò)應(yīng)用的方法:
首先Twisted提供了基本的通信編程封裝,這里先介紹下Transports。它代表網(wǎng)絡(luò)中兩個通信結(jié)點之間的連接。Transports負(fù)責(zé)描述連接的細(xì)節(jié),比如連接是面向流式的還是面向數(shù)據(jù)報的,流控以及可靠性,比如TCP、UDP和Unix套接字。對應(yīng)方法如下:
| write | 以非阻塞的方式按順序依次將數(shù)據(jù)寫到物理連接上 |
| writeSequence | 將一個字符串列表寫到物理連接上 |
| loseConnection | 將所有掛起的數(shù)據(jù)寫入,然后關(guān)閉連接 |
| getPeer | 取得連接中對端的地址信息 |
| getHost | 取得連接中本端的地址信息 |
Protocols描述了如何以異步的方式處理網(wǎng)絡(luò)中的事件。HTTP、DNS以及IMAP是應(yīng)用層協(xié)議中的例子。Protocols實現(xiàn)了IProtocol接口,它包含如下的方法:
| makeConnection | 在transport對象和服務(wù)器之間建立一條連接 |
| connectionMade | ?連接建立起來后調(diào)用 |
| dataReceived | 接收數(shù)據(jù)時調(diào)用 |
| connectionLost | 關(guān)閉連接時調(diào)用 |
廣播系統(tǒng)服務(wù)器
針對Twisted的Protocol、Factory等類進(jìn)行編程,定義它們的子類并重寫connectionMade和dataReceived進(jìn)行事件化處理。
from twisted.internet.protocol import Protocolfrom twisted.internet.protocol import Factoryfrom twisted.internet.endpoints import TCP4ServerEndpointfrom twisted.internet import reactor?clients = [] # 保存所有客戶端連接# Protocol的子類class Spreader(Protocol): def __init__(self, factory): self.factory = factory? def connectionMade(self): self.factory.numProtocols = self.factory.numProtocols + 1 # 對連接的客戶端進(jìn)行計數(shù) self.transport.write( (u"歡迎來到Spread Site,您是第%d個客戶端用戶!\n" % (self.factory.numProtocols,).encode('utf8'))) print(f"new connect {self.factory.numProtocols}") clients.append(self) # 將self保存到clients列表中? def connectionLost(self, reason): clients.remove() print(f"lost connect: {self.connect_id}")? def dataReceived(self, data): if data == "close": self.transport.loseConnection() print(f"{self.connect_id} closed") else: print(f"spreading message from {self.connect_id, self.data}") for client in clients: if client != self: # 將收到的數(shù)據(jù)通過Protocol.transport.write()函數(shù)分發(fā)給除自己以外的所有客戶端 client.transport.write(data)# Factory的子類class SpreadFactory(Factory): def __init__(self): self.numProtocols = 0 # 將客戶端計數(shù)器置0? def buildProtocol(self, addr):????????return?Spreader(self)??#?建立Protocol子類的實例?endpoint = TCP4ServerEndpoint(reactor, 8007) # 定義服務(wù)器監(jiān)聽端口endpoint.listen(SpreadFactory()) # 指定子類實例reactor.run() # 掛起運行廣播客戶端
Twisted同樣提供了基于Protocol類的TCP客戶端編程方法。
from twisted.internet.protocol import Protocol, ClientFactoryfrom twisted.internet import reactorimport sysimport datetime?class Echo(Protocol): def connectionMade(self): print("connect to server!")? def dataReceived(self, data): print("got message: ", data.decode('utf8')) reactor.callLater(5, self.say_hello)? def connectionLost(self, reason): print("Disconnected from server!")? def say_hello(self): if self.transport.connected: self.transport.write( (u"hello, I'm %s %s" % (sys.argv[1], datetime.datetime.now())).encode('utf-8'))?class EchoClientFactory(ClientFactory): def __init__(self): self.protocol = None? def startedConnecting(self, connector): print("Start to connect.")? def buildProtocol(self, addr): self.protocol = Echo() return self.protocol? def clientConnectionLost(self, connector, reason): print("Lost Connection. Reason:", reason)? def clientConnectionFailed(self, connector, reason): print("Connection failed. Reason:", reason)?host = "127.0.0.1"port =8007factory = EchoClientFactory()reactor.connectTCP(host, port, factory)reactor.run()執(zhí)行順序如下:
-
建立連接
ClientFactory.startedConnecting()
Protocol.connectionMade()
-
已連接
用Protocol.dataReceived()接受消息
用Protocol.transport.write()發(fā)送消息
-
連接斷開
Protocol.connectionLost()
ClientFactory.?clientConnectionLost()
即建立連接時先執(zhí)行ClientFactory中回調(diào),然后執(zhí)行Protocol中回調(diào),連接斷開時正好相反。?? ???
歡迎你來我的公眾號“才淺的每日python”和我一起討論,也歡迎你來催更扯淡。日拱一卒,下一篇文不見不散。
總結(jié)
以上是生活随笔為你收集整理的 Twisted——基于事件驱动的Python网络框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中国电信IT研发中心 2019校园招聘
- 下一篇: Microsoft Office Vis