Python 中 异步协程 的 使用方法介绍
?
靜覓 崔慶才的個(gè)人博客:Python中異步協(xié)程的使用方法介紹:https://cuiqingcai.com/6160.html
Python 異步 IO 、協(xié)程、asyncio、async/await、aiohttp:https://blog.csdn.net/freeking101/article/details/85286199
?
?
?
1. 前言
?
??????? 在執(zhí)行一些 IO 密集型任務(wù)的時(shí)候,程序常常會(huì)因?yàn)榈却?IO 而阻塞。比如在網(wǎng)絡(luò)爬蟲中,如果我們使用 requests 庫(kù)來進(jìn)行請(qǐng)求的話,如果網(wǎng)站響應(yīng)速度過慢,程序一直在等待網(wǎng)站響應(yīng),最后導(dǎo)致其爬取效率是非常非常低的。
??????? 為了解決這類問題,本文就來探討一下 Python 中異步協(xié)程來加速的方法,此種方法對(duì)于 IO 密集型任務(wù)非常有效。如將其應(yīng)用到網(wǎng)絡(luò)爬蟲中,爬取效率甚至可以成百倍地提升。
??????? 注:本文協(xié)程使用 async/await 來實(shí)現(xiàn),需要 Python 3.5 及以上版本。
?
?
2. 基本了解
?
在了解異步協(xié)程之前,我們首先得了解一些基礎(chǔ)概念,如 阻塞和非阻塞、同步和異步、多進(jìn)程和協(xié)程。
?
2.1 阻塞
??????? 阻塞狀態(tài)指程序未得到所需計(jì)算資源時(shí)被掛起的狀態(tài)。程序在等待某個(gè)操作完成期間,自身無法繼續(xù)干別的事情,則稱該程序在該操作上是阻塞的。
??????? 常見的阻塞形式有:網(wǎng)絡(luò) I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時(shí),所有的進(jìn)程都無法真正干事情,它們也會(huì)被阻塞。如果是多核 CPU 則正在執(zhí)行上下文切換操作的核不可被利用。
?
2.2 非阻塞
程序在等待某操作過程中,自身不被阻塞,可以繼續(xù)運(yùn)行干別的事情,則稱該程序在該操作上是非阻塞的。
非阻塞并不是在任何程序級(jí)別、任何情況下都可以存在的。
僅當(dāng)程序封裝的級(jí)別可以囊括獨(dú)立的子程序單元時(shí),它才可能存在非阻塞狀態(tài)。
非阻塞的存在是因?yàn)樽枞嬖?#xff0c;正因?yàn)槟硞€(gè)操作阻塞導(dǎo)致的耗時(shí)與效率低下,我們才要把它變成非阻塞的。
?
2.3 同步 (?同步?意味著?有序 )
不同程序單元為了完成某個(gè)任務(wù),在執(zhí)行過程中需靠某種通信方式以協(xié)調(diào)一致,稱這些程序單元是同步執(zhí)行的。
例如購(gòu)物系統(tǒng)中更新商品庫(kù)存,需要用 "鎖"?作為通信信號(hào),讓不同的更新請(qǐng)求強(qiáng)制排隊(duì)順序執(zhí)行,更新庫(kù)存的操作是同步的。
簡(jiǎn)言之,同步意味著有序。
?
2.4 異步 (?異步 意味著 無序 )
為完成某個(gè)任務(wù),不同程序單元之間過程中無需通信協(xié)調(diào),也能完成任務(wù)的方式,不相關(guān)的程序單元之間可以是異步的。
例如:爬蟲下載網(wǎng)頁(yè)。調(diào)度程序調(diào)用下載程序后,即可調(diào)度其他任務(wù),而無需與該下載任務(wù)保持通信以協(xié)調(diào)行為。不同網(wǎng)頁(yè)的下載、保存等操作都是無關(guān)的,也無需相互通知協(xié)調(diào)。這些異步操作的完成時(shí)刻并不確定。
簡(jiǎn)言之,異步意味著無序。
?
2.5 多進(jìn)程
多進(jìn)程就是利用 CPU 的多核優(yōu)勢(shì),在同一時(shí)間并行地執(zhí)行多個(gè)任務(wù),可以大大提高執(zhí)行效率。
?
2.6 協(xié)程
協(xié)程,英文叫做 Coroutine,又稱 微線程,纖程,協(xié)程是一種用戶態(tài)的輕量級(jí)線程。
協(xié)程擁有自己的寄存器上下文和棧。協(xié)程調(diào)度切換時(shí),將寄存器上下文和棧保存到其他地方,在切回來的時(shí)候,恢復(fù)先前保存的寄存器上下文和棧。因此協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài),即所有局部狀態(tài)的一個(gè)特定組合,每次過程重入時(shí),就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài)。
協(xié)程本質(zhì)上是個(gè)單進(jìn)程,協(xié)程相對(duì)于多進(jìn)程來說,無需線程上下文切換的開銷,無需原子操作鎖定及同步的開銷,編程模型也非常簡(jiǎn)單。
我們可以使用協(xié)程來實(shí)現(xiàn)異步操作,比如在網(wǎng)絡(luò)爬蟲場(chǎng)景下,我們發(fā)出一個(gè)請(qǐng)求之后,需要等待一定的時(shí)間才能得到響應(yīng),但其實(shí)在這個(gè)等待過程中,程序可以干許多其他的事情,等到響應(yīng)得到之后才切換回來繼續(xù)處理,這樣可以充分利用 CPU 和其他資源,這就是異步協(xié)程的優(yōu)勢(shì)。
?
?
3. 異步協(xié)程用法
?
接下來讓我們來了解下協(xié)程的實(shí)現(xiàn),從 Python 3.4 開始,Python 中加入了協(xié)程的概念,但這個(gè)版本的協(xié)程還是以生成器對(duì)象為基礎(chǔ)的,在 Python 3.5 則增加了 async / await,使得協(xié)程的實(shí)現(xiàn)更加方便。
Python 中使用協(xié)程最常用的庫(kù)莫過于 asyncio,所以本文會(huì)以 asyncio 為基礎(chǔ)來介紹協(xié)程的使用。
首先我們需要了解下面幾個(gè)概念:
- event_loop:事件循環(huán),相當(dāng)于一個(gè)無限循環(huán),我們可以把一些函數(shù)注冊(cè)到這個(gè)事件循環(huán)上,當(dāng)滿足條件發(fā)生的時(shí)候,就會(huì)調(diào)用對(duì)應(yīng)的處理方法。
- coroutine:中文翻譯叫協(xié)程,在 Python 中常指代為協(xié)程對(duì)象類型,我們可以將協(xié)程對(duì)象注冊(cè)到事件循環(huán)中,它會(huì)被事件循環(huán)調(diào)用。我們可以使用 async 關(guān)鍵字來定義一個(gè)方法,這個(gè)方法在調(diào)用時(shí)不會(huì)立即被執(zhí)行,而是返回一個(gè)協(xié)程對(duì)象。
- task:任務(wù),它是對(duì)協(xié)程對(duì)象的進(jìn)一步封裝,包含了任務(wù)的各個(gè)狀態(tài)。
- future:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果,實(shí)際上和 task 沒有本質(zhì)區(qū)別。
另外我們還需要了解 async / await 關(guān)鍵字,它是從 Python 3.5 才出現(xiàn)的,專門用于定義協(xié)程。
其中,async 定義一個(gè)協(xié)程,await 用來掛起阻塞方法的執(zhí)行。
?
3.1 定義協(xié)程
首先我們來定義一個(gè)協(xié)程,體驗(yàn)一下它和普通進(jìn)程在實(shí)現(xiàn)上的不同之處,代碼如下:
import asyncioasync def execute(x):print(f'Number:{x}')coroutine = execute(1) print(coroutine) print('After calling execute')loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')運(yùn)行結(jié)果:
<coroutine object execute at 0x00000247201D3740> After calling execute Number:1 After calling loop說明:
- 首先引入?asyncio 包,這樣才可以使用 async 和 await。然后使用 async 定義了一個(gè) execute() 方法
- 調(diào)用execute方法(?coroutine = execute(1) ),然而這個(gè)方法并沒有執(zhí)行,而是返回了一個(gè) coroutine 協(xié)程對(duì)象。
- 使用 get_event_loop() 方法創(chuàng)建了一個(gè)事件循環(huán) loop,并調(diào)用了 loop 對(duì)象的 run_until_complete() 方法將協(xié)程注冊(cè)到事件循環(huán) loop 中,然后啟動(dòng)。最后我們才看到了 execute() 方法打印了輸出結(jié)果。
可見 async 定義的方法就會(huì)變成一個(gè)無法直接執(zhí)行的 coroutine 對(duì)象,必須將其注冊(cè)到事件循環(huán)中才可以執(zhí)行。
在上面還提到了 task,它是對(duì) coroutine 對(duì)象的進(jìn)一步封裝,它里面相比 coroutine 對(duì)象多了運(yùn)行狀態(tài),比如 running、finished 等,我們可以用這些狀態(tài)來獲取協(xié)程對(duì)象的執(zhí)行情況。
上面的例子中,我們將 coroutine 對(duì)象傳遞給 run_until_complete() 方法的時(shí)候,實(shí)際上它進(jìn)行了一個(gè)操作就是將 coroutine 封裝成了 task 對(duì)象,我們也可以顯式地進(jìn)行聲明,如下所示:
import asyncioasync def execute(x):print(f'Number:{x}')return xcoroutine = execute(1) print(f'Coroutine: {coroutine}') print('After calling execute')loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(f'Task: {task}') loop.run_until_complete(task) print(f'Task: {task}') print('After calling loop')運(yùn)行結(jié)果:
Coroutine:?<coroutine?object?execute?at?0x10e0f7830> After?calling?execute Task:?<Task?pending?coro=<execute()?running?at?demo.py:4>> Number:?1 Task:?<Task?finished?coro=<execute()?done,?defined?at?demo.py:4>?result=1> After?calling?loop? ? ? ? 這里我們定義了 loop 對(duì)象之后,接著調(diào)用了它的 create_task() 方法將 coroutine 對(duì)象轉(zhuǎn)化為了 task 對(duì)象,隨后我們打印輸出一下,發(fā)現(xiàn)它是 pending 狀態(tài)。接著我們將 task 對(duì)象添加到事件循環(huán)中得到執(zhí)行,隨后我們?cè)俅蛴≥敵鲆幌?task 對(duì)象,發(fā)現(xiàn)它的狀態(tài)就變成了 finished,同時(shí)還可以看到其 result 變成了 1,也就是我們定義的 execute() 方法的返回結(jié)果。
? ? ? ? 另外定義 task 對(duì)象還有一種方式,就是直接通過 asyncio 的 ensure_future() 方法,返回結(jié)果也是 task 對(duì)象,這樣的話我們就可以不借助于 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對(duì)象,寫法如下:
import asyncioasync def execute(x):print(f'Number: {x}')return xcoroutine = execute(1) print(f'Coroutine: {coroutine}') print('After calling execute')task = asyncio.ensure_future(coroutine) print(f'Task: {task}') loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f'Task: {task}') print('After calling loop')運(yùn)行結(jié)果:
Coroutine:?<coroutine?object?execute?at?0x10aa33830> After?calling?execute Task:?<Task?pending?coro=<execute()?running?at?demo.py:4>> Number:?1 Task:?<Task?finished?coro=<execute()?done,?defined?at?demo.py:4>?result=1> After?calling?loop發(fā)現(xiàn)其效果都是一樣的。
?
3.2 綁定回調(diào)
另外我們也可以為某個(gè) task 綁定一個(gè)回調(diào)方法,來看下面的例子:
import asyncio import requestsasync def request():url = 'https://www.baidu.com'response = requests.get(url)return response.status_codedef callback(t_task):print('status_code:', t_task.result())coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print('Task:', task)loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)說明:
定義一個(gè) request() 方法,請(qǐng)求百度,返回狀態(tài)碼,同時(shí)這個(gè)方法里面我們沒有任何 print() 語句。
然后定義一個(gè) callback() 方法,這個(gè)方法接收一個(gè)參數(shù),是 task 對(duì)象,然后調(diào)用 print() 方法打印了 task 對(duì)象的結(jié)果。這樣我們就定義好了一個(gè) coroutine對(duì)象 和 一個(gè)回調(diào)方法,
我們現(xiàn)在希望的效果是,當(dāng) coroutine 對(duì)象執(zhí)行完畢之后,就去執(zhí)行聲明的 callback() 方法。那么它們二者怎樣關(guān)聯(lián)起來呢?
很簡(jiǎn)單,只需要調(diào)用 add_done_callback() 方法即可,我們將 callback() 方法傳遞給了封裝好的 task 對(duì)象,這樣當(dāng) task 執(zhí)行完畢之后就可以調(diào)用 callback() 方法了,同時(shí) task 對(duì)象還會(huì)作為參數(shù)傳遞給 callback() 方法,調(diào)用 task 對(duì)象的 result() 方法就可以獲取返回結(jié)果了。
運(yùn)行結(jié)果:
Task:?<Task?pending?coro=<request()?running?at?demo.py:5>?cb=[callback()?at?demo.py:11]> status_code: 200 Task:?<Task?finished?coro=<request()?done,?defined?at?demo.py:5>?result=200>實(shí)際上不用回調(diào)方法,直接在 task 運(yùn)行完畢之后也可以直接調(diào)用 result() 方法獲取結(jié)果,如下所示:
import asyncio import requestsasync def request():url = 'https://www.baidu.com'status = requests.get(url)return statuscoroutine = request() task = asyncio.ensure_future(coroutine) print(f'Task: {task}')loop = asyncio.get_event_loop() loop.run_until_complete(task) print(f'Task: {task}') print(f'Task Result: {task.result()}')運(yùn)行結(jié)果是一樣的:
Task:?<Task?pending?coro=<request()?running?at?demo.py:4>> Task:?<Task?finished?coro=<request()?done,?defined?at?demo.py:4>?result=<Response?[200]>> Task?Result:?<Response?[200]>?
3.3 多任務(wù) 的 協(xié)程
上面的例子我們只執(zhí)行了一次請(qǐng)求,如果我們想執(zhí)行多次請(qǐng)求應(yīng)該怎么辦呢?我們可以定義一個(gè) task 列表,然后使用 asyncio 的 wait() 方法即可執(zhí)行,看下面的例子:
import asyncio import requestsasync def request():url = 'https://www.baidu.com'status = requests.get(url)return statustasks = [asyncio.ensure_future(request()) for _ in range(5)]# print(f'Tasks: {tasks}') list(map(lambda x: print(x), tasks))loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print('Task Result:', task.result())這里我們使用一個(gè) for 循環(huán)創(chuàng)建了五個(gè) task,組成了一個(gè)列表,然后把這個(gè)列表首先傳遞給了 asyncio 的 wait() 方法,然后再將其注冊(cè)到時(shí)間循環(huán)中,就可以發(fā)起五個(gè)任務(wù)了。最后我們?cè)賹⑷蝿?wù)的運(yùn)行結(jié)果輸出出來,運(yùn)行結(jié)果如下:
<Task pending name='Task-1' coro=<request() running at demo.py:5>> <Task pending name='Task-2' coro=<request() running at demo.py:5>> <Task pending name='Task-3' coro=<request() running at demo.py:5>> <Task pending name='Task-4' coro=<request() running at demo.py:5>> <Task pending name='Task-5' coro=<request() running at demo.py:5>> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]>可以看到五個(gè)任務(wù)被順次執(zhí)行了,并得到了運(yùn)行結(jié)果。
?
3.4 協(xié)程實(shí)現(xiàn)
? ? ? ? 前面說了這么一通,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并沒有看出協(xié)程的優(yōu)勢(shì)啊?反而寫法上更加奇怪和麻煩了,別急,上面的案例只是鋪墊,接下來我們正式來看下協(xié)程在解決 IO 密集型任務(wù)上有怎樣的優(yōu)勢(shì)吧!
? ? ? ? 上面的代碼中,我們用一個(gè)網(wǎng)絡(luò)請(qǐng)求作為示例,這就是一個(gè)耗時(shí)等待的操作,因?yàn)槲覀冋?qǐng)求網(wǎng)頁(yè)之后需要等待頁(yè)面響應(yīng)并返回結(jié)果。耗時(shí)等待的操作一般都是 IO 操作,比如文件讀取、網(wǎng)絡(luò)請(qǐng)求等等。協(xié)程對(duì)于處理這種操作是有很大優(yōu)勢(shì)的,當(dāng)遇到需要等待的情況的時(shí)候,程序可以暫時(shí)掛起,轉(zhuǎn)而去執(zhí)行其他的操作,從而避免一直等待一個(gè)程序而耗費(fèi)過多的時(shí)間,充分利用資源。
? ? ? ? 為了表現(xiàn)出協(xié)程的優(yōu)勢(shì),我們需要先創(chuàng)建一個(gè)合適的實(shí)驗(yàn)環(huán)境,最好的方法就是模擬一個(gè)需要等待一定時(shí)間才可以獲取返回結(jié)果的網(wǎng)頁(yè),上面的代碼中使用了百度,但百度的響應(yīng)太快了,而且響應(yīng)速度也會(huì)受本機(jī)網(wǎng)速影響,所以最好的方式是自己在本地模擬一個(gè)慢速服務(wù)器,這里我們選用 Flask。
如果沒有安裝 Flask 的話可以執(zhí)行如下命令安裝:
pip3 install flask然后編寫服務(wù)器代碼如下:
from flask import Flask import timeapp = Flask(__name__)@app.route('/') def index():time.sleep(3)return 'Hello!'if __name__ == '__main__':app.run(threaded=True)這里我們定義了一個(gè) Flask 服務(wù),主入口是 index() 方法,方法里面先調(diào)用了 sleep() 方法休眠 3 秒,然后接著再返回結(jié)果,也就是說,每次請(qǐng)求這個(gè)接口至少要耗時(shí) 3 秒,這樣我們就模擬了一個(gè)慢速的服務(wù)接口。
注意這里服務(wù)啟動(dòng)的時(shí)候,run() 方法加了一個(gè)參數(shù) threaded,這表明 Flask 啟動(dòng)了多線程模式,不然默認(rèn)是只有一個(gè)線程的。如果不開啟多線程模式,同一時(shí)刻遇到多個(gè)請(qǐng)求的時(shí)候,只能順次處理,這樣即使我們使用協(xié)程異步請(qǐng)求了這個(gè)服務(wù),也只能一個(gè)一個(gè)排隊(duì)等待,瓶頸就會(huì)出現(xiàn)在服務(wù)端。所以,多線程模式是有必要打開的。
啟動(dòng)之后,Flask 應(yīng)該默認(rèn)會(huì)在 127.0.0.1:5000 上運(yùn)行,運(yùn)行之后控制臺(tái)輸出結(jié)果如下:
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)接下來我們?cè)僦匦率褂蒙厦娴姆椒ㄕ?qǐng)求一遍:
import asyncio import requests import timestart = time.time()async def request():url = 'http://127.0.0.1:5000'print(f'Waiting for {url}')response = requests.get(url)print(f'Get response from {url}, Result: {response.text}')tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print(f'Cost time: {end - start}')在這里我們還是創(chuàng)建了五個(gè) task,然后將 task 列表傳給 wait() 方法并注冊(cè)到時(shí)間循環(huán)中執(zhí)行。
運(yùn)行結(jié)果如下:
Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000, Result: Hello! Cost time: 15.090105056762695可以發(fā)現(xiàn)和正常的請(qǐng)求并沒有什么兩樣,依然還是順次執(zhí)行的,耗時(shí) 15 秒,平均一個(gè)請(qǐng)求耗時(shí) 3 秒,說好的異步處理呢?
其實(shí),要實(shí)現(xiàn)異步處理,我們得先要有掛起的操作,當(dāng)一個(gè)任務(wù)需要等待 IO 結(jié)果的時(shí)候,可以掛起當(dāng)前任務(wù),轉(zhuǎn)而去執(zhí)行其他任務(wù),這樣我們才能充分利用好資源,上面方法都是一本正經(jīng)的串行走下來,連個(gè)掛起都沒有,怎么可能實(shí)現(xiàn)異步?想太多了。
要實(shí)現(xiàn)異步,接下來我們?cè)倭私庖幌?await 的用法,使用 await 可以將耗時(shí)等待的操作掛起,讓出控制權(quán)。當(dāng)協(xié)程執(zhí)行的時(shí)候遇到 await,事件循環(huán)?就會(huì)將 本協(xié)程掛起,轉(zhuǎn)而去執(zhí)行別的協(xié)程,直到其他的協(xié)程掛起或執(zhí)行完畢。
所以,我們可能會(huì)將代碼中的 request() 方法改成如下的樣子:
async def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)response = await requests.get(url)print('Get response from', url, 'Result:', response.text)僅僅是在 requests 前面加了一個(gè) await,然而執(zhí)行以下代碼,會(huì)得到如下報(bào)錯(cuò):
Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Cost?time:?15.048935890197754 Task?exception?was?never?retrieved future:?<Task?finished?coro=<request()?done,?defined?at?demo.py:7>?exception=TypeError("object?Response?can't? be?used?in?'await'?expression",)> Traceback?(most?recent?call?last):File?"demo.py",?line?10,?in?requeststatus?=?await?requests.get(url) TypeError:?object?Response?can't?be?used?in?'await'?expression這次它遇到 await 方法確實(shí)掛起了,也等待了,但是最后卻報(bào)了這么個(gè)錯(cuò),這個(gè)錯(cuò)誤的意思是 requests 返回的 Response 對(duì)象不能和 await 一起使用,為什么呢?因?yàn)楦鶕?jù)官方文檔說明,await 后面的對(duì)象必須是如下格式之一:
- A native coroutine object returned from a native coroutine function,一個(gè)原生 coroutine 對(duì)象。
- A generator-based coroutine object returned from a function decorated with types.coroutine(),一個(gè)由 types.coroutine() 修飾的生成器,這個(gè)生成器可以返回 coroutine 對(duì)象。
- An object with an await__ method returning an iterator,一個(gè)包含 __await 方法的對(duì)象返回的一個(gè)迭代器。
可以參見:https://www.python.org/dev/peps/pep-0492/#await-expression。
reqeusts 返回的 Response 不符合上面任一條件,因此就會(huì)報(bào)上面的錯(cuò)誤了。
那么有的小伙伴就發(fā)現(xiàn)了,既然 await 后面可以跟一個(gè) coroutine 對(duì)象,那么我用 async 把請(qǐng)求的方法改成 coroutine 對(duì)象不就可以了嗎?所以就改寫成如下的樣子:
import asyncio import requests import timestart = time.time()async def get(url):return requests.get(url)async def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)response = await get(url)print('Get response from', url, 'Result:', response.text)tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print('Cost time:', end - start)這里我們將請(qǐng)求頁(yè)面的方法獨(dú)立出來,并用 async 修飾,這樣就得到了一個(gè) coroutine 對(duì)象,我們運(yùn)行一下看看:
Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Cost?time:?15.134317874908447還是不行,它還不是異步執(zhí)行,也就是說我們僅僅將涉及 IO 操作的代碼封裝到 async 修飾的方法里面是不可行的!我們必須要使用支持異步操作的請(qǐng)求方式才可以實(shí)現(xiàn)真正的異步,所以這里就需要 aiohttp 派上用場(chǎng)了。
?
3.5 使用 aiohttp
aiohttp 是一個(gè)支持異步請(qǐng)求的庫(kù),利用它和 asyncio 配合我們可以非常方便地實(shí)現(xiàn)異步請(qǐng)求操作。
安裝方式如下:
pip3?install?aiohttp官方文檔鏈接為:https://aiohttp.readthedocs.io/,它分為兩部分,一部分是 Client,一部分是 Server,詳細(xì)的內(nèi)容可以參考官方文檔。
下面我們將 aiohttp 用上來,將代碼改成如下樣子:
import asyncio import aiohttp import timestart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)# #############################################'''注意 加 await 和 不加 await 區(qū)別,1. 加 await 時(shí),可以掛起當(dāng)前函數(shù),讓出控制權(quán)2. 不加 await 時(shí),不會(huì)掛起當(dāng)前函數(shù),即函數(shù)順序執(zhí)行完返回。可以 對(duì)比輸出結(jié)果理解'''# result = await get(url)result = get(url)# #############################################print('Get response from', url, 'Result:', result)tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))end = time.time() print('Cost time:', end - start)在這里我們將請(qǐng)求庫(kù)由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get() 方法進(jìn)行請(qǐng)求,結(jié)果如下:
Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Waiting?for?http://127.0.0.1:5000 Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Get?response?from?http://127.0.0.1:5000?Result:?Hello! Cost?time:?3.0199508666992188成功了!我們發(fā)現(xiàn)這次請(qǐng)求的耗時(shí)由 15 秒變成了 3 秒,耗時(shí)直接變成了原來的 1/5。
代碼里面我們使用了 await,后面跟了 get() 方法,在執(zhí)行這五個(gè)協(xié)程的時(shí)候,如果遇到了 await,那么就會(huì)將當(dāng)前協(xié)程掛起,轉(zhuǎn)而去執(zhí)行其他的協(xié)程,直到其他的協(xié)程也掛起或執(zhí)行完畢,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行。
開始運(yùn)行時(shí),事件循環(huán)會(huì)運(yùn)行第一個(gè) task,針對(duì)第一個(gè) task 來說,當(dāng)執(zhí)行到第一個(gè) await 跟著的 get() 方法時(shí),它被掛起,但這個(gè) get() 方法第一步的執(zhí)行是非阻塞的,掛起之后立馬被喚醒,所以立即又進(jìn)入執(zhí)行,創(chuàng)建了 ClientSession 對(duì)象,接著遇到了第二個(gè) await,調(diào)用了 session.get() 請(qǐng)求方法,然后就被掛起了,由于請(qǐng)求需要耗時(shí)很久,所以一直沒有被喚醒,好第一個(gè) task 被掛起了,那接下來該怎么辦呢?事件循環(huán)會(huì)尋找當(dāng)前未被掛起的協(xié)程繼續(xù)執(zhí)行,于是就轉(zhuǎn)而執(zhí)行第二個(gè) task 了,也是一樣的流程操作,直到執(zhí)行了第五個(gè) task 的 session.get() 方法之后,全部的 task 都被掛起了。所有 task 都已經(jīng)處于掛起狀態(tài),那咋辦?只好等待了。3 秒之后,幾個(gè)請(qǐng)求幾乎同時(shí)都有了響應(yīng),然后幾個(gè) task 也被喚醒接著執(zhí)行,輸出請(qǐng)求結(jié)果,最后耗時(shí),3 秒!
怎么樣?這就是異步操作的便捷之處,當(dāng)遇到阻塞式操作時(shí),任務(wù)被掛起,程序接著去執(zhí)行其他的任務(wù),而不是傻傻地等著,這樣可以充分利用 CPU 時(shí)間,而不必把時(shí)間浪費(fèi)在等待 IO 上。
有人就會(huì)說了,既然這樣的話,在上面的例子中,在發(fā)出網(wǎng)絡(luò)請(qǐng)求后,既然接下來的 3 秒都是在等待的,在 3 秒之內(nèi),CPU 可以處理的 task 數(shù)量遠(yuǎn)不止這些,那么豈不是我們放 10 個(gè)、20 個(gè)、50 個(gè)、100 個(gè)、1000 個(gè) task 一起執(zhí)行,最后得到所有結(jié)果的耗時(shí)不都是 3 秒左右嗎?因?yàn)檫@幾個(gè)任務(wù)被掛起后都是一起等待的。
理論來說確實(shí)是這樣的,不過有個(gè)前提,那就是服務(wù)器在同一時(shí)刻接受無限次請(qǐng)求都能保證正常返回結(jié)果,也就是服務(wù)器無限抗壓,另外還要忽略 IO 傳輸時(shí)延,確實(shí)可以做到無限 task 一起執(zhí)行且在預(yù)想時(shí)間內(nèi)得到結(jié)果。
我們這里將 task 數(shù)量設(shè)置成 100,再試一下:
tasks?=?[asyncio.ensure_future(request())?for?_?in?range(100)]耗時(shí)結(jié)果如下:
Cost?time:?3.106252670288086最后運(yùn)行時(shí)間也是在 3 秒左右,當(dāng)然多出來的時(shí)間就是 IO 時(shí)延了。
可見,使用了異步協(xié)程之后,我們幾乎可以在相同的時(shí)間內(nèi)實(shí)現(xiàn)成百上千倍次的網(wǎng)絡(luò)請(qǐng)求,把這個(gè)運(yùn)用在爬蟲中,速度提升可謂是非常可觀了。
關(guān)于 await 補(bǔ)充說明:假設(shè)有兩個(gè)異步函數(shù) async a,async b,a 中的某一步有 await,當(dāng)程序碰到關(guān)鍵字 await b() 后,異步程序掛起后去執(zhí)行另一個(gè)異步b程序,就是從函數(shù)內(nèi)部跳出去執(zhí)行其他函數(shù),當(dāng)掛起條件消失后,不管b是否執(zhí)行完,要馬上從b程序中跳出來,回到原程序執(zhí)行原來的操作。如果 await 后面跟的 b 函數(shù)不是異步函數(shù),那么操作就只能等 b 執(zhí)行完再返回,無法在 b 執(zhí)行的過程中返回。如果要在 b 執(zhí)行完才返回,也就不需要用 await 關(guān)鍵字了,直接調(diào)用 b 函數(shù)就行。所以這就需要 await 后面跟的是 異步函數(shù)了。在一個(gè)異步函數(shù)中,可以不止一次掛起,也就是可以用多個(gè) await 。
?
示例 2:
import asyncio import aiohttptemplate = 'http://exercise.kingname.info/exercise_middleware_ip/{page}'async def get(session, queue):while True:try:page = queue.get_nowait()except asyncio.QueueEmpty:returnurl = template.format(page=page)resp = await session.get(url)print(await resp.text(encoding='utf-8'))async def main():async with aiohttp.ClientSession() as session:queue = asyncio.Queue()for page in range(1000):queue.put_nowait(page)tasks = []for _ in range(1000):task = get(session, queue)tasks.append(task)await asyncio.wait(tasks)loop = asyncio.get_event_loop() loop.run_until_complete(main())讓這個(gè)爬蟲爬1000頁(yè)的內(nèi)容,我們來看看下面這個(gè)視頻。
可以看到,目前這個(gè)速度已經(jīng)可以跟 Scrapy 比一比了。并且大家需要知道,這個(gè)爬蟲只有1個(gè)進(jìn)程1個(gè)線程,它是通過異步的方式達(dá)到這個(gè)速度的。為什么速度能快那么多呢?
關(guān)鍵的代碼,就在:
tasks = [] for _ in range(100):task = get(session, queue)tasks.append(task) await asyncio.wait(tasks)asyncio.wait?會(huì)在所有協(xié)程全部結(jié)束的時(shí)候才返回。
但是我們把1000個(gè) URL 放在asyncio.Queue生成的一個(gè)異步隊(duì)列里面,每一個(gè)協(xié)程都通過 while True 不停從這個(gè)異步隊(duì)列里面取 URL 并進(jìn)行訪問,直到異步隊(duì)列為空,退出。程序運(yùn)行時(shí),Python 會(huì)自動(dòng)調(diào)度這100個(gè)協(xié)程,當(dāng)一個(gè)協(xié)程在等待網(wǎng)絡(luò) IO 返回時(shí),切換到第二個(gè)協(xié)程并發(fā)起請(qǐng)求,在這個(gè)協(xié)程等待返回時(shí),繼續(xù)切換到第三個(gè)協(xié)程并發(fā)起請(qǐng)求……。程序充分利用了網(wǎng)絡(luò) IO 的等待時(shí)間,從而大大提高了運(yùn)行速度。
?
?
3.6 與單進(jìn)程、多進(jìn)程對(duì)比
可能有的小伙伴非常想知道上面的例子中,如果 100 次請(qǐng)求,不是用異步協(xié)程的話,使用單進(jìn)程和多進(jìn)程會(huì)耗費(fèi)多少時(shí)間,我們來測(cè)試一下:
首先來測(cè)試一下單進(jìn)程的時(shí)間:
import requests import timestart = time.time()def request():url = 'http://127.0.0.1:5000'print('Waiting for', url)result = requests.get(url).textprint('Get response from', url, 'Result:', result)for _ in range(100):request()end = time.time() print('Cost time:', end - start)最后耗時(shí):
Cost?time:?305.16639709472656接下來我們使用多進(jìn)程來測(cè)試下,使用 multiprocessing 庫(kù):
import requests import time import multiprocessingstart = time.time()def request(_):url = 'http://127.0.0.1:5000'print('Waiting for', url)result = requests.get(url).textprint('Get response from', url, 'Result:', result)cpu_count = multiprocessing.cpu_count() print('Cpu count:', cpu_count) pool = multiprocessing.Pool(cpu_count) pool.map(request, range(100))end = time.time() print('Cost time:', end - start)這里我使用了 multiprocessing 里面的 Pool 類,即進(jìn)程池。我的電腦的 CPU 個(gè)數(shù)是 8 個(gè),這里的進(jìn)程池的大小就是 8。
運(yùn)行時(shí)間:
Cost?time:?48.17306900024414可見 multiprocessing 相比單線程來說,還是可以大大提高效率的。
?
3.7 與多進(jìn)程的結(jié)合
既然異步協(xié)程和多進(jìn)程對(duì)網(wǎng)絡(luò)請(qǐng)求都有提升,那么為什么不把二者結(jié)合起來呢?在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點(diǎn),并開發(fā)了一個(gè)新的庫(kù),叫做 aiomultiprocess,感興趣的可以了解下:https://www.youtube.com/watch?v=0kXaLh8Fz3k。
這個(gè)庫(kù)的安裝方式是:
pip3?install?aiomultiprocess需要 Python 3.6 及更高版本才可使用。
使用這個(gè)庫(kù),我們可以將上面的例子改寫如下:
import asyncio import aiohttp import time from aiomultiprocess import Poolstart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'urls = [url for _ in range(100)]async with Pool() as pool:result = await pool.map(get, urls)return resultcoroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task)end = time.time() print('Cost time:', end - start)這樣就會(huì)同時(shí)使用多進(jìn)程和異步協(xié)程進(jìn)行請(qǐng)求,當(dāng)然最后的結(jié)果其實(shí)和異步是差不多的:
Cost?time:?3.1156570434570312因?yàn)槲业臏y(cè)試接口的原因,最快的響應(yīng)也是 3 秒,所以這部分多余的時(shí)間基本都是 IO 傳輸時(shí)延。但在真實(shí)情況下,我們?cè)谧雠廊〉臅r(shí)候遇到的情況千變?nèi)f化,一方面我們使用異步協(xié)程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節(jié)省時(shí)間其實(shí)還是非常可觀的。
以上便是 Python 中協(xié)程的基本用法,希望對(duì)大家有幫助。
?
?
4. 參考來源
- http://python.jobbole.com/87310/
- https://www.cnblogs.com/xybaby/p/6406191.html
- http://python.jobbole.com/88291/
- http://lotabout.me/2017/understand-python-asyncio/
- https://segmentfault.com/a/1190000008814676
- https://www.cnblogs.com/animalize/p/4738941.html
轉(zhuǎn)載請(qǐng)注明:靜覓???Python中異步協(xié)程的使用方法介紹
?
?
?
總結(jié)
以上是生活随笔為你收集整理的Python 中 异步协程 的 使用方法介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓逆向_15( 一 ) --- JNI
- 下一篇: python 模块 chardet下载方