Python 异步 IO 、协程、asyncio、async/await、aiohttp
From :廖雪峰?異步IO :https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152
Python Async/Await入門指南 :https://zhuanlan.zhihu.com/p/27258289
Python 生成器 和 yield 關(guān)鍵字:https://blog.csdn.net/freeking101/article/details/51126293
協(xié)程與任務(wù)?官網(wǎng)文檔:https://docs.python.org/zh-cn/3/library/asyncio-task.html
Python中異步協(xié)程的使用方法介紹:https://blog.csdn.net/freeking101/article/details/88119858
python 協(xié)程詳解及I/O多路復(fù)用,I/O異步:https://blog.csdn.net/u014028063/article/details/81408395
Python協(xié)程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html
asyncio 進階:Python黑魔法 --- 異步IO( asyncio) 協(xié)程:https://www.cnblogs.com/dhcn/p/9033628.html
談?wù)凱ython協(xié)程技術(shù)的演進:https://www.freebuf.com/company-information/153421.html
最后推薦一下《流暢的Python》,這本書中 第16章 協(xié)程的部分介紹的非常詳細
《流暢的Python》pdf 下載地址:https://download.csdn.net/download/freeking101/10993120
gevent 是 python 的一個并發(fā)框架,以微線程 greenlet 為核心,使用了 epoll 事件監(jiān)聽機制以及諸多其他優(yōu)化而變得高效。
aiohttp 使用代理 ip 訪問 https 網(wǎng)站報錯的問題:https://blog.csdn.net/qq_43210211/article/details/108379917
Python:使用 Future、asyncio 處理并發(fā)
:https://blog.csdn.net/sinat_38682860/article/details/105419842
異步? IO
在 IO 編程(?廖雪峰 Python IO 編程 :https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776)?一節(jié)中,我們已經(jīng)知道,CPU的速度遠遠快于磁盤、網(wǎng)絡(luò)等IO。在一個線程中,CPU執(zhí)行代碼的速度極快,然而,一旦遇到IO操作,如讀寫文件、發(fā)送網(wǎng)絡(luò)數(shù)據(jù)時,就需要等待IO操作完成,才能繼續(xù)進行下一步操作。這種情況稱為同步IO。
在IO操作的過程中,當(dāng)前線程被掛起,而其他需要CPU執(zhí)行的代碼就無法被當(dāng)前線程執(zhí)行了。
因為一個 IO 操作就阻塞了當(dāng)前線程,導(dǎo)致其他代碼無法執(zhí)行,所以我們必須使用多線程或者多進程來并發(fā)執(zhí)行代碼,為多個用戶服務(wù)。每個用戶都會分配一個線程,如果遇到IO導(dǎo)致線程被掛起,其他用戶的線程不受影響。
多線程和多進程的模型雖然解決了并發(fā)問題,但是系統(tǒng)不能無上限地增加線程。由于系統(tǒng)切換線程的開銷也很大,所以,一旦線程數(shù)量過多,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了,結(jié)果導(dǎo)致性能嚴重下降。
由于我們要解決的問題是CPU高速執(zhí)行能力和IO設(shè)備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法。
另一種解決IO問題的方法是異步IO。當(dāng)代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令,并不等待IO結(jié)果,然后就去執(zhí)行其他代碼了。一段時間后,當(dāng)IO返回結(jié)果時,再通知CPU進行處理。
消息模型 其實早在應(yīng)用在桌面應(yīng)用程序中了。一個 GUI 程序的主線程就負責(zé)不停地讀取消息并處理消息。所有的鍵盤、鼠標(biāo)等消息都被發(fā)送到GUI程序的消息隊列中,然后由GUI程序的主線程處理。
由于GUI 線程處理鍵盤、鼠標(biāo)等消息的速度非常快,所以用戶感覺不到延遲。某些時候,GUI線程在一個消息處理的過程中遇到問題導(dǎo)致一次消息處理時間過長,此時,用戶會感覺到整個GUI程序停止響應(yīng)了,敲鍵盤、點鼠標(biāo)都沒有反應(yīng)。這種情況說明在消息模型中,處理一個消息必須非常迅速,否則,主線程將無法及時處理消息隊列中的其他消息,導(dǎo)致程序看上去停止響應(yīng)。
消息模型 是 如何解決 同步IO 必須等待IO操作這一問題的呢 ?
在消息處理過程中,當(dāng)遇到 IO 操作時,代碼只負責(zé)發(fā)出IO請求,不等待IO結(jié)果,然后直接結(jié)束本輪消息處理,進入下一輪消息處理過程。當(dāng)IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果。
在 “發(fā)出IO請求” 到收到 “IO完成” 的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力。
協(xié)程 (Coroutines)
在學(xué)習(xí)異步IO模型前,我們先來了解協(xié)程,協(xié)程 又稱 微線程,纖程,英文名 Coroutine。
子程序(?又叫?函數(shù) ) 和 協(xié)程
- 子程序?在 所有語言中都是層級調(diào)用。比如: A 調(diào)用 B,B 在執(zhí)行過程中又調(diào)用了 C,C 執(zhí)行完畢返回,B 執(zhí)行完畢返回,最后是 A 執(zhí)行完畢。所以 子程序 即 函數(shù) 的調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。
- 協(xié)程的調(diào)用 和 子程序 不同。協(xié)程 看上去也是 子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。
注意,在一個 子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。比如:子程序 A 和 B :
def A():print('1')print('2')print('3')def B():print('x')print('y')print('z')假設(shè)由協(xié)程執(zhí)行,在執(zhí)行 A 的過程中,可以隨時中斷,去執(zhí)行 B,B 也可能在執(zhí)行過程中中斷再去執(zhí)行 A,結(jié)果可能是:
1 2 x y 3 z但是在 A 中是沒有調(diào)用 B 的,所以 協(xié)程的調(diào)用 比 函數(shù)調(diào)用 理解起來要難一些。
看起來 A、B 的執(zhí)行有點像多線程,但 協(xié)程 的特點在于是一個線程執(zhí)行。
協(xié)程 和 多線程比,協(xié)程有何優(yōu)勢?
- 1. 最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為 子程序 切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
- 2. 第二大優(yōu)勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?
最簡單的方法是 多進程 + 協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。
Python 對 協(xié)程 的支持 是通過 generator (生成器)實現(xiàn)的
在 generator 中,我們不但可以通過 for 循環(huán)來迭代,還可以不斷調(diào)用 next() 函數(shù)獲取由 yield 語句返回的下一個值。
但是 Python 的 yield 不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)。
來看例子:
傳統(tǒng)的 生產(chǎn)者-消費者 模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過 yield 跳轉(zhuǎn)到消費者開始執(zhí)行,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:
#!/usr/bin/python3 # -*- coding: utf-8 -*- # @Author : # @File : text.py # @Software : PyCharm # @description : XXXdef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)r = '200 OK'def produce(c):c.send(None)n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()c = consumer() produce(c)執(zhí)行結(jié)果:
[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK注意到 consumer函數(shù) 是一個 generator,把一個 consumer 傳入 produce 后:
consumer 通過 yield拿到消息,處理,又通過yield把結(jié)果傳回;
produce 拿到 consumer 處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
produce 決定不生產(chǎn)了,通過 c.close() 關(guān)閉 consumer,整個過程結(jié)束。
整個流程無鎖,由一個線程執(zhí)行,produce?和?consumer?協(xié)作完成任務(wù),所以稱為 “協(xié)程”,而非線程的搶占式多任務(wù)。
最后套用 Donald Knuth 的一句話總結(jié)協(xié)程的特點:“子程序就是協(xié)程的一種特例。”
參考源碼:https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py
在 Python 中,異步函數(shù)? 通常 被稱作? 協(xié)程
創(chuàng)建一個協(xié)程僅僅只需使用 async 關(guān)鍵字,或者使用 @asyncio.coroutine 裝飾器。下面的任一代碼,都可以作為協(xié)程工作,形式上也是等同的:
import asyncio# 方式 1 async def ping_server(ip):pass# 方式 2 @asyncio.coroutine def load_file(path):pass上面這兩個?特殊的函數(shù),在調(diào)用時會返回協(xié)程對象。熟悉 JavaScript 中 Promise 的同學(xué),可以把這個返回對象當(dāng)作跟 Promise 差不多。調(diào)用他們中的任意一個,實際上并未立即運行,而是返回一個協(xié)程對象,然后將其傳遞到 Eventloop 中,之后再執(zhí)行。
- 如何判斷一個?函數(shù)是不是協(xié)程??? ?asyncio 提供了 asyncio.iscoroutinefunction(func) 方法。
- 如何判斷一個 函數(shù)返回的是不是協(xié)程對象 ?? 可以使用 asyncio.iscoroutine(obj) 。
用 asyncio 提供的 @asyncio.coroutine 可以把一個 generator 標(biāo)記為 coroutine 類型,然后在 coroutine 內(nèi)部用 yield from 調(diào)用另一個 coroutine 實現(xiàn)異步操作。
Python 3.5 開始引入了新的語法 async 和 await
為了簡化并更好地標(biāo)識異步 IO,從 Python 3.5 開始引入了新的語法 async 和 await,可以讓 coroutine 的代碼更簡潔易讀。
?async?/ await 是 python3.5 的新語法,需使用 Python3.5 版本 或 以上才能正確運行。
注意:async?和?await?是針對 coroutine 的新語法,要使用新的語法,只需要做兩步簡單的替換:
- 把?@asyncio.coroutine?替換為?async?
- 把?yield from?替換為?await
?Python 3.5 以前 版本原來老的語法使用 協(xié)程
import asyncio@asyncio.coroutine def hello():print("Hello world!")r = yield from asyncio.sleep(1)print("Hello again!")Python 3.5 以后 用新語法重新編寫如下:
import asyncioasync def hello():print("Hello world!")r = await asyncio.sleep(1)print("Hello again!")在過去幾年內(nèi),異步編程由于某些好的原因得到了充分的重視。雖然它比線性編程難一點,但是效率相對來說也是更高。
比如,利用 Python 的 異步協(xié)程 (async coroutine) ,在提交 HTTP 請求后,就沒必要等待請求完成再進一步操作,而是可以一邊等著請求完成,一邊做著其他工作。這可能在邏輯上需要多些思考來保證程序正確運行,但是好處是可以利用更少的資源做更多的事。
即便邏輯上需要多些思考,但實際上在 Python 語言中,異步編程的語法和執(zhí)行并不難。跟 Javascript 不一樣,現(xiàn)在 Python 的異步協(xié)程已經(jīng)執(zhí)行得相當(dāng)好了。
對于服務(wù)端編程,異步性似乎是 Node.js 流行的一大原因。我們寫的很多代碼,特別是那些諸如網(wǎng)站之類的高 I/O 應(yīng)用,都依賴于外部資源。這可以是任何資源,包括從遠程數(shù)據(jù)庫調(diào)用到 POST 一個 REST 請求。一旦你請求這些資源的任一一個,你的代碼在等待資源響應(yīng)時便無事可做 (譯者注:如果沒有異步編程的話)。
有了異步編程,在等待這些資源響應(yīng)的過程中,你的代碼便可以去處理其他的任務(wù)。
Python async / await 手冊
Python?部落:Python async/await 手冊:https://python.freelycode.com/contribution/detail/57
知乎:從 0 到 1,Python 異步編程的演進之路(?通過爬蟲演示進化之路?):https://zhuanlan.zhihu.com/p/25228075
async / await 的使用
async 用來聲明一個函數(shù)是協(xié)程,然后使用 await 調(diào)用這個協(xié)程, await 必須在函數(shù)內(nèi)部,這個函數(shù)通常也被聲明為另一個協(xié)程。await 的目的是等待協(xié)程控制流的返回。yield 的目的 是 暫停并掛起函數(shù)的操作。
正常的函數(shù)在執(zhí)行時是不會中斷的,所以你要寫一個能夠中斷的函數(shù),就需要添加 async 關(guān)鍵。
- async 用來聲明一個函數(shù)為異步函數(shù),異步函數(shù)的特點是能在函數(shù)執(zhí)行過程中掛起,去執(zhí)行其他異步函數(shù),等到掛起條件(假設(shè)掛起條件是sleep(5))消失后,也就是5秒到了再回來執(zhí)行。
- await 可以將耗時等待的操作掛起,讓出控制權(quán)(?await 語法來掛起自身的協(xié)程 )。比如:異步程序執(zhí)行到某一步時需要等待的時間很長,就將此掛起,去執(zhí)行其他的異步程序。await 后面只能跟 異步程序 或 有 __await__ 屬性 的 對象,因為異步程序與一般程序不同。
假設(shè)有兩個異步函數(shù) async a,async b,a 中的某一步有 await,當(dāng)程序碰到關(guān)鍵字 await b() 后,異步程序掛起后去執(zhí)行另一個異步b程序,就是從函數(shù)內(nèi)部跳出去執(zhí)行其他函數(shù),當(dāng)掛起條件消失后,不管b是否執(zhí)行完,要馬上從b程序中跳出來,回到原程序執(zhí)行原來的操作。如果 await 后面跟的 b 函數(shù)不是異步函數(shù),那么操作就只能等 b 執(zhí)行完再返回,無法在 b 執(zhí)行的過程中返回。如果要在 b 執(zhí)行完才返回,也就不需要用 await 關(guān)鍵字了,直接調(diào)用 b 函數(shù)就行。所以這就需要 await 后面跟的是 異步函數(shù)了。在一個異步函數(shù)中,可以不止一次掛起,也就是可以用多個 await 。
看下 Python 中常見的幾種函數(shù)形式:
# 1. 普通函數(shù) def function():return 1# 2. 生成器函數(shù) def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數(shù)和生成器函數(shù)包裝成異步函數(shù)和異步生成器。# 3. 異步函數(shù)(協(xié)程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1通過類型判斷可以驗證函數(shù)的類型
import types# 1. 普通函數(shù) def function():return 1# 2. 生成器函數(shù) def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數(shù)和生成器函數(shù)包裝成異步函數(shù)和異步生成器。# 3. 異步函數(shù)(協(xié)程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1print(type(function) is types.FunctionType) print(type(generator()) is types.GeneratorType) print(type(async_function()) is types.CoroutineType) print(type(async_generator()) is types.AsyncGeneratorType)直接調(diào)用異步函數(shù)不會返回結(jié)果,而是返回一個coroutine對象:
print(async_function()) # <coroutine object async_function at 0x102ff67d8>協(xié)程 需要通過其他方式來驅(qū)動,因此可以使用這個協(xié)程對象的 send 方法給協(xié)程發(fā)送一個值:
print(async_function().send(None))不幸的是,如果通過上面的調(diào)用會拋出一個異常:StopIteration: 1
因為 生成器 / 協(xié)程 在正常返回退出時會拋出一個 StopIteration 異常,而原來的返回值會存放在 StopIteration 對象的 value 屬性中,通過以下捕獲可以獲取協(xié)程真正的返回值:?
try:async_function().send(None) except StopIteration as e:print(e.value) # 1通過上面的方式來新建一個 run 函數(shù)來驅(qū)動協(xié)程函數(shù),在協(xié)程函數(shù)中,可以通過 await 語法來掛起自身的協(xié)程,并等待另一個 協(xié)程 完成直到返回結(jié)果:
def run(coroutine):try:coroutine.send(None)except StopIteration as e:return 'run() : return {0}'.format(e.value)async def async_function():return 1async def await_coroutine():result = await async_function()print('await_coroutine() : print {0} '.format(result))ret_val = run(await_coroutine()) print(ret_val)要注意的是,await 語法只能出現(xiàn)在通過 async 修飾的函數(shù)中,否則會報 SyntaxError 錯誤。
而且 await 后面的對象需要是一個 Awaitable,或者實現(xiàn)了相關(guān)的協(xié)議。
查看 Awaitable 抽象類的代碼,表明了只要一個類實現(xiàn)了__await__方法,那么通過它構(gòu)造出來的實例就是一個 Awaitable:
class Awaitable(metaclass=ABCMeta):__slots__ = ()@abstractmethoddef __await__(self):yield@classmethoddef __subclasshook__(cls, C):if cls is Awaitable:return _check_methods(C, "__await__")return NotImplemented而且可以看到,Coroutine類 也繼承了 Awaitable,而且實現(xiàn)了 send,throw 和 close 方法。所以 await 一個調(diào)用異步函數(shù)返回的協(xié)程對象是合法的。
class Coroutine(Awaitable):__slots__ = ()@abstractmethoddef send(self, value):...@abstractmethoddef throw(self, typ, val=None, tb=None):...def close(self):...@classmethoddef __subclasshook__(cls, C):if cls is Coroutine:return _check_methods(C, '__await__', 'send', 'throw', 'close')return NotImplemented接下來是異步生成器,來看一個例子:
假如我要到一家超市去購買土豆,而超市貨架上的土豆數(shù)量是有限的:
class Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)現(xiàn)在我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:
def take_potatos(num):count = 0while True:if len(all_potatos) == 0:sleep(.1)else:potato = all_potatos.pop()yield potatocount += 1if count == num:breakdef buy_potatos():bucket = []for p in take_potatos(50):bucket.append(p)對應(yīng)到代碼中,就是迭代一個生成器的模型,顯然,當(dāng)貨架上的土豆不夠的時候,這時只能夠死等,而且在上面例子中等多長時間都不會有結(jié)果(因為一切都是同步的),也許可以用多進程和多線程解決,而在現(xiàn)實生活中,更應(yīng)該像是這樣的:
import asyncio import randomclass Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)async def take_potatos(num):count = 0while True:if len(all_potatos) == 0:await ask_for_potato()potato = all_potatos.pop()yield potatocount += 1if count == num:breakasync def ask_for_potato():await asyncio.sleep(random.random())all_potatos.extend(Potato.make(random.randint(1, 10)))async def buy_potatos():bucket = []async for p in take_potatos(50):bucket.append(p)print(f'Got potato {id(p)}...')def main():loop = asyncio.get_event_loop()res = loop.run_until_complete(buy_potatos())loop.close()if __name__ == '__main__':main()當(dāng)貨架上的土豆沒有了之后,可以詢問超市請求需要更多的土豆,這時候需要等待一段時間直到生產(chǎn)者完成生產(chǎn)的過程。
當(dāng)生產(chǎn)者完成和返回之后,這是便能從 await 掛起的地方繼續(xù)往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程。
用 asyncio 運行這段代碼,結(jié)果是這樣的:
Got potato 4338641384... Got potato 4338641160... Got potato 4338614736... Got potato 4338614680... Got potato 4338614568... Got potato 4344861864... Got potato 4344843456... Got potato 4344843400... Got potato 4338641384... Got potato 4338641160... ...既然是異步的,在請求之后不一定要死等,而是可以做其他事情。比如除了土豆,我還想買番茄,這時只需要在事件循環(huán)中再添加一個過程:
def main():import asyncioloop = asyncio.get_event_loop()res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))loop.close()再來運行這段代碼:
Got potato 4423119312... Got tomato 4423119368... Got potato 4429291024... Got potato 4421640768... Got tomato 4429331704... Got tomato 4429331760... Got tomato 4423119368... Got potato 4429331760... Got potato 4429331704... Got potato 4429346688... Got potato 4429346072... Got tomato 4429347360... ...看下 AsyncGenerator 的定義,它需要實現(xiàn) __aiter__ 和 __anext__ 兩個核心方法,以及 asend,athrow,aclose 方法。
class AsyncGenerator(AsyncIterator):__slots__ = ()async def __anext__(self):...@abstractmethodasync def asend(self, value):...@abstractmethodasync def athrow(self, typ, val=None, tb=None):...async def aclose(self):...@classmethoddef __subclasshook__(cls, C):if cls is AsyncGenerator:return _check_methods(C, '__aiter__', '__anext__','asend', 'athrow', 'aclose')return NotImplemented異步生成器是在 3.6 之后才有的特性,同樣的還有異步推導(dǎo)表達式,因此在上面的例子中,也可以寫成這樣:
bucket = [p async for p in take_potatos(50)]類似的,還有 await 表達式:
result = [await fun() for fun in funcs if await condition()]除了函數(shù)之外,類實例的普通方法也能用 async 語法修飾:
class ThreeTwoOne:async def begin(self):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():t = ThreeTwoOne()await t.begin()print('start')實例方法的調(diào)用同樣是返回一個 coroutine:
function = ThreeTwoOne.begin method = function.__get__(ThreeTwoOne, ThreeTwoOne()) import inspect assert inspect.isfunction(function) assert inspect.ismethod(method) assert inspect.iscoroutine(method())同理 還有類方法:
class ThreeTwoOne:@classmethodasync def begin(cls):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():await ThreeTwoOne.begin()print('start')根據(jù)PEP 492中,async 也可以應(yīng)用到 上下文管理器中,__aenter__ 和 __aexit__ 需要返回一個 Awaitable:
class GameContext:async def __aenter__(self):print('game loading...')await asyncio.sleep(1)async def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def game():async with GameContext():print('game start...')await asyncio.sleep(2)在3.7版本,contextlib 中會新增一個 asynccontextmanager 裝飾器來包裝一個實現(xiàn)異步協(xié)議的上下文管理器:
from contextlib import asynccontextmanager@asynccontextmanager async def get_connection():conn = await acquire_db_connection()try:yieldfinally:await release_db_connection(conn)async 修飾符也能用在 __call__ 方法上:
class GameContext:async def __aenter__(self):self._started = time()print('game loading...')await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def __call__(self, *args, **kws):if args[0] == 'time':return time() - self._startedasync def game():async with GameContext() as ctx:print('game start...')await asyncio.sleep(2)print('game time: ', await ctx('time'))asyncio
asyncio?是 Python 3.4 版本引入的標(biāo)準庫,直接內(nèi)置了對 異步 IO 的支持。
asyncio 官方只實現(xiàn)了比較底層的協(xié)議,比如TCP,UDP。所以諸如 HTTP 協(xié)議之類都需要借助第三方庫,比如 aiohttp 。
雖然異步編程的生態(tài)不夠同步編程的生態(tài)那么強大,但是如果有高并發(fā)的需求不妨試試,下面說一下比較成熟的異步庫
aiohttp:異步 http client/server框架。github地址: https://github.com/aio-libs/aiohttp
sanic:速度更快的類 flask web框架。github地址:https://github.com/channelcat/sanic
uvloop 快速,內(nèi)嵌于 asyncio 事件循環(huán)的庫,使用 cython 基于 libuv 實現(xiàn)。github地址: https://github.com/MagicStack/uvloop
asyncio?的編程模型就是一個 消息循環(huán)。我們從?asyncio?模塊中直接獲取一個?EventLoop?的引用,然后把需要執(zhí)行的協(xié)程扔到?EventLoop?中執(zhí)行,就實現(xiàn)了 異步IO。
python 用asyncio?模塊實現(xiàn)異步編程,該模塊最大特點就是,只存在一個線程
由于只有一個線程,就不可能多個任務(wù)同時運行。asyncio 是 "多任務(wù)合作" 模式(cooperative multitasking),允許異步任務(wù)交出執(zhí)行權(quán)給其他任務(wù),等到其他任務(wù)完成,再收回執(zhí)行權(quán)繼續(xù)往下執(zhí)行
asyncio 模塊在單線程上啟動一個事件循環(huán)(event loop),時刻監(jiān)聽新進入循環(huán)的事件,加以處理,并不斷重復(fù)這個過程,直到異步任務(wù)結(jié)束。
什么是事件循環(huán)?
單線程就意味著所有的任務(wù)需要在單線程上排隊執(zhí)行,也就是前一個任務(wù)沒有執(zhí)行完成,后一個任務(wù)就沒有辦法執(zhí)行。在CPU密集型的任務(wù)之中,這樣其實還行,但是如果我們的任務(wù)都是IO密集型的呢?也就是我們大部分的任務(wù)都是在等待網(wǎng)絡(luò)的數(shù)據(jù)返回,等待磁盤文件的數(shù)據(jù),這就會造成CPU一直在等待這些任務(wù)的完成再去執(zhí)行下一個任務(wù)。
有沒有什么辦法能夠讓單線程的任務(wù)執(zhí)行不這么笨呢?其實我們可以將這些需要等待IO設(shè)備的任務(wù)掛在一邊嘛!這時候,如果我們的任務(wù)都是需要等待的任務(wù),那么單線程在執(zhí)行時遇到一個就把它掛起來,這里可以通過一個數(shù)據(jù)結(jié)構(gòu)(例如隊列)將這些處于執(zhí)行等待狀態(tài)的任務(wù)放進去,為什么是執(zhí)行等待狀態(tài)呢?因為它們正在執(zhí)行但是又不得不等待例如網(wǎng)絡(luò)數(shù)據(jù)的返回等等。直到將所有的任務(wù)都放進去之后,單線程就可以開始它的接連不斷的表演了:有沒有任務(wù)完成的小伙伴呀!快來我這里執(zhí)行!
此時如果有某個任務(wù)完成了,它會得到結(jié)果,于是發(fā)出一個信號:我完成了。那邊還在循環(huán)追問的單線程終于得到了答復(fù),就會去看看這個任務(wù)有沒有綁定什么回調(diào)函數(shù)呀?如果綁定了回調(diào)函數(shù)就進去把回調(diào)函數(shù)給執(zhí)行了,如果沒有,就將它所在的任務(wù)恢復(fù)執(zhí)行,并將結(jié)果返回。
asyncio 就是一個 協(xié)程庫
- (1)事件循環(huán) (event loop)。事件循環(huán)需要實現(xiàn)兩個功能,一是順序執(zhí)行協(xié)程代碼;二是完成協(xié)程的調(diào)度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。
- (2)協(xié)程上下文的切換。基本上Python 生成器的 yeild 已經(jīng)能完成切換,Python3中還有特定語法支持協(xié)程切換。
Python 的異步IO:API
官方文檔:https://docs.python.org/zh-cn/3/library/asyncio.html
Python 的 asyncio 是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準庫。Python3.7 這個版本,asyncio又做了比較大的調(diào)整,把這個庫的 API 分為了 高層級API 和 低層級API,并引入asyncio.run() 這樣的高級方法,讓編寫異步程序更加簡潔。
這里先從全局認識 Python 這個異步IO庫。
asyncio 的 高層級 API 主要提高如下幾個方面:
- 并發(fā)地運行Python協(xié)程并完全控制其執(zhí)行過程;
- 執(zhí)行網(wǎng)絡(luò)IO和IPC;
- 控制子進程;
- 通過隊列實現(xiàn)分布式任務(wù);
- 同步并發(fā)代碼。
asyncio 的 低層級API 用以支持開發(fā)異步庫和框架:
- 創(chuàng)建和管理事件循環(huán)(event loop),提供異步的API用于網(wǎng)絡(luò),運行子進程,處理操作系統(tǒng)信號等;
- 通過 transports 實現(xiàn)高效率協(xié)議;
- 通過 async/await? 語法橋架基于回調(diào)的庫和代碼。
asyncio 高級 API
高層級API讓我們更方便的編寫基于asyncio的應(yīng)用程序。這些API包括:
(1)協(xié)程和任務(wù)
協(xié)程通過 async/await 語法進行聲明,是編寫異步應(yīng)用的推薦方式。歷史的?@asyncio.coroutine?和?yield from?已經(jīng)被棄用,并計劃在Python 3.10中移除。協(xié)程可以通過?asyncio.run(coro, *, debug=False)?函數(shù)運行,該函數(shù)負責(zé)管理事件循環(huán)并完結(jié)異步生成器。它應(yīng)該被用作asyncio程序的主入口點,相當(dāng)于main函數(shù),應(yīng)該只被調(diào)用一次。
任務(wù)被用于并發(fā)調(diào)度協(xié)程,可用于網(wǎng)絡(luò)爬蟲的并發(fā)。使用?asyncio.create_task()?就可以把一個協(xié)程打包為一個任務(wù),該協(xié)程會自動安排為很快運行。
協(xié)程,任務(wù)和Future都是可等待對象。其中,Future是低層級的可等待對象,表示一個異步操作的最終結(jié)果。
(2)流
流是用于網(wǎng)絡(luò)連接的高層級的使用 async/await的原語。流允許在不使用回調(diào)或低層級協(xié)議和傳輸?shù)那闆r下發(fā)送和接收數(shù)據(jù)。異步讀寫TCP有客戶端函數(shù)?asyncio.open_connection()?和 服務(wù)端函數(shù)?asyncio.start_server()?。它還支持 Unix Sockets:?asyncio.open_unix_connection()?和?asyncio.start_unix_server()。
(3)同步原語
asyncio同步原語的設(shè)計類似于threading模塊的原語,有兩個重要的注意事項:
asyncio原語不是線程安全的,因此它們不應(yīng)該用于OS線程同步(而是用threading)
這些同步原語的方法不接受超時參數(shù); 使用asyncio.wait_for()函數(shù)執(zhí)行超時操作。
asyncio具有以下基本同步原語:
- Lock
- Event
- Condition
- Semaphore
- BoundedSemaphore
(4)子進程
asyncio提供了通過 async/await 創(chuàng)建和管理子進程的API。不同于Python標(biāo)準庫的subprocess,asyncio的子進程函數(shù)都是異步的,并且提供了多種工具來處理這些函數(shù),這就很容易并行執(zhí)行和監(jiān)視多個子進程。創(chuàng)建子進程的方法主要有兩個:
coroutine asyncio.create_subprocess_exec()
coroutine asyncio.create_subprocess_shell()
(5)隊列
asyncio 隊列的設(shè)計類似于標(biāo)準模塊queue的類。雖然asyncio隊列不是線程安全的,但它們被設(shè)計為專門用于 async/await 代碼。需要注意的是,asyncio隊列的方法沒有超時參數(shù),使用?asyncio.wait_for()函數(shù)進行超時的隊列操作。
因為和標(biāo)注模塊queue的類設(shè)計相似,使用起來跟queue無太多差異,只需要在對應(yīng)的函數(shù)前面加 await 即可。asyncio 隊列提供了三種不同的隊列:
- class asyncio.Queue 先進先出隊列
- class asyncio.PriorityQueue 優(yōu)先隊列
- class asyncio.LifoQueue 后進先出隊列
(6)異常
asyncio提供了幾種異常,它們是:
- TimeoutError,
- CancelledError,
- InvalidStateError,
- SendfileNotAvailableError
- IncompleteReadError
- LimitOverrunError
asyncio低級API
低層級API為編寫基于asyncio的庫和框架提供支持,有意編寫異步庫和框架的大牛們需要熟悉這些低層級API。主要包括:
(1)事件循環(huán)
事件循環(huán)是每個asyncio應(yīng)用程序的核心。 事件循環(huán)運行異步任務(wù)和回調(diào),執(zhí)行網(wǎng)絡(luò)IO操作以及運行子進程。
應(yīng)用程序開發(fā)人員通常應(yīng)該使用高級asyncio函數(shù),例如asyncio.run(),并且很少需要引用循環(huán)對象或調(diào)用其方法。
Python 3.7 新增了?asyncio.get_running_loop()函數(shù)。
(2)Futures
Future對象用于將基于低層級回調(diào)的代碼與高層級的 async/await 代碼進行橋接。
Future表示異步操作的最終結(jié)果。 不是線程安全的。
Future是一個可等待對象。 協(xié)程可以等待Future對象,直到它們有結(jié)果或異常集,或者直到它們被取消。
通常,Futures用于啟用基于低層級回調(diào)的代碼(例如,在使用asyncio傳輸實現(xiàn)的協(xié)議中)以與高層級 async/await 代碼進行互操作。
(3)傳輸和協(xié)議(Transports和Protocols)
Transport 和 Protocol由低層級事件循環(huán)使用,比如函數(shù)loop.create_connection()。它們使用基于回調(diào)的編程風(fēng)格,并支持網(wǎng)絡(luò)或IPC協(xié)議(如HTTP)的高性能實現(xiàn)。
在最高級別,傳輸涉及字節(jié)的傳輸方式,而協(xié)議確定要傳輸哪些字節(jié)(在某種程度上何時傳輸)。
換種方式說就是:傳輸是套接字(或類似的I/O端點)的抽象,而協(xié)議是從傳輸?shù)慕嵌葋砜吹膽?yīng)用程序的抽象。
另一種觀點是傳輸和協(xié)議接口共同定義了一個使用網(wǎng)絡(luò)I/O和進程間I/O的抽象接口。
傳輸和協(xié)議對象之間始終存在1:1的關(guān)系:協(xié)議調(diào)用傳輸方法來發(fā)送數(shù)據(jù),而傳輸調(diào)用協(xié)議方法來傳遞已接收的數(shù)據(jù)。
大多數(shù)面向連接的事件循環(huán)方法(例如loop.create_connection())通常接受protocol_factory參數(shù),該參數(shù)用于為接受的連接創(chuàng)建Protocol對象,由Transport對象表示。 這些方法通常返回(傳輸,協(xié)議)元組。
(4)策略(Policy)
事件循環(huán)策略是一個全局的按進程劃分的對象,用于控制事件循環(huán)的管理。 每個事件循環(huán)都有一個默認策略,可以使用策略API對其進行更改和自定義。
策略定義了上下文的概念,并根據(jù)上下文管理單獨的事件循環(huán)。 默認策略將上下文定義為當(dāng)前線程。
通過使用自定義事件循環(huán)策略,可以自定義get_event_loop(),set_event_loop()和new_event_loop()函數(shù)的行為。
(5)平臺支持
asyncio模塊設(shè)計為可移植的,但由于平臺的底層架構(gòu)和功能,某些平臺存在細微的差異和限制。在Windows平臺,有些是不支持的,比如?loop.create_unix_connection()?and?loop.create_unix_server()。而Linux和比較新的macOS全部支持。
總結(jié)
Python 3.7 通過對 asyncio 分組使得它的架構(gòu)更加清晰,普通寫異步IO的應(yīng)用程序只需熟悉高層級API,需要寫異步IO的庫和框架時才需要理解低層級的API。
生產(chǎn)者 --- 消費者
Python 分布與并行 asyncio實現(xiàn)生產(chǎn)者消費者模型:https://blog.csdn.net/weixin_43594279/article/details/111243453
示例 1:
# coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [# asyncio.create_task(consumer(i, q)) for i in range(num_consumers)asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)]# produce_list = [asyncio.create_task(producer(q, num_consumers))]produce_list = [asyncio.ensure_future(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass示例 2:
Python 的異步IO編程例子
以 Python 3.7 上的 asyncio 為例講解如何使用 Python 的異步 IO。
創(chuàng)建第一個協(xié)程
Python 3.7 推薦使用 async/await 語法來聲明協(xié)程,來編寫異步應(yīng)用程序。我們來創(chuàng)建第一個協(xié)程函數(shù):首先打印一行“你好”,等待1秒鐘后再打印 "大家同好"。
import asyncioasync def say_hi():print('你好')await asyncio.sleep(1)print('大家同好')asyncio.run(say_hi())""" 你好 大家同好 """say_hi() 函數(shù)通過 async 聲明為協(xié)程函數(shù),較之前的修飾器聲明更簡潔明了。
在實踐過程中,什么功能的函數(shù)要用 async 聲明為協(xié)程函數(shù)呢?就是那些能發(fā)揮異步IO性能的函數(shù),比如讀寫文件、讀寫網(wǎng)絡(luò)、讀寫數(shù)據(jù)庫,這些都是浪費時間的IO操作,把它們協(xié)程化、異步化從而提高程序的整體效率(速度)。
say_hi() 函數(shù)是通過?asyncio.run()來運行的,而不是直接調(diào)用這個函數(shù)(協(xié)程)。因為,直接調(diào)用并不會把它加入調(diào)度日程,而只是簡單的返回一個協(xié)程對象:
print(say_hi()) # <coroutine object say_hi at 0x000001264DB3FCC0>真正運行一個協(xié)程
那么,如何真正運行一個協(xié)程呢?
asyncio 提供了三種機制:
- (1)asyncio.run() 函數(shù),這是異步程序的主入口,相當(dāng)于C語言中的main函數(shù)。
- (2)用 await 等待協(xié)程,比如上例中的?await asyncio.sleep(1)?。
再看下面的例子,我們定義了協(xié)程?say_delay()?,在 main() 協(xié)程中調(diào)用兩次,第一次延遲1秒后打印“你好”,第二次延遲2秒后打印 "大家同好"。這樣我們通過 await 運行了兩個協(xié)程。
import asyncio import datetimeasync def say_delay(msg=None, delay=None):await asyncio.sleep(delay)print(msg)async def main():print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')await say_delay('你好', 2)await say_delay('大家同好', 1)print(f'end at {datetime.datetime.now().replace(microsecond=0)}')asyncio.run(main())''' begin at 2020-12-19 00:55:01 你好 大家同好 end at 2020-12-19 00:55:04 '''從起止時間可以看出,兩個協(xié)程是順序執(zhí)行的,總共耗時1+2=3秒。
- (3)通過?asyncio.create_task()?函數(shù)并發(fā)運行作為 asyncio 任務(wù)(Task) 的多個協(xié)程。下面,我們用 create_task() 來修改上面的 main() 協(xié)程,從而讓兩個 say_delay() 協(xié)程并發(fā)運行:
從運行結(jié)果的起止時間可以看出,兩個協(xié)程是并發(fā)執(zhí)行的了,總耗時等于最大耗時2秒。
asyncio.create_task()?是一個很有用的函數(shù),在爬蟲中它可以幫助我們實現(xiàn)大量并發(fā)去下載網(wǎng)頁。在 Python 3.6中與它對應(yīng)的是?ensure_future()。
生產(chǎn)者 --- 消費者
示例 代碼:
# coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [asyncio.create_task(consumer(i, q)) for i in range(num_consumers)]produce_list = [asyncio.create_task(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass可等待對象(awaitables)
可等待對象,就是可以在 await 表達式中使用的對象,前面我們已經(jīng)接觸了兩種可等待對象的類型:協(xié)程 和 任務(wù),還有一個是低層級的 Future。
asyncio 模塊的許多 API 都需要傳入可等待對象,比如 run(), create_task() 等等。
(1)協(xié)程
協(xié)程是可等待對象,可以在其它協(xié)程中被等待。協(xié)程兩個緊密相關(guān)的概念是:
- 協(xié)程函數(shù):通過 async def 定義的函數(shù);
- 協(xié)程對象:調(diào)用協(xié)程函數(shù)返回的對象。
運行上面這段程序,結(jié)果為:
co is now is 1548512708.2026224 now is 1548512708.202648可以看到,直接運行協(xié)程函數(shù) whattime()得到的co是一個協(xié)程對象,因為協(xié)程對象是可等待的,所以通過 await 得到真正的當(dāng)前時間。now2是直接await 協(xié)程函數(shù),也得到了當(dāng)前時間的返回值。
(2)任務(wù)
前面我們講到,任務(wù)是用來調(diào)度協(xié)程的,以便并發(fā)執(zhí)行協(xié)程。當(dāng)一個協(xié)程通過?asyncio.create_task()?被打包為一個 任務(wù),該協(xié)程將自動加入調(diào)度隊列中,但是還未執(zhí)行。
create_task() 的基本使用前面例子已經(jīng)講過。它返回的 task 通過 await 來等待其運行完。如果,我們不等待,會發(fā)生什么?“準備立即運行”又該如何理解呢?先看看下面這個例子:
運行這段代碼的情況是這樣的:首先,1秒鐘后打印一行,這是第13,14行代碼運行的結(jié)果:
calling:0, now is 09:15:15接著,停頓1秒后,連續(xù)打印4行:
calling:1, now is 09:15:16 calling:2, now is 09:15:16 calling:3, now is 09:15:16 calling:4, now is 09:15:16從這個結(jié)果看,asyncio.create_task()產(chǎn)生的4個任務(wù),我們并沒有?await,它們也執(zhí)行了。關(guān)鍵在于第18行的?await,如果把這一行去掉或是 sleep 的時間小于1秒(比whattime()里面的sleep時間少即可),就會只看到第一行的輸出結(jié)果而看不到后面四行的輸出。這是因為,main() 不 sleep 或 sleep 少于1秒鐘,main() 就在 whattime() 還未來得及打印結(jié)果(因為,它要sleep 1秒)就退出了,從而整個程序也退出了,就沒有 whattime() 的輸出結(jié)果。
再來理解一下?“準備立即執(zhí)行”?這個說法。它的意思就是,create_task() 只是打包了協(xié)程并加入調(diào)度隊列還未執(zhí)行,并準備立即執(zhí)行,什么時候執(zhí)行呢?在 “主協(xié)程”(調(diào)用create_task()的協(xié)程)掛起的時候,這里的“掛起”有兩個方式:
- 一是,通過 await task 來執(zhí)行這個任務(wù);
- 另一個是,主協(xié)程通過 await sleep 掛起,事件循環(huán)就去執(zhí)行task了。
我們知道,asyncio 是通過事件循環(huán)實現(xiàn)異步的。在主協(xié)程 main()里面,沒有遇到 await 時,事件就是執(zhí)行 main() 函數(shù),遇到 await 時,事件循環(huán)就去執(zhí)行別的協(xié)程,即 create_task() 生成的 whattime()的4個任務(wù),這些任務(wù)一開始就是 await sleep 1秒。這時候,主協(xié)程和4個任務(wù)協(xié)程都掛起了,CPU空閑,事件循環(huán)等待協(xié)程的消息。
如果 main() 協(xié)程只 sleep了 0.1秒,它就先醒了,給事件循環(huán)發(fā)消息,事件循環(huán)就來繼續(xù)執(zhí)行 main() 協(xié)程,而 main() 后面已經(jīng)沒有代碼,就退出該協(xié)程,退出它也就意味著整個程序退出,4個任務(wù)就沒機會打印結(jié)果;
如果 main()協(xié)程sleep時間多余1秒,那么4個任務(wù)先喚醒,就會得到全部的打印結(jié)果;
如果main()的18行sleep等于1秒時,和4個任務(wù)的sleep時間相同,也會得到全部打印結(jié)果。這是為什么呢?
我猜想是這樣的:4個任務(wù)生成在前,第18行的sleep在后,事件循環(huán)的消息響應(yīng)可能有個先進先出的順序。后面深入asyncio的代碼專門研究一下這個猜想正確與否。
示例:
# -*- coding: utf-8 -*-""" @File : aio_test.py @Author : XXX @Time : 2020/12/25 23:54 """import asyncio import datetimeasync def hi(msg=None, sec=None):print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(sec)print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')return secasync def main_1():print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)for tsk in tasks:ret_val = await tskprint(f'ret_val:{ret_val}')print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_2():# ***** 注意:main_2 中睡眠了2秒,導(dǎo)致睡眠時間大于2秒的協(xié)程沒有執(zhí)行完成 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)await asyncio.sleep(2)print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_3():# ***** 注意:main_3方法并沒有實現(xiàn)并發(fā)執(zhí)行,只是順序執(zhí)行 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))await tskprint(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')print('*' * 50) asyncio.run(main_1()) print('*' * 50) asyncio.run(main_2()) print('*' * 50) asyncio.run(main_3()) print('*' * 50)(3)Future
它是一個低層級的可等待對象,表示一個異步操作的最終結(jié)果。目前,我們寫應(yīng)用程序還用不到它,暫不學(xué)習(xí)。
asyncio異步IO協(xié)程總結(jié)
協(xié)程就是我們異步操作的片段。通常,寫程序都會把全部功能分成很多不同功能的函數(shù),目的是為了結(jié)構(gòu)清晰;進一步,把那些涉及耗費時間的IO操作(讀寫文件、數(shù)據(jù)庫、網(wǎng)絡(luò))的函數(shù)通過 async def 異步化,就是異步編程。
那些異步函數(shù)(協(xié)程函數(shù))都是通過消息機制被事件循環(huán)管理調(diào)度著,整個程序的執(zhí)行是單線程的,但是某個協(xié)程A進行IO時,事件循環(huán)就去執(zhí)行其它協(xié)程非IO的代碼。當(dāng)事件循環(huán)收到協(xié)程A結(jié)束IO的消息時,就又回來執(zhí)行協(xié)程A,這樣事件循環(huán)不斷在協(xié)程之間轉(zhuǎn)換,充分利用了IO的閑置時間,從而并發(fā)的進行多個IO操作,這就是異步IO。
寫異步IO程序時記住一個準則:需要IO的地方異步。其它地方即使用了協(xié)程函數(shù)也是沒用的。
不 使用 asyncio 的 消息循環(huán) 讓協(xié)程運行
先看下 不使用? asyncio 的消息循環(huán) 怎么 調(diào)用 協(xié)程,讓協(xié)程 運行:
async def func_1():print("func_1 start")print("func_1 end")async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)try:print('f_1.send')f_1.send(None) except StopIteration as e:# 這里也是需要去捕獲StopIteration方法passtry:print('f_2.send')f_2.send(None) except StopIteration as e:pass運行結(jié)果:
<coroutine object func_1 at 0x0000020121A07C40> <coroutine object func_2 at 0x0000020121B703C0> f_1.send func_1 start func_1 end f_2.send func_2 start func_2 a func_2 b func_2 c func_2 end示例代碼2:
async def test(x):return x * 2print(test(100))try:# 既然是協(xié)程,我們像之前yield協(xié)程那樣test(100).send(None) except BaseException as e:print(type(e))ret_val = e.valueprint(ret_val)示例代碼3:
def simple_coroutine():print('-> start')x = yieldprint('-> recived', x)sc = simple_coroutine()next(sc)try:sc.send('zhexiao') except BaseException as e:print(e)對上述例子的分析:yield 的右邊沒有表達式,所以這里默認產(chǎn)出的值是None。剛開始先調(diào)用了next(...)是因為這個時候生成器還沒有啟動,沒有停在yield那里,這個時候也是無法通過send發(fā)送數(shù)據(jù)。所以當(dāng)我們通過 next(...)激活協(xié)程后 ,程序就會運行到x = yield,這里有個問題我們需要注意, x = yield這個表達式的計算過程是先計算等號右邊的內(nèi)容,然后在進行賦值,所以當(dāng)激活生成器后,程序會停在yield這里,但并沒有給x賦值。當(dāng)我們調(diào)用 send 方法后 yield 會收到這個值并賦值給 x,而當(dāng)程序運行到協(xié)程定義體的末尾時和用生成器的時候一樣會拋出StopIteration異常
如果協(xié)程沒有通過 next(...) 激活(同樣我們可以通過send(None)的方式激活),但是我們直接send,會提示如下錯誤:
最先調(diào)用 next(sc) 函數(shù)這一步通常稱為“預(yù)激”(prime)協(xié)程 (即,讓協(xié)程執(zhí)行到第一個 yield 表達式,準備好作為活躍的協(xié)程使用)。
協(xié)程在運行過程中有四個狀態(tài):
GEN_CREATE: 等待開始執(zhí)行
GEN_RUNNING: 解釋器正在執(zhí)行,這個狀態(tài)一般看不到
GEN_SUSPENDED: 在yield表達式處暫停
GEN_CLOSED: 執(zhí)行結(jié)束
通過下面例子來查看協(xié)程的狀態(tài):
示例代碼4:(使用協(xié)程計算移動平均值)
def averager():total = 0.0count = 0avg = Nonewhile True:num = yield avgtotal += numcount += 1avg = total / count# run ag = averager() # 預(yù)激協(xié)程 print(next(ag)) # Noneprint(ag.send(10)) # 10 print(ag.send(20)) # 15這里是一個死循環(huán),只要不停 send 值 給 協(xié)程,可以一直計算下去。
解釋:
- 1. 調(diào)用 next(ag) 函數(shù)后,協(xié)程會向前執(zhí)行到 yield 表達式,產(chǎn)出 average 變量的初始值 None。
- 2. 此時,協(xié)程在 yield 表達式處暫停。
- 3. 使用 send() 激活協(xié)程,把發(fā)送的值賦給 num,并計算出 avg 的值。
- 4. 使用 print 打印出 yield 返回的數(shù)據(jù)。
單步 調(diào)試 上面程序。
使用 asyncio 的 消息循環(huán) 讓協(xié)程運行
使用 asyncio 異步 IO 調(diào)用 協(xié)程
示例代碼 1:
import asyncioasync def func_1():print("func_1 start")print("func_1 end")# await asyncio.sleep(1)async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")# await asyncio.sleep(1)f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)# 獲取 EventLoop: loop = asyncio.get_event_loop() tasks = [func_1(), func_2()]# 執(zhí)行 coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()示例代碼 2:
import asyncio import timestart = time.time()def tic():return 'at %1.1f seconds' % (time.time() - start)async def gr1():# Busy waits for a second, but we don't want to stick around...print('gr1 started work: {}'.format(tic()))# 暫停兩秒,但不阻塞時間循環(huán),下同await asyncio.sleep(2)print('gr1 ended work: {}'.format(tic()))async def gr2():# Busy waits for a second, but we don't want to stick around...print('gr2 started work: {}'.format(tic()))await asyncio.sleep(2)print('gr2 Ended work: {}'.format(tic()))async def gr3():print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))await asyncio.sleep(1)print("Done!")# 事件循環(huán) ioloop = asyncio.get_event_loop()# tasks中也可以使用 asyncio.ensure_future(gr1()).. tasks = [ioloop.create_task(gr1()),ioloop.create_task(gr2()),ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close()""" 結(jié)果: gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Let's do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr2 Ended work: at 2.0 seconds gr1 ended work: at 2.0 seconds """多個?coroutine?可以封裝成一組 Task 然后并發(fā)執(zhí)行。
-
asyncio.wait(...) 協(xié)程的參數(shù)是一個由 future 或 協(xié)程 構(gòu)成的可迭代對象;wait 會分別
把各個協(xié)程包裝進一個 Task 對象。最終的結(jié)果是,wait 處理的所有對象都通過某種方式變成 Future 類的實例。wait 是協(xié)程函數(shù),因此返回的是一個協(xié)程或生成器對象。 -
ioloop.run_until_complete 方法的參數(shù)是一個 future 或 協(xié)程。如果是協(xié)程,run_until_complete方法與 wait 函數(shù)一樣,把協(xié)程包裝進一個 Task 對象中。
-
在 asyncio 包中,future和協(xié)程關(guān)系緊密,因為可以使用 yield from 從 asyncio.Future 對象中產(chǎn)出結(jié)果。這意味著,如果 foo 是協(xié)程函數(shù)(調(diào)用后返回協(xié)程對象),抑或是返回Future 或 Task 實例的普通函數(shù),那么可以這樣寫:res = yield from foo()。這是 asyncio 包的 API 中很多地方可以互換協(xié)程與期物的原因之一。 例如上面的例子中 tasks 可以改寫成協(xié)程列表:tasks = [gr1(), gr(2), gr(3)]
詳細的各個類說明,類方法,傳參,以及方法返回的是什么類型都可以在官方文檔上仔細研讀,多讀幾遍,方有體會。
示例代碼 3:
import asyncio import time import aiohttp import async_timeoutmsg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html" headers = {'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6' }urls = [msg.format(i) for i in range(5400, 5500)]async def fetch(session, url):with async_timeout.timeout(10):async with session.get(url) as response:return response.statusasync def main(url):async with aiohttp.ClientSession() as session:status = await fetch(session, url)return statusif __name__ == '__main__':start = time.time()loop = asyncio.get_event_loop()tasks = [main(url) for url in urls]# 返回一個列表,內(nèi)容為各個tasks的返回值status_list = loop.run_until_complete(asyncio.gather(*tasks))print(len([status for status in status_list if status == 200]))end = time.time()print("cost time:", end - start)示例代碼 4:
用?asyncio?實現(xiàn)??Hello world?代碼如下:
import asyncio@asyncio.coroutine def hello():print("Hello world!")# 異步調(diào)用 asyncio.sleep(1):r = yield from asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執(zhí)行 coroutine loop.run_until_complete(hello()) loop.close()或者直接使用新語法?async 和 await
import asyncioasync def hello():print("Hello world!")# 異步調(diào)用 asyncio.sleep(1):r = await asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執(zhí)行 coroutine loop.run_until_complete(hello()) loop.close()@asyncio.coroutine?把一個 generator 標(biāo)記為 coroutine類型,然后,我們就把這個?coroutine?扔到?EventLoop?中執(zhí)行。
hello()?會首先打印出?Hello world!,然后,yield from?語法可以讓我們方便地調(diào)用另一個?generator。由于asyncio.sleep()?也是一個?coroutine,所以線程不會等待?asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當(dāng)asyncio.sleep()?返回時,線程就可以從?yield from?拿到返回值(此處是None),然后接著執(zhí)行下一行語句。
把?asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行?EventLoop?中其他可以執(zhí)行的coroutine了,因此可以實現(xiàn)并發(fā)執(zhí)行。
我們用 Task 封裝兩個?coroutine?試試:
import threading import asyncioasync def hello():print('1 : Hello world! (%s)' % threading.currentThread())await asyncio.sleep(5)print('2 : Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()觀察執(zhí)行過程:
1 : Hello world! (<_MainThread(MainThread, started 12200)>) 1 : Hello world! (<_MainThread(MainThread, started 12200)>) ( 暫停約 5 秒 ) 2 : Hello again! (<_MainThread(MainThread, started 12200)>) 2 : Hello again! (<_MainThread(MainThread, started 12200)>)由打印的當(dāng)前線程名稱可以看出,兩個?coroutine?是由同一個線程并發(fā)執(zhí)行的。
如果把?asyncio.sleep()?換成真正的IO操作,則多個?coroutine?就可以由一個線程并發(fā)執(zhí)行。
我們用?asyncio?的異步網(wǎng)絡(luò)連接來獲取 sina、sohu 和 163 的網(wǎng)站首頁:
import asyncioasync def wget(host):print('wget %s...' % host)connect = asyncio.open_connection(host, 80)reader, writer = await connectheader = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % hostwriter.write(header.encode('utf-8'))await writer.drain()while True:line = await reader.readline()if line == b'\r\n':breakprint('%s header > %s' % (host, line.decode('utf-8').rstrip()))# Ignore the body, close the socketwriter.close()loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()執(zhí)行結(jié)果如下:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0可見 3 個連接 由一個線程通過?coroutine?并發(fā)完成。
參考源碼:
async_hello.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_hello.py
async_wget.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_wget.py
示例代碼 5: ( 協(xié)程 的 返回值 )
一個協(xié)程里可以啟動另外一個協(xié)程,并等待它完成返回結(jié)果,采用 await 關(guān)鍵字
import asyncioasync def outer():print('in outer')print('waiting for result1')result1 = await phase1()print('waiting for result2')result2 = await phase2(result1)return (result1, result2)async def phase1():print('in phase1')return 'result1'async def phase2(arg):print('in phase2')return 'result2 derived from {}'.format(arg)event_loop = asyncio.get_event_loop() try:return_value = event_loop.run_until_complete(outer())print('return value: {!r}'.format(return_value)) finally:event_loop.close()運行結(jié)果:
in outer waiting for result1 in phase1 waiting for result2 in phase2 return value: ('result1', 'result2 derived from result1')前面都是關(guān)于 asyncio 的例子,那么除了asyncio,就沒有其他協(xié)程庫了嗎?asyncio 作為 python 的標(biāo)準庫,自然受到很多青睞,但它有時候還是顯得太重量了,尤其是提供了許多復(fù)雜的輪子和協(xié)議,不便于使用。
你可以理解為,asyncio 是使用 async/await 語法開發(fā)的 協(xié)程庫,而不是有 asyncio 才能用 async/await,
除了 asyncio 之外,curio 和 trio 是更加輕量級的替代物,而且也更容易使用。
curio 的作者是 David Beazley,下面是使用 curio 創(chuàng)建 tcp server 的例子,據(jù)說這是 dabeaz 理想中的一個異步服務(wù)器的樣子:
from curio import run, spawn from curio.socket import *async def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)print('Server listening at', address)async with sock:while True:client, addr = await sock.accept()await spawn(echo_client, client, addr)async def echo_client(client, addr):print('Connection from', addr)async with client:while True:data = await client.recv(100000)if not data:breakawait client.sendall(data)print('Connection closed')if __name__ == '__main__':run(echo_server, ('',25000))無論是 asyncio 還是 curio,或者是其他異步協(xié)程庫,在背后往往都會借助于 IO的事件循環(huán)來實現(xiàn)異步,下面用幾十行代碼來展示一個簡陋的基于事件驅(qū)動的echo服務(wù)器:
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from selectors import DefaultSelector, EVENT_READselector = DefaultSelector() pool = {}def request(client_socket, addr):client_socket, addr = client_socket, addrdef handle_request(key, mask):data = client_socket.recv(100000)if not data:client_socket.close()selector.unregister(client_socket)del pool[addr]else:client_socket.sendall(data)return handle_requestdef recv_client(key, mask):sock = key.fileobjclient_socket, addr = sock.accept()req = request(client_socket, addr)pool[addr] = reqselector.register(client_socket, EVENT_READ, req)def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)selector.register(sock, EVENT_READ, recv_client)try:while True:events = selector.select()for key, mask in events:callback = key.datacallback(key, mask)except KeyboardInterrupt:sock.close()if __name__ == '__main__':echo_server(('',25000))驗證一下:
# terminal 1 $ nc localhost 25000 hello world hello world# terminal 2 $ nc localhost 25000 hello world hello world現(xiàn)在知道:
- 完成 異步的代碼 不一定要用 async/await ,使用了 async/await 的代碼也不一定能做到異步,
- async/await 是協(xié)程的語法糖,使協(xié)程之間的調(diào)用變得更加清晰,
- 使用 async 修飾的函數(shù)調(diào)用時會返回一個協(xié)程對象,
- await 只能放在 async 修飾的函數(shù)里面使用,await 后面必須要跟著一個 協(xié)程對象 或 Awaitable,
- await 的目的是等待協(xié)程控制流的返回,而實現(xiàn)暫停并掛起函數(shù)的操作是yield。
async/await 以及 協(xié)程 是Python未來實現(xiàn)異步編程的趨勢,我們將會在更多的地方看到他們的身影,例如協(xié)程庫的 curio 和 trio,web 框架的 sanic,數(shù)據(jù)庫驅(qū)動的 asyncpg 等等。在Python 3主導(dǎo)的今天,作為開發(fā)者,應(yīng)該及時擁抱和適應(yīng)新的變化,而基于async/await的協(xié)程憑借良好的可讀性和易用性日漸登上舞臺,看到這里,你還不趕緊上車?
Python 模塊 asyncio – 協(xié)程之間的同步
Python 模塊 asyncio – 協(xié)程之間的同步:https://www.quxihuan.com/posts/python-module-asyncio-synchronization/
await 和 yield from
Python3.3 的 yield from 語法可以把生成器的操作委托給另一個生成器,生成器的調(diào)用方可以直接與子生成器進行通信:
def sub_gen():yield 1yield 2yield 3def gen():return (yield from sub_gen())def main():for val in gen():print(val) # 1 # 2 # 3利用這一特性,使用 yield from 能夠編寫出類似協(xié)程效果的函數(shù)調(diào)用,在3.5之前,asyncio 正是使用@asyncio.coroutine 和 yield from 語法來創(chuàng)建協(xié)程:https://docs.python.org/3.4/library/asyncio-task.html
@asyncio.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + y@asyncio.coroutine def print_sum(x, y):result = yield from compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()然而,用 yield from 容易在表示協(xié)程和生成器中混淆,沒有良好的語義性,所以在 Python 3.5 推出了更新的 async/await 表達式來作為協(xié)程的語法。
因此類似以下的調(diào)用是等價的:
async with lock:...with (yield from lock):... ###################### def main():return (yield from coro())def main():return (await coro())那么,怎么把生成器包裝為一個協(xié)程對象呢?這時候可以用到 types 包中的 coroutine 裝飾器(如果使用asyncio做驅(qū)動的話,那么也可以使用 asyncio 的 coroutine 裝飾器),@types.coroutine裝飾器會將一個生成器函數(shù)包裝為協(xié)程對象:
import asyncio import types@types.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()盡管兩個函數(shù)分別使用了新舊語法,但他們都是協(xié)程對象,也分別稱作?native coroutine?以及?generator-based coroutine,因此不用擔(dān)心語法問題。
下面觀察一個 asyncio 中 Future 的例子:
import asynciofuture = asyncio.Future()async def test_1():await asyncio.sleep(1)future.set_result('data')async def test_2():print(await future)loop = asyncio.get_event_loop() tasks_list = [test_1(), test_2()] loop.run_until_complete(asyncio.wait(tasks_list)) loop.close()兩個協(xié)程在事件循環(huán)中,協(xié)程?test_1 在執(zhí)行第一句后掛起自身切到 asyncio.sleep,而協(xié)程 test_2 一直等待 future 的結(jié)果,讓出事件循環(huán),計時器結(jié)束后 test_1 執(zhí)行第二句并設(shè)置了 future 的值,被掛起的 test_2 恢復(fù)執(zhí)行,打印出 future 的結(jié)果 'data' 。
future 可以被 await 證明了 future 對象是一個 Awaitable,進入 Future 類的源碼可以看到有一段代碼顯示了 future 實現(xiàn)了__await__ 協(xié)議:
class Future:...def __iter__(self):if not self.done():self._asyncio_future_blocking = Trueyield self # This tells Task to wait for completion.assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.if compat.PY35:__await__ = __iter__ # make compatible with 'await' expression當(dāng)執(zhí)行?await future?這行代碼時,future中的這段代碼就會被執(zhí)行,首先future檢查它自身是否已經(jīng)完成,如果沒有完成,掛起自身,告知當(dāng)前的 Task(任務(wù))等待 future 完成。
當(dāng) future 執(zhí)行 set_result 方法時,會觸發(fā)以下的代碼,設(shè)置結(jié)果,標(biāo)記 future 已經(jīng)完成:
def set_result(self, result):...if self._state != _PENDING:raise InvalidStateError('{}: {!r}'.format(self._state, self))self._result = resultself._state = _FINISHEDself._schedule_callbacks()最后 future 會調(diào)度自身的回調(diào)函數(shù),觸發(fā) Task._step() 告知 Task 驅(qū)動 future 從之前掛起的點恢復(fù)執(zhí)行,不難看出,future 會執(zhí)行下面的代碼:
class Future:...def __iter__(self):...assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.最終返回結(jié)果給調(diào)用方。
Yield from
調(diào)用協(xié)程 的方式有有很多,yield from 就是其中的一種。這種方式在 Python3.3 中被引入,在 Python3.5 中以 async/await 的形式進行了優(yōu)化。yield from 表達式的使用方式如下:
import asyncio@asyncio.coroutine def get_json(client, url): file_content = yield from load_file('/Users/scott/data.txt')正如所看到的,yield from 被使用在用 @asyncio.coroutine 裝飾的函數(shù)內(nèi),如果想把 yield from 在這個函數(shù)外使用,將會拋出如下語法錯誤:
File "main.py", line 1file_content = yield from load_file('/Users/scott/data.txt')^ SyntaxError: 'yield' outside function為了避免語法錯誤,yield from 必須在調(diào)用函數(shù)的內(nèi)部使用(這個調(diào)用函數(shù)通常被裝飾為協(xié)程)。
Async / await
較新的語法是使用 async/await 關(guān)鍵字。 async 從 Python3.5 開始被引進,跟 @asyncio.coroutine 裝飾器一樣,用來聲明一個函數(shù)是一個協(xié)程。只要把它放在函數(shù)定義之前,就可以應(yīng)用到函數(shù)上,使用方式如下:
async def ping_server(ip):# ping code here...實際調(diào)用這個函數(shù)時,使用 await 而不用 yield from ,當(dāng)然,使用方式依然差不多:
async def ping_local(ip):return await ping_server('192.168.1.1')再強調(diào)一遍,跟 yield from 一樣,不能在函數(shù)外部使用 await ,否則會拋出語法錯誤。 (譯者注: async 用來聲明一個函數(shù)是協(xié)程,然后使用 await調(diào)用這個協(xié)程, await 必須在函數(shù)內(nèi)部,這個函數(shù)通常也被聲明為另一個協(xié)程)
Python3.5 對這兩種調(diào)用協(xié)程的方法都提供了支持,但是推薦 async/await 作為首選。
Event Loop
如果你還不知道如何開始和操作一個 Eventloop ,那么上面有關(guān)協(xié)程所說的都起不了多大作用。 Eventloop 在執(zhí)行異步函數(shù)時非常重要,重要到只要執(zhí)行協(xié)程,基本上就得利用 Eventloop 。
Eventloop 提供了相當(dāng)多的功能:
- 注冊,執(zhí)行 和 取消 延遲調(diào)用(異步函數(shù))
- 創(chuàng)建 客戶端 與 服務(wù)端 傳輸用于通信
- 創(chuàng)建 子程序 和 通道 跟 其他的程序 進行通信
- 指定 函數(shù) 的 調(diào)用 到 線程池
Eventloop 有相當(dāng)多的配置和類型可供使用,但大部分程序只需要如下方式預(yù)定函數(shù)即可:
import asyncioasync def speak_async(): print('OMG asynchronicity!')loop = asyncio.get_event_loop() loop.run_until_complete(speak_async()) loop.close()有意思的是代碼中的最后三行,首先獲取默認的 Eventloop ( asyncio.get_event_loop() ),然后預(yù)定和運行異步任務(wù),并在完成后結(jié)束循環(huán)。
loop.run_until_complete() 函數(shù)實際上是阻塞性的,也就是在所有異步方法完成之前,它是不會返回的。但因為我們只在一個線程中運行這段代碼,它沒法再進一步擴展,即使循環(huán)仍在運行。
可能你現(xiàn)在還沒覺得這有多大的用處,因為我們通過調(diào)用其他 IO 來結(jié)束 Eventloop 中的阻塞(譯者注:也就是在阻塞時進行其他 IO ),但是想象一下,如果在網(wǎng)頁服務(wù)器上,把整個程序都封裝在異步函數(shù)內(nèi),到時就可以同時運行多個異步請求了。
也可以將 Eventloop 的線程中斷,利用它去處理所有耗時較長的 IO 請求,而主線程處理程序邏輯或者用戶界面。
一個案例
讓我們實際操作一個稍大的案例。下面這段代碼就是一個非常漂亮的異步程序,它先從 Reddit 抓取 JSON 數(shù)據(jù),解析它,然后打印出當(dāng)天來自 /r/python,/r/programming 和 /r/compsci 的置頂帖。
所示的第一個方法 get_json() ,由 get_reddit_top() 調(diào)用,然后只創(chuàng)建一個 GET 請求到適當(dāng)?shù)木W(wǎng)址。當(dāng)這個方法和 await 一起調(diào)用后, Eventloop 便能夠繼續(xù)為其他的協(xié)程服務(wù),同時等待 HTTP 響應(yīng)達到。一旦響應(yīng)完成, JSON 數(shù)據(jù)就返回到 get_reddit_top() ,得到解析并打印出來。
import signal import sys import asyncio import aiohttp import jsonloop = asyncio.get_event_loop() client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):async with client.get(url) as response:assert response.status == 200return await response.read()async def get_reddit_top(subreddit, client):data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')j = json.loads(data1.decode('utf-8'))for i in j['data']['children']:score = i['data']['score']title = i['data']['title']link = i['data']['url']print(str(score) + ': ' + title + ' (' + link + ')')print('DONE:', subreddit + '\n')def signal_handler(signal, frame):loop.stop()client.close()sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client)) asyncio.ensure_future(get_reddit_top('programming', client)) asyncio.ensure_future(get_reddit_top('compsci', client)) loop.run_forever()注意,如果多次運行這段代碼,打印出來的 subreddit 數(shù)據(jù)在順序上會有些許變化。這是因為每當(dāng)我們調(diào)用一次代碼都會釋放對線程的控制,容許線程去處理另一個 HTTP 調(diào)用。這將導(dǎo)致誰先獲得響應(yīng),誰就先打印出來。
結(jié)論
即使 Python 內(nèi)置的異步操作沒有 Javascript 那么順暢,但這并不意味著就不能用它來把應(yīng)用變得更有趣、更有效率。只要花半個小時的時間去了解它的來龍去脈,你就會感覺把異步操作應(yīng)用到你的程序中將會是多美好的一件事。
aiohttp
asyncio?可以實現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把asyncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用 單線程 +?coroutine?實現(xiàn)多用戶的高并發(fā)支持。
asyncio?實現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp?則是基于?asyncio?實現(xiàn)的 HTTP 框架。
我們先安裝?aiohttp:pip install aiohttp
然后編寫一個HTTP服務(wù)器,分別處理以下URL:
- / - 首頁返回b'
Index
'; - /hello/{name} - 根據(jù)URL參數(shù)返回文本hello, %s!。
代碼如下:
import asynciofrom aiohttp import webasync def index(request):await asyncio.sleep(0.5)return web.Response(body=b'<h1>Index</h1>')async def hello(request):await asyncio.sleep(0.5)text = '<h1>hello, %s!</h1>' % request.match_info['name']return web.Response(body=text.encode('utf-8'))async def init(loop):app = web.Application(loop=loop)app.router.add_route('GET', '/', index)app.router.add_route('GET', '/hello/{name}', hello)srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)print('Server started at http://127.0.0.1:8000...')return srvloop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()注意?aiohttp的初始化函數(shù)init()也是一個coroutine,loop.create_server()則利用asyncio創(chuàng)建TCP服務(wù)。
參考源碼:aio_web.py :https://github.com/michaelliao/learn-python3/blob/master/samples/async/aio_web.py
?一切從爬蟲開始
【續(xù)篇】Python 協(xié)程之從放棄到死亡再到重生:https://www.secpulse.com/archives/64912.html
從一個簡單的爬蟲開始,這個爬蟲很簡單,訪問指定的URL,并且獲取內(nèi)容并計算長度,這里我們給定5個URL。第一版的代碼十分簡單,順序獲取每個URL的內(nèi)容,當(dāng)?shù)谝粋€請求完成、計算完長度后,再開始第二個請求。
spider_normal.py
# filename: spider_normal.py import time import requeststargets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def spider():results = {}for url in targets:r = requests.get(url)length = len(r.content)results[url] = lengthreturn resultsdef show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def main():start_time = time.time()results = spider()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(results)if __name__ == '__main__':main()我們多運行幾次看看結(jié)果。
大約需要花費14-16秒不等,這段代碼并沒有什么好看的,我們把關(guān)注點放在后面的代碼上。現(xiàn)在我們使用多線程來改寫這段代碼。
# filename: spider_thread.py import time import threading import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthdef main():ts = []start_time = time.time()for url in targets:t = threading.Thread(target=spider, args=(url,))ts.append(t)t.start()for t in ts:t.join()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()我們多運行幾次看看結(jié)果。
從這兩段代碼中,已經(jīng)可以看出并發(fā)對于處理任務(wù)的好處了,但是使用原生的threading模塊還是略顯麻煩,Python已經(jīng)給我們內(nèi)置了一個處理并發(fā)任務(wù)的庫concurrent,我們借用這個庫修改一下我們的代碼,之所以修改成這個庫的原因還有一個,那就是引出我們后面會談到的Future。
# filename: spider_thread.py import time from concurrent import futures import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthreturn Truedef main():start_time = time.time()with futures.ThreadPoolExecutor(10) as executor:res = executor.map(spider, targets)print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()執(zhí)行一下,會發(fā)現(xiàn)耗時與上一個版本一樣,穩(wěn)定在10s左右。
可以看到我們調(diào)用了concurrent庫中的futures,那么到底什么是futures?簡單的講,這個對象代表一種異步的操作,可以表示為一個需要延時進行的操作,當(dāng)然這個操作的狀態(tài)可能已經(jīng)完成,也有可能尚未完成,如果你寫JS的話,可以理解為是類似Promise的對象。在Python中,標(biāo)準庫中其實有兩個Future類,一個是concurrent.futures.Future,另外一個是asyncio.Future,這兩個類很類似,不完全相同,這些實現(xiàn)差異以及API的差異我們先按下暫且不談,有興趣的同學(xué)可以參考下相關(guān)的文檔。Future是我們后面討論的asyncio異步編程的基礎(chǔ),因此這里多說兩句。
Future代表的是一個未來的某一個時刻一定會執(zhí)行的操作(可能已經(jīng)執(zhí)行完成了,但是無論如何他一定有一個確切的運行時間),一般情況下用戶無需手動從零開始創(chuàng)建一個Future,而是應(yīng)當(dāng)借助框架中的API生成。比如調(diào)用concurrent.futures.Executor.submit()時,框架會為"異步操作"進行一個排期,來決定何時運行這個操作,這時候就會生成一個Future對象。
現(xiàn)在,我們來看看如何使用asyncio進行異步編程,與多線程編程不同的是,多個協(xié)程總是運行在同一個線程中的,一旦其中的一個協(xié)程發(fā)生阻塞行為,那么整個線程都被阻塞,進而所有的協(xié)程都無法繼續(xù)運行。asyncio.Future和asyncio.Task都可以看做是一個異步操作,后者是前者的子類,BaseEventLoop.create_task()會接收一個協(xié)程作為參數(shù),并且對這個任務(wù)的運行時間進行排期,返回一個asyncio.Task類的實例,這個對象也是對于協(xié)程的一層包裝。如果想獲取asyncio.Future的執(zhí)行結(jié)果,應(yīng)當(dāng)使用yield from來獲取,這樣控制權(quán)會被自動交還給EventLoop,我們無需處理"等待Future或Task運行完成"這個操作。于是就有了一個很愉悅的編程方式,如果一個函數(shù)A是協(xié)程、或返回Task或Future的實例的函數(shù),就可以通過result = yield from A()來獲取返回值。下面我們就使用asyncio和aiohttp來改寫我們的爬蟲。
import asyncio import timeimport aiohttpfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))async def get_content(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:content = await resp.read()return len(content)async def spider(url):length = await get_content(url)final_results[url] = lengthreturn Truedef main():loop = asyncio.get_event_loop()cor = [spider(url) for url in targets]start_time = time.time()result = loop.run_until_complete(asyncio.gather(*cor))print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)print("loop result: ", result)if __name__ == '__main__':main()結(jié)果非常驚人
這里可能有同學(xué)會問為什么沒看到y(tǒng)ield from以及@asyncio.coroutine,那是因為在Python3.5以后,增加了async def和awiat語法,等效于@asyncio.coroutine和yield from,詳情可以參考上一篇文章。在main()函數(shù)中,我們先獲取一個可用的事件循環(huán),緊接著將生成好的協(xié)程任務(wù)添加到這個循環(huán)中,并且等待執(zhí)行完成。在每個spider()中,執(zhí)行到await的時候,會交出控制權(quán)(如果不明白請向前看一下委托生成器的部分),并且切到其他的協(xié)程繼續(xù)運行,等到get_content()執(zhí)行完成返回后,那么會恢復(fù)spider()協(xié)程的執(zhí)行。get_content()函數(shù)中只是通過async with調(diào)用aiohttp庫的最基本方法獲取頁面內(nèi)容,并且返回了長度,僅此而已。
在修改為協(xié)程版本后,爬蟲性能有了巨大的提升,從最初了15s,到10s,再到現(xiàn)在的2s左右,簡直是質(zhì)的飛躍。這只是一個簡單的爬蟲程序,相比多線程,性能提高了近5倍,如果是其他更加復(fù)雜的大型程序,也許性能提升會更多。asyncio這套異步編程框架,通過簡單的事件循環(huán)以及協(xié)程機制,在需要等待的情況下主動交出控制權(quán),切換到其他協(xié)程進行運行。到這里就會有人問,為什么要將requests替換為aiohttp,能不能用requests?答案是不能,還是我們前面提到過的,在協(xié)程中,一切操作都要避免阻塞,禁止所有的阻塞型調(diào)用,因為所有的協(xié)程都是運行在同一個線程中的!requests庫是阻塞型的調(diào)用,當(dāng)在等待I/O時,并不能將控制權(quán)轉(zhuǎn)交給其他協(xié)程,甚至還會將當(dāng)前線程阻塞,其他的協(xié)程也無法運行。如果你在異步編程的時候需要用到一些其他的異步組件,可以到https://github.com/aio-libs/這里找找,也許就有你需要的異步庫。
關(guān)于asyncio的異步編程資料目前來說還不算很多,官方文檔應(yīng)該算是相當(dāng)不錯的參考文獻了,其中非常推薦的兩部分是:Develop with asyncio和Tasks and coroutines,各位同學(xué)有興趣的話可以自行閱讀。asyncio這個異步框架中包含了非常多的內(nèi)容,甚至還有TCP Server/Client的相關(guān)內(nèi)容,如果想要掌握asyncio這個異步編程框架,還需要多加練習(xí)。順帶一提,asyncio非常容易與其他的框架整合,例如tornado已經(jīng)有實現(xiàn)了asyncio.AbstractEventLoop的接口的類AsyncIOMainLoop,還有人將asyncio集成到QT的事件循環(huán)中了,可以說是非常的靈活了。
Python 協(xié)程總結(jié)
Python 之所以能夠處理網(wǎng)絡(luò) IO 高并發(fā),是因為借助了高效的IO模型,能夠最大限度的調(diào)度IO,然后事件循環(huán)使用協(xié)程處理IO,協(xié)程遇到IO操作就將控制權(quán)拋出,那么在IO準備好之前的這段事件,事件循環(huán)就可以使用其他的協(xié)程處理其他事情,然后協(xié)程在用戶空間,并且是單線程的,所以不會像多線程,多進程那樣頻繁的上下文切換,因而能夠節(jié)省大量的不必要性能損失。
注: 不要再協(xié)程里面使用time.sleep之類的同步操作,因為協(xié)程再單線程里面,所以會使得整個線程停下來等待,也就沒有協(xié)程的優(yōu)勢了
理解
協(xié)程,又稱為微線程,看上去像是子程序,但是它和子程序又不太一樣,它在執(zhí)行的過程中,可以在中斷當(dāng)前的子程序后去執(zhí)行別的子程序,再返回來執(zhí)行之前的子程序,但是它的相關(guān)信息還是之前的。
優(yōu)點:
如果要充分利用CPU多核,可以通過使用多進程+協(xié)程的方式
使用
打開 asyncio 的源代碼,可以發(fā)現(xiàn)asyncio中的需要用到的文件如下:
下面的則是接下來要總結(jié)的文件
| base_events | 基礎(chǔ)的事件,提供了BaseEventLoop事件 |
| coroutines | 提供了封裝成協(xié)程的類 |
| events | 提供了事件的抽象類,比如BaseEventLoop繼承了AbstractEventLoop |
| futures | 提供了Future類 |
| tasks | 提供了Task類和相關(guān)的方法 |
coroutines
| coroutine(func) | 為函數(shù)加上裝飾器 |
| iscoroutinefunction(func) | 判斷函數(shù)是否使用了裝飾器 |
| iscoroutine(obj) | 判斷該對象是否是裝飾器 |
如果在函數(shù)使用了coroutine裝飾器,就可以通過yield from去調(diào)用async def聲明的函數(shù),如果已經(jīng)使用async def聲明,就沒有必要再使用裝飾器了,這兩個功能是一樣的。
import asyncio@asyncio.coroutine def hello_world():print("Hello World!")async def hello_world2():print("Hello World2!")print('------hello_world------') print(asyncio.iscoroutinefunction(hello_world))print('------hello_world2------') print(asyncio.iscoroutinefunction(hello_world2))print('------event loop------') loop = asyncio.get_event_loop()# 一直阻塞該函數(shù)調(diào)用到函數(shù)返回 loop.run_until_complete(hello_world()) loop.run_until_complete(hello_world2()) loop.close()上面的代碼分別使用到了coroutine裝飾器和async def,其運行結(jié)果如下:
------hello_world------ True ------hello_world2------ True ------event loop------ Hello World! Hello World2!注意:不可以直接調(diào)用協(xié)程,需要一個event loop去調(diào)用。
如果想要在一個函數(shù)中去得到另外一個函數(shù)的結(jié)果,可以使用yield from或者await,例子如下:
import asyncioasync def compute(x, y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()函數(shù) print_sum 會一直等到函數(shù) compute 返回結(jié)果,執(zhí)行過程如下:
base_events
這個文件里面漏出來的只有BaseEventLoop一個類,它的相關(guān)方法如下:
| create_future() | 創(chuàng)建一個future對象并且綁定到事件上 |
| create_task() | 創(chuàng)建一個任務(wù) |
| run_forever() | 除非調(diào)用stop,否則事件會一直運行下去 |
| run_until_complete(future) | 直到 future 對象執(zhí)行完畢,事件才停止 |
| stop() | 停止事件 |
| close() | 關(guān)閉事件 |
| is_closed() | 判斷事件是否關(guān)閉 |
| time() | 返回事件運行時的時間 |
| call_later(delay, callback, *args) | 設(shè)置一個回調(diào)函數(shù),并且可以設(shè)置延遲的時間 |
| call_at(when, callback, *args) | 同上,但是設(shè)置的是絕對時間 |
| call_soon(callback, *args) | 馬上調(diào)用 |
events
| get_event_loop() | 返回一個異步的事件 |
| ... | ... |
返回的就是BaseEventLoop的對象。
future
Future類的相關(guān)方法如下:
| cancel() | 取消掉future對象 |
| cancelled() | 返回是否已經(jīng)取消掉 |
| done() | 如果future已經(jīng)完成則返回true |
| result() | 返回future執(zhí)行的結(jié)果 |
| exception() | 返回在future中設(shè)置了的exception |
| add_done_callback(fn) | 當(dāng)future執(zhí)行時執(zhí)行回調(diào)函數(shù) |
| remove_done_callback(fn) | 刪除future的所有回調(diào)函數(shù) |
| set_result(result) | 設(shè)置future的結(jié)果 |
| set_exception(exception) | 設(shè)置future的異常 |
設(shè)置 future 的例子如下:
import asyncioasync def slow_operation(future):await asyncio.sleep(1) # 睡眠future.set_result('Future is done!') # future設(shè)置結(jié)果loop = asyncio.get_event_loop() future = asyncio.Future() # 創(chuàng)建future對象 asyncio.ensure_future(slow_operation(future)) # 創(chuàng)建任務(wù) loop.run_until_complete(future) # 阻塞直到future執(zhí)行完才停止事件 print(future.result()) loop.close()run_until_complete方法在內(nèi)部通過調(diào)用了future的add_done_callback,當(dāng)執(zhí)行future完畢的時候,就會通知事件。
下面這個例子則是通過使用future的add_done_callback方法實現(xiàn)和上面例子一樣的效果:
import asyncioasync def slow_operation(future):await asyncio.sleep(1)future.set_result('Future is done!')def got_result(future):print(future.result())loop.stop() # 關(guān)閉事件loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) # future執(zhí)行完畢就執(zhí)行該回調(diào) try:loop.run_forever() finally:loop.close()一旦slow_operation函數(shù)執(zhí)行完畢的時候,就會去執(zhí)行g(shù)ot_result函數(shù),里面則調(diào)用了關(guān)閉事件,所以不用擔(dān)心事件會一直執(zhí)行。
task
Task類是Future的一個子類,也就是Future中的方法,task都可以使用,類方法如下:
| current_task(loop=None) | 返回指定事件中的任務(wù),如果沒有指定,則默認當(dāng)前事件 |
| all_tasks(loop=None) | 返回指定事件中的所有任務(wù) |
| cancel() | 取消任務(wù) |
并行執(zhí)行三個任務(wù)的例子:
import asyncioasync def factorial(name, number):f = 1for i in range(2, number + 1):print("Task %s: Compute factorial(%s)..." % (name, i))await asyncio.sleep(1)f *= iprint("Task %s: factorial(%s) = %s" % (name, number, f))loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(factorial("A", 2),factorial("B", 3),factorial("C", 4), )) loop.close()執(zhí)行結(jié)果為
Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24
可以發(fā)現(xiàn),ABC同時執(zhí)行,直到future執(zhí)行完畢才退出。
下面一些方法是和task相關(guān)的方法
| as_completed(fs, *, loop=None, timeout=None) | 返回是協(xié)程的迭代器 |
| ensure_future(coro_or_future, *, loop=None) | 調(diào)度執(zhí)行一個 coroutine object:并且它封裝成future。返回任務(wù)對象 |
| async(coro_or_future, *, loop=None) | 丟棄的方法,推薦使用ensure_future |
| wrap_future(future, *, loop=None) | Wrap a concurrent.futures.Future object in a Future object. |
| gather(*coros_or_futures, loop=None, return_exceptions=False) | 從給定的協(xié)程或者future對象數(shù)組中返回future匯總的結(jié)果 |
| sleep(delay, result=None, *, loop=None) | 創(chuàng)建一個在給定時間(以秒為單位)后完成的協(xié)程 |
| shield(arg, *, loop=None) | 等待future,屏蔽future被取消 |
| wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED) | 等待由序列futures給出的Futures和協(xié)程對象完成。協(xié)程將被包裹在任務(wù)中。返回含兩個集合的Future:(done,pending) |
| wait_for(fut, timeout, *, loop=None) | 等待單個Future或coroutine object完成超時。如果超時為None,則阻止直到future完成 |
總結(jié)
以上是生活随笔為你收集整理的Python 异步 IO 、协程、asyncio、async/await、aiohttp的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小甲鱼 OllyDbg 教程系列 (十三
- 下一篇: 开源 Python网络爬虫框架 Scra