twisted系列教程十八–异步操作的并行运行
Introduction
在上一部分我們學(xué)習(xí)了一種新的用生成器來組織一系列異步callbacks 的方法.加上deferred,我們已經(jīng)有兩種組織異步操作的方法了.
有時候,我們想讓一組異步操作并行的運行.因為twisted 是單線程的,它不會真正的并行的運行,但是我們想要異步的I/O 在一組任務(wù)上運行的盡可能的快.比如我們的poetry client,從多個server上同時下載詩,而不是一個接一個的.這就是我們?yōu)槭裁从胻wisted.
我們的opetry client 不得不解決這個問題:你怎么知道你所有的異步操作什么時候能結(jié)束?目前為止我們是用把所有的返回結(jié)果放進(jìn)一個list里面,并檢查這個list 的長度.我們必須在收集結(jié)果的時候非常注意,因為一個錯誤的結(jié)果可能讓我們的程序永久的運行下去.
就如你想象的那樣,twisted 包含了一個解決這個問題的抽象,我們今天就會學(xué)習(xí)一下它的用法.
The DeferredList
DeferredList類允許我們把一個deferred 對象的列表當(dāng)成一個deferred來對待.這樣的話我們就可以開啟多個異步的操作并在它們?nèi)繄?zhí)行完的時候得到通知.讓我們看一些例子.
在deferred-list/deferred-list-1.py,你會發(fā)現(xiàn)這些代碼:
from twisted.internet import defer
def got_results(res):
????print 'We got:', res
print 'Empty List.'
d = defer.DeferredList([])
print 'Adding Callback.'
d.addCallback(got_results)
如果你運行它,你會得到如下的輸出:
Empty List.
Adding Callback.
We got: []
需要注意的一些事情:
????DeferredList 是從python 的list 創(chuàng)建而來.在這種情況下這個list 是空的,但是我們會看到這個list 里面的對象必須都是Deferred 對象
????DeferredList 也是一個deferred 對象,它繼承至Deferred.這就意味著你可以向它加入callback 和errback,就像它是一個普通的deferred一樣
????在上面的例子中,我們的callback在被我們加入之后立即觸發(fā),所以這個DeferredList一定已經(jīng)立馬觸發(fā)了.我們過一會會繼續(xù)討論這個
????deferred list 的返回結(jié)果是一個空的list
?
現(xiàn)在讓我們看一下 deferred-list/deferred-list-2.py:
from twisted.internet import defer
def got_results(res):
????print 'We got:', res
print 'One Deferred.'
d1 = defer.Deferred()
d = defer.DeferredList([d1])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
現(xiàn)在我們創(chuàng)建了一個包含一個deferred 對象的DeferredList,下面是我們得到的信息:
One Deferred.
Adding Callback.
Firing d1.
We got: [(True, 'd1 result')]
一些注意的事情:
????這一次DeferredList沒有觸發(fā)它的callback直到我們觸發(fā)了list中的deferred
????這個結(jié)果仍舊是一個list,不過這一次有了一個元素
????這個元素是一個tuple,它的第二個值是它對應(yīng)的deferred 的結(jié)果
?
讓我們向list中添加兩個deferred,在deferred-list/deferred-list-3.py:
from twisted.internet import defer
def got_results(res):
????print 'We got:', res
print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2.'
d2.callback('d2 result')
下面是輸出:
Two Deferreds.
Adding Callback.
Firing d1.
Firing d2.
We got: [(True, 'd1 result'), (True, 'd2 result')]
DeferredList 的結(jié)果是一個數(shù)量和DeferredList 中deferred的數(shù)量的相同的list.結(jié)果的中的每一個元素包含了和它相對應(yīng)的deferred 的返回結(jié)果,前提是這個deferred運行成功.這就意味著DeferredList不會觸發(fā)直到list 中的所有中的deferred 都已經(jīng)觸發(fā).一個包含空列表的DeferredList 會立即觸發(fā).
DeferredList 中deferred 運行的順序是怎樣的呢? 看一下 deferred-list/deferred-list-4.py:
from twisted.internet import defer
def got_results(res):
????print 'We got:', res
print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d2.'
d2.callback('d2 result')
print 'Firing d1.'
d1.callback('d1 result')
現(xiàn)在我們先觸發(fā)d2然后觸發(fā)d1.下面是輸出:
Two Deferreds.
Adding Callback.
Firing d2.
Firing d1.
We got: [(True, 'd1 result'), (True, 'd2 result')]
輸出列表有著和原來的list 的一樣的順序,而不是被觸發(fā)的順序.這樣非常好,因為我們可以很好的把輸出結(jié)果和deferred很好的關(guān)聯(lián)起來.
好的,如果DeferredList 中的deferred 有一個失敗了會發(fā)生什么?輸出中的那些True 是做什么用的?讓我們看 deferred-list/deferred-list-5.py:
from twisted.internet import defer
def got_results(res):
????print 'We got:', res
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2], consumeErrors=True)
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2 with errback.'
d2.errback(Exception('d2 failure'))
現(xiàn)在我們用一個正常的結(jié)果觸發(fā)d1,用一個error來觸發(fā)d2.咱們先暫時忽略掉 consumeErrors 選項,下面是輸出:
Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, >)]
現(xiàn)在和d2 對應(yīng)的返回結(jié)果出現(xiàn)一個錯誤.到現(xiàn)在我們應(yīng)該清楚DeferredList 是怎樣工作的:
????DeferredList是被一個deferred 的列表組成的
????DeferredList本身也是一個deferred,它的返回結(jié)果是一個長度和DeferredList本身長度的列表
????DeferredList在列表中所有deferred都觸發(fā)之后才被觸發(fā)
????返回結(jié)果列表中的每一個元素對應(yīng)著DeferredList中的每一個deferred.加入那個deferred成功了 這個元素是(True,result),假如這個deferred失敗了,這個元素是(False,failure)
????一個DeferredList不會失敗,因為每一個deferred 無論成功與否它的結(jié)果都會被搜集到返回的列表中
下面讓我們看一下consumeErrors 選項,假如我們不設(shè)置consumeErrors選項(deferred-list/deferred-list-6.py),我們會得到如下的輸出:
Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, >twisted.python.failure.Failure >type 'exceptions.Exception'<<)]
Unhandled error in Deferred:
Traceback (most recent call last):
Failure: exceptions.Exception: d2 failure
如果你回想一下,在deferred 中未處理的錯誤信息會在deferred 被垃圾回收的時候被拋出來.這個信息告訴我們我們沒有全部的捕捉我們異步程序中的錯誤.這個錯誤信息從哪里來的呢?它明顯的不是從DeferredList來的,所以這個錯誤一定來自d2.
DeferredList需要知道它下面的deferreds都在什么時候觸發(fā).DeferredList 通過增加一個callback和errback到每一個deferred,這樣就可以監(jiān)測了.默認(rèn)的,這個callback(errback) 返回正常的結(jié)果(錯誤),因為返回錯誤會觸發(fā)下一個errback,這樣d2 在觸發(fā)之后會保持失敗狀態(tài).
但是假如我們設(shè)置consumeErrors 為True,向每一個deferred加入的errback 會返回None.我們也可以向d2加入自己的errback,例子在deferred-list/deferred-list-7.py.
Client 8.0
我們的poetry client 的8.0版本使用DeferredList去監(jiān)測什么時候所有的poetry全部下載完.你可以在twisted-client-8/get-poetry.py看到代碼.唯一的變化是poetry_main.讓我們看一下主要的變化:
...
ds = []
for (host, port) in addresses:
????d = get_transformed_poem(host, port)
????d.addCallbacks(got_poem)
????ds.append(d)
dlist = defer.DeferredList(ds, consumeErrors=True)
dlist.addCallback(lambda res : reactor.stop())
在client 8.0 中,我們不需要poem_done callback 或者 results list.相反的,我們把從get_transformed_poem 獲得的deferred 全部放入一個list,并創(chuàng)建一個DeferredList.因為DeferredList 會直到所有的deferred觸發(fā)之后才會被觸發(fā).我們向DeferredList增加一個callback來關(guān)閉reactor.在這種情況下,我們沒有使用DeferredList的返回結(jié)果,我們只需要知道什么時候所有的事情能結(jié)束.
Discussion
我們可以用圖片形象話一個DeferredList 是怎樣工作的,圖片三十七:
圖片三十七
真的很簡單,仍舊有幾個DeferredList 的參數(shù)我們沒有覆蓋掉.這些參數(shù)會改變DeferredList的默認(rèn)行為,感興趣的可以自己看.
在下一部分我們還會講deferred 的一個特色,一個剛被twisted 10.1.0 加進(jìn)去的.
總結(jié)
以上是生活随笔為你收集整理的twisted系列教程十八–异步操作的并行运行的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python的接口实现zope.inte
- 下一篇: Lockdoor Framework:一