日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

Python 异步 IO 、协程、asyncio、async/await、aiohttp

發(fā)布時間:2024/7/23 python 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 异步 IO 、协程、asyncio、async/await、aiohttp 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.


From :廖雪峰?異步IO :https://www.liaoxuefeng.com/wiki/1016959663602400/1017959540289152

Python Async/Await入門指南 :https://zhuanlan.zhihu.com/p/27258289

Python 生成器 和 yield 關(guān)鍵字:https://blog.csdn.net/freeking101/article/details/51126293

協(xié)程與任務(wù)?官網(wǎng)文檔https://docs.python.org/zh-cn/3/library/asyncio-task.html

Python中異步協(xié)程的使用方法介紹https://blog.csdn.net/freeking101/article/details/88119858

python 協(xié)程詳解及I/O多路復(fù)用,I/O異步:https://blog.csdn.net/u014028063/article/details/81408395

Python協(xié)程深入理解:https://www.cnblogs.com/zhaof/p/7631851.html

asyncio 進階:Python黑魔法 --- 異步IO( asyncio) 協(xié)程:https://www.cnblogs.com/dhcn/p/9033628.html

談?wù)凱ython協(xié)程技術(shù)的演進:https://www.freebuf.com/company-information/153421.html

最后推薦一下《流暢的Python》,這本書中 第16章 協(xié)程的部分介紹的非常詳細
《流暢的Python》pdf 下載地址:https://download.csdn.net/download/freeking101/10993120

gevent 是 python 的一個并發(fā)框架,以微線程 greenlet 為核心,使用了 epoll 事件監(jiān)聽機制以及諸多其他優(yōu)化而變得高效。

aiohttp 使用代理 ip 訪問 https 網(wǎng)站報錯的問題https://blog.csdn.net/qq_43210211/article/details/108379917

Python:使用 Future、asyncio 處理并發(fā)

:https://blog.csdn.net/sinat_38682860/article/details/105419842

異步? IO

在 IO 編程(?廖雪峰 Python IO 編程 :https://www.liaoxuefeng.com/wiki/1016959663602400/1017606916795776)?一節(jié)中,我們已經(jīng)知道,CPU的速度遠遠快于磁盤、網(wǎng)絡(luò)等IO。在一個線程中,CPU執(zhí)行代碼的速度極快,然而,一旦遇到IO操作,如讀寫文件、發(fā)送網(wǎng)絡(luò)數(shù)據(jù)時,就需要等待IO操作完成,才能繼續(xù)進行下一步操作。這種情況稱為同步IO。

在IO操作的過程中,當(dāng)前線程被掛起,而其他需要CPU執(zhí)行的代碼就無法被當(dāng)前線程執(zhí)行了。

因為一個 IO 操作就阻塞了當(dāng)前線程,導(dǎo)致其他代碼無法執(zhí)行,所以我們必須使用多線程或者多進程來并發(fā)執(zhí)行代碼,為多個用戶服務(wù)。每個用戶都會分配一個線程,如果遇到IO導(dǎo)致線程被掛起,其他用戶的線程不受影響。

多線程和多進程的模型雖然解決了并發(fā)問題,但是系統(tǒng)不能無上限地增加線程。由于系統(tǒng)切換線程的開銷也很大,所以,一旦線程數(shù)量過多,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了,結(jié)果導(dǎo)致性能嚴重下降。

由于我們要解決的問題是CPU高速執(zhí)行能力和IO設(shè)備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法。

另一種解決IO問題的方法是異步IO。當(dāng)代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令,并不等待IO結(jié)果,然后就去執(zhí)行其他代碼了。一段時間后,當(dāng)IO返回結(jié)果時,再通知CPU進行處理。

消息模型 其實早在應(yīng)用在桌面應(yīng)用程序中了。一個 GUI 程序的主線程就負責(zé)不停地讀取消息并處理消息。所有的鍵盤、鼠標(biāo)等消息都被發(fā)送到GUI程序的消息隊列中,然后由GUI程序的主線程處理。

由于GUI 線程處理鍵盤、鼠標(biāo)等消息的速度非常快,所以用戶感覺不到延遲。某些時候,GUI線程在一個消息處理的過程中遇到問題導(dǎo)致一次消息處理時間過長,此時,用戶會感覺到整個GUI程序停止響應(yīng)了,敲鍵盤、點鼠標(biāo)都沒有反應(yīng)。這種情況說明在消息模型中,處理一個消息必須非常迅速,否則,主線程將無法及時處理消息隊列中的其他消息,導(dǎo)致程序看上去停止響應(yīng)。

消息模型 是 如何解決 同步IO 必須等待IO操作這一問題的呢 ?

在消息處理過程中,當(dāng)遇到 IO 操作時,代碼只負責(zé)發(fā)出IO請求,不等待IO結(jié)果,然后直接結(jié)束本輪消息處理,進入下一輪消息處理過程。當(dāng)IO操作完成后,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果。

在 “發(fā)出IO請求” 到收到 “IO完成” 的這段時間里,同步IO模型下,主線程只能掛起,但異步IO模型下,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息。這樣,在異步IO模型下,一個線程就可以同時處理多個IO請求,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力。

協(xié)程 (Coroutines)

在學(xué)習(xí)異步IO模型前,我們先來了解協(xié)程,協(xié)程 又稱 微線程,纖程,英文名 Coroutine

子程序(?又叫?函數(shù) ) 協(xié)程

  • 子程序?在 所有語言中都是層級調(diào)用。比如: A 調(diào)用 B,B 在執(zhí)行過程中又調(diào)用了 C,C 執(zhí)行完畢返回,B 執(zhí)行完畢返回,最后是 A 執(zhí)行完畢。所以 子程序 即 函數(shù) 的調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。
  • 協(xié)程的調(diào)用 和 子程序 不同。協(xié)程 看上去也是 子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行。

注意,在一個 子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。比如:子程序 A 和 B :

def A():print('1')print('2')print('3')def B():print('x')print('y')print('z')

假設(shè)由協(xié)程執(zhí)行,在執(zhí)行 A 的過程中,可以隨時中斷,去執(zhí)行 B,B 也可能在執(zhí)行過程中中斷再去執(zhí)行 A,結(jié)果可能是:

1 2 x y 3 z

但是在 A 中是沒有調(diào)用 B 的,所以 協(xié)程的調(diào)用 比 函數(shù)調(diào)用 理解起來要難一些。

看起來 A、B 的執(zhí)行有點像多線程,但 協(xié)程 的特點在于是一個線程執(zhí)行。

協(xié)程 和 多線程比,協(xié)程有何優(yōu)勢?

  • 1. 最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為 子程序 切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
  • 2. 第二大優(yōu)勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?

最簡單的方法是 多進程 + 協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python 對 協(xié)程 的支持 是通過 generator (生成器)實現(xiàn)的

在 generator 中,我們不但可以通過 for 循環(huán)來迭代,還可以不斷調(diào)用 next() 函數(shù)獲取由 yield 語句返回的下一個值。

但是 Python 的 yield 不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)。

來看例子:

傳統(tǒng)的 生產(chǎn)者-消費者 模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過 yield 跳轉(zhuǎn)到消費者開始執(zhí)行,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:

#!/usr/bin/python3 # -*- coding: utf-8 -*- # @Author : # @File : text.py # @Software : PyCharm # @description : XXXdef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)r = '200 OK'def produce(c):c.send(None)n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()c = consumer() produce(c)

執(zhí)行結(jié)果:

[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK

注意到 consumer函數(shù) 是一個 generator,把一個 consumer 傳入 produce 后:

  • 首先調(diào)用 c.send(None) 啟動生成器;
  • 然后,一旦生產(chǎn)了東西,通過 c.send(n) 切換到 consumer 執(zhí)行;
  • consumer 通過 yield拿到消息,處理,又通過yield把結(jié)果傳回;

  • produce 拿到 consumer 處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;

  • produce 決定不生產(chǎn)了,通過 c.close() 關(guān)閉 consumer,整個過程結(jié)束。

  • 整個流程無鎖,由一個線程執(zhí)行,produce?和?consumer?協(xié)作完成任務(wù),所以稱為 “協(xié)程”,而非線程的搶占式多任務(wù)。

    最后套用 Donald Knuth 的一句話總結(jié)協(xié)程的特點:“子程序就是協(xié)程的一種特例。”

    參考源碼:https://github.com/michaelliao/learn-python3/blob/master/samples/async/coroutine.py

    在 Python 中,異步函數(shù)? 通常 被稱作? 協(xié)程

    創(chuàng)建一個協(xié)程僅僅只需使用 async 關(guān)鍵字,或者使用 @asyncio.coroutine 裝飾器。下面的任一代碼,都可以作為協(xié)程工作,形式上也是等同的:

    import asyncio# 方式 1 async def ping_server(ip):pass# 方式 2 @asyncio.coroutine def load_file(path):pass

    上面這兩個?特殊的函數(shù),在調(diào)用時會返回協(xié)程對象。熟悉 JavaScript 中 Promise 的同學(xué),可以把這個返回對象當(dāng)作跟 Promise 差不多。調(diào)用他們中的任意一個,實際上并未立即運行,而是返回一個協(xié)程對象,然后將其傳遞到 Eventloop 中,之后再執(zhí)行。

    • 如何判斷一個?函數(shù)是不是協(xié)程??? ?asyncio 提供了 asyncio.iscoroutinefunction(func) 方法。
    • 如何判斷一個 函數(shù)返回的是不是協(xié)程對象 ?? 可以使用 asyncio.iscoroutine(obj) 。

    用 asyncio 提供的 @asyncio.coroutine 可以把一個 generator 標(biāo)記為 coroutine 類型,然后在 coroutine 內(nèi)部用 yield from 調(diào)用另一個 coroutine 實現(xiàn)異步操作。

    Python 3.5 開始引入了新的語法 async await

    為了簡化并更好地標(biāo)識異步 IO,從 Python 3.5 開始引入了新的語法 async await,可以讓 coroutine 的代碼更簡潔易讀。

    ?async?/ await 是 python3.5 的新語法,需使用 Python3.5 版本 或 以上才能正確運行。

    注意:async?和?await?是針對 coroutine 的新語法,要使用新的語法,只需要做兩步簡單的替換:

    • 把?@asyncio.coroutine?替換為?async?
    • 把?yield from?替換為?await

    ?Python 3.5 以前 版本原來老的語法使用 協(xié)程

    import asyncio@asyncio.coroutine def hello():print("Hello world!")r = yield from asyncio.sleep(1)print("Hello again!")

    Python 3.5 以后 用新語法重新編寫如下:

    import asyncioasync def hello():print("Hello world!")r = await asyncio.sleep(1)print("Hello again!")

    在過去幾年內(nèi),異步編程由于某些好的原因得到了充分的重視。雖然它比線性編程難一點,但是效率相對來說也是更高。

    比如,利用 Python 的 異步協(xié)程 (async coroutine) ,在提交 HTTP 請求后,就沒必要等待請求完成再進一步操作,而是可以一邊等著請求完成,一邊做著其他工作。這可能在邏輯上需要多些思考來保證程序正確運行,但是好處是可以利用更少的資源做更多的事。

    即便邏輯上需要多些思考,但實際上在 Python 語言中,異步編程的語法和執(zhí)行并不難。跟 Javascript 不一樣,現(xiàn)在 Python 的異步協(xié)程已經(jīng)執(zhí)行得相當(dāng)好了。

    對于服務(wù)端編程,異步性似乎是 Node.js 流行的一大原因。我們寫的很多代碼,特別是那些諸如網(wǎng)站之類的高 I/O 應(yīng)用,都依賴于外部資源。這可以是任何資源,包括從遠程數(shù)據(jù)庫調(diào)用到 POST 一個 REST 請求。一旦你請求這些資源的任一一個,你的代碼在等待資源響應(yīng)時便無事可做 (譯者注:如果沒有異步編程的話)。

    有了異步編程,在等待這些資源響應(yīng)的過程中,你的代碼便可以去處理其他的任務(wù)。

    Python async / await 手冊

    Python?部落:Python async/await 手冊:https://python.freelycode.com/contribution/detail/57

    知乎:從 0 到 1,Python 異步編程的演進之路(?通過爬蟲演示進化之路?)https://zhuanlan.zhihu.com/p/25228075

    async / await 的使用

    async 用來聲明一個函數(shù)是協(xié)程然后使用 await 調(diào)用這個協(xié)程, await 必須在函數(shù)內(nèi)部,這個函數(shù)通常也被聲明為另一個協(xié)程await 的目的是等待協(xié)程控制流的返回yield 的目的 是 暫停并掛起函數(shù)的操作。

    正常的函數(shù)在執(zhí)行時是不會中斷的,所以你要寫一個能夠中斷的函數(shù),就需要添加 async 關(guān)鍵。

    • async 用來聲明一個函數(shù)為異步函數(shù)異步函數(shù)的特點是能在函數(shù)執(zhí)行過程中掛起,去執(zhí)行其他異步函數(shù),等到掛起條件(假設(shè)掛起條件是sleep(5))消失后,也就是5秒到了再回來執(zhí)行。
    • await 可以將耗時等待的操作掛起,讓出控制權(quán)(?await 語法來掛起自身的協(xié)程比如:異步程序執(zhí)行到某一步時需要等待的時間很長,就將此掛起,去執(zhí)行其他的異步程序。await 后面只能跟 異步程序 或 有 __await__ 屬性 的 對象因為異步程序與一般程序不同

    假設(shè)有兩個異步函數(shù) async a,async b,a 中的某一步有 await,當(dāng)程序碰到關(guān)鍵字 await b() 后,異步程序掛起后去執(zhí)行另一個異步b程序,就是從函數(shù)內(nèi)部跳出去執(zhí)行其他函數(shù),當(dāng)掛起條件消失后,不管b是否執(zhí)行完,要馬上從b程序中跳出來,回到原程序執(zhí)行原來的操作。如果 await 后面跟的 b 函數(shù)不是異步函數(shù),那么操作就只能等 b 執(zhí)行完再返回,無法在 b 執(zhí)行的過程中返回。如果要在 b 執(zhí)行完才返回,也就不需要用 await 關(guān)鍵字了,直接調(diào)用 b 函數(shù)就行。所以這就需要 await 后面跟的是 異步函數(shù)了。在一個異步函數(shù)中,可以不止一次掛起,也就是可以用多個 await 。

    看下 Python 中常見的幾種函數(shù)形式:

    # 1. 普通函數(shù) def function():return 1# 2. 生成器函數(shù) def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數(shù)和生成器函數(shù)包裝成異步函數(shù)和異步生成器。# 3. 異步函數(shù)(協(xié)程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1

    通過類型判斷可以驗證函數(shù)的類型

    import types# 1. 普通函數(shù) def function():return 1# 2. 生成器函數(shù) def generator():yield 1# 在3.5過后,我們可以使用async修飾將普通函數(shù)和生成器函數(shù)包裝成異步函數(shù)和異步生成器。# 3. 異步函數(shù)(協(xié)程) async def async_function():return 1# 4. 異步生成器 async def async_generator():yield 1print(type(function) is types.FunctionType) print(type(generator()) is types.GeneratorType) print(type(async_function()) is types.CoroutineType) print(type(async_generator()) is types.AsyncGeneratorType)

    直接調(diào)用異步函數(shù)不會返回結(jié)果,而是返回一個coroutine對象:

    print(async_function()) # <coroutine object async_function at 0x102ff67d8>

    協(xié)程 需要通過其他方式來驅(qū)動,因此可以使用這個協(xié)程對象的 send 方法給協(xié)程發(fā)送一個值:

    print(async_function().send(None))

    不幸的是,如果通過上面的調(diào)用會拋出一個異常:StopIteration: 1

    因為 生成器 / 協(xié)程 在正常返回退出時會拋出一個 StopIteration 異常,而原來的返回值會存放在 StopIteration 對象的 value 屬性中,通過以下捕獲可以獲取協(xié)程真正的返回值:?

    try:async_function().send(None) except StopIteration as e:print(e.value) # 1

    通過上面的方式來新建一個 run 函數(shù)來驅(qū)動協(xié)程函數(shù),在協(xié)程函數(shù)中,可以通過 await 語法來掛起自身的協(xié)程,并等待另一個 協(xié)程 完成直到返回結(jié)果:

    def run(coroutine):try:coroutine.send(None)except StopIteration as e:return 'run() : return {0}'.format(e.value)async def async_function():return 1async def await_coroutine():result = await async_function()print('await_coroutine() : print {0} '.format(result))ret_val = run(await_coroutine()) print(ret_val)

    要注意的是,await 語法只能出現(xiàn)在通過 async 修飾的函數(shù)中,否則會報 SyntaxError 錯誤。

    而且 await 后面的對象需要是一個 Awaitable,或者實現(xiàn)了相關(guān)的協(xié)議。

    查看 Awaitable 抽象類的代碼,表明了只要一個類實現(xiàn)了__await__方法,那么通過它構(gòu)造出來的實例就是一個 Awaitable:

    class Awaitable(metaclass=ABCMeta):__slots__ = ()@abstractmethoddef __await__(self):yield@classmethoddef __subclasshook__(cls, C):if cls is Awaitable:return _check_methods(C, "__await__")return NotImplemented

    而且可以看到,Coroutine類 也繼承了 Awaitable,而且實現(xiàn)了 send,throw 和 close 方法。所以 await 一個調(diào)用異步函數(shù)返回的協(xié)程對象是合法的。

    class Coroutine(Awaitable):__slots__ = ()@abstractmethoddef send(self, value):...@abstractmethoddef throw(self, typ, val=None, tb=None):...def close(self):...@classmethoddef __subclasshook__(cls, C):if cls is Coroutine:return _check_methods(C, '__await__', 'send', 'throw', 'close')return NotImplemented

    接下來是異步生成器,來看一個例子:

    假如我要到一家超市去購買土豆,而超市貨架上的土豆數(shù)量是有限的:

    class Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)

    現(xiàn)在我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:

    def take_potatos(num):count = 0while True:if len(all_potatos) == 0:sleep(.1)else:potato = all_potatos.pop()yield potatocount += 1if count == num:breakdef buy_potatos():bucket = []for p in take_potatos(50):bucket.append(p)

    對應(yīng)到代碼中,就是迭代一個生成器的模型,顯然,當(dāng)貨架上的土豆不夠的時候,這時只能夠死等,而且在上面例子中等多長時間都不會有結(jié)果(因為一切都是同步的),也許可以用多進程和多線程解決,而在現(xiàn)實生活中,更應(yīng)該像是這樣的:

    import asyncio import randomclass Potato:@classmethoddef make(cls, num, *args, **kws):potatos = []for i in range(num):potatos.append(cls.__new__(cls, *args, **kws))return potatosall_potatos = Potato.make(5)async def take_potatos(num):count = 0while True:if len(all_potatos) == 0:await ask_for_potato()potato = all_potatos.pop()yield potatocount += 1if count == num:breakasync def ask_for_potato():await asyncio.sleep(random.random())all_potatos.extend(Potato.make(random.randint(1, 10)))async def buy_potatos():bucket = []async for p in take_potatos(50):bucket.append(p)print(f'Got potato {id(p)}...')def main():loop = asyncio.get_event_loop()res = loop.run_until_complete(buy_potatos())loop.close()if __name__ == '__main__':main()

    當(dāng)貨架上的土豆沒有了之后,可以詢問超市請求需要更多的土豆,這時候需要等待一段時間直到生產(chǎn)者完成生產(chǎn)的過程。

    當(dāng)生產(chǎn)者完成和返回之后,這是便能從 await 掛起的地方繼續(xù)往下跑,完成消費的過程。而這整一個過程,就是一個異步生成器迭代的流程。

    用 asyncio 運行這段代碼,結(jié)果是這樣的:

    Got potato 4338641384... Got potato 4338641160... Got potato 4338614736... Got potato 4338614680... Got potato 4338614568... Got potato 4344861864... Got potato 4344843456... Got potato 4344843400... Got potato 4338641384... Got potato 4338641160... ...

    既然是異步的,在請求之后不一定要死等,而是可以做其他事情。比如除了土豆,我還想買番茄,這時只需要在事件循環(huán)中再添加一個過程:

    def main():import asyncioloop = asyncio.get_event_loop()res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))loop.close()

    再來運行這段代碼:

    Got potato 4423119312... Got tomato 4423119368... Got potato 4429291024... Got potato 4421640768... Got tomato 4429331704... Got tomato 4429331760... Got tomato 4423119368... Got potato 4429331760... Got potato 4429331704... Got potato 4429346688... Got potato 4429346072... Got tomato 4429347360... ...

    看下 AsyncGenerator 的定義,它需要實現(xiàn) __aiter__ 和 __anext__ 兩個核心方法,以及 asend,athrow,aclose 方法。

    class AsyncGenerator(AsyncIterator):__slots__ = ()async def __anext__(self):...@abstractmethodasync def asend(self, value):...@abstractmethodasync def athrow(self, typ, val=None, tb=None):...async def aclose(self):...@classmethoddef __subclasshook__(cls, C):if cls is AsyncGenerator:return _check_methods(C, '__aiter__', '__anext__','asend', 'athrow', 'aclose')return NotImplemented

    異步生成器是在 3.6 之后才有的特性,同樣的還有異步推導(dǎo)表達式,因此在上面的例子中,也可以寫成這樣:

    bucket = [p async for p in take_potatos(50)]

    類似的,還有 await 表達式:

    result = [await fun() for fun in funcs if await condition()]

    除了函數(shù)之外,類實例的普通方法也能用 async 語法修飾:

    class ThreeTwoOne:async def begin(self):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():t = ThreeTwoOne()await t.begin()print('start')

    實例方法的調(diào)用同樣是返回一個 coroutine:

    function = ThreeTwoOne.begin method = function.__get__(ThreeTwoOne, ThreeTwoOne()) import inspect assert inspect.isfunction(function) assert inspect.ismethod(method) assert inspect.iscoroutine(method())

    同理 還有類方法:

    class ThreeTwoOne:@classmethodasync def begin(cls):print(3)await asyncio.sleep(1)print(2)await asyncio.sleep(1)print(1) await asyncio.sleep(1)returnasync def game():await ThreeTwoOne.begin()print('start')

    根據(jù)PEP 492中,async 也可以應(yīng)用到 上下文管理器中,__aenter__ 和 __aexit__ 需要返回一個 Awaitable:

    class GameContext:async def __aenter__(self):print('game loading...')await asyncio.sleep(1)async def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def game():async with GameContext():print('game start...')await asyncio.sleep(2)

    在3.7版本,contextlib 中會新增一個 asynccontextmanager 裝飾器來包裝一個實現(xiàn)異步協(xié)議的上下文管理器:

    from contextlib import asynccontextmanager@asynccontextmanager async def get_connection():conn = await acquire_db_connection()try:yieldfinally:await release_db_connection(conn)

    async 修飾符也能用在 __call__ 方法上:

    class GameContext:async def __aenter__(self):self._started = time()print('game loading...')await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):print('game exit...')await asyncio.sleep(1)async def __call__(self, *args, **kws):if args[0] == 'time':return time() - self._startedasync def game():async with GameContext() as ctx:print('game start...')await asyncio.sleep(2)print('game time: ', await ctx('time'))

    asyncio

    asyncio?是 Python 3.4 版本引入的標(biāo)準庫,直接內(nèi)置了對 異步 IO 的支持。

    asyncio 官方只實現(xiàn)了比較底層的協(xié)議,比如TCP,UDP。所以諸如 HTTP 協(xié)議之類都需要借助第三方庫,比如 aiohttp

    雖然異步編程的生態(tài)不夠同步編程的生態(tài)那么強大,但是如果有高并發(fā)的需求不妨試試,下面說一下比較成熟的異步庫

    aiohttp:異步 http client/server框架。github地址: https://github.com/aio-libs/aiohttp
    sanic:速度更快的類 flask web框架。github地址:https://github.com/channelcat/sanic
    uvloop 快速,內(nèi)嵌于 asyncio 事件循環(huán)的庫,使用 cython 基于 libuv 實現(xiàn)。github地址: https://github.com/MagicStack/uvloop

    asyncio?的編程模型就是一個 消息循環(huán)我們從?asyncio?模塊中直接獲取一個?EventLoop?的引用,然后把需要執(zhí)行的協(xié)程扔到?EventLoop?中執(zhí)行,就實現(xiàn)了 異步IO

    python 用asyncio?模塊實現(xiàn)異步編程,該模塊最大特點就是,只存在一個線程

    由于只有一個線程,就不可能多個任務(wù)同時運行。asyncio 是 "多任務(wù)合作" 模式(cooperative multitasking),允許異步任務(wù)交出執(zhí)行權(quán)給其他任務(wù),等到其他任務(wù)完成,再收回執(zhí)行權(quán)繼續(xù)往下執(zhí)行

    asyncio 模塊在單線程上啟動一個事件循環(huán)(event loop),時刻監(jiān)聽新進入循環(huán)的事件,加以處理,并不斷重復(fù)這個過程,直到異步任務(wù)結(jié)束。

    什么是事件循環(huán)?

    單線程就意味著所有的任務(wù)需要在單線程上排隊執(zhí)行,也就是前一個任務(wù)沒有執(zhí)行完成,后一個任務(wù)就沒有辦法執(zhí)行。在CPU密集型的任務(wù)之中,這樣其實還行,但是如果我們的任務(wù)都是IO密集型的呢?也就是我們大部分的任務(wù)都是在等待網(wǎng)絡(luò)的數(shù)據(jù)返回,等待磁盤文件的數(shù)據(jù),這就會造成CPU一直在等待這些任務(wù)的完成再去執(zhí)行下一個任務(wù)。

    有沒有什么辦法能夠讓單線程的任務(wù)執(zhí)行不這么笨呢?其實我們可以將這些需要等待IO設(shè)備的任務(wù)掛在一邊嘛!這時候,如果我們的任務(wù)都是需要等待的任務(wù),那么單線程在執(zhí)行時遇到一個就把它掛起來,這里可以通過一個數(shù)據(jù)結(jié)構(gòu)(例如隊列)將這些處于執(zhí)行等待狀態(tài)的任務(wù)放進去,為什么是執(zhí)行等待狀態(tài)呢?因為它們正在執(zhí)行但是又不得不等待例如網(wǎng)絡(luò)數(shù)據(jù)的返回等等。直到將所有的任務(wù)都放進去之后,單線程就可以開始它的接連不斷的表演了:有沒有任務(wù)完成的小伙伴呀!快來我這里執(zhí)行!

    此時如果有某個任務(wù)完成了,它會得到結(jié)果,于是發(fā)出一個信號:我完成了。那邊還在循環(huán)追問的單線程終于得到了答復(fù),就會去看看這個任務(wù)有沒有綁定什么回調(diào)函數(shù)呀?如果綁定了回調(diào)函數(shù)就進去把回調(diào)函數(shù)給執(zhí)行了,如果沒有,就將它所在的任務(wù)恢復(fù)執(zhí)行,并將結(jié)果返回。

    asyncio 就是一個 協(xié)程庫

    • (1)事件循環(huán) (event loop)。事件循環(huán)需要實現(xiàn)兩個功能,一是順序執(zhí)行協(xié)程代碼;二是完成協(xié)程的調(diào)度,即一個協(xié)程“暫停”時,決定接下來執(zhí)行哪個協(xié)程。
    • (2)協(xié)程上下文的切換。基本上Python 生成器的 yeild 已經(jīng)能完成切換,Python3中還有特定語法支持協(xié)程切換。

    Python 的異步IO:API

    官方文檔:https://docs.python.org/zh-cn/3/library/asyncio.html

    Python 的 asyncio 是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準庫。Python3.7 這個版本,asyncio又做了比較大的調(diào)整,把這個庫的 API 分為了 高層級API低層級API,并引入asyncio.run() 這樣的高級方法,讓編寫異步程序更加簡潔。

    這里先從全局認識 Python 這個異步IO庫。

    asyncio 的 高層級 API 主要提高如下幾個方面:

    • 并發(fā)地運行Python協(xié)程并完全控制其執(zhí)行過程;
    • 執(zhí)行網(wǎng)絡(luò)IO和IPC;
    • 控制子進程;
    • 通過隊列實現(xiàn)分布式任務(wù);
    • 同步并發(fā)代碼。

    asyncio 的 低層級API 用以支持開發(fā)異步庫和框架:

    • 創(chuàng)建和管理事件循環(huán)(event loop),提供異步的API用于網(wǎng)絡(luò),運行子進程,處理操作系統(tǒng)信號等;
    • 通過 transports 實現(xiàn)高效率協(xié)議;
    • 通過 async/await? 語法橋架基于回調(diào)的庫和代碼。

    asyncio 高級 API

    高層級API讓我們更方便的編寫基于asyncio的應(yīng)用程序。這些API包括:

    (1)協(xié)程和任務(wù)

    協(xié)程通過 async/await 語法進行聲明,是編寫異步應(yīng)用的推薦方式。歷史的?@asyncio.coroutine?和?yield from?已經(jīng)被棄用,并計劃在Python 3.10中移除。協(xié)程可以通過?asyncio.run(coro, *, debug=False)?函數(shù)運行,該函數(shù)負責(zé)管理事件循環(huán)并完結(jié)異步生成器。它應(yīng)該被用作asyncio程序的主入口點,相當(dāng)于main函數(shù),應(yīng)該只被調(diào)用一次。

    任務(wù)被用于并發(fā)調(diào)度協(xié)程,可用于網(wǎng)絡(luò)爬蟲的并發(fā)。使用?asyncio.create_task()?就可以把一個協(xié)程打包為一個任務(wù),該協(xié)程會自動安排為很快運行。

    協(xié)程,任務(wù)和Future都是可等待對象。其中,Future是低層級的可等待對象,表示一個異步操作的最終結(jié)果。

    (2)流

    流是用于網(wǎng)絡(luò)連接的高層級的使用 async/await的原語。流允許在不使用回調(diào)或低層級協(xié)議和傳輸?shù)那闆r下發(fā)送和接收數(shù)據(jù)。異步讀寫TCP有客戶端函數(shù)?asyncio.open_connection()?和 服務(wù)端函數(shù)?asyncio.start_server()?。它還支持 Unix Sockets:?asyncio.open_unix_connection()?和?asyncio.start_unix_server()。

    (3)同步原語

    asyncio同步原語的設(shè)計類似于threading模塊的原語,有兩個重要的注意事項:
    asyncio原語不是線程安全的,因此它們不應(yīng)該用于OS線程同步(而是用threading)
    這些同步原語的方法不接受超時參數(shù); 使用asyncio.wait_for()函數(shù)執(zhí)行超時操作。
    asyncio具有以下基本同步原語:

    • Lock
    • Event
    • Condition
    • Semaphore
    • BoundedSemaphore

    (4)子進程

    asyncio提供了通過 async/await 創(chuàng)建和管理子進程的API。不同于Python標(biāo)準庫的subprocess,asyncio的子進程函數(shù)都是異步的,并且提供了多種工具來處理這些函數(shù),這就很容易并行執(zhí)行和監(jiān)視多個子進程。創(chuàng)建子進程的方法主要有兩個:

    coroutine asyncio.create_subprocess_exec()
    coroutine asyncio.create_subprocess_shell()

    (5)隊列

    asyncio 隊列的設(shè)計類似于標(biāo)準模塊queue的類。雖然asyncio隊列不是線程安全的,但它們被設(shè)計為專門用于 async/await 代碼。需要注意的是,asyncio隊列的方法沒有超時參數(shù),使用?asyncio.wait_for()函數(shù)進行超時的隊列操作。
    因為和標(biāo)注模塊queue的類設(shè)計相似,使用起來跟queue無太多差異,只需要在對應(yīng)的函數(shù)前面加 await 即可。asyncio 隊列提供了三種不同的隊列:

    • class asyncio.Queue 先進先出隊列
    • class asyncio.PriorityQueue 優(yōu)先隊列
    • class asyncio.LifoQueue 后進先出隊列

    (6)異常

    asyncio提供了幾種異常,它們是:

    • TimeoutError,
    • CancelledError,
    • InvalidStateError,
    • SendfileNotAvailableError
    • IncompleteReadError
    • LimitOverrunError

    asyncio低級API

    低層級API為編寫基于asyncio的庫和框架提供支持,有意編寫異步庫和框架的大牛們需要熟悉這些低層級API。主要包括:

    (1)事件循環(huán)

    事件循環(huán)是每個asyncio應(yīng)用程序的核心。 事件循環(huán)運行異步任務(wù)和回調(diào),執(zhí)行網(wǎng)絡(luò)IO操作以及運行子進程。

    應(yīng)用程序開發(fā)人員通常應(yīng)該使用高級asyncio函數(shù),例如asyncio.run(),并且很少需要引用循環(huán)對象或調(diào)用其方法。

    Python 3.7 新增了?asyncio.get_running_loop()函數(shù)。

    (2)Futures

    Future對象用于將基于低層級回調(diào)的代碼與高層級的 async/await 代碼進行橋接。
    Future表示異步操作的最終結(jié)果。 不是線程安全的。
    Future是一個可等待對象。 協(xié)程可以等待Future對象,直到它們有結(jié)果或異常集,或者直到它們被取消。
    通常,Futures用于啟用基于低層級回調(diào)的代碼(例如,在使用asyncio傳輸實現(xiàn)的協(xié)議中)以與高層級 async/await 代碼進行互操作。

    (3)傳輸和協(xié)議(Transports和Protocols)

    Transport 和 Protocol由低層級事件循環(huán)使用,比如函數(shù)loop.create_connection()。它們使用基于回調(diào)的編程風(fēng)格,并支持網(wǎng)絡(luò)或IPC協(xié)議(如HTTP)的高性能實現(xiàn)。

    在最高級別,傳輸涉及字節(jié)的傳輸方式,而協(xié)議確定要傳輸哪些字節(jié)(在某種程度上何時傳輸)。

    換種方式說就是:傳輸是套接字(或類似的I/O端點)的抽象,而協(xié)議是從傳輸?shù)慕嵌葋砜吹膽?yīng)用程序的抽象。

    另一種觀點是傳輸和協(xié)議接口共同定義了一個使用網(wǎng)絡(luò)I/O和進程間I/O的抽象接口。

    傳輸和協(xié)議對象之間始終存在1:1的關(guān)系:協(xié)議調(diào)用傳輸方法來發(fā)送數(shù)據(jù),而傳輸調(diào)用協(xié)議方法來傳遞已接收的數(shù)據(jù)。

    大多數(shù)面向連接的事件循環(huán)方法(例如loop.create_connection())通常接受protocol_factory參數(shù),該參數(shù)用于為接受的連接創(chuàng)建Protocol對象,由Transport對象表示。 這些方法通常返回(傳輸,協(xié)議)元組。

    (4)策略(Policy)

    事件循環(huán)策略是一個全局的按進程劃分的對象,用于控制事件循環(huán)的管理。 每個事件循環(huán)都有一個默認策略,可以使用策略API對其進行更改和自定義。

    策略定義了上下文的概念,并根據(jù)上下文管理單獨的事件循環(huán)。 默認策略將上下文定義為當(dāng)前線程。

    通過使用自定義事件循環(huán)策略,可以自定義get_event_loop(),set_event_loop()和new_event_loop()函數(shù)的行為。

    (5)平臺支持

    asyncio模塊設(shè)計為可移植的,但由于平臺的底層架構(gòu)和功能,某些平臺存在細微的差異和限制。在Windows平臺,有些是不支持的,比如?loop.create_unix_connection()?and?loop.create_unix_server()。而Linux和比較新的macOS全部支持。

    總結(jié)

    Python 3.7 通過對 asyncio 分組使得它的架構(gòu)更加清晰,普通寫異步IO的應(yīng)用程序只需熟悉高層級API,需要寫異步IO的庫和框架時才需要理解低層級的API。

    生產(chǎn)者 --- 消費者

    Python 分布與并行 asyncio實現(xiàn)生產(chǎn)者消費者模型:https://blog.csdn.net/weixin_43594279/article/details/111243453

    示例 1:

    # coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [# asyncio.create_task(consumer(i, q)) for i in range(num_consumers)asyncio.ensure_future(consumer(i, q)) for i in range(num_consumers)]# produce_list = [asyncio.create_task(producer(q, num_consumers))]produce_list = [asyncio.ensure_future(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass

    示例 2:

    Python 的異步IO編程例子

    以 Python 3.7 上的 asyncio 為例講解如何使用 Python 的異步 IO。

    創(chuàng)建第一個協(xié)程

    Python 3.7 推薦使用 async/await 語法來聲明協(xié)程,來編寫異步應(yīng)用程序。我們來創(chuàng)建第一個協(xié)程函數(shù):首先打印一行“你好”,等待1秒鐘后再打印 "大家同好"。

    import asyncioasync def say_hi():print('你好')await asyncio.sleep(1)print('大家同好')asyncio.run(say_hi())""" 你好 大家同好 """

    say_hi() 函數(shù)通過 async 聲明為協(xié)程函數(shù),較之前的修飾器聲明更簡潔明了。

    在實踐過程中,什么功能的函數(shù)要用 async 聲明為協(xié)程函數(shù)呢?就是那些能發(fā)揮異步IO性能的函數(shù),比如讀寫文件、讀寫網(wǎng)絡(luò)、讀寫數(shù)據(jù)庫,這些都是浪費時間的IO操作,把它們協(xié)程化、異步化從而提高程序的整體效率(速度)。

    say_hi() 函數(shù)是通過?asyncio.run()來運行的,而不是直接調(diào)用這個函數(shù)(協(xié)程)。因為,直接調(diào)用并不會把它加入調(diào)度日程,而只是簡單的返回一個協(xié)程對象:

    print(say_hi()) # <coroutine object say_hi at 0x000001264DB3FCC0>

    真正運行一個協(xié)程

    那么,如何真正運行一個協(xié)程呢?

    asyncio 提供了三種機制:

    • (1)asyncio.run() 函數(shù),這是異步程序的主入口,相當(dāng)于C語言中的main函數(shù)。
    • (2)用 await 等待協(xié)程,比如上例中的?await asyncio.sleep(1)?。

    再看下面的例子,我們定義了協(xié)程?say_delay()?,在 main() 協(xié)程中調(diào)用兩次,第一次延遲1秒后打印“你好”,第二次延遲2秒后打印 "大家同好"。這樣我們通過 await 運行了兩個協(xié)程。

    import asyncio import datetimeasync def say_delay(msg=None, delay=None):await asyncio.sleep(delay)print(msg)async def main():print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')await say_delay('你好', 2)await say_delay('大家同好', 1)print(f'end at {datetime.datetime.now().replace(microsecond=0)}')asyncio.run(main())''' begin at 2020-12-19 00:55:01 你好 大家同好 end at 2020-12-19 00:55:04 '''

    從起止時間可以看出,兩個協(xié)程是順序執(zhí)行的,總共耗時1+2=3秒。

    • (3)通過?asyncio.create_task()?函數(shù)并發(fā)運行作為 asyncio 任務(wù)(Task) 的多個協(xié)程。下面,我們用 create_task() 來修改上面的 main() 協(xié)程,從而讓兩個 say_delay() 協(xié)程并發(fā)運行:
    import asyncio import datetimeasync def say_delay(msg=None, delay=None):await asyncio.sleep(delay)print(msg)async def main():task_1 = asyncio.create_task(say_delay('你好', 2))task_2 = asyncio.create_task(say_delay('大家同好', 1))print(f'begin at {datetime.datetime.now().replace(microsecond=0)}')await task_1await task_2print(f'end at {datetime.datetime.now().replace(microsecond=0)}')asyncio.run(main())''' begin at 2020-12-19 00:58:20 大家同好 你好 end at 2020-12-19 00:58:22 '''

    從運行結(jié)果的起止時間可以看出,兩個協(xié)程是并發(fā)執(zhí)行的了,總耗時等于最大耗時2秒。

    asyncio.create_task()?是一個很有用的函數(shù),在爬蟲中它可以幫助我們實現(xiàn)大量并發(fā)去下載網(wǎng)頁。在 Python 3.6中與它對應(yīng)的是?ensure_future()

    生產(chǎn)者 --- 消費者

    示例 代碼:

    # coding=utf-8import asyncioasync def consumer(n, q):print('consumer {}: starting'.format(n))while True:print('consumer {}: waiting for item'.format(n))item = await q.get()print('consumer {}: has item {}'.format(n, item))if item is None:# None is the signal to stop.q.task_done()breakelse:await asyncio.sleep(0.01 * item)q.task_done()print('consumer {}: ending'.format(n))async def producer(q, num_workers):print('producer: starting')# Add some numbers to the queue to simulate jobsfor i in range(num_workers * 3):await q.put(i)print('producer: added task {} to the queue'.format(i))# Add None entries in the queue# to signal the consumers to exitprint('producer: adding stop signals to the queue')for i in range(num_workers):await q.put(None)print('producer: waiting for queue to empty')await q.join()print('producer: ending')async def main(num_consumers=1):q = asyncio.Queue(maxsize=num_consumers)consumer_list = [asyncio.create_task(consumer(i, q)) for i in range(num_consumers)]produce_list = [asyncio.create_task(producer(q, num_consumers))]task_list = consumer_list + produce_listfor item in task_list:await itemif __name__ == '__main__':asyncio.run(main(num_consumers=3))pass

    可等待對象(awaitables)

    可等待對象,就是可以在 await 表達式中使用的對象,前面我們已經(jīng)接觸了兩種可等待對象的類型:協(xié)程任務(wù),還有一個是低層級的 Future

    asyncio 模塊的許多 API 都需要傳入可等待對象,比如 run(), create_task() 等等。

    (1)協(xié)程

    協(xié)程是可等待對象,可以在其它協(xié)程中被等待。協(xié)程兩個緊密相關(guān)的概念是:

    • 協(xié)程函數(shù):通過 async def 定義的函數(shù);
    • 協(xié)程對象:調(diào)用協(xié)程函數(shù)返回的對象。

    運行上面這段程序,結(jié)果為:

    co is now is 1548512708.2026224 now is 1548512708.202648

    可以看到,直接運行協(xié)程函數(shù) whattime()得到的co是一個協(xié)程對象,因為協(xié)程對象是可等待的,所以通過 await 得到真正的當(dāng)前時間。now2是直接await 協(xié)程函數(shù),也得到了當(dāng)前時間的返回值。

    (2)任務(wù)

    前面我們講到,任務(wù)是用來調(diào)度協(xié)程的,以便并發(fā)執(zhí)行協(xié)程。當(dāng)一個協(xié)程通過?asyncio.create_task()?被打包為一個 任務(wù),該協(xié)程將自動加入調(diào)度隊列中,但是還未執(zhí)行

    create_task() 的基本使用前面例子已經(jīng)講過。它返回的 task 通過 await 來等待其運行完。如果,我們不等待,會發(fā)生什么?“準備立即運行”又該如何理解呢?先看看下面這個例子:

    運行這段代碼的情況是這樣的:首先,1秒鐘后打印一行,這是第13,14行代碼運行的結(jié)果:

    calling:0, now is 09:15:15

    接著,停頓1秒后,連續(xù)打印4行:

    calling:1, now is 09:15:16 calling:2, now is 09:15:16 calling:3, now is 09:15:16 calling:4, now is 09:15:16

    從這個結(jié)果看,asyncio.create_task()產(chǎn)生的4個任務(wù),我們并沒有?await,它們也執(zhí)行了。關(guān)鍵在于第18行的?await,如果把這一行去掉或是 sleep 的時間小于1秒(比whattime()里面的sleep時間少即可),就會只看到第一行的輸出結(jié)果而看不到后面四行的輸出。這是因為,main() 不 sleep 或 sleep 少于1秒鐘,main() 就在 whattime() 還未來得及打印結(jié)果(因為,它要sleep 1秒)就退出了,從而整個程序也退出了,就沒有 whattime() 的輸出結(jié)果。

    再來理解一下?“準備立即執(zhí)行”?這個說法。它的意思就是,create_task() 只是打包了協(xié)程并加入調(diào)度隊列還未執(zhí)行,并準備立即執(zhí)行,什么時候執(zhí)行呢?在 “主協(xié)程”(調(diào)用create_task()的協(xié)程)掛起的時候,這里的“掛起”有兩個方式:

    • 一是,通過 await task 來執(zhí)行這個任務(wù);
    • 另一個是,主協(xié)程通過 await sleep 掛起,事件循環(huán)就去執(zhí)行task了。

    我們知道,asyncio 是通過事件循環(huán)實現(xiàn)異步的。在主協(xié)程 main()里面,沒有遇到 await 時,事件就是執(zhí)行 main() 函數(shù),遇到 await 時,事件循環(huán)就去執(zhí)行別的協(xié)程,即 create_task() 生成的 whattime()的4個任務(wù),這些任務(wù)一開始就是 await sleep 1秒。這時候,主協(xié)程和4個任務(wù)協(xié)程都掛起了,CPU空閑,事件循環(huán)等待協(xié)程的消息。

    如果 main() 協(xié)程只 sleep了 0.1秒,它就先醒了,給事件循環(huán)發(fā)消息,事件循環(huán)就來繼續(xù)執(zhí)行 main() 協(xié)程,而 main() 后面已經(jīng)沒有代碼,就退出該協(xié)程,退出它也就意味著整個程序退出,4個任務(wù)就沒機會打印結(jié)果;

    如果 main()協(xié)程sleep時間多余1秒,那么4個任務(wù)先喚醒,就會得到全部的打印結(jié)果;

    如果main()的18行sleep等于1秒時,和4個任務(wù)的sleep時間相同,也會得到全部打印結(jié)果。這是為什么呢?

    我猜想是這樣的:4個任務(wù)生成在前,第18行的sleep在后,事件循環(huán)的消息響應(yīng)可能有個先進先出的順序。后面深入asyncio的代碼專門研究一下這個猜想正確與否。

    示例:

    # -*- coding: utf-8 -*-""" @File : aio_test.py @Author : XXX @Time : 2020/12/25 23:54 """import asyncio import datetimeasync def hi(msg=None, sec=None):print(f'enter hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(sec)print(f'leave hi(), {msg} @{datetime.datetime.now().replace(microsecond=0)}')return secasync def main_1():print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)for tsk in tasks:ret_val = await tskprint(f'ret_val:{ret_val}')print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_2():# ***** 注意:main_2 中睡眠了2秒,導(dǎo)致睡眠時間大于2秒的協(xié)程沒有執(zhí)行完成 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))tasks.append(tsk)await asyncio.sleep(2)print(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')async def main_3():# ***** 注意:main_3方法并沒有實現(xiàn)并發(fā)執(zhí)行,只是順序執(zhí)行 *****print(f'main() begin at {datetime.datetime.now().replace(microsecond=0)}')tasks = []for i in range(1, 5):tsk = asyncio.create_task(hi(i, i))await tskprint(f'main() end at {datetime.datetime.now().replace(microsecond=0)}')print('*' * 50) asyncio.run(main_1()) print('*' * 50) asyncio.run(main_2()) print('*' * 50) asyncio.run(main_3()) print('*' * 50)

    (3)Future

    它是一個低層級的可等待對象,表示一個異步操作的最終結(jié)果。目前,我們寫應(yīng)用程序還用不到它,暫不學(xué)習(xí)。

    asyncio異步IO協(xié)程總結(jié)

    協(xié)程就是我們異步操作的片段。通常,寫程序都會把全部功能分成很多不同功能的函數(shù),目的是為了結(jié)構(gòu)清晰;進一步,把那些涉及耗費時間的IO操作(讀寫文件、數(shù)據(jù)庫、網(wǎng)絡(luò))的函數(shù)通過 async def 異步化,就是異步編程。

    那些異步函數(shù)(協(xié)程函數(shù))都是通過消息機制被事件循環(huán)管理調(diào)度著,整個程序的執(zhí)行是單線程的,但是某個協(xié)程A進行IO時,事件循環(huán)就去執(zhí)行其它協(xié)程非IO的代碼。當(dāng)事件循環(huán)收到協(xié)程A結(jié)束IO的消息時,就又回來執(zhí)行協(xié)程A,這樣事件循環(huán)不斷在協(xié)程之間轉(zhuǎn)換,充分利用了IO的閑置時間,從而并發(fā)的進行多個IO操作,這就是異步IO。

    寫異步IO程序時記住一個準則:需要IO的地方異步。其它地方即使用了協(xié)程函數(shù)也是沒用的。

    不 使用 asyncio 的 消息循環(huán) 讓協(xié)程運行

    先看下 不使用? asyncio 的消息循環(huán) 怎么 調(diào)用 協(xié)程,讓協(xié)程 運行:

    async def func_1():print("func_1 start")print("func_1 end")async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)try:print('f_1.send')f_1.send(None) except StopIteration as e:# 這里也是需要去捕獲StopIteration方法passtry:print('f_2.send')f_2.send(None) except StopIteration as e:pass

    運行結(jié)果:

    <coroutine object func_1 at 0x0000020121A07C40> <coroutine object func_2 at 0x0000020121B703C0> f_1.send func_1 start func_1 end f_2.send func_2 start func_2 a func_2 b func_2 c func_2 end

    示例代碼2:

    async def test(x):return x * 2print(test(100))try:# 既然是協(xié)程,我們像之前yield協(xié)程那樣test(100).send(None) except BaseException as e:print(type(e))ret_val = e.valueprint(ret_val)

    示例代碼3:

    def simple_coroutine():print('-> start')x = yieldprint('-> recived', x)sc = simple_coroutine()next(sc)try:sc.send('zhexiao') except BaseException as e:print(e)

    對上述例子的分析:yield 的右邊沒有表達式,所以這里默認產(chǎn)出的值是None。剛開始先調(diào)用了next(...)是因為這個時候生成器還沒有啟動,沒有停在yield那里,這個時候也是無法通過send發(fā)送數(shù)據(jù)。所以當(dāng)我們通過 next(...)激活協(xié)程后 ,程序就會運行到x = yield,這里有個問題我們需要注意, x = yield這個表達式的計算過程是先計算等號右邊的內(nèi)容,然后在進行賦值,所以當(dāng)激活生成器后,程序會停在yield這里,但并沒有給x賦值。當(dāng)我們調(diào)用 send 方法后 yield 會收到這個值并賦值給 x,而當(dāng)程序運行到協(xié)程定義體的末尾時和用生成器的時候一樣會拋出StopIteration異常

    如果協(xié)程沒有通過 next(...) 激活(同樣我們可以通過send(None)的方式激活),但是我們直接send,會提示如下錯誤:

    最先調(diào)用 next(sc) 函數(shù)這一步通常稱為“預(yù)激”(prime)協(xié)程 (即,讓協(xié)程執(zhí)行到第一個 yield 表達式,準備好作為活躍的協(xié)程使用)。

    協(xié)程在運行過程中有四個狀態(tài):

  • GEN_CREATE: 等待開始執(zhí)行

  • GEN_RUNNING: 解釋器正在執(zhí)行,這個狀態(tài)一般看不到

  • GEN_SUSPENDED: 在yield表達式處暫停

  • GEN_CLOSED: 執(zhí)行結(jié)束

  • 通過下面例子來查看協(xié)程的狀態(tài):

    示例代碼4:(使用協(xié)程計算移動平均值)

    def averager():total = 0.0count = 0avg = Nonewhile True:num = yield avgtotal += numcount += 1avg = total / count# run ag = averager() # 預(yù)激協(xié)程 print(next(ag)) # Noneprint(ag.send(10)) # 10 print(ag.send(20)) # 15

    這里是一個死循環(huán),只要不停 send 值 給 協(xié)程,可以一直計算下去。

    解釋:

    • 1. 調(diào)用 next(ag) 函數(shù)后,協(xié)程會向前執(zhí)行到 yield 表達式,產(chǎn)出 average 變量的初始值 None。
    • 2. 此時,協(xié)程在 yield 表達式處暫停。
    • 3. 使用 send() 激活協(xié)程,把發(fā)送的值賦給 num,并計算出 avg 的值。
    • 4. 使用 print 打印出 yield 返回的數(shù)據(jù)。

    單步 調(diào)試 上面程序。

    使用 asyncio 的 消息循環(huán) 讓協(xié)程運行

    使用 asyncio 異步 IO 調(diào)用 協(xié)程

    示例代碼 1:

    import asyncioasync def func_1():print("func_1 start")print("func_1 end")# await asyncio.sleep(1)async def func_2():print("func_2 start")print("func_2 a")print("func_2 b")print("func_2 c")print("func_2 end")# await asyncio.sleep(1)f_1 = func_1() print(f_1)f_2 = func_2() print(f_2)# 獲取 EventLoop: loop = asyncio.get_event_loop() tasks = [func_1(), func_2()]# 執(zhí)行 coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    示例代碼 2:

    import asyncio import timestart = time.time()def tic():return 'at %1.1f seconds' % (time.time() - start)async def gr1():# Busy waits for a second, but we don't want to stick around...print('gr1 started work: {}'.format(tic()))# 暫停兩秒,但不阻塞時間循環(huán),下同await asyncio.sleep(2)print('gr1 ended work: {}'.format(tic()))async def gr2():# Busy waits for a second, but we don't want to stick around...print('gr2 started work: {}'.format(tic()))await asyncio.sleep(2)print('gr2 Ended work: {}'.format(tic()))async def gr3():print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))await asyncio.sleep(1)print("Done!")# 事件循環(huán) ioloop = asyncio.get_event_loop()# tasks中也可以使用 asyncio.ensure_future(gr1()).. tasks = [ioloop.create_task(gr1()),ioloop.create_task(gr2()),ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close()""" 結(jié)果: gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Let's do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr2 Ended work: at 2.0 seconds gr1 ended work: at 2.0 seconds """

    多個?coroutine?可以封裝成一組 Task 然后并發(fā)執(zhí)行。

    • asyncio.wait(...) 協(xié)程的參數(shù)是一個由 future 或 協(xié)程 構(gòu)成的可迭代對象;wait 會分別
      把各個協(xié)程包裝進一個 Task 對象。最終的結(jié)果是,wait 處理的所有對象都通過某種方式變成 Future 類的實例。wait 是協(xié)程函數(shù),因此返回的是一個協(xié)程或生成器對象。

    • ioloop.run_until_complete 方法的參數(shù)是一個 future 或 協(xié)程。如果是協(xié)程,run_until_complete方法與 wait 函數(shù)一樣,把協(xié)程包裝進一個 Task 對象中。

    • 在 asyncio 包中,future和協(xié)程關(guān)系緊密,因為可以使用 yield from 從 asyncio.Future 對象中產(chǎn)出結(jié)果。這意味著,如果 foo 是協(xié)程函數(shù)(調(diào)用后返回協(xié)程對象),抑或是返回Future 或 Task 實例的普通函數(shù),那么可以這樣寫:res = yield from foo()。這是 asyncio 包的 API 中很多地方可以互換協(xié)程與期物的原因之一。 例如上面的例子中 tasks 可以改寫成協(xié)程列表:tasks = [gr1(), gr(2), gr(3)]

    詳細的各個類說明,類方法,傳參,以及方法返回的是什么類型都可以在官方文檔上仔細研讀,多讀幾遍,方有體會。

    示例代碼 3:

    import asyncio import time import aiohttp import async_timeoutmsg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html" headers = {'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6' }urls = [msg.format(i) for i in range(5400, 5500)]async def fetch(session, url):with async_timeout.timeout(10):async with session.get(url) as response:return response.statusasync def main(url):async with aiohttp.ClientSession() as session:status = await fetch(session, url)return statusif __name__ == '__main__':start = time.time()loop = asyncio.get_event_loop()tasks = [main(url) for url in urls]# 返回一個列表,內(nèi)容為各個tasks的返回值status_list = loop.run_until_complete(asyncio.gather(*tasks))print(len([status for status in status_list if status == 200]))end = time.time()print("cost time:", end - start)

    示例代碼 4:

    用?asyncio?實現(xiàn)??Hello world?代碼如下:

    import asyncio@asyncio.coroutine def hello():print("Hello world!")# 異步調(diào)用 asyncio.sleep(1):r = yield from asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執(zhí)行 coroutine loop.run_until_complete(hello()) loop.close()

    或者直接使用新語法?asyncawait

    import asyncioasync def hello():print("Hello world!")# 異步調(diào)用 asyncio.sleep(1):r = await asyncio.sleep(1)print("Hello again!")# 獲取 EventLoop: loop = asyncio.get_event_loop()# 執(zhí)行 coroutine loop.run_until_complete(hello()) loop.close()

    @asyncio.coroutine?把一個 generator 標(biāo)記為 coroutine類型,然后,我們就把這個?coroutine?扔到?EventLoop?中執(zhí)行。

    hello()?會首先打印出?Hello world!,然后,yield from?語法可以讓我們方便地調(diào)用另一個?generator。由于asyncio.sleep()?也是一個?coroutine,所以線程不會等待?asyncio.sleep(),而是直接中斷并執(zhí)行下一個消息循環(huán)。當(dāng)asyncio.sleep()?返回時,線程就可以從?yield from?拿到返回值(此處是None),然后接著執(zhí)行下一行語句。

    把?asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執(zhí)行?EventLoop?中其他可以執(zhí)行的coroutine了,因此可以實現(xiàn)并發(fā)執(zhí)行。

    我們用 Task 封裝兩個?coroutine?試試:

    import threading import asyncioasync def hello():print('1 : Hello world! (%s)' % threading.currentThread())await asyncio.sleep(5)print('2 : Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    觀察執(zhí)行過程:

    1 : Hello world! (<_MainThread(MainThread, started 12200)>) 1 : Hello world! (<_MainThread(MainThread, started 12200)>) ( 暫停約 5 秒 ) 2 : Hello again! (<_MainThread(MainThread, started 12200)>) 2 : Hello again! (<_MainThread(MainThread, started 12200)>)

    由打印的當(dāng)前線程名稱可以看出,兩個?coroutine?是由同一個線程并發(fā)執(zhí)行的。

    如果把?asyncio.sleep()?換成真正的IO操作,則多個?coroutine?就可以由一個線程并發(fā)執(zhí)行。

    我們用?asyncio?的異步網(wǎng)絡(luò)連接來獲取 sina、sohu 和 163 的網(wǎng)站首頁:

    import asyncioasync def wget(host):print('wget %s...' % host)connect = asyncio.open_connection(host, 80)reader, writer = await connectheader = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % hostwriter.write(header.encode('utf-8'))await writer.drain()while True:line = await reader.readline()if line == b'\r\n':breakprint('%s header > %s' % (host, line.decode('utf-8').rstrip()))# Ignore the body, close the socketwriter.close()loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()

    執(zhí)行結(jié)果如下:

    wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0

    可見 3 個連接 由一個線程通過?coroutine?并發(fā)完成。

    參考源碼:

    async_hello.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_hello.py
    async_wget.py:https://github.com/michaelliao/learn-python3/blob/master/samples/async/async_wget.py

    示例代碼 5: ( 協(xié)程 的 返回值

    一個協(xié)程里可以啟動另外一個協(xié)程,并等待它完成返回結(jié)果,采用 await 關(guān)鍵字

    import asyncioasync def outer():print('in outer')print('waiting for result1')result1 = await phase1()print('waiting for result2')result2 = await phase2(result1)return (result1, result2)async def phase1():print('in phase1')return 'result1'async def phase2(arg):print('in phase2')return 'result2 derived from {}'.format(arg)event_loop = asyncio.get_event_loop() try:return_value = event_loop.run_until_complete(outer())print('return value: {!r}'.format(return_value)) finally:event_loop.close()

    運行結(jié)果:

    in outer waiting for result1 in phase1 waiting for result2 in phase2 return value: ('result1', 'result2 derived from result1')

    前面都是關(guān)于 asyncio 的例子,那么除了asyncio,就沒有其他協(xié)程庫了嗎?asyncio 作為 python 的標(biāo)準庫,自然受到很多青睞,但它有時候還是顯得太重量了,尤其是提供了許多復(fù)雜的輪子和協(xié)議,不便于使用。

    你可以理解為,asyncio 是使用 async/await 語法開發(fā)的 協(xié)程庫,而不是有 asyncio 才能用 async/await,
    除了 asyncio 之外,curio 和 trio 是更加輕量級的替代物,而且也更容易使用。

    curio 的作者是 David Beazley,下面是使用 curio 創(chuàng)建 tcp server 的例子,據(jù)說這是 dabeaz 理想中的一個異步服務(wù)器的樣子:

    from curio import run, spawn from curio.socket import *async def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)print('Server listening at', address)async with sock:while True:client, addr = await sock.accept()await spawn(echo_client, client, addr)async def echo_client(client, addr):print('Connection from', addr)async with client:while True:data = await client.recv(100000)if not data:breakawait client.sendall(data)print('Connection closed')if __name__ == '__main__':run(echo_server, ('',25000))

    無論是 asyncio 還是 curio,或者是其他異步協(xié)程庫,在背后往往都會借助于 IO的事件循環(huán)來實現(xiàn)異步,下面用幾十行代碼來展示一個簡陋的基于事件驅(qū)動的echo服務(wù)器:

    from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from selectors import DefaultSelector, EVENT_READselector = DefaultSelector() pool = {}def request(client_socket, addr):client_socket, addr = client_socket, addrdef handle_request(key, mask):data = client_socket.recv(100000)if not data:client_socket.close()selector.unregister(client_socket)del pool[addr]else:client_socket.sendall(data)return handle_requestdef recv_client(key, mask):sock = key.fileobjclient_socket, addr = sock.accept()req = request(client_socket, addr)pool[addr] = reqselector.register(client_socket, EVENT_READ, req)def echo_server(address):sock = socket(AF_INET, SOCK_STREAM)sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)sock.bind(address)sock.listen(5)selector.register(sock, EVENT_READ, recv_client)try:while True:events = selector.select()for key, mask in events:callback = key.datacallback(key, mask)except KeyboardInterrupt:sock.close()if __name__ == '__main__':echo_server(('',25000))

    驗證一下:

    # terminal 1 $ nc localhost 25000 hello world hello world# terminal 2 $ nc localhost 25000 hello world hello world

    現(xiàn)在知道:

    • 完成 異步的代碼 不一定要用 async/await ,使用了 async/await 的代碼也不一定能做到異步,
    • async/await 是協(xié)程的語法糖,使協(xié)程之間的調(diào)用變得更加清晰,
    • 使用 async 修飾的函數(shù)調(diào)用時會返回一個協(xié)程對象,
    • await 只能放在 async 修飾的函數(shù)里面使用,await 后面必須要跟著一個 協(xié)程對象Awaitable
    • await 的目的是等待協(xié)程控制流的返回而實現(xiàn)暫停并掛起函數(shù)的操作是yield。

    async/await 以及 協(xié)程 是Python未來實現(xiàn)異步編程的趨勢,我們將會在更多的地方看到他們的身影,例如協(xié)程庫的 curio 和 trio,web 框架的 sanic,數(shù)據(jù)庫驅(qū)動的 asyncpg 等等。在Python 3主導(dǎo)的今天,作為開發(fā)者,應(yīng)該及時擁抱和適應(yīng)新的變化,而基于async/await的協(xié)程憑借良好的可讀性和易用性日漸登上舞臺,看到這里,你還不趕緊上車?

    Python 模塊 asyncio – 協(xié)程之間的同步

    Python 模塊 asyncio – 協(xié)程之間的同步:https://www.quxihuan.com/posts/python-module-asyncio-synchronization/

    await yield from

    Python3.3 的 yield from 語法可以把生成器的操作委托給另一個生成器,生成器的調(diào)用方可以直接與子生成器進行通信:

    def sub_gen():yield 1yield 2yield 3def gen():return (yield from sub_gen())def main():for val in gen():print(val) # 1 # 2 # 3

    利用這一特性,使用 yield from 能夠編寫出類似協(xié)程效果的函數(shù)調(diào)用,在3.5之前,asyncio 正是使用@asyncio.coroutine 和 yield from 語法來創(chuàng)建協(xié)程:https://docs.python.org/3.4/library/asyncio-task.html

    @asyncio.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + y@asyncio.coroutine def print_sum(x, y):result = yield from compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    然而,用 yield from 容易在表示協(xié)程和生成器中混淆,沒有良好的語義性,所以在 Python 3.5 推出了更新的 async/await 表達式來作為協(xié)程的語法。

    因此類似以下的調(diào)用是等價的:

    async with lock:...with (yield from lock):... ###################### def main():return (yield from coro())def main():return (await coro())

    那么,怎么把生成器包裝為一個協(xié)程對象呢?這時候可以用到 types 包中的 coroutine 裝飾器(如果使用asyncio做驅(qū)動的話,那么也可以使用 asyncio 的 coroutine 裝飾器),@types.coroutine裝飾器會將一個生成器函數(shù)包裝為協(xié)程對象:

    import asyncio import types@types.coroutine def compute(x, y):print("Compute %s + %s ..." % (x, y))yield from asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    盡管兩個函數(shù)分別使用了新舊語法,但他們都是協(xié)程對象,也分別稱作?native coroutine?以及?generator-based coroutine,因此不用擔(dān)心語法問題。

    下面觀察一個 asyncio 中 Future 的例子:

    import asynciofuture = asyncio.Future()async def test_1():await asyncio.sleep(1)future.set_result('data')async def test_2():print(await future)loop = asyncio.get_event_loop() tasks_list = [test_1(), test_2()] loop.run_until_complete(asyncio.wait(tasks_list)) loop.close()

    兩個協(xié)程在事件循環(huán)中,協(xié)程?test_1 在執(zhí)行第一句后掛起自身切到 asyncio.sleep,而協(xié)程 test_2 一直等待 future 的結(jié)果,讓出事件循環(huán),計時器結(jié)束后 test_1 執(zhí)行第二句并設(shè)置了 future 的值,被掛起的 test_2 恢復(fù)執(zhí)行,打印出 future 的結(jié)果 'data' 。

    future 可以被 await 證明了 future 對象是一個 Awaitable,進入 Future 類的源碼可以看到有一段代碼顯示了 future 實現(xiàn)了__await__ 協(xié)議:

    class Future:...def __iter__(self):if not self.done():self._asyncio_future_blocking = Trueyield self # This tells Task to wait for completion.assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.if compat.PY35:__await__ = __iter__ # make compatible with 'await' expression

    當(dāng)執(zhí)行?await future?這行代碼時,future中的這段代碼就會被執(zhí)行,首先future檢查它自身是否已經(jīng)完成,如果沒有完成,掛起自身,告知當(dāng)前的 Task(任務(wù))等待 future 完成。

    當(dāng) future 執(zhí)行 set_result 方法時,會觸發(fā)以下的代碼,設(shè)置結(jié)果,標(biāo)記 future 已經(jīng)完成:

    def set_result(self, result):...if self._state != _PENDING:raise InvalidStateError('{}: {!r}'.format(self._state, self))self._result = resultself._state = _FINISHEDself._schedule_callbacks()

    最后 future 會調(diào)度自身的回調(diào)函數(shù),觸發(fā) Task._step() 告知 Task 驅(qū)動 future 從之前掛起的點恢復(fù)執(zhí)行,不難看出,future 會執(zhí)行下面的代碼:

    class Future:...def __iter__(self):...assert self.done(), "yield from wasn't used with future"return self.result() # May raise too.

    最終返回結(jié)果給調(diào)用方。

    Yield from

    調(diào)用協(xié)程 的方式有有很多,yield from 就是其中的一種。這種方式在 Python3.3 中被引入,在 Python3.5 中以 async/await 的形式進行了優(yōu)化。yield from 表達式的使用方式如下:

    import asyncio@asyncio.coroutine def get_json(client, url): file_content = yield from load_file('/Users/scott/data.txt')

    正如所看到的,yield from 被使用在用 @asyncio.coroutine 裝飾的函數(shù)內(nèi),如果想把 yield from 在這個函數(shù)外使用,將會拋出如下語法錯誤:

    File "main.py", line 1file_content = yield from load_file('/Users/scott/data.txt')^ SyntaxError: 'yield' outside function

    為了避免語法錯誤,yield from 必須在調(diào)用函數(shù)的內(nèi)部使用(這個調(diào)用函數(shù)通常被裝飾為協(xié)程)。

    Async / await

    較新的語法是使用 async/await 關(guān)鍵字。 async 從 Python3.5 開始被引進,跟 @asyncio.coroutine 裝飾器一樣,用來聲明一個函數(shù)是一個協(xié)程。只要把它放在函數(shù)定義之前,就可以應(yīng)用到函數(shù)上,使用方式如下:

    async def ping_server(ip):# ping code here...

    實際調(diào)用這個函數(shù)時,使用 await 而不用 yield from ,當(dāng)然,使用方式依然差不多:

    async def ping_local(ip):return await ping_server('192.168.1.1')

    再強調(diào)一遍,跟 yield from 一樣,不能在函數(shù)外部使用 await ,否則會拋出語法錯誤。 (譯者注: async 用來聲明一個函數(shù)是協(xié)程,然后使用 await調(diào)用這個協(xié)程, await 必須在函數(shù)內(nèi)部,這個函數(shù)通常也被聲明為另一個協(xié)程)

    Python3.5 對這兩種調(diào)用協(xié)程的方法都提供了支持,但是推薦 async/await 作為首選。

    Event Loop

    如果你還不知道如何開始和操作一個 Eventloop ,那么上面有關(guān)協(xié)程所說的都起不了多大作用。 Eventloop 在執(zhí)行異步函數(shù)時非常重要,重要到只要執(zhí)行協(xié)程,基本上就得利用 Eventloop 。

    Eventloop 提供了相當(dāng)多的功能:

    • 注冊,執(zhí)行 和 取消 延遲調(diào)用(異步函數(shù))
    • 創(chuàng)建 客戶端 與 服務(wù)端 傳輸用于通信
    • 創(chuàng)建 子程序 和 通道 跟 其他的程序 進行通信
    • 指定 函數(shù) 的 調(diào)用 到 線程池

    Eventloop 有相當(dāng)多的配置和類型可供使用,但大部分程序只需要如下方式預(yù)定函數(shù)即可:

    import asyncioasync def speak_async(): print('OMG asynchronicity!')loop = asyncio.get_event_loop() loop.run_until_complete(speak_async()) loop.close()

    有意思的是代碼中的最后三行,首先獲取默認的 Eventloop ( asyncio.get_event_loop() ),然后預(yù)定和運行異步任務(wù),并在完成后結(jié)束循環(huán)。

    loop.run_until_complete() 函數(shù)實際上是阻塞性的,也就是在所有異步方法完成之前,它是不會返回的。但因為我們只在一個線程中運行這段代碼,它沒法再進一步擴展,即使循環(huán)仍在運行。

    可能你現(xiàn)在還沒覺得這有多大的用處,因為我們通過調(diào)用其他 IO 來結(jié)束 Eventloop 中的阻塞(譯者注:也就是在阻塞時進行其他 IO ),但是想象一下,如果在網(wǎng)頁服務(wù)器上,把整個程序都封裝在異步函數(shù)內(nèi),到時就可以同時運行多個異步請求了。

    也可以將 Eventloop 的線程中斷,利用它去處理所有耗時較長的 IO 請求,而主線程處理程序邏輯或者用戶界面。

    一個案例

    讓我們實際操作一個稍大的案例。下面這段代碼就是一個非常漂亮的異步程序,它先從 Reddit 抓取 JSON 數(shù)據(jù),解析它,然后打印出當(dāng)天來自 /r/python,/r/programming 和 /r/compsci 的置頂帖。

    所示的第一個方法 get_json() ,由 get_reddit_top() 調(diào)用,然后只創(chuàng)建一個 GET 請求到適當(dāng)?shù)木W(wǎng)址。當(dāng)這個方法和 await 一起調(diào)用后, Eventloop 便能夠繼續(xù)為其他的協(xié)程服務(wù),同時等待 HTTP 響應(yīng)達到。一旦響應(yīng)完成, JSON 數(shù)據(jù)就返回到 get_reddit_top() ,得到解析并打印出來。

    import signal import sys import asyncio import aiohttp import jsonloop = asyncio.get_event_loop() client = aiohttp.ClientSession(loop=loop)async def get_json(client, url):async with client.get(url) as response:assert response.status == 200return await response.read()async def get_reddit_top(subreddit, client):data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=5')j = json.loads(data1.decode('utf-8'))for i in j['data']['children']:score = i['data']['score']title = i['data']['title']link = i['data']['url']print(str(score) + ': ' + title + ' (' + link + ')')print('DONE:', subreddit + '\n')def signal_handler(signal, frame):loop.stop()client.close()sys.exit(0)signal.signal(signal.SIGINT, signal_handler)asyncio.ensure_future(get_reddit_top('python', client)) asyncio.ensure_future(get_reddit_top('programming', client)) asyncio.ensure_future(get_reddit_top('compsci', client)) loop.run_forever()

    注意,如果多次運行這段代碼,打印出來的 subreddit 數(shù)據(jù)在順序上會有些許變化。這是因為每當(dāng)我們調(diào)用一次代碼都會釋放對線程的控制,容許線程去處理另一個 HTTP 調(diào)用。這將導(dǎo)致誰先獲得響應(yīng),誰就先打印出來。

    結(jié)論

    即使 Python 內(nèi)置的異步操作沒有 Javascript 那么順暢,但這并不意味著就不能用它來把應(yīng)用變得更有趣、更有效率。只要花半個小時的時間去了解它的來龍去脈,你就會感覺把異步操作應(yīng)用到你的程序中將會是多美好的一件事。

    aiohttp

    asyncio?可以實現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端,發(fā)揮的威力不大。如果把asyncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用 單線程 +?coroutine?實現(xiàn)多用戶的高并發(fā)支持。

    asyncio?實現(xiàn)了TCP、UDP、SSL等協(xié)議aiohttp?則是基于?asyncio?實現(xiàn)的 HTTP 框架。

    我們先安裝?aiohttp:pip install aiohttp

    然后編寫一個HTTP服務(wù)器,分別處理以下URL:

    • / - 首頁返回b'

      Index

      ';
    • /hello/{name} - 根據(jù)URL參數(shù)返回文本hello, %s!。

    代碼如下:

    import asynciofrom aiohttp import webasync def index(request):await asyncio.sleep(0.5)return web.Response(body=b'<h1>Index</h1>')async def hello(request):await asyncio.sleep(0.5)text = '<h1>hello, %s!</h1>' % request.match_info['name']return web.Response(body=text.encode('utf-8'))async def init(loop):app = web.Application(loop=loop)app.router.add_route('GET', '/', index)app.router.add_route('GET', '/hello/{name}', hello)srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)print('Server started at http://127.0.0.1:8000...')return srvloop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()

    注意?aiohttp的初始化函數(shù)init()也是一個coroutine,loop.create_server()則利用asyncio創(chuàng)建TCP服務(wù)。

    參考源碼:aio_web.py :https://github.com/michaelliao/learn-python3/blob/master/samples/async/aio_web.py

    ?一切從爬蟲開始

    【續(xù)篇】Python 協(xié)程之從放棄到死亡再到重生:https://www.secpulse.com/archives/64912.html

    從一個簡單的爬蟲開始,這個爬蟲很簡單,訪問指定的URL,并且獲取內(nèi)容并計算長度,這里我們給定5個URL。第一版的代碼十分簡單,順序獲取每個URL的內(nèi)容,當(dāng)?shù)谝粋€請求完成、計算完長度后,再開始第二個請求。

    spider_normal.py

    # filename: spider_normal.py import time import requeststargets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def spider():results = {}for url in targets:r = requests.get(url)length = len(r.content)results[url] = lengthreturn resultsdef show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def main():start_time = time.time()results = spider()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(results)if __name__ == '__main__':main()

    我們多運行幾次看看結(jié)果。

    大約需要花費14-16秒不等,這段代碼并沒有什么好看的,我們把關(guān)注點放在后面的代碼上。現(xiàn)在我們使用多線程來改寫這段代碼。

    # filename: spider_thread.py import time import threading import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthdef main():ts = []start_time = time.time()for url in targets:t = threading.Thread(target=spider, args=(url,))ts.append(t)t.start()for t in ts:t.join()print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()

    我們多運行幾次看看結(jié)果。

    從這兩段代碼中,已經(jīng)可以看出并發(fā)對于處理任務(wù)的好處了,但是使用原生的threading模塊還是略顯麻煩,Python已經(jīng)給我們內(nèi)置了一個處理并發(fā)任務(wù)的庫concurrent,我們借用這個庫修改一下我們的代碼,之所以修改成這個庫的原因還有一個,那就是引出我們后面會談到的Future。

    # filename: spider_thread.py import time from concurrent import futures import requestsfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))def spider(url):r = requests.get(url)length = len(r.content)final_results[url] = lengthreturn Truedef main():start_time = time.time()with futures.ThreadPoolExecutor(10) as executor:res = executor.map(spider, targets)print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)if __name__ == '__main__':main()

    執(zhí)行一下,會發(fā)現(xiàn)耗時與上一個版本一樣,穩(wěn)定在10s左右。

    可以看到我們調(diào)用了concurrent庫中的futures,那么到底什么是futures?簡單的講,這個對象代表一種異步的操作,可以表示為一個需要延時進行的操作,當(dāng)然這個操作的狀態(tài)可能已經(jīng)完成,也有可能尚未完成,如果你寫JS的話,可以理解為是類似Promise的對象。在Python中,標(biāo)準庫中其實有兩個Future類,一個是concurrent.futures.Future,另外一個是asyncio.Future,這兩個類很類似,不完全相同,這些實現(xiàn)差異以及API的差異我們先按下暫且不談,有興趣的同學(xué)可以參考下相關(guān)的文檔。Future是我們后面討論的asyncio異步編程的基礎(chǔ),因此這里多說兩句。

    Future代表的是一個未來的某一個時刻一定會執(zhí)行的操作(可能已經(jīng)執(zhí)行完成了,但是無論如何他一定有一個確切的運行時間),一般情況下用戶無需手動從零開始創(chuàng)建一個Future,而是應(yīng)當(dāng)借助框架中的API生成。比如調(diào)用concurrent.futures.Executor.submit()時,框架會為"異步操作"進行一個排期,來決定何時運行這個操作,這時候就會生成一個Future對象。

    現(xiàn)在,我們來看看如何使用asyncio進行異步編程,與多線程編程不同的是,多個協(xié)程總是運行在同一個線程中的,一旦其中的一個協(xié)程發(fā)生阻塞行為,那么整個線程都被阻塞,進而所有的協(xié)程都無法繼續(xù)運行。asyncio.Future和asyncio.Task都可以看做是一個異步操作,后者是前者的子類,BaseEventLoop.create_task()會接收一個協(xié)程作為參數(shù),并且對這個任務(wù)的運行時間進行排期,返回一個asyncio.Task類的實例,這個對象也是對于協(xié)程的一層包裝。如果想獲取asyncio.Future的執(zhí)行結(jié)果,應(yīng)當(dāng)使用yield from來獲取,這樣控制權(quán)會被自動交還給EventLoop,我們無需處理"等待Future或Task運行完成"這個操作。于是就有了一個很愉悅的編程方式,如果一個函數(shù)A是協(xié)程、或返回Task或Future的實例的函數(shù),就可以通過result = yield from A()來獲取返回值。下面我們就使用asyncio和aiohttp來改寫我們的爬蟲。

    import asyncio import timeimport aiohttpfinal_results = {}targets = ["https://lightless.me/archives/python-coroutine-from-start-to-boom.html","https://github.com/aio-libs","https://www.python.org/dev/peps/pep-0380/","https://www.baidu.com/","https://www.zhihu.com/", ]def show_results(results):for url, length in results.items():print("Length: {:^7d} URL: {}".format(length, url))async def get_content(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:content = await resp.read()return len(content)async def spider(url):length = await get_content(url)final_results[url] = lengthreturn Truedef main():loop = asyncio.get_event_loop()cor = [spider(url) for url in targets]start_time = time.time()result = loop.run_until_complete(asyncio.gather(*cor))print("Use time: {:.2f}s".format(time.time() - start_time))show_results(final_results)print("loop result: ", result)if __name__ == '__main__':main()

    結(jié)果非常驚人

    這里可能有同學(xué)會問為什么沒看到y(tǒng)ield from以及@asyncio.coroutine,那是因為在Python3.5以后,增加了async def和awiat語法,等效于@asyncio.coroutine和yield from,詳情可以參考上一篇文章。在main()函數(shù)中,我們先獲取一個可用的事件循環(huán),緊接著將生成好的協(xié)程任務(wù)添加到這個循環(huán)中,并且等待執(zhí)行完成。在每個spider()中,執(zhí)行到await的時候,會交出控制權(quán)(如果不明白請向前看一下委托生成器的部分),并且切到其他的協(xié)程繼續(xù)運行,等到get_content()執(zhí)行完成返回后,那么會恢復(fù)spider()協(xié)程的執(zhí)行。get_content()函數(shù)中只是通過async with調(diào)用aiohttp庫的最基本方法獲取頁面內(nèi)容,并且返回了長度,僅此而已。

    在修改為協(xié)程版本后,爬蟲性能有了巨大的提升,從最初了15s,到10s,再到現(xiàn)在的2s左右,簡直是質(zhì)的飛躍。這只是一個簡單的爬蟲程序,相比多線程,性能提高了近5倍,如果是其他更加復(fù)雜的大型程序,也許性能提升會更多。asyncio這套異步編程框架,通過簡單的事件循環(huán)以及協(xié)程機制,在需要等待的情況下主動交出控制權(quán),切換到其他協(xié)程進行運行。到這里就會有人問,為什么要將requests替換為aiohttp,能不能用requests?答案是不能,還是我們前面提到過的,在協(xié)程中,一切操作都要避免阻塞,禁止所有的阻塞型調(diào)用,因為所有的協(xié)程都是運行在同一個線程中的!requests庫是阻塞型的調(diào)用,當(dāng)在等待I/O時,并不能將控制權(quán)轉(zhuǎn)交給其他協(xié)程,甚至還會將當(dāng)前線程阻塞,其他的協(xié)程也無法運行。如果你在異步編程的時候需要用到一些其他的異步組件,可以到https://github.com/aio-libs/這里找找,也許就有你需要的異步庫。

    關(guān)于asyncio的異步編程資料目前來說還不算很多,官方文檔應(yīng)該算是相當(dāng)不錯的參考文獻了,其中非常推薦的兩部分是:Develop with asyncio和Tasks and coroutines,各位同學(xué)有興趣的話可以自行閱讀。asyncio這個異步框架中包含了非常多的內(nèi)容,甚至還有TCP Server/Client的相關(guān)內(nèi)容,如果想要掌握asyncio這個異步編程框架,還需要多加練習(xí)。順帶一提,asyncio非常容易與其他的框架整合,例如tornado已經(jīng)有實現(xiàn)了asyncio.AbstractEventLoop的接口的類AsyncIOMainLoop,還有人將asyncio集成到QT的事件循環(huán)中了,可以說是非常的靈活了。

    Python 協(xié)程總結(jié)

    Python 之所以能夠處理網(wǎng)絡(luò) IO 高并發(fā),是因為借助了高效的IO模型,能夠最大限度的調(diào)度IO,然后事件循環(huán)使用協(xié)程處理IO,協(xié)程遇到IO操作就將控制權(quán)拋出,那么在IO準備好之前的這段事件,事件循環(huán)就可以使用其他的協(xié)程處理其他事情,然后協(xié)程在用戶空間,并且是單線程的,所以不會像多線程,多進程那樣頻繁的上下文切換,因而能夠節(jié)省大量的不必要性能損失。

    注: 不要再協(xié)程里面使用time.sleep之類的同步操作,因為協(xié)程再單線程里面,所以會使得整個線程停下來等待,也就沒有協(xié)程的優(yōu)勢了

    理解

    協(xié)程,又稱為微線程,看上去像是子程序,但是它和子程序又不太一樣,它在執(zhí)行的過程中,可以在中斷當(dāng)前的子程序后去執(zhí)行別的子程序,再返回來執(zhí)行之前的子程序,但是它的相關(guān)信息還是之前的。

    優(yōu)點:

  • 極高的執(zhí)行效率,因為子程序切換而不是線程切換,沒有了線程切換的開銷;
  • 不需要多線程的鎖機制,因為只有一個線程在執(zhí)行;
  • 如果要充分利用CPU多核,可以通過使用多進程+協(xié)程的方式

    使用

    打開 asyncio 的源代碼,可以發(fā)現(xiàn)asyncio中的需要用到的文件如下:

    下面的則是接下來要總結(jié)的文件

    文件解釋
    base_events基礎(chǔ)的事件,提供了BaseEventLoop事件
    coroutines提供了封裝成協(xié)程的類
    events提供了事件的抽象類,比如BaseEventLoop繼承了AbstractEventLoop
    futures提供了Future類
    tasks提供了Task類和相關(guān)的方法

    coroutines

    函數(shù)解釋
    coroutine(func)為函數(shù)加上裝飾器
    iscoroutinefunction(func)判斷函數(shù)是否使用了裝飾器
    iscoroutine(obj)判斷該對象是否是裝飾器

    如果在函數(shù)使用了coroutine裝飾器,就可以通過yield from去調(diào)用async def聲明的函數(shù),如果已經(jīng)使用async def聲明,就沒有必要再使用裝飾器了,這兩個功能是一樣的。

    import asyncio@asyncio.coroutine def hello_world():print("Hello World!")async def hello_world2():print("Hello World2!")print('------hello_world------') print(asyncio.iscoroutinefunction(hello_world))print('------hello_world2------') print(asyncio.iscoroutinefunction(hello_world2))print('------event loop------') loop = asyncio.get_event_loop()# 一直阻塞該函數(shù)調(diào)用到函數(shù)返回 loop.run_until_complete(hello_world()) loop.run_until_complete(hello_world2()) loop.close()

    上面的代碼分別使用到了coroutine裝飾器和async def,其運行結(jié)果如下:

    ------hello_world------ True ------hello_world2------ True ------event loop------ Hello World! Hello World2!

    注意:不可以直接調(diào)用協(xié)程,需要一個event loop去調(diào)用。

    如果想要在一個函數(shù)中去得到另外一個函數(shù)的結(jié)果,可以使用yield from或者await,例子如下:

    import asyncioasync def compute(x, y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()

    函數(shù) print_sum 會一直等到函數(shù) compute 返回結(jié)果,執(zhí)行過程如下:

    base_events

    這個文件里面漏出來的只有BaseEventLoop一個類,它的相關(guān)方法如下:

    函數(shù)解釋
    create_future()創(chuàng)建一個future對象并且綁定到事件上
    create_task()創(chuàng)建一個任務(wù)
    run_forever()除非調(diào)用stop,否則事件會一直運行下去
    run_until_complete(future)直到 future 對象執(zhí)行完畢,事件才停止
    stop()停止事件
    close()關(guān)閉事件
    is_closed()判斷事件是否關(guān)閉
    time()返回事件運行時的時間
    call_later(delay, callback, *args)設(shè)置一個回調(diào)函數(shù),并且可以設(shè)置延遲的時間
    call_at(when, callback, *args)同上,但是設(shè)置的是絕對時間
    call_soon(callback, *args)馬上調(diào)用

    events

    函數(shù)解釋
    get_event_loop()返回一個異步的事件
    ......

    返回的就是BaseEventLoop的對象。

    future

    Future類的相關(guān)方法如下:

    方法解釋
    cancel()取消掉future對象
    cancelled()返回是否已經(jīng)取消掉
    done()如果future已經(jīng)完成則返回true
    result()返回future執(zhí)行的結(jié)果
    exception()返回在future中設(shè)置了的exception
    add_done_callback(fn)當(dāng)future執(zhí)行時執(zhí)行回調(diào)函數(shù)
    remove_done_callback(fn)刪除future的所有回調(diào)函數(shù)
    set_result(result)設(shè)置future的結(jié)果
    set_exception(exception)設(shè)置future的異常

    設(shè)置 future 的例子如下:

    import asyncioasync def slow_operation(future):await asyncio.sleep(1) # 睡眠future.set_result('Future is done!') # future設(shè)置結(jié)果loop = asyncio.get_event_loop() future = asyncio.Future() # 創(chuàng)建future對象 asyncio.ensure_future(slow_operation(future)) # 創(chuàng)建任務(wù) loop.run_until_complete(future) # 阻塞直到future執(zhí)行完才停止事件 print(future.result()) loop.close()

    run_until_complete方法在內(nèi)部通過調(diào)用了future的add_done_callback,當(dāng)執(zhí)行future完畢的時候,就會通知事件。

    下面這個例子則是通過使用future的add_done_callback方法實現(xiàn)和上面例子一樣的效果:

    import asyncioasync def slow_operation(future):await asyncio.sleep(1)future.set_result('Future is done!')def got_result(future):print(future.result())loop.stop() # 關(guān)閉事件loop = asyncio.get_event_loop() future = asyncio.Future() asyncio.ensure_future(slow_operation(future)) future.add_done_callback(got_result) # future執(zhí)行完畢就執(zhí)行該回調(diào) try:loop.run_forever() finally:loop.close()

    一旦slow_operation函數(shù)執(zhí)行完畢的時候,就會去執(zhí)行g(shù)ot_result函數(shù),里面則調(diào)用了關(guān)閉事件,所以不用擔(dān)心事件會一直執(zhí)行。

    task

    Task類是Future的一個子類,也就是Future中的方法,task都可以使用,類方法如下:

    方法解釋
    current_task(loop=None)返回指定事件中的任務(wù),如果沒有指定,則默認當(dāng)前事件
    all_tasks(loop=None)返回指定事件中的所有任務(wù)
    cancel()取消任務(wù)

    并行執(zhí)行三個任務(wù)的例子:

    import asyncioasync def factorial(name, number):f = 1for i in range(2, number + 1):print("Task %s: Compute factorial(%s)..." % (name, i))await asyncio.sleep(1)f *= iprint("Task %s: factorial(%s) = %s" % (name, number, f))loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(factorial("A", 2),factorial("B", 3),factorial("C", 4), )) loop.close()

    執(zhí)行結(jié)果為

    Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24

    可以發(fā)現(xiàn),ABC同時執(zhí)行,直到future執(zhí)行完畢才退出。

    下面一些方法是和task相關(guān)的方法

    方法解釋
    as_completed(fs, *, loop=None, timeout=None)返回是協(xié)程的迭代器
    ensure_future(coro_or_future, *, loop=None)調(diào)度執(zhí)行一個 coroutine object:并且它封裝成future。返回任務(wù)對象
    async(coro_or_future, *, loop=None)丟棄的方法,推薦使用ensure_future
    wrap_future(future, *, loop=None)Wrap a concurrent.futures.Future object in a Future object.
    gather(*coros_or_futures, loop=None, return_exceptions=False)從給定的協(xié)程或者future對象數(shù)組中返回future匯總的結(jié)果
    sleep(delay, result=None, *, loop=None)創(chuàng)建一個在給定時間(以秒為單位)后完成的協(xié)程
    shield(arg, *, loop=None)等待future,屏蔽future被取消
    wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)等待由序列futures給出的Futures和協(xié)程對象完成。協(xié)程將被包裹在任務(wù)中。返回含兩個集合的Future:(done,pending)
    wait_for(fut, timeout, *, loop=None)等待單個Future或coroutine object完成超時。如果超時為None,則阻止直到future完成

    總結(jié)

    以上是生活随笔為你收集整理的Python 异步 IO 、协程、asyncio、async/await、aiohttp的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    91久久影院 | 成人免费共享视频 | 欧美日韩一区三区 | 免费亚洲成人 | 69视频网站 | 国产三级久久久 | 欧美成a人片在线观看久 | 奇米网在线观看 | 黄色成人影院 | 久久国产热视频 | 天天插天天色 | 五月香婷| 国产精品免费观看久久 | 黄网站www | 二区三区视频 | 亚洲视频综合 | 99久久久国产精品免费观看 | 久草观看视频 | 精品视频999| 激情在线网站 | 日日夜夜91 | 国产a精品 | 奇米网8888| 欧美黄色成人 | 日韩网站免费观看 | 在线观看视频免费大全 | 亚洲一级二级三级 | 永久免费的啪啪网站免费观看浪潮 | 91在线中文| 久久久91精品国产 | 国产精品久久久久久久久婷婷 | 97精品国产97久久久久久春色 | 日韩精品一区二区三区在线播放 | 九九精品久久久 | 国产在线看 | 精品国产一区二区三区四区vr | 色悠悠久久综合 | 99热在线看 | 久久久精品国产免费观看同学 | 久久免费高清视频 | 毛片激情永久免费 | 成人小视频在线 | 久久99精品国产 | 久久精品综合网 | 高清av免费看 | 久久久午夜精品理论片中文字幕 | 国产成人免费精品 | 欧美日韩精品综合 | 麻豆久久久久久久 | 久久欧美精品 | 国偷自产中文字幕亚洲手机在线 | 亚洲v欧美v国产v在线观看 | 欧美日韩视频观看 | 日韩精品一区二区三区在线播放 | 超碰在线94 | 日韩免费视频播放 | 91av视频导航 | 久久视频这里有精品 | 久草精品视频 | 婷婷网址 | 超碰人人乐| 国产 亚洲 欧美 在线 | 韩国av一区 | 久久激情视频免费观看 | 亚洲 欧美 变态 国产 另类 | 亚洲精品男人天堂 | 中文在线a∨在线 | 国产精品免费久久久 | 亚洲综合小说 | 我要看黄色一级片 | 国外av在线 | 欧美精品乱码久久久久 | 日本在线h | 欧美在线a视频 | 国产va饥渴难耐女保洁员在线观看 | 国产精品一区二区三区在线免费观看 | 久久久亚洲电影 | 日韩在线观看网站 | 18av在线视频 | 四虎永久国产精品 | 久久手机免费视频 | 久久网页 | 韩国av一区二区三区在线观看 | 久久精品男人的天堂 | 美女视频黄,久久 | 国产精品久久久久久久久免费看 | www.五月天婷婷 | 香蕉免费| 婷婷六月久久 | 精品国产成人在线 | 日韩在线网| 在线观看 亚洲 | 久久久久久久看片 | 天天在线操 | 日韩一级黄色av | 国产精品久久久久永久免费观看 | 欧美日本三级 | 国产一级片免费观看 | 18pao国产成视频永久免费 | 99精品免费久久久久久久久 | 色播99 | www视频在线免费观看 | 亚洲在线激情 | 黄色电影在线免费观看 | 亚洲草视频 | 中文字幕国内精品 | 午夜av剧场| www.五月天色 | 欧美日比视频 | 免费看成年人 | 午夜av在线电影 | 黄色免费在线视频 | 国产特级毛片aaaaaaa高清 | 91av综合 | 免费网址在线播放 | 国产涩涩在线观看 | 久久少妇| 亚洲乱码中文字幕综合 | 国产在线观看午夜 | 成人黄色中文字幕 | av一本久道久久波多野结衣 | 日韩av电影免费观看 | 欧美精品一区二区免费 | 97在线免费视频观看 | 成人一级片视频 | 成人av网址大全 | 国产一区福利 | 久久久久国产免费免费 | 免费在线a | 精品国产电影一区 | 99r在线播放 | 午夜资源站 | 五月婷婷狠狠 | 国产午夜精品久久久久久久久久 | 最近中文字幕大全中文字幕免费 | 91视频黄色| 午夜精品一区二区三区免费视频 | www.狠狠操 | 黄色1级毛片 | 99久久婷婷国产综合亚洲 | 国产福利一区二区在线 | 天天操夜夜看 | 手机在线小视频 | 麻豆国产在线播放 | 色狠狠综合天天综合综合 | 五月婷婷综合在线视频 | 黄色免费电影网站 | 中文在线免费一区三区 | 日韩手机在线观看 | 九九九九热精品免费视频点播观看 | 人人超在线公开视频 | 夜夜操天天摸 | 亚洲免费色 | 国产精品乱码久久久久久1区2区 | 国内精品中文字幕 | 国产第一福利 | 黄色网www| 欧美淫视频| 一本色道久久综合亚洲二区三区 | 日日夜夜爱 | 99r精品视频在线观看 | 免费网站黄 | 国产一区二区精品久久 | 免费高清影视 | 国产日韩精品在线 | 国产偷v国产偷∨精品视频 在线草 | 亚洲精品乱码久久久久久久久久 | 国产欧美在线一区 | 国产精品久久久久永久免费看 | 日韩视频一| 超碰免费97 | 日本久久片 | 天天做日日做天天爽视频免费 | 99热99re6国产在线播放 | 日韩精品在线视频免费观看 | 香蕉97视频观看在线观看 | 久久99精品一区二区三区三区 | 天天操天天综合网 | 欧美精品一区二区在线观看 | 国产精品99视频 | 欧美久久久久久久久久久 | 久热久草在线 | 免费激情在线电影 | 国产在线美女 | 久久99电影 | 亚洲另类视频在线 | 伊人久久av | 激情久久五月 | 九九视频热 | 精品国产aⅴ麻豆 | 精品国产伦一区二区三区观看说明 | 婷婷av资源| 欧美性色xo影院 | 99欧美 | 人人干在线观看 | 日本中文字幕在线视频 | 美女视频久久 | 亚洲精品中文在线观看 | 国产精品一区二区在线 | 亚洲永久字幕 | 久久视频免费看 | 成 人 黄 色 视频免费播放 | 久久久久北条麻妃免费看 | 成人av电影免费在线播放 | 婷婷丁香在线视频 | 中国一区二区视频 | 少妇搡bbbb搡bbb搡aa | 亚洲视频在线观看网站 | 51久久成人国产精品麻豆 | 欧美婷婷色| 天天艹天天干天天 | 97碰碰精品嫩模在线播放 | 一级黄色片在线免费观看 | 视频在线观看国产 | 麻豆国产精品一区二区三区 | 久久久久久久久久久福利 | 中文字幕国语官网在线视频 | 国产国语在线 | 国产精品一区二区麻豆 | 久久99网 | 六月丁香激情综合 | 天天曰夜夜操 | 久久最新网址 | 久久九九影视网 | 国产午夜麻豆影院在线观看 | 免费在线播放av电影 | 91mv.cool在线观看| 日本中文字幕在线播放 | 天堂网在线视频 | 免费在线观看av网站 | 波多野结衣理论片 | 亚洲精品国久久99热 | 永久中文字幕 | 国产午夜三级 | 精品免费一区 | 天天爽天天碰狠狠添 | 久久久久久久久久久久久9999 | 国产精品不卡在线观看 | 欧美亚洲一级片 | 成人av资源| av在线免费在线 | 久久噜噜少妇网站 | 在线观看中文字幕一区二区 | 黄色免费看片网站 | 久久精品视频国产 | 99精品在线视频观看 | 91在线91 | 国产午夜精品一区二区三区 | 久久国产美女 | 亚洲日本中文字幕在线观看 | 99re8这里有精品热视频免费 | 亚洲一区二区视频在线播放 | 日韩成人免费在线观看 | 国产精品久久婷婷六月丁香 | 天天色天天综合 | 在线国产99 | 日本激情中文字幕 | 欧美激情视频一区二区三区 | 91久久丝袜国产露脸动漫 | 久久精品999 | 91探花国产综合在线精品 | 在线观看中文字幕2021 | 成人国产电影在线观看 | 国产午夜在线观看视频 | 中文字幕在线播放日韩 | 在线a视频免费观看 | 亚洲国产精品久久久 | 深爱激情av | 国产日韩中文字幕 | 国产精品国产自产拍高清av | 97人人澡人人添人人爽超碰 | 久久精品久久精品久久39 | 亚洲第一区精品 | 国产夫妻性生活自拍 | 亚洲精品www | a爱爱视频 | 91成人免费在线视频 | 丁香色婷婷 | 国产精品一区二区三区久久久 | 一区二区三区免费在线观看视频 | 日韩在线观看精品 | 国产午夜一级毛片 | 欧美巨乳网 | 美女网站免费福利视频 | 91黄色在线观看 | 国产毛片在线 | 色婷婷骚婷婷 | 久草免费看 | 日韩成人精品在线观看 | 亚洲国产精品人久久电影 | 精品国产一二区 | 中文字幕在线免费看线人 | 婷婷久月 | 久久人人爽爽 | 高清久久久 | 免费在线国产 | 国产91精品看黄网站 | 中文字幕在线人 | 99久久婷婷国产 | 成年人视频免费在线播放 | 99热国产精品 | 国产精品久久久久国产精品日日 | 国产一级黄色av | 五月天丁香综合 | 美女免费网站 | 日日干天天 | 婷婷在线免费视频 | 天天视频亚洲 | 国产高清av在线播放 | 久久电影国产免费久久电影 | 91热这里只有精品 | 手机av看片 | 精品久久久免费视频 | 日韩欧美电影 | 精品国产精品久久 | 中文字幕在线免费播放 | 伊人超碰在线 | 激情视频综合网 | 色亚洲激情 | 亚洲欧美日本一区二区三区 | 久久 国产一区 | av一级二级 | 精品国产诱惑 | 狠狠色丁香婷婷综合欧美 | 亚洲,国产成人av | 中文字幕在线观看视频网站 | 久久尤物电影视频在线观看 | 久久久在线免费观看 | 国产精品一区二区在线观看免费 | 久久精品一区 | 日韩久久精品一区 | 成人免费一级片 | 欧美一区二区三区在线观看 | 99综合电影在线视频 | 久久久久国产视频 | 日韩一区二区免费视频 | 欧美a√在线 | 久草| 久久乐九色婷婷综合色狠狠182 | 国产午夜精品一区二区三区嫩草 | 欧美精品亚洲二区 | 人人草在线视频 | 亚州av网站大全 | 99精品在线播放 | 黄色在线观看污 | 国产护士av | 日本乱码在线 | 久久久影片 | 日韩色爱 | 麻豆视频免费在线播放 | av中文字幕电影 | 波多野结衣电影一区二区 | 欧美日韩成人 | 日韩在线观看 | 亚洲dvd | 久久久精品电影 | 天天爽天天爽 | 1024久久| 黄色毛片视频免费 | 国产一级做a | 亚洲人人av | 超碰激情在线 | 美女禁18| 免费日韩一区二区 | 免费在线观看av的网站 | 精品国产伦一区二区三区观看方式 | 91成人午夜 | 欧美日韩视频在线观看一区二区 | 久久黄色片 | 91九色免费视频 | www视频在线观看 | 五月婷婷播播 | 国模精品在线 | 探花视频网站 | 国产精品毛片久久久久久久久久99999999 | 美女禁18| www.狠狠操.com| 国产视频观看 | 午夜黄色 | 99视频在线观看一区三区 | 天天射天天操天天色 | 天堂av免费 | 免费看污网站 | 99精品免费 | 久久专区| 国产精品成人一区二区三区 | 国产成人精品午夜在线播放 | 免费毛片aaaaaa | 久久久精品视频网站 | 国产婷婷一区二区 | 欧美一级电影在线观看 | 精品久久国产精品 | 欧美日韩视频在线播放 | 高潮久久久久久久久 | 日本特黄一级片 | 91香蕉视频在线下载 | 婷婷六月天在线 | 一区二精品 | 精品在线视频一区 | 中文字幕在线专区 | 美女精品网站 | 久久久91精品国产 | 日本三级不卡视频 | 天天爱天天 | 国产精品v欧美精品 | 亚洲日韩欧美一区二区在线 | 日日干夜夜骑 | 久久一区91| 999久久久精品视频 日韩高清www | 99精品久久99久久久久 | 日本精品中文字幕在线观看 | 91香蕉视频好色先生 | 亚洲精品成人网 | 精品国产乱码一区二区三区在线 | 欧美 国产 视频 | 亚洲在线视频免费 | 插婷婷 | 九九热免费在线观看 | 天天爱天天色 | 亚洲小视频在线观看 | 久久96国产精品久久99漫画 | 国产色视频一区 | 亚洲日本va中文字幕 | 欧美性生活免费看 | 日日夜夜免费精品 | 亚洲理论在线 | 欧美性生交大片免网 | 久久久国产精品电影 | 狠狠躁夜夜a产精品视频 | 97超碰在线免费观看 | 久久久精品免费观看 | 亚洲欧美日本一区二区三区 | 日韩中文在线观看 | 久久久久综合精品福利啪啪 | 婷婷亚洲五月色综合 | 五月天久久久久 | 中文字幕丝袜一区二区 | 亚洲国内精品在线 | 亚洲区另类春色综合小说校园片 | 色多多污污 | 亚洲自拍偷拍色图 | 黄色大片视频网站 | 天天干,天天射,天天操,天天摸 | 久久在现| 在线91色| 国产无遮挡猛进猛出免费软件 | 国产精品四虎 | 欧美性精品 | 亚洲清纯国产 | 手机成人在线 | 国内丰满少妇猛烈精品播 | 精品久操 | 欧美一区二区视频97 | 国产不卡一| 国产一区二区三区在线免费观看 | 免费在线观看的av网站 | 亚洲国产美女精品久久久久∴ | 国产麻豆果冻传媒在线观看 | 日韩一级黄色av | 九九色网 | 久久精品a | 久久国产经典 | 精品一区二区影视 | 免费在线看成人av | 亚洲日本成人网 | 亚洲乱码中文字幕综合 | 97高清视频 | 久草9视频| av在线播放国产 | 99久久国产免费免费 | 国产成人久久av | 中文字幕电影在线 | av网站播放 | 日p视频| 欧美精品二区 | 黄色av免费在线 | 97国产超碰在线 | 国产色秀视频 | 伊人天堂久久 | 黄色片视频免费 | 久久精品视频网 | 日日爱影视 | 日韩成人一级大片 | 国产精品美女久久久久久久网站 | 爱av在线网 | 成年免费在线视频 | 99精品国产99久久久久久福利 | 亚洲 欧美 精品 | 日韩精品一区电影 | 中文字幕乱码电影 | 亚洲精品高清在线 | 天天色欧美 | 亚洲一区免费在线 | 高清免费在线视频 | 久久亚洲免费 | 麻豆视频免费入口 | 97视频在线观看视频免费视频 | 成人av亚洲 | 91日韩在线视频 | 在线精品视频在线观看高清 | 日韩专区中文字幕 | 久久夜色精品国产欧美乱 | 97福利在线 | 亚洲免费在线看 | 精品国产一区二区三区久久久蜜月 | 亚洲国内精品在线 | 色无五月 | 日韩二区三区在线观看 | 亚洲欧美日韩不卡 | 国产精品 日韩 | 人人舔人人舔 | 国产精品1区2区3区 久久免费视频7 | 久久精品99精品国产香蕉 | 日韩激情小视频 | 91九色porny蝌蚪主页 | 久久国产精品99国产 | 国产视频在线观看免费 | 国产精品久久久久久一区二区 | 精品国产免费看 | 九色自拍视频 | 国产96av | 国产探花视频在线播放 | 国产精品一区二区av影院萌芽 | 亚洲在线网址 | 在线观看午夜 | 亚洲天堂网视频 | 综合影视| www.狠狠色 | 天堂素人在线 | 欧美热久久 | 日韩欧在线 | 免费午夜视频在线观看 | 三级av中文字幕 | 欧美日韩不卡一区二区 | 成人欧美一区二区三区黑人麻豆 | 日p在线观看| 天天干视频在线 | 国产免费xvideos视频入口 | www色 | 久久精彩视频 | 国产一区免费 | 又黄又刺激视频 | 久久精选视频 | 日韩中文字幕网站 | 蜜桃av久久久亚洲精品 | 欧美日韩在线视频观看 | 在线观看日本高清mv视频 | 精品91在线 | 午夜久草| 五月婷婷天堂 | 99久久夜色精品国产亚洲96 | 亚洲精品一区二区精华 | 九色精品 | 97人人视频| 国产成人一区二区三区久久精品 | 国产精品第一页在线观看 | 天天操天天操天天操天天操 | 国产一区二区精品 | 国产一级性生活视频 | 国产一级二级在线播放 | 亚洲专区在线视频 | 91精品国产综合久久福利不卡 | a v在线观看| av福利网址导航大全 | 国产不卡在线播放 | 亚洲成人动漫在线观看 | av中文字幕在线免费观看 | 97在线观看免费视频 | 玖玖爱免费视频 | 色欧美日韩 | 精品亚洲视频在线观看 | 日韩精品aaa | 99r在线观看| 久久99精品久久只有精品 | 免费网站在线观看成人 | 最新中文字幕视频 | 国产99久久久久 | 国产精品一区二区av | 免费看wwwwwwwwwww的视频 久久久久久99精品 91中文字幕视频 | 免费影视大全推荐 | 国产精品女人网站 | 久久精品爱视频 | 日韩久久影院 | 国产一区二区不卡视频 | 久久免费视频5 | 亚洲国产综合在线 | 亚洲成a人片在线www | 探花视频免费观看 | 国产色在线观看 | 在线黄色免费av | 欧美一级免费黄色片 | 69久久久 | 处女av在线 | 久久综合五月 | 国产精品99蜜臀久久不卡二区 | 国产护士hd高朝护士1 | 999久久a精品合区久久久 | 久久艹艹| 欧美日韩18 | 色婷婷综合五月 | 五月婷香| 人人看黄色 | 色全色在线资源网 | 日本黄色黄网站 | 国产精品色在线 | 久久婷婷国产色一区二区三区 | 免费观看9x视频网站在线观看 | 色婷婷激婷婷情综天天 | 探花视频免费在线观看 | 国产成人一区二区啪在线观看 | 久久精品9| 西西444www大胆高清视频 | 免费av大全 | 久久久影院一区二区三区 | 成人动漫一区二区三区 | 日本黄色片一区二区 | 色狠狠操 | 日日干激情五月 | 免费观看一级特黄欧美大片 | 精品国产人成亚洲区 | 婷婷av资源| 亚洲欧美国产精品久久久久 | www.xxxx变态.com | 欧美日韩在线视频一区二区 | 欧美日韩中文在线观看 | 国产精品黄色av | 欧美一区二区三区特黄 | 97电影网站 | 亚洲另类视频在线观看 | 日本在线成人 | 久久视频这里有久久精品视频11 | 中文字幕电影在线 | av日韩中文 | 在线日韩av | 国产一级二级三级视频 | 久久久久免费电影 | 日韩在线播放av | 69热国产视频 | 欧美日韩性| 日韩一区二区在线免费观看 | 天天操伊人| 日韩城人在线 | 四虎影视成人精品 | 国产成人久久 | 国产精品美女www爽爽爽视频 | 久久伊人精品天天 | 天天插天天干天天操 | 91九色在线视频观看 | 国产999在线观看 | 久久久在线| 欧美亚洲精品一区 | 久久激情日本aⅴ | 色搞搞 | 99久久这里有精品 | 久久精品电影 | 国产麻豆精品一区二区 | 欧美日韩国产亚洲乱码字幕 | 国产麻豆精品久久一二三 | 日韩在线观看第一页 | 国产免费三级在线观看 | 国产在线p | 日韩精品欧美一区 | 手机av电影在线观看 | 欧美日韩高清国产 | 免费色视频 | 在线观看日韩视频 | 国产91av视频在线观看 | 亚洲免费在线观看视频 | 国产在线看| 91丨九色丨国产丨porny精品 | 特级黄色视频毛片 | 国产高清视频在线播放 | 韩日电影在线观看 | 亚洲国产精品va在线 | 91人人干 | 啪啪精品 | 91天天操 | 日韩精品一区二区三区免费观看视频 | 久久艹国产 | 狠狠操狠狠插 | 中文资源在线观看 | 亚洲理论电影网 | 久操视频在线 | 亚洲精品中文在线 | 国产区精品在线观看 | 国产剧情一区在线 | 在线亚洲人成电影网站色www | 国产精品黄色在线观看 | 9999在线视频 | 综合久久影院 | 2022中文字幕在线观看 | 精品国产一区二区三区久久久蜜月 | 天天插天天干天天操 | 天天操天天摸天天爽 | 日韩av在线一区二区 | 在线观看视频一区二区三区 | 国产视频在线一区二区 | av网站有哪些 | 免费a现在观看 | 精品免费在线视频 | 日日爱视频 | 久久成人综合 | 曰韩精品 | 国产精品2019 | 国产精品av免费 | 超碰免费久久 | 久久手机看片 | 日本久久免费电影 | 干干日日 | 黄色三级免费片 | 日韩欧美一区二区在线观看 | 欧美成年网站 | 天天操天天操 | 国产精品色婷婷视频 | 麻花天美星空视频 | 国产va饥渴难耐女保洁员在线观看 | 国产福利91精品张津瑜 | 国产一区在线精品 | 亚洲一区尤物 | 黄色成人av网址 | 亚洲第一区在线观看 | 欧美怡红院| 精品国产一区二区三区在线 | 69国产精品视频免费观看 | 成人免费看电影 | 超碰大片 | 国产精品免费久久久 | 国产一区二区在线看 | 超碰日韩 | 视频高清 | 午夜色场 | 最近更新的中文字幕 | 国产精国产精品 | 国产特级毛片aaaaaa高清 | 国产精品中文在线 | 日韩av看片| 黄色免费看片网站 | 色在线免费观看 | 伊人中文网 | 欧美大片大全 | 91传媒在线看 | 五月婷婷一级片 | 日韩欧美在线高清 | 精品国产不卡 | 国产手机免费视频 | 激情久久久久久久久久久久久久久久 | 久久国产香蕉视频 | www.天天干.com | 国产免费激情久久 | 国内免费的中文字幕 | 国内精品中文字幕 | 久久国产午夜精品理论片最新版本 | 激情综合六月 | 亚洲精品久久久久久中文传媒 | sesese图片 | 国产偷国产偷亚洲清高 | 视频三区| 日韩有色 | 99热精品视 | 日韩三区在线观看 | 国产精品麻豆三级一区视频 | 久久久国产视频 | 在线观看中文字幕视频 | 99色婷婷 | 91麻豆精品国产91久久久更新时间 | 人人草在线观看 | 中文字幕有码在线播放 | 亚洲在线成人精品 | 免费情缘 | 欧美亚洲国产精品久久高清浪潮 | 久热免费在线观看 | 蜜桃av久久久亚洲精品 | 精品国产一区二 | 91精品系列 | 四虎影视成人永久免费观看视频 | 亚洲人人精品 | 色综合激情网 | 91福利视频在线 | 久久久久国产精品厨房 | 国产精品久久久久aaaa九色 | 日韩中文免费视频 | 久青草视频 | 国产精品久久久久久久久久妇女 | a在线播放| 超碰97人人爱 | 91自拍视频在线 | 人人干人人超 | 狠狠色狠狠色综合日日92 | 亚洲一区二区高潮无套美女 | 国产精品九九九 | 国产91精品在线播放 | 欧美精品成人在线 | 久久精品人人做人人综合老师 | 一级免费片 | 91在线播放国产 | 日韩精品一二三 | 午夜婷婷在线观看 | 一区二区三区在线不卡 | 精品视频在线免费 | av丝袜在线| 五月天亚洲综合 | 日韩av影片在线观看 | 亚洲精品大片www | 99精品在线免费视频 | 99热 精品在线 | 亚洲精品国产精品国自产 | 日韩中文字幕网站 | 丝袜足交在线 | 久久久久久久99精品免费观看 | 97偷拍视频 | 欧产日产国产69 | 高清一区二区三区av | 久久男人免费视频 | 日日爱网址 | 91最新在线视频 | 日韩精品一区在线播放 | 国产福利一区二区在线 | 特黄特色特刺激视频免费播放 | 中文在线免费一区三区 | 国产三级精品在线 | 亚洲一区二区观看 | 亚洲狠狠 | 在线观看亚洲电影 | 香蕉影院在线播放 | 国产九九九九九 | 在线观看亚洲专区 | 丰满少妇高潮在线观看 | 中文字幕精品在线 | 久久不卡免费视频 | 国产精品va | 中文字幕精品三级久久久 | 激情视频免费在线 | 欧美日韩亚洲第一页 | 亚洲综合狠狠干 | 成人aⅴ视频| www亚洲一区 | 最近中文字幕免费视频 | 久久久综合香蕉尹人综合网 | 日韩一区二区三区高清免费看看 | av解说在线观看 | 激情xxxx| 福利一区在线 | 美女黄频| 日本久久久亚洲精品 | wwwwww色| 久久婷婷激情 | 黄色网在线免费观看 | 黄在线免费观看 | 91在线看免费 | 天天干 夜夜操 | 久久精品视频播放 | 人人cao| 91正在播放 | 天堂av影院| 日日久视频 | 99国产情侣在线播放 | 色婷婷免费 | 国产精品99久久久 | 国产成人久久av977小说 | 天天爽网站 | 毛片基地黄久久久久久天堂 | 亚洲自拍偷拍色图 | 免费电影播放 | 亚洲精选视频免费看 | 久久综合五月天婷婷伊人 | 开心丁香婷婷深爱五月 | 久久久久久久久免费 | 精品国产午夜 | 五月婷婷狠狠 | 福利一区二区 | 国产美女主播精品一区二区三区 | 欧美日韩一区二区在线观看 | 久久久久久久久久亚洲精品 | 精品久久久成人 | 伊人一级 | 国产一区二区三区黄 | 男女全黄一级一级高潮免费看 | 久久精品91视频 | 久久国产热 | 国产精品美女久久久久久久网站 | 欧美色精品天天在线观看视频 | 中文字幕黄色 | 青青五月天 | 日韩精品中文字幕在线不卡尤物 | 日韩在线免费视频观看 | 一本一本久久a久久精品综合 | a黄色影院 | av在线播放中文字幕 | 888av | 久久av免费电影 | 色婷婷激情综合 | 久久精品精品电影网 | 国产精品美女免费视频 | 三级av中文字幕 | 国产精品99久久久精品免费观看 | av丁香花 | 夜夜摸夜夜爽 | 国产一区免费观看 | 欧美福利久久 | 天天爱天天操天天爽 | 欧美大荫蒂xxx | 亚洲国产高清在线观看视频 | 精品一区二区三区四区在线 | 日日夜夜天天久久 | 成年人免费观看在线视频 | 夜夜骑日日操 | 久久综合狠狠 | 狠狠夜夜 | 97色在线观看免费视频 | 一区二区三区在线看 | 日本中文字幕久久 | 久久国产亚洲 | 色婷婷伊人 | 亚洲精品久久久蜜桃 | 国产成人精品一二三区 | 国产一区二区精 | 亚洲精品国偷拍自产在线观看蜜桃 | 天天干天天草天天爽 | 色噜噜噜噜 | 国产精品综合久久久久久 | 三级黄色在线观看 | 国产精品国内免费一区二区三区 | 综合色中文 | 日韩免费观看一区二区三区 | 亚洲精品男人的天堂 | 操一草| 狠狠躁日日躁 | 91最新中文字幕 | 国产视频在线播放 | 久久色网站 | 一色屋精品视频在线观看 | 亚洲男男gaygay无套同网址 | 久草国产视频 | 一区在线观看 | 婷婷丁香综合 | 国产一二三区在线观看 | 亚洲一区免费在线 | av一级一片 | 亚洲最大成人免费网站 | 久久综合色影院 | 色之综合网 | 成人av网站在线观看 | 91在线看视频免费 | 免费三级大片 | 亚洲永久精品一区 | 在线观看视频免费播放 | 伊人伊成久久人综合网小说 | av电影在线播放 | 成年人免费av网站 | 中文字幕精品三级久久久 | 伊人婷婷综合 | 亚洲第一av在线播放 | 久久成人亚洲欧美电影 | 日本久久高清视频 | 欧美激情视频一区二区三区免费 | 91精品久久久久久久99蜜桃 | 国产69久久久 | 精品国产一区二区三区日日嗨 | 中文字幕av在线播放 | www免费网站在线观看 | 久久国产成人午夜av影院潦草 | 狠狠狠狠干 | 亚洲无吗视频在线 | 欧美色综合久久 | 国产成人精品999 | 中文亚洲欧美日韩 | 在线国产视频 | 婷婷中文字幕在线观看 | 久久精品看 | 91久久久久久久一区二区 | 精品视频在线观看 | 日韩在线不卡视频 | 国产九九九精品视频 | 国产精品亚洲片夜色在线 | 中文字幕精品一区二区精品 | 99国产一区二区三精品乱码 | 久草视频免费观 | 国产精品一区二区三区在线看 | 在线观看岛国片 | 国产在线精品一区二区 | 精品久久一区二区 | 欧美va日韩va| 亚洲一级片在线看 | av理论电影| 亚洲成人av电影在线 | 久久久亚洲网站 | 久久r精品 | 久久国产精品久久国产精品 | 很污的网站| 亚洲成人黄色av | 成人黄色大片 | 日韩成人在线一区二区 | 亚洲天天做 | a视频在线 | 91片黄在线观 | 国产在线观看你懂得 | 2019国产精品 | 国产精品黄色影片导航在线观看 |