日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 使用 asyncio 包处理并发

發布時間:2024/7/5 python 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 使用 asyncio 包处理并发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 1. 線程與協程對比
    • 2. 使用 asyncio 和 aiohttp 下載
    • 3. 避免阻塞型調用
    • 4. 使用 asyncio.as_completed
    • 5. 使用Executor對象,防止阻塞事件循環
    • 6. 從回調到期物和協程

learn from 《流暢的python》

1. 線程與協程對比

threading

import threading import itertools import time import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無限循環status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標移動回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除狀態消息,把光標移回開頭def slow_function(): # 假設是一個耗時的計算過程time.sleep(10) # sleep 會阻塞主線程,釋放GIL,創建從屬線程return 42def supervisor(): # 該函數,設置從屬線程,顯示線程對象,運行耗時的計算,最后殺死線程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner) # 顯示從屬線程對象spinner.start() # 啟動從屬線程result = slow_function() # 運行計算程序,阻塞主線程,從屬線程動畫顯示旋轉指針signal.go = False # 改變signal 狀態,終止 spin 中的for循環spinner.join() # 等待spinner線程結束return resultdef main():result = supervisor() # 運行 supervisorprint("Answer:", result)if __name__ == '__main__':main()

適合 asyncio 的協程要由調用方驅動,并由調用方通過 yield from 調用(語法過時了,新版的用 async / await )
或者把協程傳給 asyncio 包中的某個函數

一篇博文參考:https://www.cnblogs.com/dhcn/p/9032461.html

import asyncio import itertools import sys# https://docs.python.org/3.8/library/asyncio.html async def spin(msg): # py3.5以后的新語法 async / await,協程函數write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無限循環status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標移動回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消異常,退出循環print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 協程函數print("start IO")await asyncio.sleep(3) # 假裝進行 IO 操作print("end IO ")return 42async def supervisor(): # 協程函數spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任務print("spinner object:", spinner) # 顯示從屬線程對象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task對象可以取消,拋出CancelledError異常return resultdef main():loop = asyncio.get_event_loop() # 獲取事件循環的引用result = loop.run_until_complete(supervisor()) # 驅動 supervisor 協程,讓它運行完畢loop.close()print("answer:", result)if __name__ == '__main__':main()

輸出:

spinner object: <Task pending coro=<spin() running at D:\gitcode > start slow start IO end IO ng!(期間thinking!在輸出,后來被覆蓋) end slow cancel end spin answer: 42請按任意鍵繼續. . .

2. 使用 asyncio 和 aiohttp 下載

import time import sys import os import asyncio import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './'def save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 協程對象wait_coro = asyncio.wait(todo) # 包裝成 task,wait是協程函數,返回協程或者生成器對象res, _ = loop.run_until_complete(wait_coro)# 驅動協程,返回 第一個元素是一系列結束的期物,第二個元素是一系列未結束的期物# loop.close(),好像不需要這句 上面 with 處可能自動關閉了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計時信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN # 20 flags downloaded in 3.88s

3. 避免阻塞型調用

執行硬盤或網絡 I/O 操作的函數定義為 阻塞型函數

有兩種方法能 避免阻塞型調用 中止整個應用程序 的進程:

  • 單獨的線程中運行各個阻塞型操作
  • 把每個阻塞型操作 轉換成非阻塞的異步調用 使用

4. 使用 asyncio.as_completed

import collections import time import sys import os import asyncio from http import HTTPStatusimport aiohttp from aiohttp import web import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './' DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req): # 協程函數counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req) # 最多可以使用這個計數器的協程個數todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)] # 協程對象列表todo_iter = asyncio.as_completed(todo) # 獲取迭代器,會在期物運行結束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list)) # 迭代器傳給tqdm,顯示進度條for future in todo_iter: # 迭代器運行結束的期物try:res = await future # 獲取期物對象的結果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1 # 記錄結果return counter # 返回計數器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 實例化 downloader_coro協程,然后通過 run_until_complete 方法把它傳給事件循環counts = loop.run_until_complete(coro)# loop.close() # 好像不需要這句 上面 with 處可能自動關閉了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計時信息if __name__ == '__main__':main(download_many_)

5. 使用Executor對象,防止阻塞事件循環

  • loop.run_in_executor 方法把阻塞的作業(例如保存文件)委托給線程池做
async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:# 因此保存文件時,整個應用程序都會凍結,為了避免,使用下面方法loop = asyncio.get_event_loop() # 獲取事件循環對象的引用loop.run_in_executor(None, # 方法的第一個參數是 Executor 實例;# 如果設為 None,使用事件循環的默認 ThreadPoolExecutor 實例save_flag, image, cc.lower() + ".gif")# 余下的參數是可調用的對象,以及可調用對象的位置參數status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)

6. 從回調到期物和協程

  • 如果一個操作需要依賴之前操作的結果,那就得嵌套回調
def stage1(response1):request2 = step1(response1)api_call2(request2, stage2)def stage2(response2):request3 = step2(response2)api_call3(request3, stage3)def stage3(response3):tep3(response3)api_call1(request1, stage1)

好的寫法:

async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await api_call2(request2) # 第二步 request3 = step2(response2) response3 = await api_call3(request3)# 第三步 step3(response3) loop.create_task(three_stages(request1)) # 必須顯式調度執行

協程 必須使用 事件循環 顯式排定 協程的執行時間

異步系統避免用戶級線程的開銷,這是它能比多線程系統管理更多并發連接的主要原因

總結

以上是生活随笔為你收集整理的python 使用 asyncio 包处理并发的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。