协程asyncio_Asyncio深入浅出
Asyncio是一個異步編程的框架,可以解決異步編程,協(xié)程調度問題,線程問題,是整個異步IO的解決方案。
在學習asyncio之前,我們先來理清楚同步/異步的概念:
同步是指完成事務的邏輯,先執(zhí)行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執(zhí)行第二個事務,順序執(zhí)行 一直等待
異步是和同步相對的,異步是指在處理調用這個事務的之后,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態(tài)、通知、回調來通知調用者處理結果。
異步IO采用消息循環(huán)的模式,重復“讀取消息—處理消息”的過程,也就是說異步IO模型”需要一個消息循環(huán),在消息循環(huán)中,主線程不斷地重復“讀取消息-處理消息”這一過程。
- event_loop 事件循環(huán):程序開啟一個無限的循環(huán),程序員會把一些函數注冊到事件循環(huán)上。當滿足事件發(fā)生的時候,調用相應的協(xié)程函數。
- coroutine 協(xié)程:協(xié)程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執(zhí)行函數,而是會返回一個協(xié)程對象。協(xié)程對象需要注冊到事件循環(huán),由事件循環(huán)調用。
- task 任務:一個協(xié)程對象就是一個原生可以掛起的函數,任務則是對協(xié)程進一步封裝,其中包含任務的各種狀態(tài)。
- async/await 關鍵字: 用于定義協(xié)程的關鍵字,async定義一個協(xié)程,await用于掛起阻塞的異步調用接口。(async和await這兩個關鍵詞是在python3.5開始正式提出定義,asyncio是python解決異步io編程的一個完整框架。關于定義和原理請參考官方文檔: 協(xié)程與任務, 如何理解await)
其中協(xié)程編程離不開的三大要點:
- 事件循環(huán)
- 回調
- epoll/select(IO多路復用)
以下內容有刪減的摘自 :
Asyncio并發(fā)編程?www.langzi.fun事件循環(huán)
簡單案例(訪問一個網站)
async def get_url_title(url): # 使用關鍵詞async定義一個協(xié)程print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)# 這一步至關重要# asyncio.sleep(2) 功能:異步非阻塞等待2s,作用是模擬訪問網站消耗的時間# await 的作用類似 yield,即這個時候把線程資源控制權交出去,監(jiān)聽這個描述符直到這個任務完成# await 后面只能接三種類型'''1. 協(xié)程:Python 協(xié)程屬于 可等待 對象,因此可以在其他協(xié)程中被等待:2. 任務:任務 被用來設置日程以便 并發(fā) 執(zhí)行協(xié)程。(當一個協(xié)程通過 asyncio.create_task() 等函數被打包為一個 任務,該協(xié)程將自動排入日程準備立即運行)3. Future 對象:Future 是一種特殊的 低層級 可等待對象,表示一個異步操作的 最終結果。(當一個 Future 對象 被等待,這意味著協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢。)如果await time.sleep(2) 是會報錯的'''print('網站訪問成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 一行代碼創(chuàng)造事件循環(huán)loop.run_until_complete(get_url_title('http://www.langzi.fun'))# 這是一個阻塞的方法,可以理解成多線程中的join方法# 直到get_url_title('http://www.langzi.fun')完成后,才會繼續(xù)執(zhí)行下面的代碼end_time = time.time()print('消耗時間:{}'.format(end_time-start_time))返回結果:
開始訪問網站:http://www.langzi.fun 網站訪問成功 消耗時間:2.0018768310546875簡單案例(訪問多個網站)
協(xié)程的優(yōu)勢是多任務協(xié)作,單任務訪問網站沒法發(fā)揮出他的功能,一次性訪問多個網站或者一次性等待多個IO響應時間才能發(fā)揮它的優(yōu)勢。
# -*- coding:utf-8 -*- import asyncio import timeasync def get_url_title(url):print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)print('網站訪問成功')if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)造一個事件循環(huán)tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# 這個列表代表總任務量,即執(zhí)行10次get_url_title()函數loop.run_until_complete(asyncio.wait(tasks))# asyncio.wait后面接上非空可迭代對象,一般來說是功能函數列表# 功能是一次性提交多個任務,等待完成# loop.run_until_complete(asyncio.gather(*tasks))# 和上面代碼功能一致,但是gather更加高級,如果是列表就需要加上*# 這里會等到全部的任務執(zhí)行完后才會執(zhí)行后面的代碼end_time = time.time()print('消耗時間:{}'.format(end_time-start_time))對一個網站發(fā)起10次請求,返回結果:
開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 消耗時間:2.0015649795532227gather與wait的區(qū)別:
- gather更擅長于將函數聚合在一起
- wait更擅長篩選運行狀況
即gather更加高級,他可以將任務分組,也可以取消任務
import asyncioasync def get_url_title(url):print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)print('網站訪問成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用wait方法# tasks = [get_url_title('http://www.langzi.fun')for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))# 使用gather方法實現分組導入(方法1)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]loop.run_until_complete(asyncio.gather(*group1,*group2))# 這種方法會把兩個全部一次性導入# 使用gather方法實現分組導入(方法2)group1 = [get_url_title('http://www.langzi.fun')for i in range(3)]group2 = [get_url_title('http://www.baidu.com')for i in range(5)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)#group2.cancel() 取消group2任務loop.run_until_complete(asyncio.gather(group1,group2))# 這種方法會先把group1導入,然后導入group2返回結果:
開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 開始訪問網站:http://www.baidu.com 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功另外一種使用gather獲取返回結果:
import asyncioasync def get_url_title(url):print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)print('網站訪問成功')return 'success'if __name__ == '__main__':loop = asyncio.get_event_loop()# 使用gather方法傳遞任務獲取結果group1 = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))loop.run_until_complete(asyncio.gather(group1))# 如果不是列表就不需要加*print(group1.result())返回結果:
開始訪問網站:http://www.langzi.fun 網站訪問成功 success還有一些復雜的區(qū)別轉移到python 異步協(xié)程中查看
協(xié)程的調用和組合十分靈活,尤其是對于結果的處理,如何返回,如何掛起,需要逐漸積累經驗和前瞻的設計。
簡單案例(獲取返回值)
# -*- coding:utf-8 -*- import asyncio import timeasync def get_url_title(url):print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)print('網站訪問成功')return 'success'if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)get_future = loop.create_task(get_url_title('http://www.langzi.fun'))#get_future = asyncio.ensure_future(get_url_title('http://www.langzi.fun'))# 這兩行代碼功能用法一模一樣loop.run_until_complete(get_future)print('獲取結果:{}'.format(get_future.result()))# 獲取結果end_time = time.time()print('消耗時間:{}'.format(end_time-start_time))返回結果:
開始訪問網站:http://www.langzi.fun 網站訪問成功 獲取結果:success 消耗時間:2.0019724369049072如果是多個網址傳入,訪問多個網址的返回值呢?只需要把前面的知識點匯總一起即可使用:
if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)tasks = [loop.create_task(get_url_title('http://www.langzi.fun')) for i in range(10)]# 把所有要返回的函數加載到一個列表loop.run_until_complete(asyncio.wait(tasks))# 這里和上面用法一樣print('獲取結果:{}'.format([x.result() for x in tasks]))# 因為結果都在一個列表,在列表中取值即可end_time = time.time()print('消耗時間:{}'.format(end_time-start_time))返回結果:
開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 開始訪問網站:http://www.langzi.fun 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 網站訪問成功 獲取結果:['success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success'] 消耗時間:2.0016491413116455簡單案例(回調函數)
上面的例子是一個協(xié)程函數,當這個協(xié)程函數的await xxx執(zhí)行完畢后,想要執(zhí)行另一個函數后,然后再返回這個協(xié)程函數的返回結果該這么做:
# -*- coding:utf-8 -*- import asyncio from functools import partial # partial的功能是 固定函數參數,返回一個新的函數。你可以這么理解: ''' from functools import partialdef go(x,y):return x+yg = partial(go,y=2)print(g(1)) 返回結果:3g = partial(go,x=5,y=2)print(g()) 返回結果:7''' async def get_url_title(url):print('開始訪問網站:{}'.format(url))await asyncio.sleep(2)print('網站訪問成功')# 當這個協(xié)程函數快要結束返回值的時候,會調用下面的call_back函數# 等待call_back函數執(zhí)行完畢后,才返回這個協(xié)程函數的值return 'success'def call_back(future,url):# 注意這里必須要傳遞future參數,因為這里的future即代表下面的get_future對象print('檢測網址:{}狀態(tài)正常'.format(url))if __name__ == '__main__':loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)get_future = loop.create_task(get_url_title('http://www.langzi.fun'))# 將一個任務注冊到loop事件循環(huán)中get_future.add_done_callback(partial(call_back,url = 'http://www.langzi.fun'))# 這里是設置,當上面的任務完成要返回結果的時候,執(zhí)行call_back函數# 注意call_back函數不能加上(),也就意味著你只能依靠partial方法進行傳遞參數loop.run_until_complete(get_future)# 等待任務完成print('獲取結果:{}'.format(get_future.result()))# 獲取結果返回結果:
開始訪問網站:http://www.langzi.fun 網站訪問成功 檢測網址:http://www.langzi.fun狀態(tài)正常 獲取結果:success梳理
取消協(xié)程任務
存在多個任務協(xié)程,想使用ctrl c退出協(xié)程,使用例子講解:
import asyncio async def get_time_sleep(t):print('開始運行,等待:{}s'.format(t))await asyncio.sleep(t)print('運行結束')if __name__ == '__main__':loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)task_1 = get_time_sleep(1)task_2 = get_time_sleep(2)task_3 = get_time_sleep(3)tasks = [task_1,task_2,task_3]# 三個協(xié)程任務加載到一個列表try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt:# 當檢測到鍵盤輸入 ctrl c的時候all_tasks = asyncio.Task.all_tasks()# 獲取注冊到loop下的所有taskfor task in all_tasks:print('開始取消協(xié)程')task.cancel()# 取消該協(xié)程,如果取消成功則返回Trueloop.stop()# 停止循環(huán)loop.run_forever()# loop事件循環(huán)一直運行# 這兩步必須要做finally:loop.close()# 關閉事件循環(huán)run_forever 會一直運行,直到 stop 被調用,但是你不能像下面這樣調 stop
loop.run_forever() loop.stop()run_forever 不返回,stop 永遠也不會被調用。所以,只能在協(xié)程中調 stop:
async def do_some_work(loop, x):print('Waiting ' + str(x))await asyncio.sleep(x)print('Done')loop.stop()這樣并非沒有問題,假如有多個協(xié)程在 loop 里運行:
asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3))loop.run_forever()第二個協(xié)程沒結束,loop 就停止了——被先結束的那個協(xié)程給停掉的。
要解決這個問題,可以用 gather 把多個協(xié)程合并成一個 future,并添加回調,然后在回調里再去停止 loop。
其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever。
關于loop.close(),簡單來說,loop 只要不關閉,就還可以再運行。
loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close()但是如果關閉了,就不能再運行了:
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常梳理
協(xié)程相互嵌套
import asyncio async def sum_tion(x,y):print('開始執(zhí)行傳入參數相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模擬等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)print(result)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(print_sum(1000,2000))loop.close()返回結果:
開始執(zhí)行傳入參數相加:1000 + 2000 3000執(zhí)行流程:
如果想要獲取協(xié)程嵌套函數返回的值,就必須使用回調:
import asyncio async def sum_tion(x,y)->int:print('開始執(zhí)行傳入參數相加:{} + {}'.format(x,y))await asyncio.sleep(1)# 模擬等待1Sreturn (x+y)async def print_sum(x,y):result = await sum_tion(x,y)return resultdef callback(future):return future.result()if __name__ == '__main__':loop = asyncio.get_event_loop()future = loop.create_task(print_sum(100,200))# 如果想要獲取嵌套協(xié)程返回的值,就必須使用回調future.add_done_callback(callback)loop.run_until_complete(future)print(future.result())loop.close()返回結果:
開始執(zhí)行傳入參數相加:100 + 200 300定時啟動任務
asyncio提供定時啟動協(xié)程任務,通過call_soon,call_later,call_at實現,他們的區(qū)別如下:
call_soon
call_soon是立即執(zhí)行
def callback(sleep_times):print("預計消耗時間 {} s".format(sleep_times)) def stoploop(loop):print('時間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)loop.call_soon(callback,5)# 立即啟動callback函數loop.call_soon(stoploop,loop)# 上面執(zhí)行完畢后,立即啟動執(zhí)行stoploop函數loop.run_forever()#要用這個run_forever運行,因為沒有傳入協(xié)程print('總共耗時:{}'.format(time.time()-start_time))返回結果:
預計消耗時間 5 s 時間消耗完畢 總共耗時:0.0010013580322265625call_later
call_later是設置一定時間啟動執(zhí)行
def callback(sleep_times):print("預計消耗時間 {} s".format(sleep_times)) def stoploop(loop):print('時間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()loop.call_later(1,callback,1.0)# 等待1秒后執(zhí)行callback函數,傳入參數是1.0loop.call_later(5,stoploop,loop)# 等待5秒后執(zhí)行stoploop函數,傳入參數是looploop.run_forever()print('總共耗時:{}'.format(time.time()-start_time))返回結果:
預計消耗時間 1.0 s 時間消耗完畢 總共耗時:5.002613544464111call_at
call_at類似與call_later,但是他指定的時間不再是傳統(tǒng)意義上的時間,而是loop的內部時鐘時間,效果和call_later一樣, call_later內部其實調用了call_later
import time import asynciodef callback(loop):print("傳入loop.time()時間為: {} s".format(loop.time())) def stoploop(loop):print('時間消耗完畢')loop.stop()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()now = loop.time()# loop內部的時鐘時間loop.call_at(now+1,callback,loop)# 等待loop內部時鐘時間加上1s后,執(zhí)行callba函數,傳入參數為looploop.call_at(now+3,callback,loop)# 等待loop內部時鐘時間加上3s后,執(zhí)行callba函數,傳入參數為looploop.call_at(now+5,stoploop,loop)# 等待loop內部時鐘時間加上1s后,執(zhí)行stoploop函數,傳入參數為loop返回結果:
傳入loop.time()時間為: 3989.39 s 傳入loop.time()時間為: 3991.39 s 時間消耗完畢 總共耗時:5.002060174942017call_soon_threadsafe 線程安全的call_soon
call_soon_threadsafe用法和call_soon一致。但在涉及多線程時, 會使用它.
梳理
結合線程池
Asyncio是異步IO編程的解決方案,異步IO是包括多線程,多進程,和協(xié)程的。所以asyncio是可以完成多線程多進程和協(xié)程的,在開頭說到,協(xié)程是單線程的,如果遇到阻塞的話,會阻塞所有的代碼任務,所以是不能加入阻塞IO的,但是比如requests庫是阻塞的,socket如果不設置setblocking(false)的話,也是阻塞的,這個時候可以放到一個線程中去做也是可以解決的,即在協(xié)程中集成阻塞IO,就加入多線程一起解決問題。
用requests完成異步編程(使用線程池)
from concurrent.futures import ThreadPoolExecutor import requests import asyncio import time import redef get_url_title(url):# 功能是獲取網址的標題r = requests.get(url)try:title = re.search('<title>(.*?)</title>',r.content.decode(),re.S|re.I).group(1)except Exception as e:title = eprint(title)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()# 創(chuàng)建一個事件循環(huán)p = ThreadPoolExecutor(5)# 創(chuàng)建一個線程池,開啟5個線程tasks = [loop.run_in_executor(p,get_url_title,'http://www.langzi.fun')for i in range(10)]# 這一步很重要,使用loop.run_in_executor()函數:內部接受的是阻塞的線程池,執(zhí)行的函數,傳入的參數# 即對網站訪問10次,使用線程池訪問loop.run_until_complete(asyncio.wait(tasks))# 等待所有的任務完成print(time.time()-start_time)返回結果:
Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) 5.589502334594727訪問10次消耗時間為5.5s,嘗試將 p = ThreadPoolExecutor(10),線程數量設置成10個線程,消耗時間為4.6s,改用從進程池p = ProcessPoolExecutor(10),也是一樣可以運行的,不過10個進程消耗時間也是5.5s,并且消耗更多的CPU資源。
### 用socket完成異步編程(使用線程池)
import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse import time import redef get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket連接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect((host, 80))client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode('utf8')html_data = data.split('rnrn')[1]# 把請求頭信息去掉, 只要網頁內容title = re.search('<title>(.*?)</title>',html_data,re.S|re.I).group(1)print(title)client.close()if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()p = ThreadPoolExecutor(3) # 線程池 放3個線程tasks = [loop.run_in_executor(p,get_url,'http://www.langzi.fun') for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print('last time:{}'.format(time.time() - start_time))返回結果:
Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) Langzi - Never Setter 永不將就 - 致力于Python開發(fā)網絡安全工具,分享Python底層與進階知識,漏洞掃描器開發(fā)與爬蟲開發(fā) last time:5.132313966751099使用socket完成http請求(未使用線程池)
import asyncio from urllib.parse import urlparse import timeasync def get_url(url):# 通過socket請求htmlurl = urlparse(url)host = url.netlocpath = url.pathif path == "":path = '/'# 建立socket連接reader, writer = await asyncio.open_connection(host, 80) # 協(xié)程 與服務端建立連接writer.write("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(path, host).encode('utf8'))all_lines = []async for raw_line in reader: # __aiter__ __anext__魔法方法line = raw_line.decode('utf8')all_lines.append(line)html = 'n'.join(all_lines)return htmlasync def main():tasks = []tasks = [asyncio.ensure_future(get_url('http://www.langzi.fun')) for i in range(10)]for task in asyncio.as_completed(tasks): # 完成一個 print一個result = await taskprint(result)if __name__ == '__main__':start_time = time.time()loop = asyncio.get_event_loop()loop.run_until_complete(main())print('last time:{}'.format(time.time() - start_time))asyncio協(xié)程和之前講解的select事件循環(huán)原理是一樣的
梳理
與多進程的結合
既然異步協(xié)程和多進程對網絡請求都有提升,那么為什么不把二者結合起來呢?在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點,并開發(fā)了一個新的庫,叫做 aiomultiprocess
這個庫的安裝方式是:
pip3 install aiomultiprocess需要 Python 3.6 及更高版本才可使用。
使用這個庫,我們可以將上面的例子改寫如下:
import asyncio import aiohttp import time from aiomultiprocess import Poolstart = time.time()async def get(url):session = aiohttp.ClientSession()response = await session.get(url)result = await response.text()session.close()return resultasync def request():url = 'http://127.0.0.1:5000'urls = [url for _ in range(100)]async with Pool() as pool:result = await pool.map(get, urls)return resultcoroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task)end = time.time() print('Cost time:', end - start)這樣就會同時使用多進程和異步協(xié)程進行請求,但在真實情況下,我們在做爬取的時候遇到的情況千變萬化,一方面我們使用異步協(xié)程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節(jié)省時間其實還是非常可觀的。
同步與通信
和多線程多進程任務一樣,協(xié)程也可以實現和需要進行同步與通信。
簡單例子(順序啟動多任務)
協(xié)程是單線程的,他的執(zhí)行依賴于事件循環(huán)中最后的loop.run_until_complate()
import asyncionum = 0async def add():global numfor i in range(10):await asyncio.sleep(0.1)num += i async def desc():global numfor i in range(10):await asyncio.sleep(0.2)num -= iif __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [add(),desc()]loop.run_until_complete(asyncio.wait(tasks))# 這里執(zhí)行順序是先執(zhí)行add函數,然后執(zhí)行desc函數# 所以最后的結果是0loop.close()print(num)返回結果:
0這里使用一個共有變量,協(xié)程下不需要加鎖。
簡單例子(Lock(鎖))
# -*- coding:utf-8 -*- import asyncio import functoolsdef unlock(lock):print('線程鎖釋放成功')lock.release()async def test(locker, lock):print(f'{locker} 等待線程鎖釋放')# ---------------------------------# with await lock:# print(f'{locker} 線程鎖上鎖')# ---------------------------------# 上面這兩行代碼等同于:# ---------------------------------# await lock.acquire()# print(f'{locker} 線程鎖上鎖')# lock.release()# ---------------------------------await lock.acquire()print(f'{locker} 線程鎖上鎖')lock.release()print(f'{locker} 線程鎖釋放')async def main(loop):lock = asyncio.Lock()await lock.acquire()loop.call_later(0.5, functools.partial(unlock, lock))# call_later() 表達推遲一段時間的回調, 第一個參數是以秒為單位的延遲, 第二個參數是回調函數await asyncio.wait([test('任務 1 ', lock), test('任務 2', lock)])if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結果:
任務 1 等待線程鎖釋放 任務 2 等待線程鎖釋放 線程鎖釋放成功 任務 1 線程鎖上鎖 任務 1 線程鎖釋放 任務 2 線程鎖上鎖 任務 2 線程鎖釋放簡單例子(Semaphore(信號量))
可以使用 Semaphore(信號量) 來控制并發(fā)訪問的數量:
import asyncio from aiohttp import ClientSessionasync def fetch(sem,url):async with sem:# 最大訪問數async with ClientSession() as session:async with session.get(url) as response:status = response.statusres = await response.text()print("{}:{} ".format(response.url, status))return resif __name__ == '__main__':loop = asyncio.get_event_loop()url = "http://www.langzi.fun"sem = asyncio.Semaphore(1000)# 設置最大并發(fā)數為1000tasks = [loop.create_task(fetch(sem,url))for i in range(100)]# 對網站訪問100次loop.run_until_complete(asyncio.wait(tasks))簡單例子(Condition(條件))
import asyncioasync def consumer(cond, name, second):# 消費者函數await asyncio.sleep(second)# 等待延遲with await cond:await cond.wait()print('{}: 得到響應'.format(name))async def producer(cond):await asyncio.sleep(2)for n in range(1, 3):with await cond:print('生產者 {} 號'.format(n))cond.notify(n=n) # 挨個通知單個消費者await asyncio.sleep(0.1)async def producer2(cond):await asyncio.sleep(2)with await cond:print('釋放信號量,通知所有消費者')cond.notify_all()# 一次性通知全部的消費者async def main(loop):condition = asyncio.Condition()# 設置信號量task = loop.create_task(producer(condition))# producer 和 producer2 是兩個協(xié)程, 不能使用 call_later(), 需要用到 create_task() 把它們創(chuàng)建成一個 taskconsumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()print('---分割線---')task = loop.create_task(producer2(condition))consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()# 取消任務if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結果:
生產者 1 號 c1: 得到響應 生產者 2 號 c2: 得到響應 ---分割線--- 釋放信號量,通知所有消費者 c1: 得到響應 c2: 得到響應簡單例子(Event(事件))
與 Lock(鎖) 不同的是, 事件被觸發(fā)的時候, 兩個消費者不用獲取鎖, 就要盡快地執(zhí)行下去了
import asyncio import functoolsdef set_event(event):print('開始設置事件')event.set()async def test(name, event):print('{} 的事件未設置'.format(name))await event.wait()print('{} 的事件已設置'.format(name))async def main(loop):event = asyncio.Event()# 聲明事件print('事件是否設置: {}'.format(event.is_set()))loop.call_later(0.1, functools.partial(set_event, event))# 在0.1s后執(zhí)行set_event()函數,對事件進行設置await asyncio.wait([test('e1', event), test('e2', event)])print('最終事件狀態(tài): {}'.format(event.is_set()))if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main(loop))loop.close()返回結果:
事件是否設置: False e1 的事件未設置 e2 的事件未設置 開始設置事件 e1 的事件已設置 e2 的事件已設置 最終事件狀態(tài): True簡單例子(協(xié)程間通信)
協(xié)程是單線程,因此使用list、dict就可以實現通信,而不會有線程安全問題,當然可以使用asyncio.Queue
from asyncio import Queue queue = Queue(maxsize=3) # queue的put和get需要用await舉個例子:
import asyncio from asyncio import Queue import random import string q = Queue(maxsize=100)async def add():while 1:await q.put(random.choice(string.ascii_letters))async def desc():while 1:res = await q.get()print(res)await asyncio.sleep(1)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait([add(),desc()]))loop.run_forever()返回結果:
D b S x ...加速asyncio
uvloop,這個使用庫可以有效的加速asyncio,本庫基于libuv,也就是nodejs用的那個庫。使用它也非常方便,不過目前不支持windows
import asyncio import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())沒錯就是2行代碼,就可以提速asyncio。
tokio同樣可以做異步資源循環(huán)
import tokio asyncio.set_event_loop_policy(tokio.EventLoopPolicy())參考:
python異步編程之asyncio(百萬并發(fā)) - 三只松鼠 - 博客園?www.cnblogs.comAsyncio并發(fā)編程?www.langzi.fun總結
以上是生活随笔為你收集整理的协程asyncio_Asyncio深入浅出的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode 658. 找到 K 个
- 下一篇: LeetCode 669. 修剪二叉搜索