Python中的并行处理(Pool.map()、Pool.starmap()、Pool.apply()、)
1.并行處理
? ? ?并行處理是一種在同一臺計算機的多個處理器中同時運行任務的工作模式。這種工作模式的目的就是減少總的任務處理時間,但是進程之間的通信會有額外的開銷,因此對小的任務而言,總的任務時間會有所增加而不是是減少。
? ? ? 在Python語言中,multiprocessing模塊通過使用子進程(而不是線程)來運行獨立的并行進程。它可以讓您利用機器上的多個處理器(Windows和Unix),也就是說,多個進程可以完全獨立的在內存中運行
2.自己的設備最多可以進行多少個并行處理
? ? ? 一次可以運行的最大進程數受計算機中處理器數量的限制。可以使用multiprocessing模塊中的cpu_count()函數進行顯示
import multiprocessing as mpprint("Number of processers: ", mp.cpu_count())像我電腦只有四個:
3.同步執行和異步執行
? ? 并行處理中,有兩種執行類型:? ?同步和異步
? ? 同步執行就是各個進程按照啟動的先后順序完成。這是通過鎖定主程序直到相應的進程執行完畢來實現的。
? ??異步執行,換句話說,進程的執行不涉及鎖定,這樣做的結果就是,進程結果返回的順序可能混淆,但通常情況下,異步執行會更快完成。
multiprocessing模塊中有兩個對象是用來實現函數并行執行的:Pool類和Process類
? ??
?
4.解決實際問題實例 :?計算每行中給定數值范圍內的元素個數
? ? ? 給定一個二維矩陣(或者列表和多維列表),計算每行中給定數值范圍內的元素個數
import numpy as np from time import time# RandomState()是一個偽隨機數生成器 np.random.RandomState(100) # 0, 10 : 生成0到10的隨機整數 # size=[200000, 5] 即生成200000行,一列的 ndarray(二維矩陣的形式,每個里面5個元素) arr = np.random.randint(0, 10, size=[200000, 5]) data = arr.tolist() # 將numpy.ndarray 轉化為list # 因為是隨機的,所以每次的數字不確定 data = data[:5] print("數據為:", data)""" 運行結果: 數據為: [[5, 6, 7, 0, 9], [4, 0, 6, 7, 4], [7, 3, 8, 3, 9], [2, 1, 9, 3, 2], [0, 0, 9, 5, 2]]"""? 4.1?不使用并行處理的參考代碼
? ? 函數howmany_within_range()進行重復以檢查在范圍內的數有多少個病返回計數
"""不使用并行處理"""def howmany_within_range(row, minimum, maximum):count = 0for n in row:if minimum <= n <= maximum:count += 1return countresult = [] for row in data:result.append(howmany_within_range(row, minimum=4, maximum=8)) print("給定數值范圍中的元素個數:", result[:10]) """ 注意:以下只是參考輸出,因為輸入序列是隨機的,每次輸出結果并不固定 運行結果: 給定數值范圍中的元素 [3, 2, 3, 4, 2, 3, 3, 2, 2, 2] """? 4.2 對函數進行并行化處理
? ? ? 對代碼進行并行處理通常的做法是取出其中可以多次運行的特定函數,將其放在不同的處理器上并運行,要做到這一點,就需要使用Pool類對數目為n的處理器進行初始化,之后將想要并運行的函數傳遞給Pool類中并行方法。
multipprocessing.Pool()中提供了apply(),map()和starmap()等方法對傳入的函數并行運行。
? apply()和map()? 之間又有什么區別呢?
apply()和map()都是把要進行并行化的函數作為主要參數,但是不同的是,apply()?接收args參數,通過args將各個參數傳送給被并行化處理的函數,而map僅將一個迭代器作為參數。
?因此使用,對于簡單的可迭代的操作,使用map()進行并行處理更適合,而且能更快完成工作
?4.2.1? Pool.apply()進行并行化處理
if __name__ == '__main__':# 1.初始化 multiprocessing.Pool()pool = mp.Pool(mp.cpu_count())# 2.使用apply(), 將函數howmany_within_range作為主參傳進去results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]# 3. 不要忘記關閉進程pool.close()print(results[:10])? 注意:?使用? if __name__ == '__main__':? 將你的代碼放到下面去執行,不然會報錯
? ? ??The "freeze_support()" line can be omitted if the program
? ? ? ? is not going to be frozen to produce an executable.??
? ?我們如果在這段程序之外打印,會發現,程序會有個并行進行運行,也就多打印程序外的內容多次
4.2.2? Parallelizing using Pool.map()
? ? ? ?Pool.map()僅接受一個迭代器參數。對howmany_within_range()函數進行簡單的修改,修改為howmany_within_range_rowonly()把minimum和maximum設置為固定值,即為? 只接受行數據列表迭代器作為輸入,不是最好的辦法,但清楚的顯示了它與apply()的不同之處
import multiprocessing as mpdef howmany_within_range_rowonly(row, minimum=4, maximum=8):count = 0for n in row:if minimum <= n <= maximum:count += 1return countpool = mp.Pool(mp.cpu_count())results = pool.map(howmany_within_range_rowonly,[row for row in data])pool.close()print(results[:10])?4.2.3? 使用Pool.starmap()進行并行化
? ? ? ? 與Pool.map()一樣,Pool.starmap()也只僅接受一個迭代器參數,但在starmap()中,迭代器中的每一個元件也是一個迭代器。你可以通過這個內部迭代器向被并行化處理的函數傳遞參數,在執行時再順序解開,只要傳遞和解開的順序一致就行
?實際上,Pool.starmap()就像是一個接受參數的Pool.map()版本
import multiprocessing as mppool = mp.Pool(mp.cpu_count())results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])pool.close()print(results[:10])5.異步并行處理
? ?和同步并行處理對等的異步并行處理函數apply_async(),map_async()和starmap_async()允許以異步方式并行執行進程,即下一個進程可以在前一個進程完成時立即啟動,而不考慮啟動順序。因此,無法保證結果與輸入的順序相同
6.?使用Pool.apply_async()進行并行化
持續更新
?
總結
以上是生活随笔為你收集整理的Python中的并行处理(Pool.map()、Pool.starmap()、Pool.apply()、)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的__new__(new函
- 下一篇: 1.设计模式中监听模式(观察者模式)(P