日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

进程池Pool

發布時間:2023/12/20 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 进程池Pool 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Pool

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

?

例7.1:使用進程池(非阻塞)

#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print "Sub-process(es) done."

一次執行結果

1

2

3

4

5

6

7

8

9

10

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello?0

?

msg: hello?1

msg: hello?2

end

msg: hello?3

end

end

end

Sub-process(es) done.

函數解釋:

apply_async(func[, args[, kwds[, callback]]])?
它是非阻塞,
apply(func[, args[, kwds]])是阻塞的
close()? ? 關閉pool,使其不在接受新的任務。
terminate()? ? 結束工作進程,不在處理未完成的任務。
join()? ? 主進程阻塞,等待子進程的退出,?
join方法要在close或terminate之后使用。
執行說明:創建一個進程池pool,并設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。



執行說明:創建一個進程池pool,并設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],
四個對象被提交到pool中,因pool指定進程數為3,
所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3.


所以會出現輸出“msg: hello 3”出現在"end"后。
因為為非阻塞,主函數會自己執行自個的,
不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,(也就是說進程都沒執行完,主進程就開始做自己的事情了)
主程序在pool.join()處等待各個進程的結束。

?

例7.2:使用進程池(阻塞)

#coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print "Sub-process(es) done."

一次執行的結果

1

2

3

4

5

6

7

8

9

10

msg: hello?0

end

msg: hello?1

end

msg: hello?2

end

msg: hello?3

end

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~

Sub-process(es) done.

  

例7.3:使用進程池,并關注結果

import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"return "done" + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for i in xrange(3):msg = "hello %d" %(i)result.append(pool.apply_async(func, (msg, )))pool.close()pool.join()for res in result:print ":::", res.get()print "Sub-process(es) done."

一次執行結果

1

2

3

4

5

6

7

8

9

10

msg: hello?0

msg: hello?1

msg: hello?2

end

end

end

::: donehello?0

::: donehello?1

::: donehello?2

Sub-process(es) done.

?

例7.4:使用多個進程池

#coding: utf-8 import multiprocessing import os, time, randomdef Lee():print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的進程的IDstart = time.time()time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數end = time.time()print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():print "\nRun task Marlon-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 40)end=time.time()print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():print "\nRun task Allen-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 30)end = time.time()print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():print "\nRun task Frank-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 20)end = time.time()print 'Task Frank runs %0.2f seconds.' %(end - start)if __name__=='__main__':function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid())pool=multiprocessing.Pool(4)for func in function_list:pool.apply_async(func) #Pool執行函數,apply執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中print 'Waiting for all subprocesses done...'pool.close()pool.join() #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束print 'All subprocesses done.'

一次執行結果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

parent process?7704

?

Waiting for?all?subprocesses done...

Run task Lee-6948

?

Run task Marlon-2896

?

Run task Allen-7304

?

Run task Frank-3052

Task Lee, runs?1.59?seconds.

Task Marlon runs?8.48?seconds.

Task Frank runs?15.68?seconds.

Task Allen runs?18.08?seconds.

All subprocesses done.

?

?

?

?python自2.6開始提供了多進程模塊multiprocessing,這里主要是介紹multiprocessing下的Pool的幾個函數
一 apply(func[, args[, kwds]])
apply用于傳遞不定參數,同python中的apply函數一致(不過內置的apply函數從2.3以后就不建議使用了),主進程會阻塞于函數。
for x in gen_list(l):
result = pool.apply(pool_test, (x,))
print 'main process'
這個時候主進程的執行流程同單進程一致
二 apply_async(func[, args[, kwds[, callback]]])
與apply用法一致,但它是非阻塞的且支持結果返回后進行回調。
for x in gen_list(l):
result = pool.apply_async(pool_test, (x,))
print 'main process'
這個時候主進程循環運行過程中不等待apply_async的返回結果,在主進程結束后,即使子進程還未返回整個程序也會就退出。雖然 apply_async是非阻塞的,但其返回結果的get方法卻是阻塞的,在本例中result.get()會阻塞主進程。因此可以這樣來處理返回結果:
[x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
如果我們對返回結果不感興趣, 那么可以在主進程中使用pool.close與pool.join來防止主進程退出。注意join方法一定要在close或terminate之后調用。
for x in gen_list(l):
pool.apply_async(pool_test, (x, ))
print 'main_process'
pool.close()
pool.join()
三 map(func, iterable[, chunksize])
map方法與內置的map函數行為基本一致,在它會使進程阻塞與此直到結果返回。
但需注意的是其第二個參數雖然描述的為iterable, 但在實際使用中發現只有在整個隊列全部就緒后,程序才會運行子進程。
四 map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。
五 imap(func, iterable[, chunksize])
與map不同的是, imap的返回結果為iter,需要在主進程中主動使用next來驅動子進程的調用。即使子進程沒有返回結果,主進程對于gen_list(l)的 iter還是會繼續進行, 另外根據python2.6文檔的描述,對于大數據量的iterable而言,將chunksize設置大一些比默認的1要好。
for x in pool.imap(pool_test, gen_list(l)):
pass
六 imap_unordered(func, iterable[, chunksize])
同imap一致,只不過其并不保證返回結果與迭代傳入的順序一致。
七 close()
關閉pool,使其不在接受新的任務。
八 terminate()
結束工作進程,不在處理未處理的任務。
九 join()
主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用。
l = range(10)
def gen_list(l):
for x in l:
print 'yield', x
yield x
def pool_test(x):
print 'f2', x
time.sleep(1)

?

總結

以上是生活随笔為你收集整理的进程池Pool的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。