python异步编程之asyncio高阶API
asyncio 高階API列表
asyncio中函數(shù)可以分為高階函數(shù)和低階函數(shù)。低階函數(shù)用于調(diào)用事件循環(huán)、linux 套接字、信號(hào)等更底層的功能,高階函數(shù)是屏蔽了更多底層細(xì)節(jié)的任務(wù)并發(fā),任務(wù)執(zhí)行函數(shù)。通常開(kāi)發(fā)中使用更多的是高階函數(shù)。本篇主要介紹asyncio中常用的高階函數(shù)。
由于asyncio在不同的版本中有差異,本文以及本系列都以python3.10為準(zhǔn)。
| 函數(shù) | 功能 |
|---|---|
| run() | 創(chuàng)建事件循環(huán),運(yùn)行一個(gè)協(xié)程,關(guān)閉事件循環(huán)。 |
| create_task() | 創(chuàng)建一個(gè)asyncio的Task對(duì)象 |
| await sleep() | 休眠幾秒 |
| await gather() | 并發(fā)執(zhí)行所有事件的調(diào)度和等待 |
| await wait_for() | 有超時(shí)控制的運(yùn)行 |
| await shield() | 屏蔽取消操作 |
| await wait() | 完成情況的監(jiān)控器 |
| current_task() | 返回當(dāng)前Task對(duì)象 |
| all_tasks() | 返回事件循環(huán)中所有的task對(duì)象 |
| Task | Task對(duì)象 |
| to_thread() | 在不同的 OS 線(xiàn)程中異步地運(yùn)行一個(gè)函數(shù) |
| run_coroutine_threadsafe() | 從其他OS線(xiàn)程中調(diào)度一個(gè)協(xié)程 |
| for in as_completed() | 用 for 循環(huán)監(jiān)控完成情況 |
run
函數(shù)原型:
asyncio.run(coro, *, debug=False)
功能:創(chuàng)建事件循環(huán),運(yùn)行傳入的協(xié)程。該函數(shù)總是會(huì)創(chuàng)建一個(gè)新的事件循環(huán)并在結(jié)束時(shí)關(guān)閉它,應(yīng)該被當(dāng)做asyncio程序的主入口點(diǎn)。run() 函數(shù)是用來(lái)創(chuàng)建事件,將task加入事件,運(yùn)行事件的函數(shù)。
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
run() 從功能上等價(jià)于以下低階API。獲取一個(gè)事件循環(huán),創(chuàng)建一個(gè)task,加入事件循環(huán)。
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
create_task
函數(shù)原型:
asyncio.create_task(coro, *, name=None)
功能:將協(xié)程函數(shù)封裝成一個(gè)Task。協(xié)程函數(shù)沒(méi)有生命周期,但是Task有生命周期。
將協(xié)程打包為一個(gè) Task 并自動(dòng)尋找事件循環(huán)加入。返回 Task 對(duì)象。該任務(wù)會(huì)在 get_running_loop() 返回的循環(huán)中執(zhí)行,如果當(dāng)前線(xiàn)程沒(méi)有在運(yùn)行的循環(huán)則會(huì)引發(fā) RuntimeError。
async def coro():
await asyncio.sleep(1)
print("i am coro")
async def main():
task = asyncio.create_task(coro())
print(f"task狀態(tài):{task._state}")
await asyncio.sleep(2)
print(f"task狀態(tài):{task._state}")
print("i am main")
asyncio.run(main())
結(jié)果:
task狀態(tài):PENDING
i am coro
task狀態(tài):FINISHED
i am main
結(jié)果分析:
可以看到task運(yùn)行中的狀態(tài)和結(jié)束的生命周期狀態(tài)
gather
函數(shù)原型:
asyncio.gather(*aws, return_exceptions=False)
功能:
并發(fā)執(zhí)行所有可等待對(duì)象,收集任務(wù)結(jié)果,返回所有已經(jīng)完成的task的結(jié)果。結(jié)果將是一個(gè)由所有返回值組成的列表。結(jié)果值的順序與傳入的task的順序一致。可等待對(duì)象可以是協(xié)程和task。
如果序列中是協(xié)程而不是task,那么會(huì)將其自動(dòng)封裝成task加入事件循環(huán)。
import asyncio
async def coro(value):
print(f"hello coro{value}")
return f"coro{value}"
async def main():
tasks = [coro(i) for i in range(5)]
res = await asyncio.gather(*tasks)
for i in res:
print(i)
asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro0
coro1
coro2
coro3
coro4
結(jié)果分析:
獲取了所有協(xié)程的返回值,并且返回的順序和任務(wù)的順序一致。
wait
函數(shù)原型:
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
功能:
并發(fā)地運(yùn)行序列中的可等待對(duì)象,并進(jìn)入阻塞狀態(tài)直到滿(mǎn)足 return_when 所指定的條件。將task任務(wù)結(jié)果收集起來(lái),返回兩個(gè) Task/Future 集合: (done, pending)。done是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),未完成的原因可能是超時(shí)或return_when策略。
aws:
aws中保存的是task而不是協(xié)程,從3.8起不建議傳入?yún)f(xié)程,3.11將不再支持傳入?yún)f(xié)程。
timeout:
如指定 timeout (float 或 int 類(lèi)型) 則它將被用于控制返回之前等待的最長(zhǎng)秒數(shù)。
請(qǐng)注意此函數(shù)不會(huì)引發(fā) asyncio.TimeoutError。當(dāng)超時(shí)發(fā)生時(shí),未完成的 Future 或 Task 將不會(huì)繼續(xù)執(zhí)行,不會(huì)返回結(jié)果。
return_when:
return_when 指定此函數(shù)應(yīng)在何時(shí)返回。它必須為以下參數(shù)之一:
| 參數(shù) | 描述 |
|---|---|
| FIRST_COMPLETED | 函數(shù)將在任意可等待對(duì)象結(jié)束或取消時(shí)返回。 |
| FIRST_EXCEPTION | 函數(shù)將在任意可等待對(duì)象因引發(fā)異常而結(jié)束時(shí)返回。當(dāng)沒(méi)有引發(fā)任何異常時(shí)它就相當(dāng)于 ALL_COMPLETED。 |
| ALL_COMPLETED | 函數(shù)將在所有可等待對(duì)象結(jié)束或取消時(shí)返回。 |
基礎(chǔ)使用示例:
import asyncio
async def coro(value):
print(f"hello coro{value}")
return f"coro{value}"
async def main():
tasks = [asyncio.create_task(coro(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks)
for i in done:
print(i.result())
asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
coro1
coro2
coro0
coro3
coro4
結(jié)果分析:
返回結(jié)果和執(zhí)行順序并不是一致的
指定超時(shí)時(shí)間:
import asyncio
from asyncio import FIRST_COMPLETED
async def coro(value):
print(f"hello coro{value}")
await asyncio.sleep(value)
return f"coro{value}"
async def main():
tasks = [asyncio.create_task(coro(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks, timeout=3)
print("---------finish----------")
for i in done:
print(i.result())
print("---------pending----------")
for i in pending:
print(i)
asyncio.run(main())
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro1
coro2
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future finished result=None>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
結(jié)果分析:
超時(shí)未完成的task會(huì)保存在pending中,未完成的task在超時(shí)之后不會(huì)繼續(xù)執(zhí)行,沒(méi)有返回結(jié)果。
return_when配置任意任務(wù)完成就返回:
import asyncio
from asyncio import FIRST_COMPLETED
async def coro(value):
print(f"hello coro{value}")
await asyncio.sleep(value)
return f"coro{value}"
async def main():
tasks = [asyncio.create_task(coro(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
print("---------finish----------")
for i in done:
print(i.result())
print("---------pending----------")
for i in pending:
print(i)
asyncio.run(main())
結(jié)果:
hello coro0
hello coro1
hello coro2
hello coro3
hello coro4
---------finish----------
coro0
---------pending----------
<Task pending name='Task-5' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-3' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-4' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
<Task pending name='Task-6' coro=<coro() running at /Users/ljk/Documents/code/daily_dev/async_demo/wait_demo.py:6> wait_for=<Future pending cb=[Task.task_wakeup()]>>
結(jié)果分析:
獲取到任意結(jié)果就返回,未完成的task保存在pending中。未完成的task在超時(shí)之后不會(huì)繼續(xù)執(zhí)行。
as_completed
函數(shù)原型:
asyncio.as_completed(aws, *, timeout=None)
說(shuō)明:并發(fā)執(zhí)行aws中保存的可等待對(duì)象,返回一個(gè)協(xié)程的迭代器。可以從迭代器中取出最先執(zhí)行完成的task的結(jié)果。返回結(jié)果和執(zhí)行順序不一致。aws中可以是task或協(xié)程序列。
import asyncio
async def coro(value):
print(f"hello coro{value}")
return f"coro{value}"
async def main():
tasks = [coro(i) for i in range(5)]
for item in asyncio.as_completed(tasks):
res = await item
print(res)
asyncio.run(main())
結(jié)果:
hello coro2
hello coro3
hello coro4
hello coro1
hello coro0
coro2
coro3
coro4
coro1
coro0
結(jié)果分析:
所有任務(wù)都會(huì)執(zhí)行完成,沒(méi)有超時(shí)配置。返回順序和執(zhí)行順序無(wú)關(guān)。
gather、wait、as_completed 異同點(diǎn)小結(jié)
asyncio協(xié)程體系中可以實(shí)現(xiàn)創(chuàng)建多個(gè)任務(wù)并發(fā)執(zhí)行的函數(shù)有以下三個(gè):
- asyncio.gather
- asyncio.wait
- asyncio.as_completed
不同之處比較:
| 特性/函數(shù) | gather | wait | as_completed |
|---|---|---|---|
| 入?yún)?/td> | 同時(shí)支持task和協(xié)程序列 | 只支持task序列 | 同時(shí)支持task和協(xié)程序列 |
| 獲取結(jié)果順序 | 有序,和并發(fā)序列順序相同 | 無(wú)序,和并發(fā)序列無(wú)關(guān) | 無(wú)序,和并發(fā)序列無(wú)關(guān) |
| 返回 | 返回結(jié)果列表,保存的是函數(shù)返回值。 | 返回元組done、pending。元組中保存的是task,而非task 的函數(shù)返回值 | 返回一個(gè)迭代器,從中可迭代出函數(shù)返回值。 |
wait for
函數(shù)原型:
asyncio.wait_for(aw, timeout)
功能:執(zhí)行單個(gè)可等待對(duì)象,指定 timeout 秒數(shù)后超時(shí)
等待可等待對(duì)象完成,指定timeout秒數(shù)后超時(shí)。和gather類(lèi)似,可以自動(dòng)將協(xié)程轉(zhuǎn)化成任務(wù)加入循環(huán)。
timeout 可以為 None,也可以為 float 或 int 型數(shù)值表示的等待秒數(shù)。如果 timeout 為 None,則等待直到完成。
如果發(fā)生超時(shí),任務(wù)將取消并引發(fā) asyncio.TimeoutError。
async def coro():
# 睡眠5s
await asyncio.sleep(3600)
print('finish!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(coro(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
結(jié)果:
timeout!
高階API中常用的函數(shù)基本就是這些,下一篇分析低階函數(shù)。
連載一系列關(guān)于python異步編程的文章。包括同異步框架性能對(duì)比、異步事情驅(qū)動(dòng)原理等。歡迎關(guān)注微信公眾號(hào)第一時(shí)間接收推送的文章。
總結(jié)
以上是生活随笔為你收集整理的python异步编程之asyncio高阶API的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 聊聊 从源码来看ChatGLM-6B的模
- 下一篇: python异步编程之asyncio低阶