日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python asyncio 并发编程_asyncio并发编程

發(fā)布時(shí)間:2025/3/11 python 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python asyncio 并发编程_asyncio并发编程 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一. 事件循環(huán)

1.注:

實(shí)現(xiàn)搭配:事件循環(huán)+回調(diào)(驅(qū)動(dòng)生成器【協(xié)程】)+epoll(IO多路復(fù)用),asyncio是Python用于解決異步編程的一整套解決方案;

基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(實(shí)現(xiàn)了web服務(wù)器,可以直接部署,真正部署還是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)

1 importasyncio2 importtime3 async defget_html(url):4 print('start get url')5 #不能直接使用time.sleep,這是阻塞的函數(shù),如果使用time在并發(fā)的情況有多少個(gè)就有多少個(gè)2秒

6 await asyncio.sleep(2)7 print('end get url')8 if __name__=='__main__':9 start_time=time.time()10 loop=asyncio.get_event_loop()11 task=[get_html('www.baidu.com') for i in range(10)]12 loop.run_until_complete(asyncio.wait(task))13 print(time.time()-start_time)

View Code

2.如何獲取協(xié)程的返回值(和線程池類似):

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8 return "HAHA"

9 #需要接收task,如果要接收其他的參數(shù)就需要用到partial(偏函數(shù)),參數(shù)需要放到前面

10 defcallback(url,future):11 print(url+'success')12 print('send email')13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 task=loop.create_task(get_html('www.baidu.com'))16 #原理還是獲取event_loop,然后調(diào)用create_task方法,一個(gè)線程只有一個(gè)loop

17 #get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以

18 #loop.run_until_complete(get_future)

19 #run_until_complete可以接收future類型,task類型(是future類型的一個(gè)子類),也可以接收可迭代類型

20 task.add_done_callback(partial(callback,'www.baidu.com'))21 loop.run_until_complete(task)22 print(task.result())

View Code

3.wait和gather的區(qū)別:

3.1wait簡單使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #wait和線程的wait相似

13 loop.run_until_complete(asyncio.wait(tasks))

View Code

協(xié)程的wait和線程的wait相似,也有timeout,return_when(什么時(shí)候返回)等參數(shù)

3.2gather簡單使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #gather注意加*,這樣就會(huì)變成參數(shù)

13 loop.run_until_complete(asyncio.gather(*tasks))

View Code

3.3gather和wait的區(qū)別:(定制性不強(qiáng)時(shí)可以優(yōu)先考慮gather)

gather更加高層,可以將tasks分組;還可以成批的取消任務(wù)

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 groups1=[get_html('www.baidu.com') for i in range(10)]12 groups2=[get_html('www.baidu.com') for i in range(10)]13 #gather注意加*,這樣就會(huì)變成參數(shù)

14 loop.run_until_complete(asyncio.gather(*groups1,*groups2))15 #這種方式也可以

16 #groups1 = [get_html('www.baidu.com') for i in range(10)]

17 #groups2 = [get_html('www.baidu.com') for i in range(10)]

18 #groups1=asyncio.gather(*groups1)

19 #groups2=asyncio.gather(*groups2)

20 #取消任務(wù)

21 #groups2.cancel()

22 #loop.run_until_complete(asyncio.gather(groups1,groups2))

View Code

二. 協(xié)程嵌套

1.run_util_complete()源碼:和run_forever()區(qū)別并不大,只是可以在運(yùn)行完指定的協(xié)程后可以把loop停止掉,而run_forever()不會(huì)停止

2.loop會(huì)被放在future里面,future又會(huì)放在loop中

3.取消future(task):

3.1子協(xié)程調(diào)用原理:

官網(wǎng)例子:

解釋:?await相當(dāng)于yield from,loop運(yùn)行協(xié)程print_sum(),print_sum又會(huì)去調(diào)用另一個(gè)協(xié)程compute,run_util_complete會(huì)把協(xié)程print_sum注冊(cè)到loop中。

1.event_loop會(huì)為print_sum創(chuàng)建一個(gè)task,通過驅(qū)動(dòng)task執(zhí)行print_sum(task首先會(huì)進(jìn)入pending【等待】的狀態(tài));

2.print_sum直接進(jìn)入字協(xié)程的調(diào)度,這個(gè)時(shí)候轉(zhuǎn)向執(zhí)行另一個(gè)協(xié)程(compute,所以print_sum變?yōu)閟uspended【暫停】狀態(tài));

3.compute這個(gè)協(xié)程首先打印,然后去調(diào)用asyncio的sleep(此時(shí)compute進(jìn)入suspende的狀態(tài)【暫停】),直接把返回值返回給Task(沒有經(jīng)過print_sum,相當(dāng)于yield from,直接在調(diào)用方和子生成器通信,是由委托方print_sum建立的通道);

4.Task會(huì)告訴Event_loop暫停,Event_loop等待一秒后,通過Task喚醒(越過print_sum和compute建立一個(gè)通道);

5.compute繼續(xù)執(zhí)行,變?yōu)闋顟B(tài)done【執(zhí)行完成】,然后拋一個(gè)StopIteration的異常,會(huì)被await語句捕捉到,然后提取出1+2=3的值,進(jìn)入print_sum,print_sum也被激活(因?yàn)閽伋隽薙topIteration的異常被print_sum捕捉),print_sum執(zhí)行完也會(huì)被標(biāo)記為done的狀態(tài),同時(shí)拋出StopIteration會(huì)被Task接收

三. call_soon、call_later、call_at、call_soon_threadsafe

1.call_soon:可以直接接收函數(shù),而不用協(xié)程

1 importasyncio2 #函數(shù)

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 #可以直接傳遞函數(shù),而不用協(xié)程,call_soon其實(shí)就是調(diào)用的call_later,時(shí)間為0秒

11 loop.call_soon(callback,2)12 loop.call_soon(stoploop,loop)13 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行

14 loop.run_forever()

View Code

2.call_later:可以指定多長時(shí)間后啟動(dòng)(實(shí)際調(diào)用call_at,時(shí)間不是傳統(tǒng)的時(shí)間,而是loop內(nèi)部的時(shí)間)

1 importasyncio2 #函數(shù)

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 loop.call_later(3,callback,1)11 loop.call_later(1, callback, 2)12 loop.call_later(1, callback, 2)13 loop.call_later(1, callback, 2)14 loop.call_soon(callback,4)15 #loop.call_soon(stoploop,loop)

16 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行

17 loop.run_forever()

View Code

3.call_at:指定某個(gè)時(shí)間執(zhí)行

1 importasyncio2 #函數(shù)

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通過該函數(shù)暫停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 now=loop.time()11 print(now)12 loop.call_at(now+3,callback,1)13 loop.call_at(now+1, callback, 0.5)14 loop.call_at(now+1, callback, 2)15 loop.call_at(now+1, callback, 2)16 #loop.call_soon(stoploop,loop)

17 #不能用run_util_complete(因?yàn)椴皇菂f(xié)程),run_forever找到call_soon一直運(yùn)行

18 loop.run_forever()

View Code

4.call_soon_threadsafe:

線程安全的方法,不僅能解決協(xié)程,也能解決線程,進(jìn)程,和call_soon幾乎一致,多了self._write_to_self(),和call_soon用法一致

四. ThreadPoolExecutor+asyncio(線程池和協(xié)程結(jié)合)

1.使用run_in_executor:就是把阻塞的代碼放進(jìn)線程池運(yùn)行,性能并不是特別高,和多線程差不多

1 #使用多線程,在協(xié)程中集成阻塞io

2 importasyncio3 importsocket4 from urllib.parse importurlparse5 from concurrent.futures importThreadPoolExecutor6 importtime7 defget_url(url):8 #通過socket請(qǐng)求html

9 url=urlparse(url)10 host=url.netloc11 path=url.path12 if path=="":13 path="/"

14 #建立socket連接

15 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)16 client.connect((host,80))17 #向服務(wù)器發(fā)送數(shù)據(jù)

18 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))19 #將數(shù)據(jù)讀取完

20 data=b""

21 whileTrue:22 d=client.recv(1024)23 ifd:24 data+=d25 else:26 break

27 #會(huì)將header信息作為返回字符串

28 data=data.decode('utf8')29 print(data.split('\r\n\r\n')[1])30 client.close()31

32 if __name__=='__main__':33 start_time=time.time()34 loop=asyncio.get_event_loop()35 excutor=ThreadPoolExecutor()36 tasks=[]37 for i in range(20):38 task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')39 tasks.append(task)40 loop.run_until_complete(asyncio.wait(tasks))41 print(time.time()-start_time)

View Code

五. asyncio模擬http請(qǐng)求

注:asyncio目前沒有提供http協(xié)議的接口

1 #asyncio目前沒有提供http協(xié)議的接口

2 importasyncio3 from urllib.parse importurlparse4 importtime5

6

7 async defget_url(url):8 #通過socket請(qǐng)求html

9 url =urlparse(url)10 host =url.netloc11 path =url.path12 if path == "":13 path = "/"

14 #建立socket連接(比較耗時(shí)),非阻塞需要注冊(cè),都在open_connection中實(shí)現(xiàn)了

15 reader, writer = await asyncio.open_connection(host, 80)16 #向服務(wù)器發(fā)送數(shù)據(jù),unregister和register都實(shí)現(xiàn)了

17 writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))18 #讀取數(shù)據(jù)

19 all_lines =[]20 #源碼實(shí)現(xiàn)較復(fù)雜,有__anext__的魔法函數(shù)(協(xié)程)

21 async for line inreader:22 data = line.decode('utf8')23 all_lines.append(data)24 html = '\n'.join(all_lines)25 returnhtml26

27

28 async defmain():29 tasks =[]30 for i in range(20):31 url = "http://www.baidu.com/"

32 tasks.append(asyncio.ensure_future(get_url(url)))33 for task inasyncio.as_completed(tasks):34 result =await task35 print(result)36

37

38 if __name__ == '__main__':39 start_time =time.time()40 loop =asyncio.get_event_loop()41 #tasks=[get_url('http://www.baidu.com') for i in range(10)]

42 #在外部獲取結(jié)果,保存為future對(duì)象

43 #tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]

44 #loop.run_until_complete(asyncio.wait(tasks))

45 #for task in tasks:

46 #print(task.result())

47 #執(zhí)行完一個(gè)打印一個(gè)

48 loop.run_until_complete(main())49 print(time.time() - start_time)

View Code

六. future和task

1.future:協(xié)程中的future和線程池中的future相似

future中的方法,都和線程池中的相似

set_result方法

不像線程池中運(yùn)行完直接運(yùn)行代碼(這是單線程的,會(huì)調(diào)用call_soon方法)

2.task:是future的子類,是future和協(xié)程之間的橋梁

會(huì)首先啟動(dòng)_step方法

該方法會(huì)首先啟動(dòng)協(xié)程,把返回值(StopIteration的值)做處理,用于解決協(xié)程和線程不一致的地方

七. asyncio同步和通信

1.單線程協(xié)程不需要鎖:

1 importasyncio2 total=03 async defadd():4 globaltotal5 for i in range(1000000):6 total+=1

7

8

9 async defdecs():10 globaltotal11 for i in range(1000000):12 total-=1

13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 tasks=[add(),decs()]16 loop.run_until_complete(asyncio.wait(tasks))17 print(total)

View Code

2.某種情況需要鎖:

asyncio中的鎖(同步機(jī)制)

1 importasyncio,aiohttp2 #這是并沒有調(diào)用系統(tǒng)的鎖,只是簡單的自己實(shí)現(xiàn)(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【單線程】】

3 #Queue還可以限流,如果只需要通信還可以直接使用全局變量否則可以

4 from asyncio importLock,Queue5 catche={}6 lock=Lock()7 async defget_stuff():8 #實(shí)現(xiàn)了__enter__和__exit__兩個(gè)魔法函數(shù),可以用with

9 #with await lock:

10 #更明確的語法__aenter__和__await__

11 async with lock:12 #注意加await,是一個(gè)協(xié)程

13 #await lock.acquire()

14 for url incatche:15 returncatche[url]16 #異步的接收

17 stauff=aiohttp.request('Get',url)18 catche[url]=stauff19 returnstauff20 #release是一個(gè)簡單的函數(shù)

21 #lock.release()

22

23 async defparse_stuff():24 stuff=await get_stuff()25

26 async defuse_stuff():27 stuff=await get_stuff()28 #如果沒有同步機(jī)制,就會(huì)發(fā)起兩次請(qǐng)求(這里就可以加一個(gè)同步機(jī)制)

29 tasks=[parse_stuff(),use_stuff()]30 loop=asyncio.get_event_loop()31 loop.run_until_complete(asyncio.wait(tasks))

View Code

八. aiohttp實(shí)現(xiàn)高并發(fā)爬蟲

1 #asyncio去重url,入庫(異步的驅(qū)動(dòng)aiomysql)

2 importaiohttp3 importasyncio4 importre5 importaiomysql6 from pyquery importpyquery7

8 start_url = 'http://www.jobbole.com/'

9 waiting_urls =[]10 seen_urls =[]11 stopping =False12 #限制并發(fā)數(shù)

13 sem=asyncio.Semaphore(3)14

15

16 async deffetch(url, session):17 async with sem:18 await asyncio.sleep(1)19 try:20 async with session.get(url) as resp:21 print('url_status:{}'.format(resp.status))22 if resp.status in [200, 201]:23 data =await resp.text()24 returndata25 exceptException as e:26 print(e)27

28

29 defextract_urls(html):30 '''

31 解析無io操作32 '''

33 urls =[]34 pq =pyquery(html)35 for link in pq.items('a'):36 url = link.attr('href')37 if url and url.startwith('http') and url not inurls:38 urls.append(url)39 waiting_urls.append(url)40 returnurls41

42

43 async definit_urls(url, session):44 html =await fetch(url, session)45 seen_urls.add(url)46 extract_urls(html)47

48

49 async defhandle_article(url, session, pool):50 '''

51 處理文章52 '''

53 html =await fetch(url, session)54 seen_urls.append(url)55 extract_urls(html)56 pq =pyquery(html)57 title = pq('title').text()58 async with pool.acquire() as conn:59 async with conn.cursor() as cur:60 insert_sql = "insert into Test(title) values('{}')".format(title)61 await cur.execute(insert_sql)62

63

64 async defconsumer(pool):65 with aiohttp.CLientSession() as session:66 while notstopping:67 if len(waiting_urls) ==0:68 await asyncio.sleep(0.5)69 continue

70 url =waiting_urls.pop()71 print('start url:' + 'url')72 if re.match('http://.*?jobble.com/\d+/', url):73 if url not inseen_urls:74 asyncio.ensure_future(handle_article(url, session, pool))75 await asyncio.sleep(30)76 else:77 if url not inseen_urls:78 asyncio.ensure_future(init_urls(url, session))79

80

81 async defmain():82 #等待mysql連接好

83 pool = aiomysql.connect(host='localhost', port=3306, user='root',84 password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)85 async with aiohttp.CLientSession() as session:86 html =await fetch(start_url, session)87 seen_urls.add(start_url)88 extract_urls(html)89 asyncio.ensure_future(consumer(pool))90

91 if __name__ == '__main__':92 loop =asyncio.get_event_loop()93 asyncio.ensure_future(loop)94 loop.run_forever(main(loop))

View Code

總結(jié)

以上是生活随笔為你收集整理的python asyncio 并发编程_asyncio并发编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。