python进程池pool_python多任务--进程池Pool
進程池Pool
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時就可以用到multiprocessing模塊提供的Pool方法。
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
一:使用進程池
例1:非阻塞
from multiprocessing import Pool
import os, time, random
def worker(name):
t_start = time.time()
print("%s開始執行,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "執行完畢,耗時%0.2f" % (t_stop - t_start))
def main():
po = Pool(5) # 定義一個進程池,最大進程數5
# 往進程池中添加任務
for i in range(10):
# Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閑出來的子進程去調用目標
po.apply_async(worker, (f'liang{i}',))
print("----start----")
po.close() # 關閉進程池,關閉后po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之后
print("----all_done----")
if __name__ == '__main__':
main()
執行結果
----start----
liang0開始執行,進程號為10404
liang1開始執行,進程號為9920
liang2開始執行,進程號為13136
liang3開始執行,進程號為10180
liang4開始執行,進程號為7708
liang4 執行完畢,耗時0.57
liang5開始執行,進程號為7708
liang2 執行完畢,耗時1.20
liang6開始執行,進程號為13136
liang1 執行完畢,耗時1.33
liang7開始執行,進程號為9920
liang0 執行完畢,耗時1.34
liang8開始執行,進程號為10404
liang3 執行完畢,耗時1.96
liang9開始執行,進程號為10180
liang5 執行完畢,耗時1.73
liang9 執行完畢,耗時0.54
liang8 執行完畢,耗時1.28
liang7 執行完畢,耗時1.37
liang6 執行完畢,耗時1.88
----all_done----
函數解釋:
apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(并行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表, kwds為傳遞給func的關鍵字參數列表;
apply(func[, args[, kwds]]):使用阻塞方式調用func
close():關閉Pool,使其不再接受新的任務;
terminate():不管任務是否完成,立即終止;
join():主進程阻塞,等待子進程的退出, 必須在close或terminate之后使用;
執行說明:創建一個進程池pool,并設定進程的數量為5,range(10)會相繼產生10個對象,10個對象被提交到pool中,因pool指定進程數為5,所以0、1、2、3、4會直接送到進程中執行,當其中一個執行完后才空出一個進程處理對象,繼續去執行新的對象,所以會出現輸出“liang5開始執行,進程號為7708”出現在"liang4 執行完畢,耗時0.57"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“----start----”,主程序在pool.join()處等待各個進程的結束。
例2:阻塞
from multiprocessing import Pool
import os, time, random
def worker(name):
t_start = time.time()
print("%s開始執行,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
t_stop = time.time()
print(name, "執行完畢,耗時%0.2f" % (t_stop - t_start))
def main():
po = Pool(3) # 定義一個進程池,最大進程數3
# 往進程池中添加任務
for i in range(0, 5):
# Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閑出來的子進程去調用目標
po.apply(worker, (f'liang{i}',))
print("----start----")
po.close() # 關閉進程池,關閉后po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之后
print("----all_done----")
if __name__ == '__main__':
main()
輸出
liang0開始執行,進程號為1976
liang0 執行完畢,耗時1.75
liang1開始執行,進程號為12624
liang1 執行完畢,耗時0.57
liang2開始執行,進程號為12444
liang2 執行完畢,耗時0.52
liang3開始執行,進程號為1976
liang3 執行完畢,耗時1.23
liang4開始執行,進程號為12624
liang4 執行完畢,耗時0.85
----start----
----all_done----
因為是阻塞,主函數會等待進程的執行,執行完之后才會繼續往下,所以運行完所有進程后才輸出“----start----”
例3、使用進程池,并返回結果
from multiprocessing import Pool
import os, time, random
def worker(name):
print("%s開始執行,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
return name,os.getpid()
def main():
po = Pool(3) # 定義一個進程池,最大進程數3
res=[]
# 往進程池中添加任務
for i in range(0, 5):
# Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閑出來的子進程去調用目標
res.append(po.apply_async(worker, (f'liang{i}',)))
print("----start----")
po.close() # 關閉進程池,關閉后po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之后
for result in res:
print(result.get()) #get()函數得出每個返回結果的值
print("----all_done----")
if __name__ == '__main__':
main()
輸出結果:
----start----
liang0開始執行,進程號為14012
liang1開始執行,進程號為13000
liang2開始執行,進程號為14120
liang3開始執行,進程號為14012
liang4開始執行,進程號為14012
('liang0', 14012)
('liang1', 13000)
('liang2', 14120)
('liang3', 14012)
('liang4', 14012)
----all_done----
例4、多進程執行多個任務
from multiprocessing import Pool
import os, time, random
def worker1(name):
print("%s開始執行work1,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
def worker2(name):
print("%s開始執行work2,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
def worker3(name):
print("%s開始執行work3,進程號為%d" % (name, os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random() * 2)
def main():
po = Pool(4) # 定義一個進程池,最大進程數3
work_list=[worker1,worker2,worker3]
# 往進程池中添加任務
for work in work_list:
for i in range(3):
po.apply_async(work, (f'liang{i}',))
print("----start----")
po.close() # 關閉進程池,關閉后po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之后
print("----all_done----")
if __name__ == '__main__':
main()
線程池4個線程執行3個任務,每個任務執行3次。
輸出:
----start----
liang0開始執行work1,進程號為13088
liang1開始執行work1,進程號為4908
liang2開始執行work1,進程號為4200
liang0開始執行work2,進程號為8124
liang1開始執行work2,進程號為4908
liang2開始執行work2,進程號為13088
liang0開始執行work3,進程號為8124
liang1開始執行work3,進程號為4200
liang2開始執行work3,進程號為4908
----all_done----
二、進程池進程之間的通訊
進程池中進程的通訊隊列
from multiprocessing import Pool, Manager
q = Manager().Queue()
import os
import time
from multiprocessing import Pool, Manager
def work(name, q):
time.sleep(1)
print(f"{name}:---{os.getpid()}---{q.get()}")
def main():
# 創建一個用于進程池通信的隊列
q = Manager().Queue()
for i in range(1000):
q.put(f'data-{i}')
# 創建一個擁有五個進程的進程池
po = Pool(5)
# 往進程池中添加20個任務
for i in range(20):
po.apply_async(work, (f'liang{i}', q))
# close:關閉進程池(進程池停止接收任務)
po.close()
# 主進程等待進程池中的任務結束再往下執行
po.join()
if __name__ == '__main__':
main()
總結
以上是生活随笔為你收集整理的python进程池pool_python多任务--进程池Pool的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 8 9区别 endnote7_大家都了解
- 下一篇: websocket python爬虫_p