python多线程执行其他模块的文件_python并发编程--进程线程--其他模块-从菜鸟到老鸟(三)...
concurrent模塊
1、concurrent模塊的介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池,提供異步調(diào)用
ProcessPoolExecutor:進(jìn)程池,提供異步調(diào)用
ProcessPoolExecutor?和?ThreadPoolExecutor:兩者都實(shí)現(xiàn)相同的接口,該接口由抽象Executor類(lèi)定義。
2、基本方法
使用_base.Executor
concurrent.futures.thread.ThreadPoolExecutor #線程池
concurrent.futures.process.ProcessPoolExecutor #進(jìn)程池#構(gòu)造函數(shù)
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
submit(fn, *args, **kwargs)?:異步提交任務(wù)
使用submit函數(shù)來(lái)提交線程需要執(zhí)行任務(wù)(函數(shù)名和參數(shù))到線程池中,并返回該任務(wù)的句柄(類(lèi)似于文件、畫(huà)圖),注意submit()不是阻塞的,而是立即返回。
map(func, *iterables, timeout=None, chunksize=1)
取代for循環(huán)submit的操作
shutdown(wait=True)?:相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作
wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False,立即返回,并不會(huì)等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值,整個(gè)程序都會(huì)等到所有任務(wù)執(zhí)行完畢
note:submit和map必須在shutdown之前
pool.submit()返回的對(duì)象是
concurrent.futures._base.Future類(lèi)
add_done_callback(self,fn)
cancel(self)
cancelled(self)
done(self)
exception(self,timeout=None)
result(self,timeout=None)
running(self)
set_exception(self,exception)
set_result(self,result)
set_running_or_notify_cancel(self)
result(timeout=None)?:取得結(jié)果,通過(guò)submit函數(shù)返回的任務(wù)句柄,使用result()方法可以獲取任務(wù)的返回值,查看內(nèi)部代碼,發(fā)現(xiàn)這個(gè)方法是阻塞的
done()方法判斷該任務(wù)是否結(jié)束
add_done_callback(fn)?:回調(diào)函數(shù)
3、進(jìn)程池和線程池
池的功能:限制進(jìn)程數(shù)或線程數(shù).
什么時(shí)候限制: 當(dāng)并發(fā)的任務(wù)數(shù)量遠(yuǎn)遠(yuǎn)大于計(jì)算機(jī)所能承受的范圍,即無(wú)法一次性開(kāi)啟過(guò)多的任務(wù)數(shù)量 我就應(yīng)該考慮去限制我進(jìn)程數(shù)或線程數(shù),從保證服務(wù)器不崩.
3.1 進(jìn)程池
from concurrent.futures importProcessPoolExecutorimportosimporttimedeftask(i):print("第"+str(i)+"個(gè)在執(zhí)行任務(wù)id:"+str(os.getpid()))
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(4) #進(jìn)程池里又4個(gè)進(jìn)程
for i in range(5): #5個(gè)任務(wù)
pool.submit(task,i)#進(jìn)程池里當(dāng)前執(zhí)行的任務(wù)i,池子里的4個(gè)進(jìn)程一次一次執(zhí)行任務(wù)
pool.shutdown()print("耗時(shí):",time.time()-start)
3.2 線程池
from concurrent.futures importThreadPoolExecutorfrom threading importcurrentThreadimporttimedeftask(i):print("第"+str(i)+"個(gè)在執(zhí)行任務(wù)id:"+currentThread().name)
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool= ThreadPoolExecutor(4) #進(jìn)程池里又4個(gè)線程
for i in range(5): #5個(gè)任務(wù)
pool.submit(task,i)#線程池里當(dāng)前執(zhí)行的任務(wù)i,池子里的4個(gè)線程一次一次執(zhí)行任務(wù)
pool.shutdown()print("耗時(shí):",time.time()-start)
其他:done() 、 result()
通過(guò)submit函數(shù)返回的任務(wù)句柄,能夠使用done()方法判斷該任務(wù)是否結(jié)束
使用result()方法可以獲取任務(wù)的返回值,查看內(nèi)部代碼,發(fā)現(xiàn)這個(gè)方法是阻塞的
3.4列表+as_compelete模擬先進(jìn)先出
對(duì)于線程,這樣可以模擬執(zhí)行與結(jié)果的先進(jìn)先出。
但是對(duì)于進(jìn)程會(huì)報(bào)錯(cuò)。
importtimefrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):
times=1time.sleep(times)print("第 NO.{i} get page {times} finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
start=time.time()
executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #進(jìn)程池會(huì)導(dǎo)致后面的all_task報(bào)錯(cuò)
all_task= [executor.submit(get_html,(i)) for i in range(5)]for future inas_completed(all_task):
data=future.result()print("in main:get page {} success".format(data))print('主進(jìn)程結(jié)束--耗時(shí)',time.time()-start)
結(jié)果:
第 NO.0 get page 1finished
第 NO.1 get page 1finishedinmain:get page 第 NO.0 successin main:get page 第 NO.1success
第 NO.2 get page 1finishedin main:get page 第 NO.2success
第 NO.3 get page 1finishedin main:get page 第 NO.3success
第 NO.4 get page 1finishedin main:get page 第 NO.4success
主進(jìn)程結(jié)束--耗時(shí) 3.0034666061401367
結(jié)果:
3.4 Map的用法
可以將多個(gè)任務(wù)一次性的提交給進(jìn)程、線程池。---備注進(jìn)程是也不行的,也會(huì)報(bào)錯(cuò)。
使用map方法,不需提前使用submit方法,map方法與python標(biāo)準(zhǔn)庫(kù)中的map含義相同,都是將序列中的每個(gè)元素都執(zhí)行同一個(gè)函數(shù)。
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(i):print("第"+str(i)+"個(gè)在執(zhí)行任務(wù)id:"+str(os.getpid()))
time.sleep(1)if __name__ == '__main__':
start=time.time()
pool=ProcessPoolExecutor(max_workers=3) #也可以換成ThreadPoolExecutor
pool.map(task,range(1,5)) #map取代了for+submit
pool.shutdown()print("耗時(shí):",time.time()-start)
考慮到結(jié)果返回值:
importtimefrom random importrandomfrom concurrent.futures importProcessPoolExecutor,as_completed,ThreadPoolExecutordefget_html(i):
times=1+random()/100time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
start=time.time()
executor= ThreadPoolExecutor(max_workers=2)#executor = ProcessPoolExecutor(max_workers=2) #進(jìn)程池會(huì)導(dǎo)致后面的executor.map報(bào)錯(cuò)
res=executor.map(get_html, range(5))#for future in res: #直接返回結(jié)果,不需要get
print("in main:get page {} success".format(future))print('主進(jìn)程結(jié)束--耗時(shí)',time.time()-start)
3.5 同步調(diào)用,順序返回
因?yàn)槲覀冊(cè)谘h(huán)中每次循環(huán)都要調(diào)用或這說(shuō)提交任務(wù),并等待結(jié)果。所以其實(shí)進(jìn)程之間是串行的。所以是同步的方式。
from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime
n= 1
deftask(i):globaln
time.sleep(1)print(f'{current_process().name} 在執(zhí)行任務(wù){(diào)i}')
n+=ireturn f'得到 {current_process().name} 任務(wù){(diào)i} 的結(jié)果'
if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2) #進(jìn)程池里又4個(gè)線程
pool_lis =[]for i in range(5): #20個(gè)任務(wù)
future = pool.submit(task,i)#進(jìn)程池里當(dāng)前執(zhí)行的任務(wù)i,池子里的4個(gè)線程一次一次執(zhí)行任務(wù)
pool_lis.append(future.result())#等待我執(zhí)行任務(wù)得到的結(jié)果,如果一直沒(méi)有結(jié)果,則阻塞。這里會(huì)導(dǎo)致我們所有任務(wù)編程了串行
#在這里就引出了下面的pool.shutdown()方法
pool.shutdown(wait=True) #關(guān)閉了池的入口,不允許在往里面添加任務(wù)了,會(huì)等帶所有的任務(wù)執(zhí)行完,結(jié)束阻塞
for res inpool_lis:print(res)print(n)#這里肯定是拿到0的
print("主進(jìn)程---耗時(shí)",time.time()-start)#可以用join去解決,等待每一個(gè)進(jìn)程結(jié)束后,拿到他的結(jié)果
結(jié)果:
SpawnProcess-2在執(zhí)行任務(wù)0
SpawnProcess-1在執(zhí)行任務(wù)1
SpawnProcess-2在執(zhí)行任務(wù)2
SpawnProcess-1在執(zhí)行任務(wù)3
SpawnProcess-2在執(zhí)行任務(wù)4
得到 SpawnProcess-2任務(wù)0 的結(jié)果
得到 SpawnProcess-1任務(wù)1 的結(jié)果
得到 SpawnProcess-2任務(wù)2 的結(jié)果
得到 SpawnProcess-1任務(wù)3 的結(jié)果
得到 SpawnProcess-2任務(wù)4 的結(jié)果1主進(jìn)程---耗時(shí) 5.575225830078125
同步--所以是串行的。耗時(shí)與單進(jìn)程差不多
3.5 異步調(diào)用,順序返回
from concurrent.futures importProcessPoolExecutorfrom multiprocessing importcurrent_processimporttime
n= 1
deftask(i):globaln
time.sleep(1)print(f'{current_process().name} 在執(zhí)行任務(wù){(diào)i}')
n+=ireturn f'得到 {current_process().name} 任務(wù){(diào)i} 的結(jié)果'
if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2) #進(jìn)程池里又4個(gè)線程
pool_lis =[]for i in range(5): #20個(gè)任務(wù)
future = pool.submit(task,i)#進(jìn)程池里當(dāng)前執(zhí)行的任務(wù)i,池子里的4個(gè)線程一次一次執(zhí)行任務(wù)
#print(future.result()) # 這是在等待我執(zhí)行任務(wù)得到的結(jié)果,如果一直沒(méi)有結(jié)果,這里會(huì)導(dǎo)致我們所有任務(wù)編程了串行
#在這里就引出了下面的pool.shutdown()方法
pool_lis.append(future)
pool.shutdown(wait=True) #關(guān)閉了池的入口,不允許在往里面添加任務(wù)了,會(huì)等帶所有的任務(wù)執(zhí)行完,結(jié)束阻塞
for p inpool_lis:print(p.result())print(n)#這里肯定是拿到0的
print("主進(jìn)程---耗時(shí)",time.time()-start)#可以用join去解決,等待每一個(gè)進(jìn)程結(jié)束后,拿到他的結(jié)果
結(jié)果:
SpawnProcess-1在執(zhí)行任務(wù)0
SpawnProcess-2在執(zhí)行任務(wù)1
SpawnProcess-1在執(zhí)行任務(wù)2
SpawnProcess-2在執(zhí)行任務(wù)3
SpawnProcess-1在執(zhí)行任務(wù)4
得到 SpawnProcess-1任務(wù)0 的結(jié)果
得到 SpawnProcess-2任務(wù)1 的結(jié)果
得到 SpawnProcess-1任務(wù)2 的結(jié)果
得到 SpawnProcess-2任務(wù)3 的結(jié)果
得到 SpawnProcess-1任務(wù)4 的結(jié)果1主進(jìn)程---耗時(shí) 3.2690603733062744
異步結(jié)果,有序返回相應(yīng)結(jié)果
3.5 回調(diào)函數(shù):
add_done_callback
from multiprocessing importcurrent_processimporttimefrom random importrandomfrom concurrent.futures importProcessPoolExecutordeftask(i):print(f'{current_process().name} 在執(zhí)行{i}')
time.sleep(1+random())returni#parse 就是一個(gè)回調(diào)函數(shù)
defparse(future):#處理拿到的結(jié)果
print(f'{current_process().name} 拿到結(jié)果{future.result()} 結(jié)束了當(dāng)前任務(wù)')if __name__ == '__main__':
start=time.time()
pool= ProcessPoolExecutor(2)for i in range(5):
future=pool.submit(task,i)'''給當(dāng)前執(zhí)行的任務(wù)綁定了一個(gè)函數(shù),在當(dāng)前任務(wù)結(jié)束的時(shí)候就會(huì)觸發(fā)這個(gè)函數(shù)(稱(chēng)之為回調(diào)函數(shù))
會(huì)把future對(duì)象作為參數(shù)傳給函數(shù)
注:這個(gè)稱(chēng)為回調(diào)函數(shù),當(dāng)前任務(wù)處理結(jié)束了,就回來(lái)調(diào)parse這個(gè)函數(shù)'''future.add_done_callback(parse)#add_done_callback (parse) parse是一個(gè)回調(diào)函數(shù)
#add_done_callback () 是對(duì)象的一個(gè)綁定方法,他的參數(shù)就是一個(gè)函數(shù)
pool.shutdown()print('主線程耗時(shí):',time.time()-start)
結(jié)果:
SpawnProcess-1在執(zhí)行0
SpawnProcess-2在執(zhí)行1
SpawnProcess-2在執(zhí)行2
MainProcess 拿到結(jié)果1 結(jié)束了當(dāng)前任務(wù)
SpawnProcess-1在執(zhí)行3
MainProcess 拿到結(jié)果0 結(jié)束了當(dāng)前任務(wù)
SpawnProcess-1在執(zhí)行4
MainProcess 拿到結(jié)果3 結(jié)束了當(dāng)前任務(wù)
MainProcess 拿到結(jié)果2 結(jié)束了當(dāng)前任務(wù)
MainProcess 拿到結(jié)果4 結(jié)束了當(dāng)前任務(wù)
主線程耗時(shí):4.721129417419434
回調(diào)是主進(jìn)程的,結(jié)果是無(wú)序的
3.6wait
wait方法可以讓主線程阻塞,直到滿足設(shè)定的要求。wait方法接收3個(gè)參數(shù),等待的任務(wù)序列、超時(shí)時(shí)間以及等待條件。
等待條件return_when默認(rèn)為ALL_COMPLETED,表明要等待所有的任務(wù)都借宿。
可以看到運(yùn)行結(jié)果中,確實(shí)是所有任務(wù)都完成了,主線程才打印出main,等待條件還可以設(shè)置為FIRST_COMPLETED,表示第一個(gè)任務(wù)完成就停止等待
from concurrent.futures importThreadPoolExecutor,wait,ALL_COMPLETED,FIRST_COMPLETEDimporttime#參數(shù)times用來(lái)模擬網(wǎng)絡(luò)請(qǐng)求時(shí)間
from random importrandomdefget_html(i):
times=1+random()*10time.sleep(times)print("第 NO.{i} get page {times}s finished".format(i=i,times=times))return "第 NO.{i}".format(i=i)
executor= ThreadPoolExecutor(max_workers=2)
urls= range(5)
all_task= [executor.submit(get_html,(url)) for url inurls]
wait(all_task,return_when=ALL_COMPLETED)print("main")
joblib模塊
總結(jié)
以上是生活随笔為你收集整理的python多线程执行其他模块的文件_python并发编程--进程线程--其他模块-从菜鸟到老鸟(三)...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微信零钱提现要手续费吗 免费额度需要合理
- 下一篇: python中哈希是什么意思_在pyth