日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...

發(fā)布時間:2024/7/5 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

需要注意一下

不能無限的開進程,不能無限的開線程

最常用的就是開進程池,開線程池。其中回調(diào)函數(shù)非常重要

回調(diào)函數(shù)其實可以作為一種編程思想,誰好了誰就去掉

只要你用并發(fā),就會有鎖的問題,但是你不能一直去自己加鎖吧

那么我們就用QUEUE,這樣還解決了自動加鎖的問題

由Queue延伸出的一個點也非常重要的概念。以后寫程序也會用到

這個思想。就是生產(chǎn)者與消費者問題

一、Python標(biāo)準(zhǔn)模塊--concurrent.futures(并發(fā)未來)

concurent.future模塊需要了解的

1.concurent.future模塊是用來創(chuàng)建并行的任務(wù),提供了更高級別的接口,

為了異步執(zhí)行調(diào)用

2.concurent.future這個模塊用起來非常方便,它的接口也封裝的非常簡單

3.concurent.future模塊既可以實現(xiàn)進程池,也可以實現(xiàn)線程池

4.模塊導(dǎo)入進程池和線程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

還可以導(dǎo)入一個Executor,但是你別這樣導(dǎo),這個類是一個抽象類

抽象類的目的是規(guī)范他的子類必須有某種方法(并且抽象類的方法必須實現(xiàn)),但是抽象類不能被實例化

5.

p = ProcessPoolExecutor(max_works)對于進程池如果不寫max_works:默認(rèn)的是cpu的數(shù)目,默認(rèn)是4個

p = ThreadPoolExecutor(max_works)對于線程池如果不寫max_works:默認(rèn)的是cpu的數(shù)目*5

6.如果是進程池,得到的結(jié)果如果是一個對象。我們得用一個.get()方法得到結(jié)果

但是現(xiàn)在用了concurent.future模塊,我們可以用obj.result方法

p.submit(task,i) #相當(dāng)于apply_async異步方法

p.shutdown() #默認(rèn)有個參數(shù)wite=True (相當(dāng)于close和join)

那么什么是線程池呢?我們來了解一下

二、線程池

進程池:就是在一個進程內(nèi)控制一定個數(shù)的線程

基于concurent.future模塊的進程池和線程池 (他們的同步執(zhí)行和異步執(zhí)行是一樣的)

1 #1.同步執(zhí)行--------------

2 from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor3 importos,time,random4 deftask(n):5 print('[%s] is running'%os.getpid())6 time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長

7 return n**2

8 if __name__ == '__main__':9 start =time.time()10 p =ProcessPoolExecutor()11 for i in range(10): #現(xiàn)在是開了10個任務(wù), 那么如果是上百個任務(wù)呢,就不能無線的開進程,那么就得考慮控制

12 #線程數(shù)了,那么就得考慮到池了

13 obj = p.submit(task,i).result() #相當(dāng)于apply同步方法

14 p.shutdown() #相當(dāng)于close和join方法

15 print('='*30)16 print(time.time() - start) #17.36499309539795

17

18

19 #2.異步執(zhí)行-----------

20 #from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

21 #import os,time,random

22 #def task(n):

23 #print('[%s] is running'%os.getpid())

24 #time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長

25 #return n**2

26 #if __name__ == '__main__':

27 #start = time.time()

28 #p = ProcessPoolExecutor()

29 #l = []

30 #for i in range(10): #現(xiàn)在是開了10個任務(wù), 那么如果是上百個任務(wù)呢,就不能無線的開進程,那么就得考慮控制

31 ## 線程數(shù)了,那么就得考慮到池了

32 #obj = p.submit(task,i) #相當(dāng)于apply_async()異步方法

33 #l.append(obj)

34 #p.shutdown() #相當(dāng)于close和join方法

35 #print('='*30)

36 #print([obj.result() for obj in l])

37 #print(time.time() - start) #5.362306594848633

基于concurrent.futures模塊的進程池

1 from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor2 from threading importcurrentThread3 importos,time,random4 deftask(n):5 print('%s:%s is running'%(currentThread().getName(),os.getpid())) #看到的pid都是一樣的,因為線程是共享了一個進程

6 time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長

7 return n**2

8 if __name__ == '__main__':9 start =time.time()10 p = ThreadPoolExecutor() #線程池 #如果不給定值,默認(rèn)cup*5

11 l =[]12 for i in range(10): #10個任務(wù) # 線程池效率高了

13 obj = p.submit(task,i) #相當(dāng)于apply_async異步方法

14l.append(obj)15 p.shutdown() #默認(rèn)有個參數(shù)wite=True (相當(dāng)于close和join)

16 print('='*30)17 print([obj.result() for obj inl])18 print(time.time() - start) #3.001171827316284

基于concurrent.futures模塊的線程池

應(yīng)用線程池(下載網(wǎng)頁并解析)

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import requests

import time,os

def get_page(url):

print(' is getting [%s]'%(os.getpid(),url))

response = requests.get(url)

if response.status_code==200: #200代表狀態(tài):下載成功了

return {'url':url,'text':response.text}

def parse_page(res):

res = res.result()

print(' is getting [%s]'%(os.getpid(),res['url']))

with open('db.txt','a') as f:

parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))

f.write(parse_res)

if __name__ == '__main__':

# p = ThreadPoolExecutor()

p = ProcessPoolExecutor()

l = [

'http://www.baidu.com',

'http://www.baidu.com',

'http://www.baidu.com',

'http://www.baidu.com',

]

for url in l:

res = p.submit(get_page,url).add_done_callback(parse_page) #這里的回調(diào)函數(shù)拿到的是一個對象。得

# 先把返回的res得到一個結(jié)果。即在前面加上一個res.result() #誰好了誰去掉回調(diào)函數(shù)

# 回調(diào)函數(shù)也是一種編程思想。不僅開線程池用,開線程池也用

p.shutdown() #相當(dāng)于進程池里的close和join

print('主',os.getpid())

map函數(shù)的應(yīng)用

# map函數(shù)舉例

obj= map(lambda x:x**2 ,range(10))

print(list(obj))

#運行結(jié)果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

可以和上面的開進程池/線程池的對比著看,就能發(fā)現(xiàn)map函數(shù)的強大了

map函數(shù)的應(yīng)用

三、協(xié)程介紹

協(xié)程:單線程下實現(xiàn)并發(fā)(提高效率)

說到協(xié)成,我們先說一下協(xié)程聯(lián)想到的知識點

yield復(fù)習(xí)

1 3.yield功能2(可以吧函數(shù)暫停住,保存原來的狀態(tài))--------------

2 deff1():3 print('first')4 yield 1

5 print('second')6 yield 2

7 print('third')8 yield 3

9 #print(f1()) #加了yield返回的是一個生成器

10 g =f1()11 print(next(g)) #當(dāng)遇見了yield的時候就返回一個值,而且保存原來的狀態(tài)

12 print(next(g)) #當(dāng)遇見了yield的時候就返回一個值

13 print(next(g)) #當(dāng)遇見了yield的時候就返回一個值

yield功能示例1

1 #3.yield表達式(對于表達式的yield)--------------------

2 importtime3 defwrapper(func):4 def inner(*args,**kwargs):5 ret =func(*args,**kwargs)6next(ret)7 returnret8 returninner9@wrapper10 defconsumer():11 whileTrue:12 x= yield

13 print(x)14

15 defproducter(target):16 '''生產(chǎn)者造值'''

17 #next(g) #相當(dāng)于g.send(None)

18 for i in range(10):19 time.sleep(0.5)20 target.send(i)#要用send就得用兩個yield

21 producter(consumer())

yield功能示例2

引子

本節(jié)主題是實現(xiàn)單線程下的并發(fā),即只在一個主線程,并且很明顯的是,可利用的cpu只有一個情況下實現(xiàn)并發(fā),

為此我們需要先回顧下并發(fā)的本質(zhì):切換+保存狀態(tài)

cpu正在運行一個任務(wù),會在兩種情況下切走去執(zhí)行其他的任務(wù)(切換由操作系統(tǒng)強制控制),

一種情況是該任務(wù)發(fā)生了阻塞,另外一種情況是該任務(wù)計算的時間過長

其中第二種情況并不能提升效率,只是為了讓cpu能夠雨露均沾,實現(xiàn)看起來大家都被執(zhí)行的效果,如果多個程序都是純計算任務(wù),這種切換反而會降低效率。為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以保存任務(wù)運行狀態(tài)的方法,我們來簡單復(fù)習(xí)一下:

單純的切反而會影響效率

1 #串行執(zhí)行

2 importtime3 defconsumer(res):4 '''任務(wù)1:接收數(shù)據(jù),處理數(shù)據(jù)'''

5 pass

6

7 defproducer():8 '''任務(wù)2:生產(chǎn)數(shù)據(jù)'''

9 res=[]10 for i in range(10000000):11res.append(i)12 returnres13

14 start=time.time()15 #串行執(zhí)行

16 res=producer()17consumer(res)18 stop=time.time()19 print(stop-start) #1.5536692142486572

串行執(zhí)行

1 importtime2 defwrapper(func):3 def inner(*args,**kwargs):4 ret =func(*args,**kwargs)5next(ret)6 returnret7 returninner8@wrapper9 defconsumer():10 whileTrue:11 x= yield

12 print(x)13

14 defproducter(target):15 '''生產(chǎn)者造值'''

16 #next(g) #相當(dāng)于g.send(None)

17 for i in range(10):18 time.sleep(0.5)19 target.send(i)#要用send就得用兩個yield

20 producter(consumer())

基于yield并發(fā)執(zhí)行

對于單線程下,我們不可避免程序中出現(xiàn)io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統(tǒng)級別)控制單線程下多個任務(wù)能遇到io就切換,這樣就保證了該線程能夠最大限度地處于就緒態(tài),即隨時都可以被cpu執(zhí)行的狀態(tài),相當(dāng)于我們在用戶程序級別將自己的io操作最大限度地隱藏起來,對于操作系統(tǒng)來說:這哥們(該線程)好像是一直處于計算過程的,io比較少。

協(xié)程的本質(zhì)就是在單線程下,由用戶自己控制一個任務(wù)遇到io阻塞了就切換另外一個任務(wù)去執(zhí)行,以此來提升效率。

因此我們需要找尋一種可以同時滿足以下條件的解決方案:

1. 可以控制多個任務(wù)之間的切換,切換之前將任務(wù)的狀態(tài)保存下來(重新運行時,可以基于暫停的位置繼續(xù))

2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發(fā)生切換

四、Greenlet

Greenlet模塊和yield沒有什么區(qū)別,就只是單純的切,跟效率無關(guān)。

只不過比yield更好一點,切的時候方便一點。但是仍然沒有解決效率

Greenlet可以讓你在多個任務(wù)之間來回的切

舉例:

1 from greenlet importgreenlet2 importtime3 defeat(name):4 print('%s eat 1' %name)5 time.sleep(10) #當(dāng)遇到IO的時候它也沒有切,這就得用gevent了

6 g2.switch('egon')7 print('%s eat 2' %name)8g2.switch()9 defplay(name):10 print('%s play 1' %name)11g1.switch()12 print('%s play 2' %name)13

14 g1=greenlet(eat)15 g2=greenlet(play)16

17 g1.switch('egon')#可以在第一次switch時傳入?yún)?shù),以后都不需要

greenlet

所以上面的方法都不可行,那么這就用到了Gevert ,也就是協(xié)程。就解決了單線程實現(xiàn)并發(fā)的問題,還提升了效率

五、Gevent介紹

Gevent 是一個第三方庫,可以輕松通過gevent實現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是Greenlet,

它是以C擴展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進程的內(nèi)部,但它們被協(xié)作式地調(diào)度。

#用法

g1=gevent.spawn(func,1,,2,3,x=4,y=5)創(chuàng)建一個協(xié)程對象g1,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat,后面可以有多個參數(shù),可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結(jié)束

g2.join() #等待g2結(jié)束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

舉例;

1 from gevent importmonkey;monkey.patch_all()2 importgevent3 importtime4 defeat(name):5 print('%s eat 1' %name)6 time.sleep(2) #我們用等待的時間模擬IO阻塞

7 '''在gevent模塊里面要用gevent.sleep(2)表示等待的時間

8 然而我們經(jīng)常用time.sleep()用習(xí)慣了,那么有些人就想著

9 可以用time.sleep(),那么也不是不可以。要想用,就得在

10 最上面導(dǎo)入from gevent import monkey;monkey.patch_all()這句話

11 如果不導(dǎo)入直接用time.sleep(),就實現(xiàn)不了單線程并發(fā)的效果了

12'''

13 #gevent.sleep(2)

14 print('%s eat 2' %name)15 return 'eat'

16 defplay(name):17 print('%s play 1' %name)18 time.sleep(3)19 #gevent.sleep(3)

20 print('%s play 2' %name)21 return 'paly' #當(dāng)有返回值的時候,gevent模塊也提供了返回結(jié)果的操作

22

23 start =time.time()24 g1 = gevent.spawn(eat,'egon') #執(zhí)行任務(wù)

25 g2 = gevent.spawn(play,'egon') #g1和g2的參數(shù)可以不一樣

26 #g1.join() #等待g1

27 #g2.join() #等待g2

28 #上面等待的兩句也可以這樣寫

29gevent.joinall([g1,g2])30 print('主',time.time()-start) #3.001171588897705

31

32 print(g1.value)33 print(g2.value)

gevent的一些方法(重要)

需要說明的是:

gevent.sleep(2)模擬的是gevent可以識別的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前

或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

六、Gevent之同步于異步

1 from gevent importspawn,joinall,monkey;monkey.patch_all()2

3 importtime4 deftask(pid):5 """6 Some non-deterministic task

7"""

8 time.sleep(0.5)9 print('Task %s done' %pid)10

11

12 defsynchronous():13 for i in range(10):14task(i)15

16 defasynchronous():17 g_l=[spawn(task,i) for i in range(10)]18joinall(g_l)19

20 if __name__ == '__main__':21 print('Synchronous:')22synchronous()23

24 print('Asynchronous:')25asynchronous()26 #上面程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。 初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall 函數(shù),后者阻塞當(dāng)前流程,并執(zhí)行所有給定的greenlet。執(zhí)行流程只會在 所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。

View Code

七、Gevent之應(yīng)用舉例一

1 from gevent import monkey;monkey.patch_all() #打補丁

2 importgevent3 importrequests4 importtime5 defget_page(url):6 print('get :%s'%url)7 response =requests.get(url)8 if response.status_code==200: #下載成功的狀態(tài)

9 print('%d bytes received from:%s'%(len(response.text),url))10 start=time.time()11gevent.joinall([12 gevent.spawn(get_page,'http://www.baidu.com'),13 gevent.spawn(get_page, 'https://www.yahoo.com/'),14 gevent.spawn(get_page, 'https://github.com/'),15])16 stop =time.time()17 print('run time is %s' %(stop-start))

協(xié)程應(yīng)用爬蟲

from gevent importjoinall,spawn,monkey;monkey.patch_all()importrequestsfrom threading importcurrent_threaddefparse_page(res):print('%s PARSE %s' %(current_thread().getName(),len(res)))def get_page(url,callback=parse_page):print('%s GET %s' %(current_thread().getName(),url))

response=requests.get(url)if response.status_code == 200:

callback(response.text)if __name__ == '__main__':

urls=['https://www.baidu.com','https://www.taobao.com','https://www.openstack.org',

]

tasks=[]for url inurls:

tasks.append(spawn(get_page,url))

joinall(tasks)

協(xié)程應(yīng)用爬蟲(回調(diào)函數(shù))

八、Gevent之應(yīng)用舉例二

也可以利用協(xié)程實現(xiàn)并發(fā)

1 #!usr/bin/env python

2 #-*- coding:utf-8 -*-

3 from gevent importmonkey;monkey.patch_all()4 importgevent5 from socket import *

6 print('start running...')7 deftalk(conn,addr):8 whileTrue:9 data = conn.recv(1024)10 print('%s:%s %s'%(addr[0],addr[1],data))11conn.send(data.upper())12conn.close()13 defserver(ip,duankou):14 server =socket(AF_INET, SOCK_STREAM)15 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)16server.bind((ip,duankou))17 server.listen(5)18 whileTrue:19 conn,addr = server.accept() #等待鏈接

20 gevent.spawn(talk,conn,addr) #異步執(zhí)行 (p =Process(target=talk,args=(coon,addr))

21 #p.start())相當(dāng)于開進程里的這兩句

22server.close()23 if __name__ == '__main__':24 server('127.0.0.1',8081)

服務(wù)端利用協(xié)程

1 #!usr/bin/env python

2 #-*- coding:utf-8 -*-

3 from multiprocessing importProcess4 from gevent importmonkey;monkey.patch_all()5 from socket import *

6 defclient(ip,duankou):7 client =socket(AF_INET, SOCK_STREAM)8client.connect((ip,duankou))9 whileTrue:10 client.send('hello'.encode('utf-8'))11 data = client.recv(1024)12 print(data.decode('utf-8'))13 if __name__ == '__main__':14 for i in range(100):15 p = Process(target=client,args=(('127.0.0.1',8081)))16 p.start()

客戶端開了100個進程

總結(jié)

以上是生活随笔為你收集整理的python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。