python多进程详解
目錄
- python多進程
- 序.multiprocessing
- 一、Process
- process介紹
- 例1.1:創建函數并將其作為單個進程
- 例1.2:創建函數并將其作為多個進程
- 例1.3:將進程定義為類
- 例1.4:daemon程序對比結果
- 二、Lock
- 三、Semaphore
- 四、Event
- 五、Queue
- 六、Pipe
- 七、Pool
- 例7.1:使用進程池(非阻塞)
- 例7.2:使用進程池(阻塞)
- 例7.3:使用進程池,并關注結果
- 例7.4:使用多個進程池
python多進程
序.multiprocessing
python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發執行的轉換。multiprocessing支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
一、Process
process介紹
創建進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置。
例1.1:創建函數并將其作為單個進程
import multiprocessing
import timedef worker(interval):n = 5while n > 0:print("The time is {0}".format(time.ctime()))time.sleep(interval)n -= 1if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.start()print("p.pid:", p.pid)print("p.name:", p.name)print("p.is_alive:", p.is_alive())------------------------------------------------>>> p.pid: 1004
>>> p.name: Process-1
>>> p.is_alive: True
>>> The time is Mon Jul 29 21:31:11 2019
>>> The time is Mon Jul 29 21:31:14 2019
>>> The time is Mon Jul 29 21:31:17 2019
>>> The time is Mon Jul 29 21:31:20 2019
>>> The time is Mon Jul 29 21:31:23 2019 例1.2:創建函數并將其作為多個進程
import multiprocessing
import timedef worker_1(interval):print("worker_1")time.sleep(interval)print("end worker_1")def worker_2(interval):print("worker_2")time.sleep(interval)print("end worker_2")def worker_3(interval):print("worker_3")time.sleep(interval)print("end worker_3")if __name__ == "__main__":p1 = multiprocessing.Process(target = worker_1, args = (2,))p2 = multiprocessing.Process(target = worker_2, args = (3,))p3 = multiprocessing.Process(target = worker_3, args = (4,))p1.start()p2.start()p3.start()print("The number of CPU is:" + str(multiprocessing.cpu_count()))for p in multiprocessing.active_children():print("child p.name:" + p.name + "\tp.id" + str(p.pid))print("END")------------------------------------------------>>> The number of CPU is:8
>>> child p.name:Process-3 p.id18208
>>> child p.name:Process-2 p.id1404
>>> child p.name:Process-1 p.id11684
>>> END
>>> worker_1
>>> worker_2
>>> worker_3
>>> end worker_1
>>> end worker_2
>>> end worker_3 例1.3:將進程定義為類
import multiprocessing
import timeclass ClockProcess(multiprocessing.Process):def __init__(self, interval):multiprocessing.Process.__init__(self)self.interval = intervaldef run(self):n = 5while n > 0:print("the time is {0}".format(time.ctime()))time.sleep(self.interval)n -= 1if __name__ == '__main__':p = ClockProcess(3)p.start() ------------------------------------------------>>> the time is Mon Jul 29 21:43:07 2019
>>> the time is Mon Jul 29 21:43:10 2019
>>> the time is Mon Jul 29 21:43:13 2019
>>> the time is Mon Jul 29 21:43:16 2019
>>> the time is Mon Jul 29 21:43:19 2019 注:進程p調用start()時,自動調用run()
例1.4:daemon程序對比結果
1.4-1 不加daemon屬性
import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.start()print("end!")------------------------------------------------>>> end!
>>> work start:Tue Jul 29 21:29:10 2019
>>> work end:Tue Jul 29 21:29:13 2019 1.4-2 加上daemon屬性
import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.daemon = Truep.start()print("end!")------------------------------------------------>>> end! 注:因子進程設置了daemon屬性,主進程結束,它們就隨著結束了。
1.4-3 設置daemon執行完結束的方法
import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.daemon = Truep.start()p.join()print("end!")------------------------------------------------>>> work start:Tue Jul 29 22:16:32 2019
>>> work end:Tue Jul 29 22:16:35 2019
>>> end! 二、Lock
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。
import multiprocessing
import sysdef worker_with(lock, f):with lock:fs = open(f, 'a+')n = 10while n > 1:fs.write("Lockd acquired via with\n")n -= 1fs.close()def worker_no_with(lock, f):lock.acquire()try:fs = open(f, 'a+')n = 10while n > 1:fs.write("Lock acquired directly\n")n -= 1fs.close()finally:lock.release()if __name__ == "__main__":lock = multiprocessing.Lock()f = "file.txt"w = multiprocessing.Process(target = worker_with, args=(lock, f))nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))w.start()nw.start()print("end")------------------------------------------------>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lockd acquired via with
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly
>>> Lock acquired directly 三、Semaphore
Semaphore用來控制對共享資源的訪問數量,例如池的最大連接數。
import multiprocessing
import timedef worker(s, i):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(i)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s, i*2))p.start()------------------------------------------------>>> Process-1acquire
>>> Process-1release
>>>
>>> Process-2acquire
>>> Process-3acquire
>>> Process-2release
>>>
>>> Process-5acquire
>>> Process-3release
>>>
>>> Process-4acquire
>>> Process-5release
>>>
>>> Process-4release 四、Event
Event用來實現進程間同步通信。
import multiprocessing
import timedef wait_for_event(e):print("wait_for_event: starting")e.wait()print("wairt_for_event: e.is_set()->" + str(e.is_set()))def wait_for_event_timeout(e, t):print("wait_for_event_timeout:starting")e.wait(t)print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))if __name__ == "__main__":e = multiprocessing.Event()w1 = multiprocessing.Process(name = "block",target = wait_for_event,args = (e,))w2 = multiprocessing.Process(name = "non-block",target = wait_for_event_timeout,args = (e, 2))w1.start()w2.start()time.sleep(3)e.set()print("main: event is set")------------------------------------------------>>> wait_for_event: starting
>>> wait_for_event_timeout:starting
>>> wait_for_event_timeout:e.is_set->False
>>> main: event is set
>>> wairt_for_event: e.is_set()->True 五、Queue
Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。Queue的一段示例代碼:
import multiprocessingdef writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False))except: passif __name__ == "__main__":q = multiprocessing.Queue()writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()------------------------------------------------>>> 1 六、Pipe
Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受消息,conn2只負責發送消息。
send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出EOFError。
import multiprocessing
import timedef proc1(pipe):while True:for i in range(10000):print("send: %s" %(i))pipe.send(i)time.sleep(1)def proc2(pipe):while True:print("proc2 rev:", pipe.recv())time.sleep(1)def proc3(pipe):while True:print("PROC3 rev:", pipe.recv())time.sleep(1)if __name__ == "__main__":pipe = multiprocessing.Pipe()p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))# p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))p1.start()p2.start()# p3.start()p1.join()p2.join()# p3.join()------------------------------------------------>>> send: 0
>>> roc2 rev: 0
>>> send: 1
>>> proc2 rev: 1
>>> send: 2
>>> proc2 rev: 2
>>> send: 3
>>> proc2 rev: 3
>>> send: 4
>>> proc2 rev: 4
>>> send: 5
>>> proc2 rev: 5
>>> send: 6
>>> proc2 rev: 6
>>> send: 7
>>> proc2 rev: 7
>>> send: 8
>>> proc2 rev: 8...... 七、Pool
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
例7.1:使用進程池(非阻塞)
import multiprocessing
import timedef func(msg):print("msg:", msg)time.sleep(3)print("end")if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in range(4):msg = "hello %d" %(i)pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print("Sub-process(es) done.")------------------------------------------------>>> Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
>>> msg: hello 0
>>> msg: hello 1
>>> msg: hello 2
>>> end
>>> msg: hello 3
>>> end
>>> end
>>> end
>>> Sub-process(es) done. 函數解釋:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
- close() 關閉pool,使其不在接受新的任務。
- terminate() 結束工作進程,不在處理未完成的任務。
- join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之后使用。
執行說明:創建一個進程池pool,并設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事后才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在"end"后。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環后直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。
例7.2:使用進程池(阻塞)
import multiprocessing
import timedef func(msg):print("msg:", msg)time.sleep(3)print("end")if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in range(4):msg = "hello %d" %(i)pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print("Sub-process(es) done.")------------------------------------------------>>> msg: hello 0
>>> end
>>> msg: hello 1
>>> end
>>> msg: hello 2
>>> end
>>> msg: hello 3
>>> end
>>> Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
>>> Sub-process(es) done. 例7.3:使用進程池,并關注結果
import multiprocessing
import timedef func(msg):print("msg:", msg)time.sleep(3)print("end")return "done" + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for i in range(3):msg = "hello %d" %(i)result.append(pool.apply_async(func, (msg, )))pool.close()pool.join()for res in result:print(":::", res.get())print("Sub-process(es) done.")------------------------------------------------>>> msg: hello 0
>>> msg: hello 1
>>> msg: hello 2
>>> end
>>> end
>>> end
>>> ::: donehello 0
>>> ::: donehello 1
>>> ::: donehello 2
>>> Sub-process(es) done. 例7.4:使用多個進程池
import multiprocessing
import os, time, randomdef Lee():print("\nRun task Lee-%s" % (os.getpid())) # os.getpid()獲取當前的進程的IDstart = time.time()time.sleep(random.random() * 10) # random.random()隨機生成0-1之間的小數end = time.time()print('Task Lee, runs %0.2f seconds.' % (end - start))def Marlon():print("\nRun task Marlon-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 40)end = time.time()print('Task Marlon runs %0.2f seconds.' % (end - start))def Allen():print("\nRun task Allen-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 30)end = time.time()print('Task Allen runs %0.2f seconds.' % (end - start))def Frank():print("\nRun task Frank-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 20)end = time.time()print('Task Frank runs %0.2f seconds.' % (end - start))if __name__ == '__main__':function_list = [Lee, Marlon, Allen, Frank]print("parent process %s" % (os.getpid()))pool = multiprocessing.Pool(4)for func in function_list:pool.apply_async(func) # Pool執行函數,apply執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中print('Waiting for all subprocesses done...')pool.close()pool.join() # 調用join之前,一定要先調用close() 函數,否則會出錯, close()執行后不會有新的進程加入到pool,join函數等待素有子進程結束print('All subprocesses done.')------------------------------------------------>>> parent process 9828
>>> Waiting for all subprocesses done...
>>>
>>> Run task Lee-12948
>>>
>>> Run task Marlon-8948
>>>
>>> Run task Allen-18124
>>>
>>> Run task Frank-17404
>>> Task Frank runs 3.42 seconds.
>>> Task Lee, runs 6.69 seconds.
>>> Task Allen runs 8.38 seconds.
>>> Task Marlon runs 13.37 seconds.
>>> All subprocesses done.
轉載于:https://www.cnblogs.com/luyuze95/p/11266951.html
總結
以上是生活随笔為你收集整理的python多进程详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TS - 处理故障的一些通用方法
- 下一篇: Python - 在CentOS7.5系