日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 异步 IO 、协程、asyncio、async/await、aiohttp

發布時間:2024/7/23 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 异步 IO 、协程、asyncio、async/await、aiohttp 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


From :廖雪峰?異步IO :https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152

Python Async/Await入門指南 :https://zhuanlan.zhihu.com/p/27258289

Python 生成器 和 yield 關鍵字:https://blog.csdn.net/freeking101/article/details/51126293

協程與任務?官網文檔https://docs.python.org/zh-cn/3/library/asyncio-task.html

Python中異步協程的使用方法介紹https://blog.csdn.net/freeking101/article/details/88119858

python 協程詳解及I/O多路復用,I/O異步:https://blog.csdn.net/u014028063/article/details/81408395

Python協程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html

asyncio 進階:Python黑魔法 --- 異步IO( asyncio) 協程:https://www.cnblogs.com/dhcn/p/9033628.html

談談Python協程技術的演進:https://www.freebuf.com/company-information/153421.html

最后推薦一下《流暢的Python》,這本書中 第16章 協程的部分介紹的非常詳細
《流暢的Python》pdf 下載地址:https://download.csdn.net/download/freeking101/10993120

gevent 是 python 的一個并發框架,以微線程 greenlet 為核心,使用了 epoll 事件監聽機制以及諸多其他優化而變得高效。

aiohttp 使用代理 ip 訪問 https 網站報錯的問題https://blog.csdn.net/qq_43210211/article/details/108379917

Python:使用 Future、asyncio 處理并發

:https://blog.csdn.net/sinat_38682860/article/details/105419842

異步? IO

在 IO 編程(?廖雪峰 Python IO 編程 :https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776)?一節中,我們已經知道,CPU的速度遠遠快于磁盤、網絡等IO。在一個線程中,CPU執行代碼的速度極快,然而,一旦遇到IO操作,如讀寫文件、發送網絡數據時,就需要等待IO操作完成,才能繼續進行下一步操作。這種情況稱為同步IO。

在IO操作的過程中,當前線程被掛起,而其他需要CPU執行的代碼就無法被當前線程執行了。

因為一個 IO 操作就阻塞了當前線程,導致其他代碼無法執行,所以我們必須使用多線程或者多進程來并發執行代碼,為多個用戶服務。每個用戶都會分配一個線程,如果遇到IO導致線程被掛起,其他用戶的線程不受影響。

多線程和多進程的模型雖然解決了并發問題,但是系統不能無上限地增加線程。由于系統切換線程的開銷也很大,所以,一旦線程數量過多,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了,結果導致性能嚴重下降。

由于我們要解決的問題是CPU高速執行能力和IO設備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法。

另一種解決IO問題的方法是異步IO。當代碼需要執行一個耗時的IO操作時,它只發出IO指令,并不等待IO結果,然后就去執行其他代碼了。一段時間后,當IO返回結果時,再通知CPU進行處理。

消息模型 其實早在應用在桌面應用程序中了。一個 GUI 程序的主線程就負責不停地讀取消息并處理消息。所有的鍵盤、鼠標等消息都被發送到GUI程序的消息隊列中,然后由GUI程序的主線程處理。

由于GUI 線程處理鍵盤、鼠標等消息的速度非常快,所以用戶感覺不到延遲。某些時候,GUI線程在一個消息處理的過程中遇到問題導致一次消息處理時間過長,此時,用戶會感覺到整個GUI程序停止響應了,敲鍵盤、點鼠標都沒有反應。這種情況說明在消息模型中,處理一個消息必須非常迅速,否則,主線程將無法及時處理消息隊列中的其他消息,導致程序看上去停止響應。

消息模型 是 如何解決 同步IO 必須等待IO操作這一問題的呢 ?

在消息處理過程中,當遇到 IO 操作時,代碼只負責發出IO請求,不等待IO結果,然后直接結束本輪消息處理,進入下一輪消息處理過程。當IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結果。

在 “發出IO請求” 到收到 “IO完成” 的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環中繼續處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數IO密集型的應用程序,使用異步IO將大大提升系統的多任務處理能力。

協程 (Coroutines)

在學習異步IO模型前,我們先來了解協程,協程 又稱 微線程,纖程,英文名 Coroutine

子程序(?又叫?函數 ) 協程

  • 子程序?在 所有語言中都是層級調用。比如: A 調用 B,B 在執行過程中又調用了 C,C 執行完畢返回,B 執行完畢返回,最后是 A 執行完畢。所以 子程序 即 函數 的調用是通過棧實現的,一個線程就是執行一個子程序。子程序調用總是一個入口,一次返回,調用順序是明確的。
  • 協程的調用 和 子程序 不同。協程 看上去也是 子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。

注意,在一個 子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷。比如:子程序 A 和 B :

def A():print('1')print('2')print('3')def B():print('x')print('y')print('z')

假設由協程執行,在執行 A 的過程中,可以隨時中斷,去執行 B,B 也可能在執行過程中中斷再去執行 A,結果可能是:

1 2 x y 3 z

但是在 A 中是沒有調用 B 的,所以 協程的調用 比 函數調用 理解起來要難一些。

看起來 A、B 的執行有點像多線程,但 協程 的特點在于是一個線程執行。

協程 和 多線程比,協程有何優勢?

  • 1. 最大的優勢就是協程極高的執行效率。因為 子程序 切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。
  • 2. 第二大優勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因為協程是一個線程執行,那怎么利用多核CPU呢?

最簡單的方法是 多進程 + 協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

Python 對 協程 的支持 是通過 generator (生成器)實現的

在 generator 中,我們不但可以通過 for 循環來迭代,還可以不斷調用 next() 函數獲取由 yield 語句返回的下一個值。

但是 Python 的 yield 不但可以返回一個值,它還可以接收調用者發出的參數。

來看例子:

傳統的 生產者-消費者 模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。如果改用協程,生產者生產消息后,直接通過 yield 跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:

#!/usr/bin/python3 # -*- coding: utf-8 -*- # @Author : # @File : text.py # @Software : PyCharm # @description : XXXdef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)r = '200 OK'def produce(c):c.send(None)n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()c = consumer() produce(c)

執行結果:

[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK

注意到 consumer函數 是一個 generator,把一個 consumer 傳入 produce 后:

  • 首先調用 c.send(None) 啟動生成器;
  • 然后,一旦生產了東西,通過 c.send(n) 切換到 consumer 執行;
  • consumer 通過 yield拿到消息,處理,又通過yield把結果傳回;

  • produce 拿到 consumer 處理的結果,繼續生產下一條消息;

  • produce 決定不生產了,通過 c.close() 關閉 consumer,整個過程結束。

  • 整個流程無鎖,由一個線程執行,produce?和?consumer?協作完成任務,所以稱為 “協程”,而非線程的搶占式多任務。

    最后套用 Donald Knuth 的一句話總結協程的特點:“子程序就是協程的一種特例。”

    參考源碼:https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py

    在 Python 中,異步函數? 通常 被稱作? 協程

    創建一個協程僅僅只需使用 async 關鍵字,或者使用 @asyncio.coroutine 裝飾器。下面的任一代碼,都可以作為協程工作,形式上也是等同的:

    import asyncio# 方式 1 async def ping_server(ip):pass# 方式 2 @asyncio.coroutine def load_file(path):pass

    上面這兩個?特殊的函數,在調用時會返回協程對象。熟悉 JavaScript 中 Promise 的同學,可以把這個返回對象當作跟 Promise 差不多。調用他們中的任意一個,實際上并未立即運行,而是返回一個協程對象,然后將其傳遞到 Eventloop 中,之后再執行。

    • 如何判斷一個?函數是不是協程??? ?asyncio 提供了 asyncio.iscoroutinefunction(func) 方法。
    • 如何判斷一個 函數返回的是不是協程對象 ?? 可以使用 asyncio.iscoroutine(obj) 。

    用 asyncio 提供的 @asyncio.coroutine 可以把一個 generator 標記為 coroutine 類型,然后在 coroutine 內部用 yield from 調用另一個 coroutine 實現異步操作。

    Python 3.5 開始引入了新的語法 async await

    為了簡化并更好地標識異步 IO,從 Python 3.5 開始引入了新的語法 async await,可以讓 coroutine 的代碼更簡潔易讀。

    ?async?/ await 是 python3.5 的新語法,需使用 Python3.5 版本 或 以上才能正確運行。

    注意:async?和?await?是針對 coroutine 的新語法,要使用新的語法,只需要做兩步簡單的替換:

    • 把?@asyncio.coroutine?替換為?async?
    • 把?yield from?替換為?await

    ?Python 3.5 以前 版本原來老的語法使用 協程

    import asyncio@asyncio.coroutine def hello():print("Hello world!")r = yield from asyncio.sleep(1)print("Hello again!")

    Python 3.5 以后 用新語法重新編寫如下:

    import asyncioasync def hello():print("Hello world!")r = await asyncio.sleep(1)print("Hello again!")

    在過去幾年內,異步編程由于某些好的原因得到了充分的重視。雖然它比線性編程難一點,但是效率相對來說也是更高。

    比如,利用 Python 的 異步協程 (async coroutine) ,在提交 HTTP 請求后,就沒必要等待請求完成再進一步操作,而是可以一邊等著請求完成,一邊做著其他工作。這可能在邏輯上需要多些思考來保證程序正確運行,但是好處是可以利用更少的資源做更多的事。

    即便邏輯上需要多些思考,但實際上在 Python 語言中,異步編程的語法和執行并不難。跟 Javascript 不一樣,現在 Python 的異步協程已經執行得相當好了。

    對于服務端編程,異步性似乎是 Node.js 流行的一大原因。我們寫的很多代碼,特別是那些諸如網站之類的高 I/O 應用,都依賴于外部資源。這可以是任何資源,包括從遠程數據庫調用到 POST 一個 REST 請求。一旦你請求這些資源的任一一個,你的代碼在等待資源響應時便無事可做 (譯者注:如果沒有異步編程的話)。

    有了異步編程,在等待這些資源響應的過程中,你的代碼便可以去處理其他的任務。

    Python async / await 手冊

    Python?部落:Python async/await 手冊:https://python.freelycode.com/contribution/detail/57

    知乎:從 0 到 1,Python 異步編程的演進之路(?通過爬蟲演示進化之路?)https://zhuanlan.zhihu.com/p/25228075

    async / await 的使用

    async 用來聲明一個函數是協程然后使用 await 調用這個協程, await 必須在函數內部,這個函數通常也被聲明為另一個協程await 的目的是等待協程控制流的返回yield 的目的 是 暫停并掛起函數的操作。

    正常的函數在執行時是不會中斷的,所以你要寫一個能夠中斷的函數,就需要添加 async 關鍵。

    • async 用來聲明一個函數為異步函數異步函數的特點是能在函數執行過程中掛起,去執行其他異步函數,等到掛起條件(假設掛起條件是sleep(5))消失后,也就是5秒到了再回來執行。
    • await 可以將耗時等待的操作掛起,讓出控制權(?await 語法來掛起自身的協程比如:異步程序執行到某一步時需要等待的時間很長,就將此掛起,去執行其他的異步程序。await 后面只能跟 異步程序 或 有 __await__ 屬性 的 對象因為異步程序與一般程序不同

    假設有兩個異步函數 async a,async b,a 中的某一步有 await,當程序碰到關鍵字 await b() 后,異步程序掛起后去執行另一個異步b程序,就是從函數內部跳出去執行其他函數,當掛起條件消失后,不管b是否執行完,要馬上從b程序中跳出來,回到原程序執行原來的操作。如果 await 后面跟的 b 函數不是異步函數,那么操作就只能等 b 執行完再返回,無法在 b 執行的過程中返回。如果要在 b 執行完才返回,也就不需要用 await 關鍵字了,直接調用 b 函數就行。所以這就需要 await 后面跟的是 異步函數了。在一個異步函數中,可以不止一次掛起,也就是可以用多個 await 。

    看下 Python 中常見的幾種函數形式:

    # 1. 普通函數 def function():return 1# 2. 生成器函數 def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數和生成器函數包裝成異步函數和異步生成器。# 3. 異步函數(協程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1

    通過類型判斷可以驗證函數的類型

    import types# 1. 普通函數 def function():return 1# 2. 生成器函數 def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數和生成器函數包裝成異步函數和異步生成器。# 3. 異步函數(協程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1print(type(function) is types.FunctionType) print(type(generator()) is types.GeneratorType) print(type(async_function()) is types.CoroutineType) print(type(async_generator()) is types.AsyncGeneratorType)

    直接調用異步函數不會返回結果,而是返回一個coroutine對象:

    print(async_function()) # <coroutine object async_function at 0x102ff67d8>

    協程 需要通過其他方式來驅動,因此可以使用這個協程對象的 send 方法給協程發送一個值:

    print(async_function().send(None))

    不幸的是,如果通過上面的調用會拋出一個異常:StopIteration: 1

    因為 生成器 / 協程 在正常返回退出時會拋出一個 StopIteration 異常,而原來的返回值會存放在 StopIteration 對象的 value 屬性中,通過以下捕獲可以獲取協程真正的返回值:?

    try:async_function().send(None) except StopIteration as e:print(e.value) # 1

    通過上面的方式來新建一個 run 函數來驅動協程函數,在協程函數中,可以通過 await 語法來掛起自身的協程,并等待另一個 協程 完成直到返回結果:

    def run(coroutine):try:coroutine.send(None)except StopIteration as e:return 'run() : return {0}'.format(e.value)async def async_function():return 1async def await_coroutine():result = await async_function()print('await_coroutine() : print {0} '.format(result))ret_val = run(await_coroutine()) print(ret_val)

    要注意的是,await 語法只能出現在通過 async 修飾的函數中,否則會報 SyntaxError 錯誤。

    而且 await 后面的對象需要是一個 Awaitable,或者實現了相關的協議。

    查看 Awaitable 抽象類的代碼,表明了只要一個類實現了__await__方法,那么通過它構造出來的實例就是一個 Awaitable:

    class Awaitable(metaclass=ABCMeta):__slots__ = ()@abstractmethoddef __await__(self):yield@classmethoddef __subclasshook__(cls, C):if cls is Awaitable:return _check_methods(C, "__await__")return NotImplemented

    而且可以看到,Coroutine類 也繼承了 Awaitable,而且實現了 send,throw 和 close 方法。所以 await 一個調用異步函數返回的協程對象是合法的。

    class Coroutine(Awaitable):__slots__ = ()@abstractmethoddef send(self, value):...@abstractmethoddef throw(self, typ, val=None, tb=None):...def close(self):...@classmethoddef __subclasshook__(cls, C):if cls is Coroutine:return _check_methods(C, '__await__', 'send', 'throw', 'close')return NotImplemented

    接下來是異步生成器,來看一個例子:

    假如我要到一家超市去購買土豆,而超市貨架上的土豆數量是有限的:

    class Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)

    現在我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:

    def take_potatos(num):count = 0while True:if len(all_potatos) == 0:sleep(.1)else:potato = all_potatos.pop()yield potatocount += 1if count == num:breakdef buy_potatos():bucket = []for p in take_potatos(50):bucket.append(p)

    對應到代碼中,就是迭代一個生成器的模型,顯然,當貨架上的土豆不夠的時候,這時只能夠死等,而且在上面例子中等多長時間都不會有結果(因為一切都是同步的),也許可以用多進程和多線程解決,而在現實生活中,更應該像是這樣的:

    import asyncio import randomclass Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)async def take_potatos(num):count = 0while True:if len(all_potatos) == 0:await ask_for_potato()potato = all_potatos.pop()yield potatocount += 1if count == num:breakasync def ask_for_potato():await asyncio.sleep(random.random())all_potatos.extend(Potato.make(random.randint(1, 10)))async def buy_potatos():bucket = []async for p in take_potatos(50):bucket.append(p)print(f'Got potato {id(p)}...')def main():loop = asyncio.get_event_loop()res = loop.run_until_complete(buy_potatos())loop.close()if __name__ == '__main__':main()

    當貨架上的土豆沒有了之后,可以詢問超市請求需要更多的土豆,這時候需要等待一段時間直到生產者完成生產的過程。

    當生產者完成和返回之后,這是便能從 await 掛起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程。

    用 asyncio 運行這段代碼,結果是這樣的:

    Got potato 4338641384... Got potato 4338641160... Got potato 4338614736... Got potato 4338614680... Got potato 4338614568... Got potato 4344861864... Got potato 4344843456... Got potato 4344843400... Got potato 4338641384... Got potato 4338641160... ...

    既然是異步的,在請求之后不一定要死等,而是可以做其他事情。比如除了土豆,我還想買番茄,這時只需要在事件循環中再添加一個過程:

    def main():import asyncioloop = asyncio.get_event_loop()res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))loop.close()

    再來運行這段代碼:

    Got potato 4423119312... Got tomato 4423119368... Got potato 4429291024... Got potato 4421640768... Got tomato 4429331704... Got tomato 4429331760... Got tomato 4423119368... Got potato 4429331760... Got potato 4429331704... Got potato 4429346688... Got potato 4429346072... Got tomato 4429347360... ...

    看下 AsyncGenerator 的定義,它需要實現 __aiter__ 和 __anext__ 兩個核心方法,以及 asend,athrow,aclose 方法。

    class AsyncGenerator(AsyncIterator):__slots__ = ()async def __anext__(self):...@abstractmethodasync def asend(self, value):...@abstractmethodasync def athrow(self, typ, val=None, tb=None):...async def aclose(self):...@classmethoddef __subclasshook__(cls, C):if cls is AsyncGenerator:return _check_methods(C, '__aiter__', '__anext__','asend', 'athrow', 'aclose')return NotImplemented

    異步生成器是在 3.6 之后才有的特性,同樣的還有異步推導表達式,因此在上面的例子中,也可以寫成這樣:

    bucket = [p async for p in take_potatos(50)]

    類似的,還有 await 表達式:

    result = [await fun() for fun in funcs if await condition()]

    除了函數之外,類實例的普通方法也能用 async 語法修飾:

    class ThreeTwoOne:async def begin(self):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():t = ThreeTwoOne()await t.begin()print('start')

    實例方法的調用同樣是返回一個 coroutine:

    function = ThreeTwoOne.begin method = function.__get__(ThreeTwoOne, ThreeTwoOne()) import inspect assert inspect.isfunction(function) assert inspect.ismethod(method) assert inspect.iscoroutine(method())

    同理 還有類方法:

    class ThreeTwoOne:@classmethodasync def begin(cls):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():await ThreeTwoOne.begin()print('start')

    根據PEP 492中,async 也可以應用到 上下文管理器中,__aenter__ 和 __aexit__ 需要返回一個 Awaitable:

    class GameContext:async def __aenter__(self):print('game loading...')await asyncio.sleep(1)async def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def game():async with GameContext():print('game start...')await asyncio.sleep(2)

    在3.7版本,contextlib 中會新增一個 asynccontextmanager 裝飾器來包裝一個實現異步協議的上下文管理器:

    from contextlib import asynccontextmanager@asynccontextmanager async def get_connection():conn = await acquire_db_connection()try:yieldfinally:await release_db_connection(conn)

    async 修飾符也能用在 __call__ 方法上:

    class GameContext:async def __aenter__(self):self._started = time()print('game loading...')await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def __call__(self, *args, **kws):if args[0] == 'time':return time() - self._startedasync def game():async with GameContext() as ctx:print('game start...')await asyncio.sleep(2)print('game time: ', await ctx('time'))

    asyncio

    asyncio?是 Python 3.4 版本引入的標準庫,直接內置了對 異步 IO 的支持。

    asyncio 官方只實現了比較底層的協議,比如TCP,UDP。所以諸如 HTTP 協議之類都需要借助第三方庫,比如 aiohttp

    雖然異步編程的生態不夠同步編程的生態那么強大,但是如果有高并發的需求不妨試試,下面說一下比較成熟的異步庫

    aiohttp:異步 http client/server框架。github地址: https://github.com/aio-libs/aiohttp
    sanic:速度更快的類 flask web框架。github地址:https://github.com/channelcat/sanic
    uvloop 快速,內嵌于 asyncio 事件循環的庫,使用 cython 基于 libuv 實現。github地址: https://github.com/MagicStack/uvloop

    asyncio?的編程模型就是一個 消息循環我們從?asyncio?模塊中直接獲取一個?EventLoop?的引用,然后把需要執行的協程扔到?EventLoop?中執行,就實現了 異步IO

    python 用asyncio?模塊實現異步編程,該模塊最大特點就是,只存在一個線程

    由于只有一個線程,就不可能多個任務同時運行。asyncio 是 "多任務合作" 模式(cooperative multitasking),允許異步任務交出執行權給其他任務,等到其他任務完成,再收回執行權繼續往下執行

    asyncio 模塊在單線程上啟動一個事件循環(event loop),時刻監聽新進入循環的事件,加以處理,并不斷重復這個過程,直到異步任務結束。

    什么是事件循環?

    單線程就意味著所有的任務需要在單線程上排隊執行,也就是前一個任務沒有執行完成,后一個任務就沒有辦法執行。在CPU密集型的任務之中,這樣其實還行,但是如果我們的任務都是IO密集型的呢?也就是我們大部分的任務都是在等待網絡的數據返回,等待磁盤文件的數據,這就會造成CPU一直在等待這些任務的完成再去執行下一個任務。

    有沒有什么辦法能夠讓單線程的任務執行不這么笨呢?其實我們可以將這些需要等待IO設備的任務掛在一邊嘛!這時候,如果我們的任務都是需要等待的任務,那么單線程在執行時遇到一個就把它掛起來,這里可以通過一個數據結構(例如隊列)將這些處于執行等待狀態的任務放進去,為什么是執行等待狀態呢?因為它們正在執行但是又不得不等待例如網絡數據的返回等等。直到將所有的任務都放進去之后,單線程就可以開始它的接連不斷的表演了:有沒有任務完成的小伙伴呀!快來我這里執行!

    此時如果有某個任務完成了,它會得到結果,于是發出一個信號:我完成了。那邊還在循環追問的單線程終于得到了答復,就會去看看這個任務有沒有綁定什么回調函數呀?如果綁定了回調函數就進去把回調函數給執行了,如果沒有,就將它所在的任務恢復執行,并將結果返回。

    asyncio 就是一個 協程庫

    • (1)事件循環 (event loop)。事件循環需要實現兩個功能,一是順序執行協程代碼;二是完成協程的調度,即一個協程“暫停”時,決定接下來執行哪個協程。
    • (2)協程上下文的切換。基本上Python 生成器的 yeild 已經能完成切換,Python3中還有特定語法支持協程切換。

    Python 的異步IO:API

    官方文檔:https://docs.python.org/zh-cn/3/library/asyncio.html

    Python 的 asyncio 是使用 async/await 語法編寫并發代碼的標準庫。Python3.7 這個版本,asyncio又做了比較大的調整,把這個庫的 API 分為了 高層級API低層級API,并引入asyncio.run() 這樣的高級方法,讓編寫異步程序更加簡潔。

    這里先從全局認識 Python 這個異步IO庫。

    asyncio 的 高層級 API 主要提高如下幾個方面:

    • 并發地運行Python協程并完全控制其執行過程;
    • 執行網絡IO和IPC;
    • 控制子進程;
    • 通過隊列實現分布式任務;
    • 同步并發代碼。

    asyncio 的 低層級API 用以支持開發異步庫和框架:

    • 創建和管理事件循環(event loop),提供異步的API用于網絡,運行子進程,處理操作系統信號等;
    • 通過 transports 實現高效率協議;
    • 通過 async/await? 語法橋架基于回調的庫和代碼。

    asyncio 高級 API

    高層級API讓我們更方便的編寫基于asyncio的應用程序。這些API包括:

    (1)協程和任務

    協程通過 async/await 語法進行聲明,是編寫異步應用的推薦方式。歷史的?@asyncio.coroutine?和?yield from?已經被棄用,并計劃在Python 3.10中移除。協程可以通過?asyncio.run(coro, *, debug=False)?函數運行,該函數負責管理事件循環并完結異步生成器。它應該被用作asyncio程序的主入口點,相當于main函數,應該只被調用一次。

    任務被用于并發調度協程,可用于網絡爬蟲的并發。使用?asyncio.create_task()?就可以把一個協程打包為一個任務,該協程會自動安排為很快運行。

    協程,任務和Future都是可等待對象。其中,Future是低層級的可等待對象,表示一個異步操作的最終結果。

    (2)流

    流是用于網絡連接的高層級的使用 async/await的原語。流允許在不使用回調或低層級協議和傳輸的情況下發送和接收數據。異步讀寫TCP有客戶端函數?asyncio.open_connection()?和 服務端函數?asyncio.start_server()?。它還支持 Unix Sockets:?asyncio.open_unix_connection()?和?asyncio.start_unix_server()。

    (3)同步原語

    asyncio同步原語的設計類似于threading模塊的原語,有兩個重要的注意事項:
    asyncio原語不是線程安全的,因此它們不應該用于OS線程同步(而是用threading)
    這些同步原語的方法不接受超時參數; 使用asyncio.wait_for()函數執行超時操作。
    asyncio具有以下基本同步原語:

    • Lock
    • Event
    • Condition
    • Semaphore
    • BoundedSemaphore

    (4)子進程

    asyncio提供了通過 async/await 創建和管理子進程的API。不同于Python標準庫的subprocess,asyncio的子進程函數都是異步的,并且提供了多種工具來處理這些函數,這就很容易并行執行和監視多個子進程。創建子進程的方法主要有兩個:

    coroutine asyncio.create_subprocess_exec()
    coroutine asyncio.create_subprocess_shell()

    (5)隊列

    asyncio 隊列的設計類似于標準模塊queue的類。雖然asyncio隊列不是線程安全的,但它們被設計為專門用于 async/await 代碼。需要注意的是,asyncio隊列的方法沒有超時參數,使用?asyncio.wait_for()函數進行超時的隊列操作。
    因為和標注模塊queue的類設計相似,使用起來跟queue無太多差異,只需要在對應的函數前面加 await 即可。asyncio 隊列提供了三種不同的隊列:

    • class asyncio.Queue 先進先出隊列
    • class asyncio.PriorityQueue 優先隊列
    • class asyncio.LifoQueue 后進先出隊列

    (6)異常

    asyncio提供了幾種異常,它們是:

    • TimeoutError,
    • CancelledError,
    • InvalidStateError,
    • SendfileNotAvailableError
    • IncompleteReadError
    • LimitOverrunError

    asyncio低級API

    低層級API為編寫基于asyncio的庫和框架提供支持,有意編寫異步庫和框架的大牛們需要熟悉這些低層級API。主要包括:

    (1)事件循環

    事件循環是每個asyncio應用程序的核心。 事件循環運行異步任務和回調,執行網絡IO操作以及運行子進程。

    應用程序開發人員通常應該使用高級asyncio函數,例如asyncio.run(),并且很少需要引用循環對象或調用其方法。

    Python 3.7 新增了?asyncio.get_running_loop()函數。

    (2)Futures

    Future對象用于將基于低層級回調的代碼與高層級的 async/await 代碼進行橋接。
    Future表示異步操作的最終結果。 不是線程安全的。
    Future是一個可等待對象。 協程可以等待Future對象,直到它們有結果或異常集,或者直到它們被取消。
    通常,Futures用于啟用基于低層級回調的代碼(例如,在使用asyncio傳輸實現的協議中)以與高層級 async/await 代碼進行互操作。

    (3)傳輸和協議(Transports和Protocols)

    Transport 和 Protocol由低層級事件循環使用,比如函數loop.create_connection()。它們使用基于回調的編程風格,并支持網絡或IPC協議(如HTTP)的高性能實現。

    在最高級別,傳輸涉及字節的傳輸方式,而協議確定要傳輸哪些字節(在某種程度上何時傳輸)。

    換種方式說就是:傳輸是套接字(或類似的I/O端點)的抽象,而協議是從傳輸的角度來看的應用程序的抽象。

    另一種觀點是傳輸和協議接口共同定義了一個使用網絡I/O和進程間I/O的抽象接口。

    傳輸和協議對象之間始終存在1:1的關系:協議調用傳輸方法來發送數據,而傳輸調用協議方法來傳遞已接收的數據。

    大多數面向連接的事件循環方法(例如loop.create_connection())通常接受protocol_factory參數,該參數用于為接受的連接創建Protocol對象,由Transport對象表示。 這些方法通常返回(傳輸,協議)元組。

    (4)策略(Policy)

    事件循環策略是一個全局的按進程劃分的對象,用于控制事件循環的管理。 每個事件循環都有一個默認策略,可以使用策略API對其進行更改和自定義。

    策略定義了上下文的概念,并根據上下文管理單獨的事件循環。 默認策略將上下文定義為當前線程。

    通過使用自定義事件循環策略,可以自定義get_event_loop(),set_event_loop()和new_event_loop()函數的行為。

    (5)平臺支持

    asyncio模塊設計為可移植的,但由于平臺的底層架構和功能,某些平臺存在細微的差異和限制。在Windows平臺,有些是不支持的,比如?loop.create_unix_connection()?and?loop.create_unix_server()。而Linux和比較新的macOS全部支持。

    總結

    Python 3.7 通過對 asyncio 分組使得它的架構更加清晰,普通寫異步IO的應用程序只需熟悉高層級API,需要寫異步IO的庫和框架時才需要理解低層級的API。

    生產者 --- 消費者

    Python 分布與并行 asyncio實現生產者消費者模型:https://blog.csdn.net/weixin_43594279/article/details/111243453

    示例 1:

    # coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [# asyncio.create_task(consumer(i, q)) for i in range(num_consumers)asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)]# produce_list = [asyncio.create_task(producer(q, num_consumers))]produce_list = [asyncio.ensure_future(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass

    示例 2:

    Python 的異步IO編程例子

    以 Python 3.7 上的 asyncio 為例講解如何使用 Python 的異步 IO。

    創建第一個協程

    Python 3.7 推薦使用 async/await 語法來聲明協程,來編寫異步應用程序。我們來創建第一個協程函數:首先打印一行“你好”,等待1秒鐘后再打印 "大家同好"。

    import asyncioasync def say_hi():print('你好')await asyncio.sleep(1)print('大家同好')asyncio.run(say_hi())""" 你好 大家同好 """

    say_hi() 函數通過 async 聲明為協程函數,較之前的修飾器聲明更簡潔明了。

    在實踐過程中,什么功能的函數要用 async 聲明為協程函數呢?就是那些能發揮異步IO性能的函數,比如讀寫文件、讀寫網絡、讀寫數據庫,這些都是浪費時間的IO操作,把它們協程化、異步化從而提高程序的整體效率(速度)。

    say_hi() 函數是通過?asyncio.run()來運行的,而不是直接調用這個函數(協程)。因為,直接調用并不會把它加入調度日程,而只是簡單的返回一個協程對象:

    print(say_hi()) # <coroutine object say_hi at 0x000001264DB3FCC0>

    真正運行一個協程

    那么,如何真正運行一個協程呢?

    asyncio 提供了三種機制:

    • (1)asyncio.run() 函數,這是異步程序的主入口,相當于C語言中的main函數。
    • (2)用 await 等待協程,比如上例中的?await asyncio.sleep(1)?。

    再看下面的例子,我們定義了協程?say_delay()?,在 main() 協程中調用兩次,第一次延遲1秒后打印“你好”,第二次延遲2秒后打印 "大家同好"。這樣我們通過 await 運行了兩個協程。

    import asyncio import datetimeasync def say_delay(msg=None, delay=None):await asyncio.sleep(delay)print(msg)async def main():print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')await say_delay('你好', 2)await say_delay('大家同好', 1)print(f'end at {datetime.datetime.now().replace(microsecond=0)}')asyncio.run(main())''' begin at 2020-12-19 00:55:01 你好 大家同好 end at 2020-12-19 00:55:04 '''

    從起止時間可以看出,兩個協程是順序執行的,總共耗時1+2=3秒。

    • (3)通過?asyncio.create_task()?函數并發運行作為 asyncio 任務(Task) 的多個協程。下面,我們用 create_task() 來修改上面的 main() 協程,從而讓兩個 say_delay() 協程并發運行:
    import asyncio import datetimeasync def say_delay(msg=None, delay=None):await asyncio.sleep(delay)print(msg)async def main():task_1 = asyncio.create_task(say_delay('你好', 2))task_2 = asyncio.create_task(say_delay('大家同好', 1))print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')await task_1await task_2print(f'end at {datetime.datetime.now().replace(microsecond=0)}')asyncio.run(main())''' begin at 2020-12-19 00:58:20 大家同好 你好 end at 2020-12-19 00:58:22 '''

    從運行結果的起止時間可以看出,兩個協程是并發執行的了,總耗時等于最大耗時2秒。

    asyncio.create_task()?是一個很有用的函數,在爬蟲中它可以幫助我們實現大量并發去下載網頁。在 Python 3.6中與它對應的是?ensure_future()

    生產者 --- 消費者

    示例 代碼:

    # coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [asyncio.create_task(consumer(i, q)) for i in range(num_consumers)]produce_list = [asyncio.create_task(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass

    可等待對象(awaitables)

    可等待對象,就是可以在 await 表達式中使用的對象,前面我們已經接觸了兩種可等待對象的類型:協程任務,還有一個是低層級的 Future

    asyncio 模塊的許多 API 都需要傳入可等待對象,比如 run(), create_task() 等等。

    (1)協程

    協程是可等待對象,可以在其它協程中被等待。協程兩個緊密相關的概念是:

    • 協程函數:通過 async def 定義的函數;
    • 協程對象:調用協程函數返回的對象。

    運行上面這段程序,結果為:

    co is now is 1548512708.2026224 now is 1548512708.202648

    可以看到,直接運行協程函數 whattime()得到的co是一個協程對象,因為協程對象是可等待的,所以通過 await 得到真正的當前時間。now2是直接await 協程函數,也得到了當前時間的返回值。

    (2)任務

    前面我們講到,任務是用來調度協程的,以便并發執行協程。當一個協程通過?asyncio.create_task()?被打包為一個 任務,該協程將自動加入調度隊列中,但是還未執行

    create_task() 的基本使用前面例子已經講過。它返回的 task 通過 await 來等待其運行完。如果,我們不等待,會發生什么?“準備立即運行”又該如何理解呢?先看看下面這個例子:

    運行這段代碼的情況是這樣的:首先,1秒鐘后打印一行,這是第13,14行代碼運行的結果:

    calling:0, now is 09:15:15

    接著,停頓1秒后,連續打印4行:

    calling:1, now is 09:15:16 calling:2, now is 09:15:16 calling:3, now is 09:15:16 calling:4, now is 09:15:16

    從這個結果看,asyncio.create_task()產生的4個任務,我們并沒有?await,它們也執行了。關鍵在于第18行的?await,如果把這一行去掉或是 sleep 的時間小于1秒(比whattime()里面的sleep時間少即可),就會只看到第一行的輸出結果而看不到后面四行的輸出。這是因為,main() 不 sleep 或 sleep 少于1秒鐘,main() 就在 whattime() 還未來得及打印結果(因為,它要sleep 1秒)就退出了,從而整個程序也退出了,就沒有 whattime() 的輸出結果。

    再來理解一下?“準備立即執行”?這個說法。它的意思就是,create_task() 只是打包了協程并加入調度隊列還未執行,并準備立即執行,什么時候執行呢?在 “主協程”(調用create_task()的協程)掛起的時候,這里的“掛起”有兩個方式:

    • 一是,通過 await task 來執行這個任務;
    • 另一個是,主協程通過 await sleep 掛起,事件循環就去執行task了。

    我們知道,asyncio 是通過事件循環實現異步的。在主協程 main()里面,沒有遇到 await 時,事件就是執行 main() 函數,遇到 await 時,事件循環就去執行別的協程,即 create_task() 生成的 whattime()的4個任務,這些任務一開始就是 await sleep 1秒。這時候,主協程和4個任務協程都掛起了,CPU空閑,事件循環等待協程的消息。

    如果 main() 協程只 sleep了 0.1秒,它就先醒了,給事件循環發消息,事件循環就來繼續執行 main() 協程,而 main() 后面已經沒有代碼,就退出該協程,退出它也就意味著整個程序退出,4個任務就沒機會打印結果;

    如果 main()協程sleep時間多余1秒,那么4個任務先喚醒,就會得到全部的打印結果;

    如果main()的18行sleep等于1秒時,和4個任務的sleep時間相同,也會得到全部打印結果。這是為什么呢?

    我猜想是這樣的:4個任務生成在前,第18行的sleep在后,事件循環的消息響應可能有個先進先出的順序。后面深入asyncio的代碼專門研究一下這個猜想正確與否。

    示例:

    # -*- coding: utf-8 -*-""" @File : aio_test.py @Author : XXX @Time : 2020/12/25 23:54 """import asyncio import datetimeasync def hi(msg=None, sec=None):print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(sec)print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')return secasync def main_1():print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)for tsk in tasks:ret_val = await tskprint(f'ret_val:{ret_val}')print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_2():# ***** 注意:main_2 中睡眠了2秒,導致睡眠時間大于2秒的協程沒有執行完成 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)await asyncio.sleep(2)print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_3():# ***** 注意:main_3方法并沒有實現并發執行,只是順序執行 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))await tskprint(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')print('*' * 50) asyncio.run(main_1()) print('*' * 50) asyncio.run(main_2()) print('*' * 50) asyncio.run(main_3()) print('*' * 50)

    (3)Future

    它是一個低層級的可等待對象,表示一個異步操作的最終結果。目前,我們寫應用程序還用不到它,暫不學習。

    asyncio異步IO協程總結

    協程就是我們異步操作的片段。通常,寫程序都會把全部功能分成很多不同功能的函數,目的是為了結構清晰;進一步,把那些涉及耗費時間的IO操作(讀寫文件、數據庫、網絡)的函數通過 async def 異步化,就是異步編程。

    那些異步函數(協程函數)都是通過消息機制被事件循環管理調度著,整個程序的執行是單線程的,但是某個協程A進行IO時,事件循環就去執行其它協程非IO的代碼。當事件循環收到協程A結束IO的消息時,就又回來執行協程A,這樣事件循環不斷在協程之間轉換,充分利用了IO的閑置時間,從而并發的進行多個IO操作,這就是異步IO。

    寫異步IO程序時記住一個準則:需要IO的地方異步。其它地方即使用了協程函數也是沒用的。

    不 使用 asyncio 的 消息循環 讓協程運行

    先看下 不使用? asyncio 的消息循環 怎么 調用 協程,讓協程 運行:

    async def func_1():print("func_1 start")print("func_1 end")async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)try:print('f_1.send')f_1.send(None) except StopIteration as e:# 這里也是需要去捕獲StopIteration方法passtry:print('f_2.send')f_2.send(None) except StopIteration as e:pass

    運行結果:

    <coroutine object func_1 at 0x0000020121A07C40> <coroutine object func_2 at 0x0000020121B703C0> f_1.send func_1 start func_1 end f_2.send func_2 start func_2 a func_2 b func_2 c func_2 end

    示例代碼2:

    async def test(x):return x * 2print(test(100))try:# 既然是協程,我們像之前yield協程那樣test(100).send(None) except BaseException as e:print(type(e))ret_val = e.valueprint(ret_val)

    示例代碼3:

    def simple_coroutine():print('-> start')x = yieldprint('-> recived', x)sc = simple_coroutine()next(sc)try:sc.send('zhexiao') except BaseException as e:print(e)

    對上述例子的分析:yield 的右邊沒有表達式,所以這里默認產出的值是None。剛開始先調用了next(...)是因為這個時候生成器還沒有啟動,沒有停在yield那里,這個時候也是無法通過send發送數據。所以當我們通過 next(...)激活協程后 ,程序就會運行到x = yield,這里有個問題我們需要注意, x = yield這個表達式的計算過程是先計算等號右邊的內容,然后在進行賦值,所以當激活生成器后,程序會停在yield這里,但并沒有給x賦值。當我們調用 send 方法后 yield 會收到這個值并賦值給 x,而當程序運行到協程定義體的末尾時和用生成器的時候一樣會拋出StopIteration異常

    如果協程沒有通過 next(...) 激活(同樣我們可以通過send(None)的方式激活),但是我們直接send,會提示如下錯誤:

    最先調用 next(sc) 函數這一步通常稱為“預激”(prime)協程 (即,讓協程執行到第一個 yield 表達式,準備好作為活躍的協程使用)。

    協程在運行過程中有四個狀態:

  • GEN_CREATE: 等待開始執行

  • GEN_RUNNING: 解釋器正在執行,這個狀態一般看不到

  • GEN_SUSPENDED: 在yield表達式處暫停

  • GEN_CLOSED: 執行結束

  • 通過下面例子來查看協程的狀態:

    示例代碼4:(使用協程計算移動平均值)

    def averager():total = 0.0count = 0avg = Nonewhile True:num = yield avgtotal += numcount += 1avg = total / count# run ag = averager() # 預激協程 print(next(ag)) # Noneprint(ag.send(10)) # 10 print(ag.send(20)) # 15

    這里是一個死循環,只要不停 send 值 給 協程,可以一直計算下去。

    解釋:

    • 1. 調用 next(ag) 函數后,協程會向前執行到 yield 表達式,產出 average 變量的初始值 None。
    • 2. 此時,協程在 yield 表達式處暫停。
    • 3. 使用 send() 激活協程,把發送的值賦給 num,并計算出 avg 的值。
    • 4. 使用 print 打印出 yield 返回的數據。

    單步 調試 上面程序。

    使用 asyncio 的 消息循環 讓協程運行

    使用 asyncio 異步 IO 調用 協程

    示例代碼 1:

    import asyncioasync def func_1():print("func_1 start")print("func_1 end")# await asyncio.sleep(1)async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")# await asyncio.sleep(1)f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)# 獲取 EventLoop: loop = asyncio.get_event_loop() tasks = [func_1(), func_2()]# 執行 coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    示例代碼 2:

    import asyncio import timestart = time.time()def tic():return 'at %1.1f seconds' % (time.time() - start)async def gr1():# Busy waits for a second, but we don't want to stick around...print('gr1 started work: {}'.format(tic()))# 暫停兩秒,但不阻塞時間循環,下同await asyncio.sleep(2)print('gr1 ended work: {}'.format(tic()))async def gr2():# Busy waits for a second, but we don't want to stick around...print('gr2 started work: {}'.format(tic()))await asyncio.sleep(2)print('gr2 Ended work: {}'.format(tic()))async def gr3():print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))await asyncio.sleep(1)print("Done!")# 事件循環 ioloop = asyncio.get_event_loop()# tasks中也可以使用 asyncio.ensure_future(gr1()).. tasks = [ioloop.create_task(gr1()),ioloop.create_task(gr2()),ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close()""" 結果: gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Let's do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr2 Ended work: at 2.0 seconds gr1 ended work: at 2.0 seconds """

    多個?coroutine?可以封裝成一組 Task 然后并發執行。

    • asyncio.wait(...) 協程的參數是一個由 future 或 協程 構成的可迭代對象;wait 會分別
      把各個協程包裝進一個 Task 對象。最終的結果是,wait 處理的所有對象都通過某種方式變成 Future 類的實例。wait 是協程函數,因此返回的是一個協程或生成器對象。

    • ioloop.run_until_complete 方法的參數是一個 future 或 協程。如果是協程,run_until_complete方法與 wait 函數一樣,把協程包裝進一個 Task 對象中。

    • 在 asyncio 包中,future和協程關系緊密,因為可以使用 yield from 從 asyncio.Future 對象中產出結果。這意味著,如果 foo 是協程函數(調用后返回協程對象),抑或是返回Future 或 Task 實例的普通函數,那么可以這樣寫:res = yield from foo()。這是 asyncio 包的 API 中很多地方可以互換協程與期物的原因之一。 例如上面的例子中 tasks 可以改寫成協程列表:tasks = [gr1(), gr(2), gr(3)]

    詳細的各個類說明,類方法,傳參,以及方法返回的是什么類型都可以在官方文檔上仔細研讀,多讀幾遍,方有體會。

    示例代碼 3:

    import asyncio import time import aiohttp import async_timeoutmsg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html" headers = {'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6' }urls = [msg.format(i) for i in range(5400, 5500)]async def fetch(session, url):with async_timeout.timeout(10):async with session.get(url) as response:return response.statusasync def main(url):async with aiohttp.ClientSession() as session:status = await fetch(session, url)return statusif __name__ == '__main__':start = time.time()loop = asyncio.get_event_loop()tasks = [main(url) for url in urls]# 返回一個列表,內容為各個tasks的返回值status_list = loop.run_until_complete(asyncio.gather(*tasks))print(len([status for status in status_list if status == 200]))end = time.time()print("cost time:", end - start)

    示例代碼 4:

    用?asyncio?實現??Hello world?代碼如下:

    import asyncio@asyncio.coroutine def hello():print("Hello world!")# 異步調用 asyncio.sleep(1):r = yield from asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執行 coroutine loop.run_until_complete(hello()) loop.close()

    或者直接使用新語法?asyncawait

    import asyncioasync def hello():print("Hello world!")# 異步調用 asyncio.sleep(1):r = await asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執行 coroutine loop.run_until_complete(hello()) loop.close()

    @asyncio.coroutine?把一個 generator 標記為 coroutine類型,然后,我們就把這個?coroutine?扔到?EventLoop?中執行。

    hello()?會首先打印出?Hello world!,然后,yield from?語法可以讓我們方便地調用另一個?generator。由于asyncio.sleep()?也是一個?coroutine,所以線程不會等待?asyncio.sleep(),而是直接中斷并執行下一個消息循環。當asyncio.sleep()?返回時,線程就可以從?yield from?拿到返回值(此處是None),然后接著執行下一行語句。

    把?asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執行?EventLoop?中其他可以執行的coroutine了,因此可以實現并發執行。

    我們用 Task 封裝兩個?coroutine?試試:

    import threading import asyncioasync def hello():print('1 : Hello world! (%s)' % threading.currentThread())await asyncio.sleep(5)print('2 : Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    觀察執行過程:

    1 : Hello world! (<_MainThread(MainThread, started 12200)>) 1 : Hello world! (<_MainThread(MainThread, started 12200)>) ( 暫停約 5 秒 ) 2 : Hello again! (<_MainThread(MainThread, started 12200)>) 2 : Hello again! (<_MainThread(MainThread, started 12200)>)

    由打印的當前線程名稱可以看出,兩個?coroutine?是由同一個線程并發執行的。

    如果把?asyncio.sleep()?換成真正的IO操作,則多個?coroutine?就可以由一個線程并發執行。

    我們用?asyncio?的異步網絡連接來獲取 sina、sohu 和 163 的網站首頁:

    import asyncioasync def wget(host):print('wget %s...' % host)connect = asyncio.open_connection(host, 80)reader, writer = await connectheader = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % hostwriter.write(header.encode('utf-8'))await writer.drain()while True:line = await reader.readline()if line == b'\r\n':breakprint('%s header > %s' % (host, line.decode('utf-8').rstrip()))# Ignore the body, close the socketwriter.close()loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    執行結果如下:

    wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0

    可見 3 個連接 由一個線程通過?coroutine?并發完成。

    參考源碼:

    async_hello.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_hello.py
    async_wget.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_wget.py

    示例代碼 5: ( 協程 的 返回值

    一個協程里可以啟動另外一個協程,并等待它完成返回結果,采用 await 關鍵字

    import asyncioasync def outer():print('in outer')print('waiting for result1')result1 = await phase1()print('waiting for result2')result2 = await phase2(result1)return (result1, result2)async def phase1():print('in phase1')return 'result1'async def phase2(arg):print('in phase2')return 'result2 derived from {}'.format(arg)event_loop = asyncio.get_event_loop() try:return_value = event_loop.run_until_complete(outer())print('return value: {!r}'.format(return_value)) finally:event_loop.close()

    運行結果:

    in outer waiting for result1 in phase1 waiting for result2 in phase2 return value: ('result1', 'result2 derived from result1')

    前面都是關于 asyncio 的例子,那么除了asyncio,就沒有其他協程庫了嗎?asyncio 作為 python 的標準庫,自然受到很多青睞,但它有時候還是顯得太重量了,尤其是提供了許多復雜的輪子和協議,不便于使用。

    你可以理解為,asyncio 是使用 async/await 語法開發的 協程庫,而不是有 asyncio 才能用 async/await,
    除了 asyncio 之外,curio 和 trio 是更加輕量級的替代物,而且也更容易使用。

    curio 的作者是 David Beazley,下面是使用 curio 創建 tcp server 的例子,據說這是 dabeaz 理想中的一個異步服務器的樣子:

    from curio import run, spawn from curio.socket import *async def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)print('Server listening at', address)async with sock:while True:client, addr = await sock.accept()await spawn(echo_client, client, addr)async def echo_client(client, addr):print('Connection from', addr)async with client:while True:data = await client.recv(100000)if not data:breakawait client.sendall(data)print('Connection closed')if __name__ == '__main__':run(echo_server, ('',25000))

    無論是 asyncio 還是 curio,或者是其他異步協程庫,在背后往往都會借助于 IO的事件循環來實現異步,下面用幾十行代碼來展示一個簡陋的基于事件驅動的echo服務器:

    from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from selectors import DefaultSelector, EVENT_READselector = DefaultSelector() pool = {}def request(client_socket, addr):client_socket, addr = client_socket, addrdef handle_request(key, mask):data = client_socket.recv(100000)if not data:client_socket.close()selector.unregister(client_socket)del pool[addr]else:client_socket.sendall(data)return handle_requestdef recv_client(key, mask):sock = key.fileobjclient_socket, addr = sock.accept()req = request(client_socket, addr)pool[addr] = reqselector.register(client_socket, EVENT_READ, req)def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)selector.register(sock, EVENT_READ, recv_client)try:while True:events = selector.select()for key, mask in events:callback = key.datacallback(key, mask)except KeyboardInterrupt:sock.close()if __name__ == '__main__':echo_server(('',25000))

    驗證一下:

    # terminal 1 $ nc localhost 25000 hello world hello world# terminal 2 $ nc localhost 25000 hello world hello world

    現在知道:

    • 完成 異步的代碼 不一定要用 async/await ,使用了 async/await 的代碼也不一定能做到異步,
    • async/await 是協程的語法糖,使協程之間的調用變得更加清晰,
    • 使用 async 修飾的函數調用時會返回一個協程對象,
    • await 只能放在 async 修飾的函數里面使用,await 后面必須要跟著一個 協程對象Awaitable
    • await 的目的是等待協程控制流的返回而實現暫停并掛起函數的操作是yield。

    async/await 以及 協程 是Python未來實現異步編程的趨勢,我們將會在更多的地方看到他們的身影,例如協程庫的 curio 和 trio,web 框架的 sanic,數據庫驅動的 asyncpg 等等。在Python 3主導的今天,作為開發者,應該及時擁抱和適應新的變化,而基于async/await的協程憑借良好的可讀性和易用性日漸登上舞臺,看到這里,你還不趕緊上車?

    Python 模塊 asyncio – 協程之間的同步

    Python 模塊 asyncio – 協程之間的同步:https://www.quxihuan.com/posts/python-module-asyncio-synchronization/

    await yield from

    Python3.3 的 yield from 語法可以把生成器的操作委托給另一個生成器,生成器的調用方可以直接與子生成器進行通信:

    def sub_gen():yield 1yield 2yield 3def gen():return (yield from sub_gen())def main():for val in gen():print(val) # 1 # 2 # 3

    利用這一特性,使用 yield from 能夠編寫出類似協程效果的函數調用,在3.5之前,asyncio 正是使用@asyncio.coroutine 和 yield from 語法來創建協程:https://docs.python.org/3.4/library/asyncio-task.html

    @asyncio.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + y@asyncio.coroutine def print_sum(x, y):result = yield from compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    然而,用 yield from 容易在表示協程和生成器中混淆,沒有良好的語義性,所以在 Python 3.5 推出了更新的 async/await 表達式來作為協程的語法。

    因此類似以下的調用是等價的:

    async with lock:...with (yield from lock):... ###################### def main():return (yield from coro())def main():return (await coro())

    那么,怎么把生成器包裝為一個協程對象呢?這時候可以用到 types 包中的 coroutine 裝飾器(如果使用asyncio做驅動的話,那么也可以使用 asyncio 的 coroutine 裝飾器),@types.coroutine裝飾器會將一個生成器函數包裝為協程對象:

    import asyncio import types@types.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    盡管兩個函數分別使用了新舊語法,但他們都是協程對象,也分別稱作?native coroutine?以及?generator-based coroutine,因此不用擔心語法問題。

    下面觀察一個 asyncio 中 Future 的例子:

    import asynciofuture = asyncio.Future()async def test_1():await asyncio.sleep(1)future.set_result('data')async def test_2():print(await future)loop = asyncio.get_event_loop() tasks_list = [test_1(), test_2()] loop.run_until_complete(asyncio.wait(tasks_list)) loop.close()

    兩個協程在事件循環中,協程?test_1 在執行第一句后掛起自身切到 asyncio.sleep,而協程 test_2 一直等待 future 的結果,讓出事件循環,計時器結束后 test_1 執行第二句并設置了 future 的值,被掛起的 test_2 恢復執行,打印出 future 的結果 'data' 。

    future 可以被 await 證明了 future 對象是一個 Awaitable,進入 Future 類的源碼可以看到有一段代碼顯示了 future 實現了__await__ 協議:

    class Future:...def __iter__(self):if not self.done():self._asyncio_future_blocking = Trueyield self # This tells Task to wait for completion.assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.if compat.PY35:__await__ = __iter__ # make compatible with 'await' expression

    當執行?await future?這行代碼時,future中的這段代碼就會被執行,首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的 Task(任務)等待 future 完成。

    當 future 執行 set_result 方法時,會觸發以下的代碼,設置結果,標記 future 已經完成:

    def set_result(self, result):...if self._state != _PENDING:raise InvalidStateError('{}: {!r}'.format(self._state, self))self._result = resultself._state = _FINISHEDself._schedule_callbacks()

    最后 future 會調度自身的回調函數,觸發 Task._step() 告知 Task 驅動 future 從之前掛起的點恢復執行,不難看出,future 會執行下面的代碼:

    class Future:...def __iter__(self):...assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.

    最終返回結果給調用方。

    Yield from

    調用協程 的方式有有很多,yield from 就是其中的一種。這種方式在 Python3.3 中被引入,在 Python3.5 中以 async/await 的形式進行了優化。yield from 表達式的使用方式如下:

    import asyncio@asyncio.coroutine def get_json(client, url): file_content = yield from load_file('/Users/scott/data.txt')

    正如所看到的,yield from 被使用在用 @asyncio.coroutine 裝飾的函數內,如果想把 yield from 在這個函數外使用,將會拋出如下語法錯誤:

    File "main.py", line 1file_content = yield from load_file('/Users/scott/data.txt')^ SyntaxError: 'yield' outside function

    為了避免語法錯誤,yield from 必須在調用函數的內部使用(這個調用函數通常被裝飾為協程)。

    Async / await

    較新的語法是使用 async/await 關鍵字。 async 從 Python3.5 開始被引進,跟 @asyncio.coroutine 裝飾器一樣,用來聲明一個函數是一個協程。只要把它放在函數定義之前,就可以應用到函數上,使用方式如下:

    async def ping_server(ip):# ping code here...

    實際調用這個函數時,使用 await 而不用 yield from ,當然,使用方式依然差不多:

    async def ping_local(ip):return await ping_server('192.168.1.1')

    再強調一遍,跟 yield from 一樣,不能在函數外部使用 await ,否則會拋出語法錯誤。 (譯者注: async 用來聲明一個函數是協程,然后使用 await調用這個協程, await 必須在函數內部,這個函數通常也被聲明為另一個協程)

    Python3.5 對這兩種調用協程的方法都提供了支持,但是推薦 async/await 作為首選。

    Event Loop

    如果你還不知道如何開始和操作一個 Eventloop ,那么上面有關協程所說的都起不了多大作用。 Eventloop 在執行異步函數時非常重要,重要到只要執行協程,基本上就得利用 Eventloop 。

    Eventloop 提供了相當多的功能:

    • 注冊,執行 和 取消 延遲調用(異步函數)
    • 創建 客戶端 與 服務端 傳輸用于通信
    • 創建 子程序 和 通道 跟 其他的程序 進行通信
    • 指定 函數 的 調用 到 線程池

    Eventloop 有相當多的配置和類型可供使用,但大部分程序只需要如下方式預定函數即可:

    import asyncioasync def speak_async(): print('OMG asynchronicity!')loop = asyncio.get_event_loop() loop.run_until_complete(speak_async()) loop.close()

    有意思的是代碼中的最后三行,首先獲取默認的 Eventloop ( asyncio.get_event_loop() ),然后預定和運行異步任務,并在完成后結束循環。

    loop.run_until_complete() 函數實際上是阻塞性的,也就是在所有異步方法完成之前,它是不會返回的。但因為我們只在一個線程中運行這段代碼,它沒法再進一步擴展,即使循環仍在運行。

    可能你現在還沒覺得這有多大的用處,因為我們通過調用其他 IO 來結束 Eventloop 中的阻塞(譯者注:也就是在阻塞時進行其他 IO ),但是想象一下,如果在網頁服務器上,把整個程序都封裝在異步函數內,到時就可以同時運行多個異步請求了。

    也可以將 Eventloop 的線程中斷,利用它去處理所有耗時較長的 IO 請求,而主線程處理程序邏輯或者用戶界面。

    一個案例

    讓我們實際操作一個稍大的案例。下面這段代碼就是一個非常漂亮的異步程序,它先從 Reddit 抓取 JSON 數據,解析它,然后打印出當天來自 /r/python,/r/programming 和 /r/compsci 的置頂帖。

    所示的第一個方法 get_json() ,由 get_reddit_top() 調用,然后只創建一個 GET 請求到適當的網址。當這個方法和 await 一起調用后, Eventloop 便能夠繼續為其他的協程服務,同時等待 HTTP 響應達到。一旦響應完成, JSON 數據就返回到 get_reddit_top() ,得到解析并打印出來。

    import signal import sys import asyncio import aiohttp import jsonloop = asyncio.get_event_loop() client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):async with client.get(url) as response:assert response.status == 200return await response.read()async def get_reddit_top(subreddit, client):data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')j = json.loads(data1.decode('utf-8'))for i in j['data']['children']:score = i['data']['score']title = i['data']['title']link = i['data']['url']print(str(score) + ': ' + title + ' (' + link + ')')print('DONE:', subreddit + '\n')def signal_handler(signal, frame):loop.stop()client.close()sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client)) asyncio.ensure_future(get_reddit_top('programming', client)) asyncio.ensure_future(get_reddit_top('compsci', client)) loop.run_forever()

    注意,如果多次運行這段代碼,打印出來的 subreddit 數據在順序上會有些許變化。這是因為每當我們調用一次代碼都會釋放對線程的控制,容許線程去處理另一個 HTTP 調用。這將導致誰先獲得響應,誰就先打印出來。

    結論

    即使 Python 內置的異步操作沒有 Javascript 那么順暢,但這并不意味著就不能用它來把應用變得更有趣、更有效率。只要花半個小時的時間去了解它的來龍去脈,你就會感覺把異步操作應用到你的程序中將會是多美好的一件事。

    aiohttp

    asyncio?可以實現單線程并發IO操作。如果僅用在客戶端,發揮的威力不大。如果把asyncio用在服務器端,例如Web服務器,由于HTTP連接就是IO操作,因此可以用 單線程 +?coroutine?實現多用戶的高并發支持。

    asyncio?實現了TCP、UDP、SSL等協議aiohttp?則是基于?asyncio?實現的 HTTP 框架。

    我們先安裝?aiohttp:pip install aiohttp

    然后編寫一個HTTP服務器,分別處理以下URL:

    • / - 首頁返回b'

      Index

      ';
    • /hello/{name} - 根據URL參數返回文本hello, %s!。

    代碼如下:

    import asynciofrom aiohttp import webasync def index(request):await asyncio.sleep(0.5)return web.Response(body=b'<h1>Index</h1>')async def hello(request):await asyncio.sleep(0.5)text = '<h1>hello, %s!</h1>' % request.match_info['name']return web.Response(body=text.encode('utf-8'))async def init(loop):app = web.Application(loop=loop)app.router.add_route('GET', '/', index)app.router.add_route('GET', '/hello/{name}', hello)srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)print('Server started at http://127.0.0.1:8000...')return srvloop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()

    注意?aiohttp的初始化函數init()也是一個coroutine,loop.create_server()則利用asyncio創建TCP服務。

    參考源碼:aio_web.py :https://github.com/michaelliao/learn-python3/blob/master/samples/async/aio_web.py

    ?一切從爬蟲開始

    【續篇】Python 協程之從放棄到死亡再到重生:https://www.secpulse.com/archives/64912.html

    從一個簡單的爬蟲開始,這個爬蟲很簡單,訪問指定的URL,并且獲取內容并計算長度,這里我們給定5個URL。第一版的代碼十分簡單,順序獲取每個URL的內容,當第一個請求完成、計算完長度后,再開始第二個請求。

    spider_normal.py

    # filename: spider_normal.py import time import requeststargets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def spider():results = {}for url in targets:r = requests.get(url)length = len(r.content)results[url] = lengthreturn resultsdef show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def main():start_time = time.time()results = spider()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(results)if __name__ == '__main__':main()

    我們多運行幾次看看結果。

    大約需要花費14-16秒不等,這段代碼并沒有什么好看的,我們把關注點放在后面的代碼上。現在我們使用多線程來改寫這段代碼。

    # filename: spider_thread.py import time import threading import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthdef main():ts = []start_time = time.time()for url in targets:t = threading.Thread(target=spider, args=(url,))ts.append(t)t.start()for t in ts:t.join()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()

    我們多運行幾次看看結果。

    從這兩段代碼中,已經可以看出并發對于處理任務的好處了,但是使用原生的threading模塊還是略顯麻煩,Python已經給我們內置了一個處理并發任務的庫concurrent,我們借用這個庫修改一下我們的代碼,之所以修改成這個庫的原因還有一個,那就是引出我們后面會談到的Future。

    # filename: spider_thread.py import time from concurrent import futures import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthreturn Truedef main():start_time = time.time()with futures.ThreadPoolExecutor(10) as executor:res = executor.map(spider, targets)print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()

    執行一下,會發現耗時與上一個版本一樣,穩定在10s左右。

    可以看到我們調用了concurrent庫中的futures,那么到底什么是futures?簡單的講,這個對象代表一種異步的操作,可以表示為一個需要延時進行的操作,當然這個操作的狀態可能已經完成,也有可能尚未完成,如果你寫JS的話,可以理解為是類似Promise的對象。在Python中,標準庫中其實有兩個Future類,一個是concurrent.futures.Future,另外一個是asyncio.Future,這兩個類很類似,不完全相同,這些實現差異以及API的差異我們先按下暫且不談,有興趣的同學可以參考下相關的文檔。Future是我們后面討論的asyncio異步編程的基礎,因此這里多說兩句。

    Future代表的是一個未來的某一個時刻一定會執行的操作(可能已經執行完成了,但是無論如何他一定有一個確切的運行時間),一般情況下用戶無需手動從零開始創建一個Future,而是應當借助框架中的API生成。比如調用concurrent.futures.Executor.submit()時,框架會為"異步操作"進行一個排期,來決定何時運行這個操作,這時候就會生成一個Future對象。

    現在,我們來看看如何使用asyncio進行異步編程,與多線程編程不同的是,多個協程總是運行在同一個線程中的,一旦其中的一個協程發生阻塞行為,那么整個線程都被阻塞,進而所有的協程都無法繼續運行。asyncio.Future和asyncio.Task都可以看做是一個異步操作,后者是前者的子類,BaseEventLoop.create_task()會接收一個協程作為參數,并且對這個任務的運行時間進行排期,返回一個asyncio.Task類的實例,這個對象也是對于協程的一層包裝。如果想獲取asyncio.Future的執行結果,應當使用yield from來獲取,這樣控制權會被自動交還給EventLoop,我們無需處理"等待Future或Task運行完成"這個操作。于是就有了一個很愉悅的編程方式,如果一個函數A是協程、或返回Task或Future的實例的函數,就可以通過result = yield from A()來獲取返回值。下面我們就使用asyncio和aiohttp來改寫我們的爬蟲。

    import asyncio import timeimport aiohttpfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))async def get_content(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:content = await resp.read()return len(content)async def spider(url):length = await get_content(url)final_results[url] = lengthreturn Truedef main():loop = asyncio.get_event_loop()cor = [spider(url) for url in targets]start_time = time.time()result = loop.run_until_complete(asyncio.gather(*cor))print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)print("loop result: ", result)if __name__ == '__main__':main()

    結果非常驚人

    這里可能有同學會問為什么沒看到yield from以及@asyncio.coroutine,那是因為在Python3.5以后,增加了async def和awiat語法,等效于@asyncio.coroutine和yield from,詳情可以參考上一篇文章。在main()函數中,我們先獲取一個可用的事件循環,緊接著將生成好的協程任務添加到這個循環中,并且等待執行完成。在每個spider()中,執行到await的時候,會交出控制權(如果不明白請向前看一下委托生成器的部分),并且切到其他的協程繼續運行,等到get_content()執行完成返回后,那么會恢復spider()協程的執行。get_content()函數中只是通過async with調用aiohttp庫的最基本方法獲取頁面內容,并且返回了長度,僅此而已。

    在修改為協程版本后,爬蟲性能有了巨大的提升,從最初了15s,到10s,再到現在的2s左右,簡直是質的飛躍。這只是一個簡單的爬蟲程序,相比多線程,性能提高了近5倍,如果是其他更加復雜的大型程序,也許性能提升會更多。asyncio這套異步編程框架,通過簡單的事件循環以及協程機制,在需要等待的情況下主動交出控制權,切換到其他協程進行運行。到這里就會有人問,為什么要將requests替換為aiohttp,能不能用requests?答案是不能,還是我們前面提到過的,在協程中,一切操作都要避免阻塞,禁止所有的阻塞型調用,因為所有的協程都是運行在同一個線程中的!requests庫是阻塞型的調用,當在等待I/O時,并不能將控制權轉交給其他協程,甚至還會將當前線程阻塞,其他的協程也無法運行。如果你在異步編程的時候需要用到一些其他的異步組件,可以到https://github.com/aio-libs/這里找找,也許就有你需要的異步庫。

    關于asyncio的異步編程資料目前來說還不算很多,官方文檔應該算是相當不錯的參考文獻了,其中非常推薦的兩部分是:Develop with asyncio和Tasks and coroutines,各位同學有興趣的話可以自行閱讀。asyncio這個異步框架中包含了非常多的內容,甚至還有TCP Server/Client的相關內容,如果想要掌握asyncio這個異步編程框架,還需要多加練習。順帶一提,asyncio非常容易與其他的框架整合,例如tornado已經有實現了asyncio.AbstractEventLoop的接口的類AsyncIOMainLoop,還有人將asyncio集成到QT的事件循環中了,可以說是非常的靈活了。

    Python 協程總結

    Python 之所以能夠處理網絡 IO 高并發,是因為借助了高效的IO模型,能夠最大限度的調度IO,然后事件循環使用協程處理IO,協程遇到IO操作就將控制權拋出,那么在IO準備好之前的這段事件,事件循環就可以使用其他的協程處理其他事情,然后協程在用戶空間,并且是單線程的,所以不會像多線程,多進程那樣頻繁的上下文切換,因而能夠節省大量的不必要性能損失。

    注: 不要再協程里面使用time.sleep之類的同步操作,因為協程再單線程里面,所以會使得整個線程停下來等待,也就沒有協程的優勢了

    理解

    協程,又稱為微線程,看上去像是子程序,但是它和子程序又不太一樣,它在執行的過程中,可以在中斷當前的子程序后去執行別的子程序,再返回來執行之前的子程序,但是它的相關信息還是之前的。

    優點:

  • 極高的執行效率,因為子程序切換而不是線程切換,沒有了線程切換的開銷;
  • 不需要多線程的鎖機制,因為只有一個線程在執行;
  • 如果要充分利用CPU多核,可以通過使用多進程+協程的方式

    使用

    打開 asyncio 的源代碼,可以發現asyncio中的需要用到的文件如下:

    下面的則是接下來要總結的文件

    文件解釋
    base_events基礎的事件,提供了BaseEventLoop事件
    coroutines提供了封裝成協程的類
    events提供了事件的抽象類,比如BaseEventLoop繼承了AbstractEventLoop
    futures提供了Future類
    tasks提供了Task類和相關的方法

    coroutines

    函數解釋
    coroutine(func)為函數加上裝飾器
    iscoroutinefunction(func)判斷函數是否使用了裝飾器
    iscoroutine(obj)判斷該對象是否是裝飾器

    如果在函數使用了coroutine裝飾器,就可以通過yield from去調用async def聲明的函數,如果已經使用async def聲明,就沒有必要再使用裝飾器了,這兩個功能是一樣的。

    import asyncio@asyncio.coroutine def hello_world():print("Hello World!")async def hello_world2():print("Hello World2!")print('------hello_world------') print(asyncio.iscoroutinefunction(hello_world))print('------hello_world2------') print(asyncio.iscoroutinefunction(hello_world2))print('------event loop------') loop = asyncio.get_event_loop()# 一直阻塞該函數調用到函數返回 loop.run_until_complete(hello_world()) loop.run_until_complete(hello_world2()) loop.close()

    上面的代碼分別使用到了coroutine裝飾器和async def,其運行結果如下:

    ------hello_world------ True ------hello_world2------ True ------event loop------ Hello World! Hello World2!

    注意:不可以直接調用協程,需要一個event loop去調用。

    如果想要在一個函數中去得到另外一個函數的結果,可以使用yield from或者await,例子如下:

    import asyncioasync def compute(x, y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    函數 print_sum 會一直等到函數 compute 返回結果,執行過程如下:

    base_events

    這個文件里面漏出來的只有BaseEventLoop一個類,它的相關方法如下:

    函數解釋
    create_future()創建一個future對象并且綁定到事件上
    create_task()創建一個任務
    run_forever()除非調用stop,否則事件會一直運行下去
    run_until_complete(future)直到 future 對象執行完畢,事件才停止
    stop()停止事件
    close()關閉事件
    is_closed()判斷事件是否關閉
    time()返回事件運行時的時間
    call_later(delay, callback, *args)設置一個回調函數,并且可以設置延遲的時間
    call_at(when, callback, *args)同上,但是設置的是絕對時間
    call_soon(callback, *args)馬上調用

    events

    函數解釋
    get_event_loop()返回一個異步的事件
    ......

    返回的就是BaseEventLoop的對象。

    future

    Future類的相關方法如下:

    方法解釋
    cancel()取消掉future對象
    cancelled()返回是否已經取消掉
    done()如果future已經完成則返回true
    result()返回future執行的結果
    exception()返回在future中設置了的exception
    add_done_callback(fn)當future執行時執行回調函數
    remove_done_callback(fn)刪除future的所有回調函數
    set_result(result)設置future的結果
    set_exception(exception)設置future的異常

    設置 future 的例子如下:

    import asyncioasync def slow_operation(future):await asyncio.sleep(1) # 睡眠future.set_result('Future is done!') # future設置結果loop = asyncio.get_event_loop() future = asyncio.Future() # 創建future對象 asyncio.ensure_future(slow_operation(future)) # 創建任務 loop.run_until_complete(future) # 阻塞直到future執行完才停止事件 print(future.result()) loop.close()

    run_until_complete方法在內部通過調用了future的add_done_callback,當執行future完畢的時候,就會通知事件。

    下面這個例子則是通過使用future的add_done_callback方法實現和上面例子一樣的效果:

    import asyncioasync def slow_operation(future):await asyncio.sleep(1)future.set_result('Future is done!')def got_result(future):print(future.result())loop.stop() # 關閉事件loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) # future執行完畢就執行該回調 try:loop.run_forever() finally:loop.close()

    一旦slow_operation函數執行完畢的時候,就會去執行got_result函數,里面則調用了關閉事件,所以不用擔心事件會一直執行。

    task

    Task類是Future的一個子類,也就是Future中的方法,task都可以使用,類方法如下:

    方法解釋
    current_task(loop=None)返回指定事件中的任務,如果沒有指定,則默認當前事件
    all_tasks(loop=None)返回指定事件中的所有任務
    cancel()取消任務

    并行執行三個任務的例子:

    import asyncioasync def factorial(name, number):f = 1for i in range(2, number + 1):print("Task %s: Compute factorial(%s)..." % (name, i))await asyncio.sleep(1)f *= iprint("Task %s: factorial(%s) = %s" % (name, number, f))loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(factorial("A", 2),factorial("B", 3),factorial("C", 4), )) loop.close()

    執行結果為

    Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24

    可以發現,ABC同時執行,直到future執行完畢才退出。

    下面一些方法是和task相關的方法

    方法解釋
    as_completed(fs, *, loop=None, timeout=None)返回是協程的迭代器
    ensure_future(coro_or_future, *, loop=None)調度執行一個 coroutine object:并且它封裝成future。返回任務對象
    async(coro_or_future, *, loop=None)丟棄的方法,推薦使用ensure_future
    wrap_future(future, *, loop=None)Wrap a concurrent.futures.Future object in a Future object.
    gather(*coros_or_futures, loop=None, return_exceptions=False)從給定的協程或者future對象數組中返回future匯總的結果
    sleep(delay, result=None, *, loop=None)創建一個在給定時間(以秒為單位)后完成的協程
    shield(arg, *, loop=None)等待future,屏蔽future被取消
    wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)等待由序列futures給出的Futures和協程對象完成。協程將被包裹在任務中。返回含兩個集合的Future:(done,pending)
    wait_for(fut, timeout, *, loop=None)等待單個Future或coroutine object完成超時。如果超時為None,則阻止直到future完成

    總結

    以上是生活随笔為你收集整理的Python 异步 IO 、协程、asyncio、async/await、aiohttp的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。