日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python异步编程之asyncio低阶API

發布時間:2024/1/8 python 39 coder
生活随笔 收集整理的這篇文章主要介紹了 python异步编程之asyncio低阶API 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

低階API介紹

asyncio中低階API的種類很多,涉及到開發的5個方面。包括:

  1. 獲取事件循環
  2. 事件循環方法集
  3. 傳輸
  4. 協議
  5. 事件循環策略

本篇中只講解asyncio常見常用的函數,很多底層函數如網絡、IPC、套接字、信號等不在本篇范圍。

獲取事件循環

事件循環是異步中重要的概念之一,用于驅動任務的執行。包含的低階API如下:

函數 功能
asyncio.get_running_loop() 獲取當前運行的事件循環首選函數。
asyncio.get_event_loop() 獲得一個事件循環實例
asyncio.set_event_loop() 將策略設置到事件循環
asyncio.new_event_loop() 創建一個新的事件循環

在asyncio初識這篇中提到過事件循環,可以把事件循環當做是一個while循環,在周期性的運行并執行一些任務。這個說法比較抽象,事件循環本質上其實是能調用操作系統IO模型的模塊。以Linux系統為例,IO模型有阻塞,非阻塞,IO多路復用等。asyncio 常用的是IO多路復用模型的epoolkqueue。事件循環原理涉及到異步編程的操作系統原理,后續更新一系列相關文章。

get_event_loop()
創建一個事件循環,用于驅動協程的執行

import asyncio

async def demo(i):
    print(f"hello {i}")

def main():
    loop = asyncio.get_event_loop()
    print(loop._selector)
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)

main()

結果:

<selectors.KqueueSelector object at 0x104eabe20>
hello 1

可以通過loop._selector屬性獲取到當前事件循環使用的是kqueue模型

獲取循環

import asyncio

async def demo(i):
    res = asyncio.get_running_loop()
    print(res)
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)
main()

結果:

<_UnixSelectorEventLoop running=True closed=False debug=False>
hello 1

推薦使用asyncio.run 創建事件循環,底層API主要用于庫的編寫。

生命周期

生命周期是用于管理任務的啟停的函數,如下:

函數 功能
loop.run_until_complete() 運行一個期程/任務/可等待對象直到完成。
loop.run_forever() 一直運行事件循環,直到被顯示停止
loop.stop() 停止事件循環
loop.close() 關閉事件循環
loop.is_running() 返回 True , 如果事件循環正在運行
loop.is_closed() 返回 True ,如果事件循環已經被關閉
await loop.shutdown_asyncgens() 關閉異步生成器

run_until_complete
運行一個期程/任務/可等待對象直到完成。run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task。run_until_complete()是會自動關閉事件循環的函數,區別于run_forever()是需要手動關閉事件循環的函數。

import asyncio 


async def demo(i):
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    
    task = loop.create_task(demo(1))

    # 傳入的是一個任務
    loop.run_until_complete(task)

    # 傳入的是一個協程也可以
    loop.run_until_complete(demo(20))


main()

結果:

hello 1
hello 20

調試

函數 功能
loop.set_debug() 開啟或禁用調試模式
loop.get_debug() 獲取當前測試模式

調度回調函數

在異步編程中回調函數是一種很常見的方法,想要在事件循環中增加一些回調函數,可以有如下方法:

函數 功能
loop.call_soon() 盡快調用回調。
loop.call_soon_threadsafe() loop.call_soon() 方法線程安全的變體。
loop.call_later() 在給定時間之后調用回調函數。
loop.call_at() 在指定的時間調用回調函數。

這些回調函數既可以回調普通函數也可以回調協程函數。
call_soon
函數原型:

loop.call_soon(callback, *args, context=None)

示例:

import asyncio

async def my_coroutine():
    print("協程被執行")

async def other_coro():
    print("非call_soon調用")

def callback_function():
    print("回調函數被執行")


# 創建一個事件循環
loop = asyncio.get_event_loop()

# 使用create_task包裝協程函數,并調度執行
loop.call_soon(loop.create_task, my_coroutine())

# 調度一個常規函數以盡快執行
loop.call_soon(callback_function)

# 啟動一個事件循環
task = loop.create_task(other_coro())
loop.run_until_complete(task)

結果:

回調函數被執行
非call_soon調用
協程被執行

結果分析:
call_soon調用普通函數直接傳入函數名作為參數,調用協程函數需要講協程通過loop.create_task封裝成task。

線程/進程池

函數 功能
await loop.run_in_executor() 多線程中運行一個阻塞的函數
loop.set_default_executor() 設置 loop.run_in_executor() 默認執行器

asyncio.run_in_executor 用于在異步事件循環中執行一個阻塞的函數或方法。它將阻塞的調用委托給一個線程池或進程池,以確保不阻塞主事件循環。可以用于在協程中調用一些不支持異步編程的方法,不支持異步編程的模塊。

run_in_executor

import asyncio
import concurrent.futures

def blocking_function():
    # 模擬一個阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函數返回"

async def async_function2():
    print("async_function2 start")
    await asyncio.sleep(1)
    print("async_function2 end")

async def async_function():
    print("異步函數開始執行。。。")

    print("調用同步阻塞函數")
    # 使用run_in_executor調度執行阻塞函數
    result = await loop.run_in_executor(None, blocking_function)

    print(f"獲取同步函數的結果: {result}")

# 創建一個事件循環
loop = asyncio.get_event_loop()

# 運行異步函數
loop.run_until_complete(asyncio.gather(async_function(), async_function2()))

結果:

異步函數開始執行。。。
調用同步阻塞函數
async_function2 start
async_function2 end
獲取同步函數的結果: 阻塞函數返回

結果分析:
通過事件循環執行任務async_function,在async_function中通過loop.run_in_executor調用同步阻塞函數blocking_function,該阻塞函數沒有影響事件循環中另一個任務async_function2的執行。
await loop.run_in_executor(None, blocking_function)中None代表使用的是默認線程池,也可以替換成其他線程池。

使用自定義線程池和進程池

import asyncio
import concurrent.futures

def blocking_function():
    # 模擬一個阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函數返回"

async def async_function():
    print("異步函數開始執行。。。")

    print("調用同步阻塞函數")

    # 線程池
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('線程池調用返回結果:', result)

    # 進程池
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('進程池調用返回結果:', result)

if __name__ == '__main__':
    # 創建一個事件循環
    loop = asyncio.get_event_loop()

    # 運行異步函數
    loop.run_until_complete(async_function())


結果:

異步函數開始執行。。。
調用同步阻塞函數
線程池調用返回結果: 阻塞函數返回
進程池調用返回結果: 阻塞函數返回

結果分析:
通過線程池concurrent.futures.ThreadPoolExecutor()和進程池concurrent.futures.ProcessPoolExecutor()執行阻塞函數。

任務與期程

函數 功能
loop.create_future() 創建一個 Future 對象。
loop.create_task() 將協程當作 Task 一樣調度。
loop.set_task_factory() 設置 loop.create_task() 使用的工廠,它將用來創建 Tasks 。
loop.get_task_factory() 獲取 loop.create_task() 使用的工廠,它用來創建 Tasks 。

create_future
create_future 的功能是創建一個future對象。future對象通常不需要手動創建,因為task會自動管理任務結果。相當于task是全自動,創建future是半自動。創建的future就需要手動的講future狀態設置成完成,才能表示task的狀態為完成。

import asyncio


def foo(future, result):
    print(f"此時future的狀態:{future}")
    future.set_result(result)
    print(f"此時future的狀態:{future}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 手動創建future對象
    all_done = loop.create_future()

    # 設置一個回調函數用于修改設置future的結果
    loop.call_soon(foo, all_done, "Future is done!")

    result = loop.run_until_complete(all_done)

    print("返回結果", result)
    print("獲取future的結果", all_done.result())

結果:

此時future的狀態:<Future pending cb=[_run_until_complete_cb() at /Users/lib/python3.10/asyncio/base_events.py:184]>
此時future的狀態:<Future finished result='Future is done!'>
返回結果 Future is done!
獲取future的結果 Future is done!

結果分析:
future設置結果之后之后,future對象的狀態就從pending變成finished狀態。如果一個future沒有手動設置結果,那么事件循環就不會停止。

create_task
將協程封裝成一個task對象,事件循環主要操作的是task對象。協程沒有狀態,而task是有狀態的。

import asyncio 


async def demo(i):
    print(f"hello {i}")
    await asyncio.sleep(1)

def main():
    loop = asyncio.get_event_loop()

    # 將攜程封裝成task,給事件使用
    task = loop.create_task(demo(1))

    loop.run_until_complete(task)

main()
>>> 
hello 1

asyncio.create_task 和 loop.create_task的區別:
兩者實現的功能都是一樣的,將協程封裝成一個task,讓協程擁有了生命周期。區別僅僅在于使用的方法。asyncio.create_task 是高階API,不需要創建事件循環,而loop.create_task需要先創建事件循環再使用該方法。

小結

以上是asyncio低階API的使用介紹,前一篇是高階API的使用介紹,用兩篇介紹了asyncio常見的函數,以后遇到asyncio相關的代碼就不會感到陌生。雖然asyncio是比較復雜的編程思想,但是有了這些函數的使用基礎,能夠更高效的掌握。

連載一系列關于python異步編程的文章。包括同異步框架性能對比、異步事情驅動原理等。歡迎關注微信公眾號第一時間接收推送的文章。

總結

以上是生活随笔為你收集整理的python异步编程之asyncio低阶API的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。