【Python】浅谈 multiprocessing
生活随笔
收集整理的這篇文章主要介紹了
【Python】浅谈 multiprocessing
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一前言?
? ?使用python進行并發處理多臺機器/多個實例的時候,我們可以使用threading ,但是由于著名的GIL存在,實際上threading 并未提供真正有效的并發處理,要充分利用到多核CPU,我們需要使用多進程。Python提供了非常好用的多進程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process對象來創建一個進程,該Process對象與Threading對象的用法基本相同,具有相同的方法(官方原話:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue類用于進程之間的通信。話不多說 show me the code!
二使用
2.1 初識異同
下面的程序顯示threading和multiprocessing的在使用方面的異同,相近的函數join(),start(),append() 等,并做同一件事情打印自己的進程pid
#!/usr/bin/env python
# encoding: utf-8
import os
import threading
import multiprocessing
def printer(msg):
????print(msg, os.getpid())
print('Main begin:', os.getpid())
# threading
record = []
for i in range(5):
????thread = threading.Thread(target=printer, args=('threading',))
????thread.start()
????record.append(thread)
for thread in record:
????thread.join()
# multi-process
record = []
for i in range(5):
????process = multiprocessing.Process(target=printer, args=('multiprocessing',))
????process.start()
????record.append(process)
for process in record:
????process.join()
print('Main end:', os.getpid()) 輸出結果
Main begin: 9524
threading 9524
threading 9524
threading 9524
threading 9524
threading 9524
multiprocessing 9539
multiprocessing 9540
multiprocessing 9541
multiprocessing 9542
multiprocessing 9543
Main end: 9524 從例子的結果可以看出多線程threading的進程id和主進程(父進程)pid一樣 ,同為9524; 多進程打印的pid每個都不一樣,for循環中每創建一個process對象都年開一個進程。其他相關的方法基本類似。
2.2 用法
創建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示調用對象,
args表示調用對象的位置參數元組。
kwargs表示調用對象的字典。
name為進程的別名。
group實質上不使用,為None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程,并自動調用run方法.
屬性:authkey、daemon(要通過start()設置,必須設置在方法start之前)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置。
2.3 創建單進程
單線程比較簡單,創建一個 Process的實例對象就好,傳入參數 target 為已經定義好的方法worker以及worker需要的參數
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午6:45
func:
"""
import multiprocessing
import datetime, time
def worker(interval):
????print("process start: {0}".format(datetime.datetime.today()));
????time.sleep(interval)
????print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
????p = multiprocessing.Process(target=worker, args=(5,))
????p.start()
????p.join()
????print "end!" 2.4 創建多進程
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午7:50
func:
"""
import multiprocessing
def worker(num):
????print "worker %d" %num
if __name__ == "__main__":
????print("The number of CPU is:" + str(multiprocessing.cpu_count()))
????proc = []
????for i in xrange(5):
????????p = multiprocessing.Process(target=worker, args=(i,))
????????proc.append(p)
????for p in proc:
????????p.start()
????for p in proc:
????????p.join()
????print "end ..." 輸出
The number of CPU is:4
worker 0
worker 1
worker 2
worker 3
worker 4
main process end ... 2.5 線程池
multiprocessing提供進程池的類--Pool,它可以指定程序最大可以調用的進程數量,當有新的請求提交到pool中時,如果進程池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果進程池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
構造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes ?: 使用的工作進程的數量,如果processes是None,默認使用os.cpu_count()返回的數量。
initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
context: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。
實例方法:
apply(func[, args[, kwds]]):同步進程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進程池
close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
terminate() : 結束工作進程,不在處理未完成的任務.
join() : 等待工作線程的退出,在調用join()前必須調用close()或者 terminate(),因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵尸進程。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午7:50
func:
"""
from multiprocessing import Pool
import time
def worker(num):
????print "worker %d" %num
????time.sleep(2)
????print "end worker %d" %num
if __name__ == "__main__":
????proc_pool = Pool(2)
????for i in xrange(4):
????????proc_pool.apply_async(worker, (i,)) #使用了異步調用,從輸出結果可以看出來
????proc_pool.close()
????proc_pool.join()
????print "main process end ..." 輸出結果
worker 0
worker 1
end worker 0
end worker 1
worker 2
worker 3
end worker 2
end worker 3
main process end .. 解釋:創建一個進程池pool 對象proc_pool,并設定進程的數量為2,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為2,所以0、1會直接送到進程中執行,當其中的2個任務執行完之后才空出2進程處理對象2和3,所以會出現輸出?worker 2 worker 3 出現在end worker 0 end worker 1之后。思考一下如果調用 ?proc_pool.apply(worker, (i,)) 的輸出結果會是什么樣的?
2.6 使用queue
multiprocessing提供隊列類,可以通過調用multiprocessing.Queue(maxsize) 初始化隊列對象,maxsize表示隊列里面最多的元素個數。
例子 創建了兩個函數入隊,出隊,出隊處理時使用了lock特性,串行化取數據。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:03
func:
"""
import time
from multiprocessing import Process, current_process,Lock,Queue
import datetime
def inputQ(queue):
????time.sleep(1)
????info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
????queue.put(info)
def outputQ(queue,lock):
????info = queue.get()
????lock.acquire()
????print ("proc_name: " + current_process().name + ' gets info :' + info)
????lock.release()
if __name__ == '__main__':
????record1 = [] # store input processes
????record2 = [] # store output processes
????lock = Lock() # To prevent messy print
????queue = Queue(3)
????for i in range(10):
????????process = Process(target=inputQ, args=(queue,))
????????process.start()
????????record1.append(process)
????for i in range(10):
????????process = Process(target=outputQ, args=(queue,lock))
????????process.start()
????????record2.append(process)
????for p in record1:
????????p.join()
????queue.close() # No more object will come, close the queue
????for p in record2:
????????p.join() 2.7 使用pipe?
Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。
用法 multiprocessing.Pipe([duplex])
該類返回一組對象實例(conn1, conn2),分別代表發送和接受消息的兩端。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午8:01
func:
"""
from multiprocessing import Process, Pipe
def p1(conn, name):
????conn.send('hello ,{name}'.format(name=name))
????print "p1 receive :", conn.recv()
????conn.close()
def p2(conn, name):
????conn.send('hello ,{name}'.format(name=name))
????print "p2 receive :", conn.recv()
????conn.close()
if __name__ == '__main__':
????parent_conn, child_conn = Pipe()
????proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
????proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
????proc1.start()
????proc2.start()
????proc1.join()
????proc2.join() 輸出:
p1 receive : hello ,child_conn
p2 receive : hello ,parent_conn 該例子中 p1 p2 通過pipe 給彼此相互發送信息,p1 發送"parent_conn" 給 p2 ,p2 發送"child_conn" 給p1.
2.8 daemon程序對比結果
import multiprocessing
import datetime, time
def worker(interval):
????print("process start: {0}".format(datetime.datetime.today()));
????time.sleep(interval)
????print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
????p = multiprocessing.Process(target=worker, args=(5,))
????p.start()
????print "end!" 輸出:
end! process start: 2017-07-02 18:47:30.656244 process end: 2017-07-02 18:47:35.657464
設置 daemon?=?True,程序隨著主程序結束而不等待子進程。
import multiprocessing
import datetime, time
def worker(interval):
????print("process start: {0}".format(datetime.datetime.today()));
????time.sleep(interval)
????print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
????p = multiprocessing.Process(target=worker, args=(5,))
????p.daemon = True
????p.start()
????print "end!" 輸出:
end!
因為子進程設置了daemon屬性,主進程結束,multiprocessing創建的進程對象就隨著結束了。
import multiprocessing
import datetime, time
def worker(interval):
????print("process start: {0}".format(datetime.datetime.today()));
????time.sleep(interval)
????print("process end: {0}".format(datetime.datetime.today()));
if __name__ == "__main__":
????p = multiprocessing.Process(target=worker, args=(5,))
????p.daemon = True ?#
????p.start()
????p.join() #進程執行完畢后再關閉
????print "end!" 輸出:
process start: 2017-07-02 18:48:20.953754
process end: 2017-07-02 18:48:25.954736
2.9 Lock()
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
例子:
多個進程使用同一個std_out ,使用lock機制確保同一個時刻有一個一個進程獲取輸出。
#!/usr/bin/env python
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:28
func:?
"""
from multiprocessing import Process, Lock
def func_with_lock(l, i):
? ? l.acquire()
? ? print 'hello world', i
? ? l.release()
def func_without_lock(i):
? ? print 'hello world', i
if __name__ == '__main__':
? ? lock = Lock()
? ? print "func_with_lock :"
? ? for num in range(10):
? ? ? ? Process(target=func_with_lock, args=(lock, num)).start()
輸出:
func_with_lock :
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
hello world 5
hello world 6
hello world 7
hello world 8
hello world 9
三 小結
?本文參考官方資料以及其他資源,對multiprocesssing 的使用方式做了總結,還有很多知識需要詳細閱讀官方文檔。紙上來得終覺淺,絕知此事要躬行。參考資料
[1]官方文檔?
[2]Python標準庫10 多進程初步 (multiprocessing包)
? ?使用python進行并發處理多臺機器/多個實例的時候,我們可以使用threading ,但是由于著名的GIL存在,實際上threading 并未提供真正有效的并發處理,要充分利用到多核CPU,我們需要使用多進程。Python提供了非常好用的多進程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process對象來創建一個進程,該Process對象與Threading對象的用法基本相同,具有相同的方法(官方原話:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue類用于進程之間的通信。話不多說 show me the code!
二使用
2.1 初識異同
下面的程序顯示threading和multiprocessing的在使用方面的異同,相近的函數join(),start(),append() 等,并做同一件事情打印自己的進程pid
點擊(此處)折疊或打開
2.2 用法
創建進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示調用對象,
args表示調用對象的位置參數元組。
kwargs表示調用對象的字典。
name為進程的別名。
group實質上不使用,為None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程,并自動調用run方法.
屬性:authkey、daemon(要通過start()設置,必須設置在方法start之前)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止后自動終止,且自己不能產生新進程,必須在start()之前設置。
2.3 創建單進程
單線程比較簡單,創建一個 Process的實例對象就好,傳入參數 target 為已經定義好的方法worker以及worker需要的參數
點擊(此處)折疊或打開
multiprocessing提供進程池的類--Pool,它可以指定程序最大可以調用的進程數量,當有新的請求提交到pool中時,如果進程池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果進程池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。
構造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes ?: 使用的工作進程的數量,如果processes是None,默認使用os.cpu_count()返回的數量。
initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
context: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。
實例方法:
apply(func[, args[, kwds]]):同步進程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :異步進程池
close() : 關閉進程池,阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
terminate() : 結束工作進程,不在處理未完成的任務.
join() : 等待工作線程的退出,在調用join()前必須調用close()或者 terminate(),因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵尸進程。
點擊(此處)折疊或打開
2.6 使用queue
multiprocessing提供隊列類,可以通過調用multiprocessing.Queue(maxsize) 初始化隊列對象,maxsize表示隊列里面最多的元素個數。
例子 創建了兩個函數入隊,出隊,出隊處理時使用了lock特性,串行化取數據。
Pipe可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)創建單向管道 (默認為雙向)。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。
用法 multiprocessing.Pipe([duplex])
該類返回一組對象實例(conn1, conn2),分別代表發送和接受消息的兩端。
點擊(此處)折疊或打開
2.8 daemon程序對比結果
點擊(此處)折疊或打開
設置 daemon?=?True,程序隨著主程序結束而不等待子進程。
end!
因為子進程設置了daemon屬性,主進程結束,multiprocessing創建的進程對象就隨著結束了。
點擊(此處)折疊或打開
當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問的沖突。
實例方法:
acquire([timeout]): 使線程進入同步阻塞狀態,嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
例子:
多個進程使用同一個std_out ,使用lock機制確保同一個時刻有一個一個進程獲取輸出。
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2017/7/2 下午9:28
func:?
"""
from multiprocessing import Process, Lock
def func_with_lock(l, i):
? ? l.acquire()
? ? print 'hello world', i
? ? l.release()
def func_without_lock(i):
? ? print 'hello world', i
if __name__ == '__main__':
? ? lock = Lock()
? ? print "func_with_lock :"
? ? for num in range(10):
? ? ? ? Process(target=func_with_lock, args=(lock, num)).start()
點擊(此處)折疊或打開
三 小結
?本文參考官方資料以及其他資源,對multiprocesssing 的使用方式做了總結,還有很多知識需要詳細閱讀官方文檔。紙上來得終覺淺,絕知此事要躬行。參考資料
[1]官方文檔?
[2]Python標準庫10 多進程初步 (multiprocessing包)
總結
以上是生活随笔為你收集整理的【Python】浅谈 multiprocessing的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eclipse更新time out的问题
- 下一篇: python3 快速排序