python asyncio 并发编程_asyncio并发编程
一. 事件循環
1.注:
實現搭配:事件循環+回調(驅動生成器【協程】)+epoll(IO多路復用),asyncio是Python用于解決異步編程的一整套解決方案;
基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(實現了web服務器,可以直接部署,真正部署還是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)
1 importasyncio2 importtime3 async defget_html(url):4 print('start get url')5 #不能直接使用time.sleep,這是阻塞的函數,如果使用time在并發的情況有多少個就有多少個2秒
6 await asyncio.sleep(2)7 print('end get url')8 if __name__=='__main__':9 start_time=time.time()10 loop=asyncio.get_event_loop()11 task=[get_html('www.baidu.com') for i in range(10)]12 loop.run_until_complete(asyncio.wait(task))13 print(time.time()-start_time)
View Code
2.如何獲取協程的返回值(和線程池類似):
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8 return "HAHA"
9 #需要接收task,如果要接收其他的參數就需要用到partial(偏函數),參數需要放到前面
10 defcallback(url,future):11 print(url+'success')12 print('send email')13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 task=loop.create_task(get_html('www.baidu.com'))16 #原理還是獲取event_loop,然后調用create_task方法,一個線程只有一個loop
17 #get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以
18 #loop.run_until_complete(get_future)
19 #run_until_complete可以接收future類型,task類型(是future類型的一個子類),也可以接收可迭代類型
20 task.add_done_callback(partial(callback,'www.baidu.com'))21 loop.run_until_complete(task)22 print(task.result())
View Code
3.wait和gather的區別:
3.1wait簡單使用:
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #wait和線程的wait相似
13 loop.run_until_complete(asyncio.wait(tasks))
View Code
協程的wait和線程的wait相似,也有timeout,return_when(什么時候返回)等參數
3.2gather簡單使用:
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #gather注意加*,這樣就會變成參數
13 loop.run_until_complete(asyncio.gather(*tasks))
View Code
3.3gather和wait的區別:(定制性不強時可以優先考慮gather)
gather更加高層,可以將tasks分組;還可以成批的取消任務
1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8
9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 groups1=[get_html('www.baidu.com') for i in range(10)]12 groups2=[get_html('www.baidu.com') for i in range(10)]13 #gather注意加*,這樣就會變成參數
14 loop.run_until_complete(asyncio.gather(*groups1,*groups2))15 #這種方式也可以
16 #groups1 = [get_html('www.baidu.com') for i in range(10)]
17 #groups2 = [get_html('www.baidu.com') for i in range(10)]
18 #groups1=asyncio.gather(*groups1)
19 #groups2=asyncio.gather(*groups2)
20 #取消任務
21 #groups2.cancel()
22 #loop.run_until_complete(asyncio.gather(groups1,groups2))
View Code
二. 協程嵌套
1.run_util_complete()源碼:和run_forever()區別并不大,只是可以在運行完指定的協程后可以把loop停止掉,而run_forever()不會停止
2.loop會被放在future里面,future又會放在loop中
3.取消future(task):
3.1子協程調用原理:
官網例子:
解釋:?await相當于yield from,loop運行協程print_sum(),print_sum又會去調用另一個協程compute,run_util_complete會把協程print_sum注冊到loop中。
1.event_loop會為print_sum創建一個task,通過驅動task執行print_sum(task首先會進入pending【等待】的狀態);
2.print_sum直接進入字協程的調度,這個時候轉向執行另一個協程(compute,所以print_sum變為suspended【暫停】狀態);
3.compute這個協程首先打印,然后去調用asyncio的sleep(此時compute進入suspende的狀態【暫停】),直接把返回值返回給Task(沒有經過print_sum,相當于yield from,直接在調用方和子生成器通信,是由委托方print_sum建立的通道);
4.Task會告訴Event_loop暫停,Event_loop等待一秒后,通過Task喚醒(越過print_sum和compute建立一個通道);
5.compute繼續執行,變為狀態done【執行完成】,然后拋一個StopIteration的異常,會被await語句捕捉到,然后提取出1+2=3的值,進入print_sum,print_sum也被激活(因為拋出了StopIteration的異常被print_sum捕捉),print_sum執行完也會被標記為done的狀態,同時拋出StopIteration會被Task接收
三. call_soon、call_later、call_at、call_soon_threadsafe
1.call_soon:可以直接接收函數,而不用協程
1 importasyncio2 #函數
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 #可以直接傳遞函數,而不用協程,call_soon其實就是調用的call_later,時間為0秒
11 loop.call_soon(callback,2)12 loop.call_soon(stoploop,loop)13 #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
14 loop.run_forever()
View Code
2.call_later:可以指定多長時間后啟動(實際調用call_at,時間不是傳統的時間,而是loop內部的時間)
1 importasyncio2 #函數
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 loop.call_later(3,callback,1)11 loop.call_later(1, callback, 2)12 loop.call_later(1, callback, 2)13 loop.call_later(1, callback, 2)14 loop.call_soon(callback,4)15 #loop.call_soon(stoploop,loop)
16 #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
17 loop.run_forever()
View Code
3.call_at:指定某個時間執行
1 importasyncio2 #函數
3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數暫停
6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 now=loop.time()11 print(now)12 loop.call_at(now+3,callback,1)13 loop.call_at(now+1, callback, 0.5)14 loop.call_at(now+1, callback, 2)15 loop.call_at(now+1, callback, 2)16 #loop.call_soon(stoploop,loop)
17 #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
18 loop.run_forever()
View Code
4.call_soon_threadsafe:
線程安全的方法,不僅能解決協程,也能解決線程,進程,和call_soon幾乎一致,多了self._write_to_self(),和call_soon用法一致
四. ThreadPoolExecutor+asyncio(線程池和協程結合)
1.使用run_in_executor:就是把阻塞的代碼放進線程池運行,性能并不是特別高,和多線程差不多
1 #使用多線程,在協程中集成阻塞io
2 importasyncio3 importsocket4 from urllib.parse importurlparse5 from concurrent.futures importThreadPoolExecutor6 importtime7 defget_url(url):8 #通過socket請求html
9 url=urlparse(url)10 host=url.netloc11 path=url.path12 if path=="":13 path="/"
14 #建立socket連接
15 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)16 client.connect((host,80))17 #向服務器發送數據
18 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))19 #將數據讀取完
20 data=b""
21 whileTrue:22 d=client.recv(1024)23 ifd:24 data+=d25 else:26 break
27 #會將header信息作為返回字符串
28 data=data.decode('utf8')29 print(data.split('\r\n\r\n')[1])30 client.close()31
32 if __name__=='__main__':33 start_time=time.time()34 loop=asyncio.get_event_loop()35 excutor=ThreadPoolExecutor()36 tasks=[]37 for i in range(20):38 task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')39 tasks.append(task)40 loop.run_until_complete(asyncio.wait(tasks))41 print(time.time()-start_time)
View Code
五. asyncio模擬http請求
注:asyncio目前沒有提供http協議的接口
1 #asyncio目前沒有提供http協議的接口
2 importasyncio3 from urllib.parse importurlparse4 importtime5
6
7 async defget_url(url):8 #通過socket請求html
9 url =urlparse(url)10 host =url.netloc11 path =url.path12 if path == "":13 path = "/"
14 #建立socket連接(比較耗時),非阻塞需要注冊,都在open_connection中實現了
15 reader, writer = await asyncio.open_connection(host, 80)16 #向服務器發送數據,unregister和register都實現了
17 writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))18 #讀取數據
19 all_lines =[]20 #源碼實現較復雜,有__anext__的魔法函數(協程)
21 async for line inreader:22 data = line.decode('utf8')23 all_lines.append(data)24 html = '\n'.join(all_lines)25 returnhtml26
27
28 async defmain():29 tasks =[]30 for i in range(20):31 url = "http://www.baidu.com/"
32 tasks.append(asyncio.ensure_future(get_url(url)))33 for task inasyncio.as_completed(tasks):34 result =await task35 print(result)36
37
38 if __name__ == '__main__':39 start_time =time.time()40 loop =asyncio.get_event_loop()41 #tasks=[get_url('http://www.baidu.com') for i in range(10)]
42 #在外部獲取結果,保存為future對象
43 #tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]
44 #loop.run_until_complete(asyncio.wait(tasks))
45 #for task in tasks:
46 #print(task.result())
47 #執行完一個打印一個
48 loop.run_until_complete(main())49 print(time.time() - start_time)
View Code
六. future和task
1.future:協程中的future和線程池中的future相似
future中的方法,都和線程池中的相似
set_result方法
不像線程池中運行完直接運行代碼(這是單線程的,會調用call_soon方法)
2.task:是future的子類,是future和協程之間的橋梁
會首先啟動_step方法
該方法會首先啟動協程,把返回值(StopIteration的值)做處理,用于解決協程和線程不一致的地方
七. asyncio同步和通信
1.單線程協程不需要鎖:
1 importasyncio2 total=03 async defadd():4 globaltotal5 for i in range(1000000):6 total+=1
7
8
9 async defdecs():10 globaltotal11 for i in range(1000000):12 total-=1
13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 tasks=[add(),decs()]16 loop.run_until_complete(asyncio.wait(tasks))17 print(total)
View Code
2.某種情況需要鎖:
asyncio中的鎖(同步機制)
1 importasyncio,aiohttp2 #這是并沒有調用系統的鎖,只是簡單的自己實現(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【單線程】】
3 #Queue還可以限流,如果只需要通信還可以直接使用全局變量否則可以
4 from asyncio importLock,Queue5 catche={}6 lock=Lock()7 async defget_stuff():8 #實現了__enter__和__exit__兩個魔法函數,可以用with
9 #with await lock:
10 #更明確的語法__aenter__和__await__
11 async with lock:12 #注意加await,是一個協程
13 #await lock.acquire()
14 for url incatche:15 returncatche[url]16 #異步的接收
17 stauff=aiohttp.request('Get',url)18 catche[url]=stauff19 returnstauff20 #release是一個簡單的函數
21 #lock.release()
22
23 async defparse_stuff():24 stuff=await get_stuff()25
26 async defuse_stuff():27 stuff=await get_stuff()28 #如果沒有同步機制,就會發起兩次請求(這里就可以加一個同步機制)
29 tasks=[parse_stuff(),use_stuff()]30 loop=asyncio.get_event_loop()31 loop.run_until_complete(asyncio.wait(tasks))
View Code
八. aiohttp實現高并發爬蟲
1 #asyncio去重url,入庫(異步的驅動aiomysql)
2 importaiohttp3 importasyncio4 importre5 importaiomysql6 from pyquery importpyquery7
8 start_url = 'http://www.jobbole.com/'
9 waiting_urls =[]10 seen_urls =[]11 stopping =False12 #限制并發數
13 sem=asyncio.Semaphore(3)14
15
16 async deffetch(url, session):17 async with sem:18 await asyncio.sleep(1)19 try:20 async with session.get(url) as resp:21 print('url_status:{}'.format(resp.status))22 if resp.status in [200, 201]:23 data =await resp.text()24 returndata25 exceptException as e:26 print(e)27
28
29 defextract_urls(html):30 '''
31 解析無io操作32 '''
33 urls =[]34 pq =pyquery(html)35 for link in pq.items('a'):36 url = link.attr('href')37 if url and url.startwith('http') and url not inurls:38 urls.append(url)39 waiting_urls.append(url)40 returnurls41
42
43 async definit_urls(url, session):44 html =await fetch(url, session)45 seen_urls.add(url)46 extract_urls(html)47
48
49 async defhandle_article(url, session, pool):50 '''
51 處理文章52 '''
53 html =await fetch(url, session)54 seen_urls.append(url)55 extract_urls(html)56 pq =pyquery(html)57 title = pq('title').text()58 async with pool.acquire() as conn:59 async with conn.cursor() as cur:60 insert_sql = "insert into Test(title) values('{}')".format(title)61 await cur.execute(insert_sql)62
63
64 async defconsumer(pool):65 with aiohttp.CLientSession() as session:66 while notstopping:67 if len(waiting_urls) ==0:68 await asyncio.sleep(0.5)69 continue
70 url =waiting_urls.pop()71 print('start url:' + 'url')72 if re.match('http://.*?jobble.com/\d+/', url):73 if url not inseen_urls:74 asyncio.ensure_future(handle_article(url, session, pool))75 await asyncio.sleep(30)76 else:77 if url not inseen_urls:78 asyncio.ensure_future(init_urls(url, session))79
80
81 async defmain():82 #等待mysql連接好
83 pool = aiomysql.connect(host='localhost', port=3306, user='root',84 password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)85 async with aiohttp.CLientSession() as session:86 html =await fetch(start_url, session)87 seen_urls.add(start_url)88 extract_urls(html)89 asyncio.ensure_future(consumer(pool))90
91 if __name__ == '__main__':92 loop =asyncio.get_event_loop()93 asyncio.ensure_future(loop)94 loop.run_forever(main(loop))
View Code
總結
以上是生活随笔為你收集整理的python asyncio 并发编程_asyncio并发编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle数据库物理结构包含,Orac
- 下一篇: pythonturtle画彩虹蟒蛇_py