python 使用 asyncio 包处理并发
生活随笔
收集整理的這篇文章主要介紹了
python 使用 asyncio 包处理并发
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1. 線程與協程對比
- 2. 使用 asyncio 和 aiohttp 下載
- 3. 避免阻塞型調用
- 4. 使用 asyncio.as_completed
- 5. 使用Executor對象,防止阻塞事件循環
- 6. 從回調到期物和協程
learn from 《流暢的python》
1. 線程與協程對比
threading
import threading import itertools import time import sysclass Signal:go = Truedef spin(msg, signal):write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無限循環status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標移動回去time.sleep(0.1)if not signal.go:breakwrite(' ' * len(status) + "\x08" * len(status))# 使用空格清除狀態消息,把光標移回開頭def slow_function(): # 假設是一個耗時的計算過程time.sleep(10) # sleep 會阻塞主線程,釋放GIL,創建從屬線程return 42def supervisor(): # 該函數,設置從屬線程,顯示線程對象,運行耗時的計算,最后殺死線程signal = Signal()spinner = threading.Thread(target=spin, args=("thinking!", signal))print("spinner object:", spinner) # 顯示從屬線程對象spinner.start() # 啟動從屬線程result = slow_function() # 運行計算程序,阻塞主線程,從屬線程動畫顯示旋轉指針signal.go = False # 改變signal 狀態,終止 spin 中的for循環spinner.join() # 等待spinner線程結束return resultdef main():result = supervisor() # 運行 supervisorprint("Answer:", result)if __name__ == '__main__':main()適合 asyncio 的協程要由調用方驅動,并由調用方通過 yield from 調用(語法過時了,新版的用 async / await )
或者把協程傳給 asyncio 包中的某個函數
一篇博文參考:https://www.cnblogs.com/dhcn/p/9032461.html
import asyncio import itertools import sys# https://docs.python.org/3.8/library/asyncio.html async def spin(msg): # py3.5以后的新語法 async / await,協程函數write, flush = sys.stdout.write, sys.stdout.flushfor char in itertools.cycle("|/-\\"): # 無限循環status = char + ' ' + msgwrite(status)flush()write("\x08" * len(status)) # \x08 退格鍵,光標移動回去try:await asyncio.sleep(0.1)except asyncio.CancelledError: # 遇到取消異常,退出循環print("cancel")breakwrite(' ' * len(status) + "\x08" * len(status))print("end spin")async def slow_function(): # 協程函數print("start IO")await asyncio.sleep(3) # 假裝進行 IO 操作print("end IO ")return 42async def supervisor(): # 協程函數spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任務print("spinner object:", spinner) # 顯示從屬線程對象# spinner object: <Task pending coro=<spin() running at D:\ >print("start slow")result = await slow_function()print("end slow")spinner.cancel() # task對象可以取消,拋出CancelledError異常return resultdef main():loop = asyncio.get_event_loop() # 獲取事件循環的引用result = loop.run_until_complete(supervisor()) # 驅動 supervisor 協程,讓它運行完畢loop.close()print("answer:", result)if __name__ == '__main__':main()輸出:
spinner object: <Task pending coro=<spin() running at D:\gitcode > start slow start IO end IO ng!(期間thinking!在輸出,后來被覆蓋) end slow cancel end spin answer: 42請按任意鍵繼續. . .2. 使用 asyncio 和 aiohttp 下載
import time import sys import os import asyncio import aiohttpPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './'def save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:image = await resp.read()return imageasync def download_one(cc):image = await get_flag(cc)show(cc)save_flag(image, cc.lower() + '.gif')return ccdef download_many_(cc_list):loop = asyncio.get_event_loop()todo = [download_one(cc) for cc in sorted(cc_list)] # 協程對象wait_coro = asyncio.wait(todo) # 包裝成 task,wait是協程函數,返回協程或者生成器對象res, _ = loop.run_until_complete(wait_coro)# 驅動協程,返回 第一個元素是一系列結束的期物,第二個元素是一系列未結束的期物# loop.close(),好像不需要這句 上面 with 處可能自動關閉了return len(res)def main(download_many):t0 = time.time()count = download_many(POP20_CC)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計時信息if __name__ == '__main__':main(download_many_)# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN # 20 flags downloaded in 3.88s3. 避免阻塞型調用
執行硬盤或網絡 I/O 操作的函數定義為 阻塞型函數
有兩種方法能 避免阻塞型調用 中止整個應用程序 的進程:
- 在單獨的線程中運行各個阻塞型操作
- 把每個阻塞型操作 轉換成非阻塞的異步調用 使用
4. 使用 asyncio.as_completed
import collections import time import sys import os import asyncio from http import HTTPStatusimport aiohttp from aiohttp import web import tqdmPOP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = './' DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000class FetchError(Exception):def __init__(self, country_code):self.country_code = country_codedef save_flag(img, filename): # 保存圖像path = os.path.join(DEST_DIR, filename)with open(path, 'wb') as fp:fp.write(img)def show(text): # 打印信息print(text, end=' ')sys.stdout.flush()async def get_flag(cc): # 獲取圖像url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())async with aiohttp.request("GET", url) as resp:if resp.status == 200:image = await resp.read()return imageelif resp.status == 404:raise web.HTTPNotFound()else:raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)async def download_one(cc, semaphore, verbose):try:async with semaphore:image = await get_flag(cc)except web.HTTPNotFound:status = HTTPStatus.NOT_FOUNDmsg = "not found"except Exception as exc:raise FetchError(cc) from excelse:save_flag(image, cc.lower() + '.gif')status = HTTPStatus.OKmsg = "OK"if verbose and msg:print(cc, msg)return (status, cc)async def downloader_coro(cc_list, verbose, concur_req): # 協程函數counter = collections.Counter()semaphore = asyncio.Semaphore(value=concur_req) # 最多可以使用這個計數器的協程個數todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)] # 協程對象列表todo_iter = asyncio.as_completed(todo) # 獲取迭代器,會在期物運行結束后返回期物if not verbose:todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list)) # 迭代器傳給tqdm,顯示進度條for future in todo_iter: # 迭代器運行結束的期物try:res = await future # 獲取期物對象的結果except FetchError as exc:country_code = exc.country_codetry:error_msg = exc.__cause__.args[0]except IndexError:error_msg = exc.__cause__.__class__.__name__if verbose and error_msg:msg = '*** Error for {}: {}'print(msg.format(country_code, error_msg))status = HTTPStatus.errorelse:status = res[0]counter[status] += 1 # 記錄結果return counter # 返回計數器def download_many_(cc_list, verbose, concur_req):loop = asyncio.get_event_loop()coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)# 實例化 downloader_coro協程,然后通過 run_until_complete 方法把它傳給事件循環counts = loop.run_until_complete(coro)# loop.close() # 好像不需要這句 上面 with 處可能自動關閉了return countsdef main(download_many):t0 = time.time()count = download_many(POP20_CC, True, MAX_CONCUR_REQ)elapsed = time.time() - t0msg = '\n{} flags downloaded in {:.2f}s'print(msg.format(count, elapsed)) # 計時信息if __name__ == '__main__':main(download_many_)5. 使用Executor對象,防止阻塞事件循環
- loop.run_in_executor 方法把阻塞的作業(例如保存文件)委托給線程池做
6. 從回調到期物和協程
- 如果一個操作需要依賴之前操作的結果,那就得嵌套回調
好的寫法:
async def three_stages(request1): response1 = await api_call1(request1) # 第一步 request2 = step1(response1) response2 = await api_call2(request2) # 第二步 request3 = step2(response2) response3 = await api_call3(request3)# 第三步 step3(response3) loop.create_task(three_stages(request1)) # 必須顯式調度執行協程 必須使用 事件循環 顯式排定 協程的執行時間
異步系統 能 避免用戶級線程的開銷,這是它能比多線程系統管理更多并發連接的主要原因
總結
以上是生活随笔為你收集整理的python 使用 asyncio 包处理并发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据仓库 Hive(内含大数据镜像下载)
- 下一篇: python web开发 HTML基础