python asyncio理解_深入理解asyncio(二)
Asyncio.gather vs asyncio.wait
在上篇文章已經(jīng)看到多次用asyncio.gather了,還有另外一個(gè)用法是asyncio.wait,他們都可以讓多個(gè)協(xié)程并發(fā)執(zhí)行。那為什么提供2個(gè)方法呢?他們有什么區(qū)別,適用場景是怎么樣的呢?其實(shí)我之前也是有點(diǎn)困惑,直到我讀了asyncio的源碼。我們先看2個(gè)協(xié)程的例子:
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
return 'A'
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
return 'B'
在IPython里面用gather執(zhí)行一下:
In : return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a
In : return_value_a, return_value_b
Out: ('A', 'B')
Ok,asyncio.gather方法的名字說明了它的用途,gather的意思是「搜集」,也就是能夠收集協(xié)程的結(jié)果,而且要注意,它會(huì)按輸入?yún)f(xié)程的順序保存的對應(yīng)協(xié)程的執(zhí)行結(jié)果。
接著我們說asyncio.await,先執(zhí)行一下:
In : done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a
In : done
Out:
{:1> result='A'>,
:8> result='B'>}
In : pending
Out: set()
In : task = list(done)[0]
In : task
Out: :8> result='B'>
In : task.result()
Out: 'B'
asyncio.wait的返回值有2項(xiàng),第一項(xiàng)表示完成的任務(wù)列表(done),第二項(xiàng)表示等待(Future)完成的任務(wù)列表(pending),每個(gè)任務(wù)都是一個(gè)Task實(shí)例,由于這2個(gè)任務(wù)都已經(jīng)完成,所以可以執(zhí)行task.result()獲得協(xié)程返回值。
Ok, 說到這里,我總結(jié)下它倆的區(qū)別的第一層區(qū)別:asyncio.gather封裝的Task全程黑盒,只告訴你協(xié)程結(jié)果。
asyncio.wait會(huì)返回封裝的Task(包含已完成和掛起的任務(wù)),如果你關(guān)注協(xié)程執(zhí)行結(jié)果你需要從對應(yīng)Task實(shí)例里面用result方法自己拿。
為什么說「第一層區(qū)別」,asyncio.wait看名字可以理解為「等待」,所以返回值的第二項(xiàng)是pending列表,但是看上面的例子,pending是空集合,那么在什么情況下,pending里面不為空呢?這就是第二層區(qū)別:asyncio.wait支持選擇返回的時(shí)機(jī)。
asyncio.wait支持一個(gè)接收參數(shù)return_when,在默認(rèn)情況下,asyncio.wait會(huì)等待全部任務(wù)完成(return_when='ALL_COMPLETED'),它還支持FIRST_COMPLETED(第一個(gè)協(xié)程完成就返回)和FIRST_EXCEPTION(出現(xiàn)第一個(gè)異常就返回):
In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
Suspending a
Suspending b
Resuming b
In : done
Out: {:8> result='B'>}
In : pending
Out: {:3> wait_for=()]>>}
看到了吧,這次只有協(xié)程b完成了,協(xié)程a還是pending狀態(tài)。
在大部分情況下,用asyncio.gather是足夠的,如果你有特殊需求,可以選擇asyncio.wait,舉2個(gè)例子:需要拿到封裝好的Task,以便取消或者添加成功回調(diào)等
業(yè)務(wù)上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的
asyncio.create_task vs loop.create_task vs asyncio.ensure_future
創(chuàng)建一個(gè)Task一共有3種方法,如這小節(jié)的標(biāo)題。在上篇文章我說過,從Python 3.7開始可以統(tǒng)一的使用更高階的asyncio.create_task。其實(shí)asyncio.create_task就是用的loop.create_task:
def create_task(coro):
loop = events.get_running_loop()
return loop.create_task(coro)
loop.create_task接受的參數(shù)需要是一個(gè)協(xié)程,但是asyncio.ensure_future除了接受協(xié)程,還可以是Future對象或者awaitable對象:如果參數(shù)是協(xié)程,其實(shí)底層還是用的loop.create_task,返回Task對象
如果是Future對象會(huì)直接返回
如果是一個(gè)awaitable對象會(huì)await這個(gè)對象的__await__方法,再執(zhí)行一次ensure_future,最后返回Task或者Future
所以就像ensure_future名字說的,確保這個(gè)是一個(gè)Future對象:Task是Future 子類,前面說過一般情況下開發(fā)者不需要自己創(chuàng)建Future
其實(shí)前面說的asyncio.wait和asyncio.gather里面都用了asyncio.ensure_future。對于絕大多數(shù)場景要并發(fā)執(zhí)行的是協(xié)程,所以直接用asyncio.create_task就足夠了~
shield
接著說asyncio.shield,用它可以屏蔽取消操作。一直到這里,我們還沒有見識(shí)過Task的取消。看一個(gè)例子:
In : loop = asyncio.get_event_loop()
In : task1 = loop.create_task(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2)
Suspending a
Suspending b
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
cell_name in async-def-wrapper()
CancelledError:
在上面的例子中,task1被取消了后再用asyncio.gather收集結(jié)果,直接拋CancelledError錯(cuò)誤了。這里有個(gè)細(xì)節(jié),gather支持return_exceptions參數(shù):
In : await asyncio.gather(task1, task2, return_exceptions=True)
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到,task2依然會(huì)執(zhí)行完成,但是task1的返回值是一個(gè)CancelledError錯(cuò)誤,也就是任務(wù)被取消了。如果一個(gè)創(chuàng)建后就不希望被任何情況取消,可以使用asyncio.shield保護(hù)任務(wù)能順利完成。不過要注意一個(gè)陷阱,先看錯(cuò)誤的寫法:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : task1.cancel()
Out: True
In : await asyncio.gather(task1, task2, return_exceptions=True)
Suspending a
Suspending b
Resuming b
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到依然是CancelledError錯(cuò)誤,且協(xié)程a未執(zhí)行完成,正確的用法是這樣的:
In : task1 = asyncio.shield(a())
In : task2 = loop.create_task(b())
In : ts = asyncio.gather(task1, task2, return_exceptions=True)
In : task1.cancel()
Out: True
In : await ts
Suspending a
Suspending b
Resuming a
Resuming b
Out: [concurrent.futures._base.CancelledError(), 'B']
可以看到雖然結(jié)果是一個(gè)CancelledError錯(cuò)誤,但是看輸出能確認(rèn)協(xié)程實(shí)際上是執(zhí)行了的。所以正確步驟是:先創(chuàng)建 GatheringFuture 對象 ts
取消任務(wù)
await ts
asynccontextmanager
如果你了解Python,之前可能聽過或者用過contextmanager ,一個(gè)上下文管理器。通過一個(gè)計(jì)時(shí)的例子就理解它的作用:
from contextlib import contextmanager
async def a():
await asyncio.sleep(3)
return 'A'
async def b():
await asyncio.sleep(1)
return 'B'
async def s1():
return await asyncio.gather(a(), b())
@contextmanager
def timed(func):
start = time.perf_counter()
yield asyncio.run(func())
print(f'Cost: {time.perf_counter() - start}')
timed函數(shù)用了contextmanager裝飾器,把協(xié)程的運(yùn)行結(jié)果yield出來,執(zhí)行結(jié)束后還計(jì)算了耗時(shí):
In : from contextmanager import *
In : with timed(s1) as rv:
...: print(f'Result: {rv}')
...:
Result: ['A', 'B']
Cost: 3.0052654459999992
大家先體會(huì)一下。在Python 3.7添加了asynccontextmanager,也就是異步版本的contextmanager,適合異步函數(shù)的執(zhí)行,上例可以這么改:
@asynccontextmanager
async def async_timed(func):
start = time.perf_counter()
yield await func()
print(f'Cost: {time.perf_counter() - start}')
async def main():
async with async_timed(s1) as rv:
print(f'Result: {rv}')
In : asyncio.run(main())
Result: ['A', 'B']
Cost: 3.00414147500004
async版本的with要用async with,另外要注意yield await func()這句,相當(dāng)于yield + await func()
PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源碼注釋,可以看延伸閱讀鏈接2,另外延伸閱讀鏈接3包含的PR中相關(guān)的測試代碼部分也能幫助你理解
代碼目錄
本文代碼可以在 mp項(xiàng)目 找到
延伸閱讀
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的python asyncio理解_深入理解asyncio(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql统计数据的代码_MySQL按时
- 下一篇: python单双三引号区别_python