第06讲:多路加速,了解多进程基本原理
在上一課時(shí)我們了解了多線程的基本概念,同時(shí)我們也提到,Python 中的多線程是不能很好發(fā)揮多核優(yōu)勢(shì)的,如果想要發(fā)揮多核優(yōu)勢(shì),最好還是使用多進(jìn)程。
那么本課時(shí)我們就來了解下多進(jìn)程的基本概念和用 Python 實(shí)現(xiàn)多進(jìn)程的方法。
1.多進(jìn)程的含義
進(jìn)程(Process)是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位。
顧名思義,多進(jìn)程就是啟用多個(gè)進(jìn)程同時(shí)運(yùn)行。由于進(jìn)程是線程的集合,而且進(jìn)程是由一個(gè)或多個(gè)線程構(gòu)成的,所以多進(jìn)程的運(yùn)行意味著有大于或等于進(jìn)程數(shù)量的線程在運(yùn)行。
2.Python 多進(jìn)程的優(yōu)勢(shì)
通過上一課時(shí)我們知道,由于進(jìn)程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢(shì),一個(gè)進(jìn)程中的多個(gè)線程,在同一時(shí)刻只能有一個(gè)線程運(yùn)行。
而對(duì)于多進(jìn)程來說,每個(gè)進(jìn)程都有屬于自己的 GIL,所以,在多核處理器下,多進(jìn)程的運(yùn)行是不會(huì)受 GIL 的影響的。因此,多進(jìn)程能更好地發(fā)揮多核的優(yōu)勢(shì)。
當(dāng)然,對(duì)于爬蟲這種 IO 密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對(duì)于計(jì)算密集型任務(wù)來說,Python 的多進(jìn)程相比多線程,其多核運(yùn)行效率會(huì)有成倍的提升。
總的來說,Python 的多進(jìn)程整體來看是比多線程更有優(yōu)勢(shì)的。所以,在條件允許的情況下,能用多進(jìn)程就盡量用多進(jìn)程。
不過值得注意的是,由于進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位,所以各個(gè)進(jìn)程之間的數(shù)據(jù)是無法共享的,如多個(gè)進(jìn)程無法共享一個(gè)全局變量,進(jìn)程之間的數(shù)據(jù)共享需要有單獨(dú)的機(jī)制來實(shí)現(xiàn),這在后面也會(huì)講到。
3.多進(jìn)程的實(shí)現(xiàn)
在 Python 中也有內(nèi)置的庫來實(shí)現(xiàn)多進(jìn)程,它就是 multiprocessing。
multiprocessing 提供了一系列的組件,如 Process(進(jìn)程)、Queue(隊(duì)列)、Semaphore(信號(hào)量)、Pipe(管道)、Lock(鎖)、Pool(進(jìn)程池)等,接下來讓我們來了解下它們的使用方法。
直接使用 Process 類
在 multiprocessing 中,每一個(gè)進(jìn)程都用一個(gè) Process 類來表示。它的 API 調(diào)用如下:
Process([group [, target [, name [, args [, kwargs]]]]])- target 表示調(diào)用對(duì)象,你可以傳入方法的名字。
- args 表示被調(diào)用對(duì)象的位置參數(shù)元組,比如 target 是函數(shù) func,他有兩個(gè)參數(shù) m,n,那么 args 就傳入 [m, n] 即可。
- kwargs 表示調(diào)用對(duì)象的字典。
- name 是別名,相當(dāng)于給這個(gè)進(jìn)程取一個(gè)名字。
- group 分組。
我們先用一個(gè)實(shí)例來感受一下:
import multiprocessingdef process(index):print(f'Process: {index}')if __name__ == '__main__':for i in range(5):p = multiprocessing.Process(target=process, args=(i,))p.start()這是一個(gè)實(shí)現(xiàn)多進(jìn)程最基礎(chǔ)的方式:通過創(chuàng)建 Process 來新建一個(gè)子進(jìn)程,其中 target 參數(shù)傳入方法名,args 是方法的參數(shù),是以元組的形式傳入,其和被調(diào)用的方法 process 的參數(shù)是一一對(duì)應(yīng)的。
注意:這里 args
必須要是一個(gè)元組,如果只有一個(gè)參數(shù),那也要在元組第一個(gè)元素后面加一個(gè)逗號(hào),如果沒有逗號(hào)則和單個(gè)元素本身沒有區(qū)別,無法構(gòu)成元組,導(dǎo)致參數(shù)傳遞出現(xiàn)問題。
創(chuàng)建完進(jìn)程之后,我們通過調(diào)用 start 方法即可啟動(dòng)進(jìn)程了。運(yùn)行結(jié)果如下:
Process: 0 Process: 1 Process: 2 Process: 3 Process: 4可以看到,我們運(yùn)行了 5 個(gè)子進(jìn)程,每個(gè)進(jìn)程都調(diào)用了 process 方法。process 方法的 index 參數(shù)通過 Process 的 args 傳入,分別是 0~4 這 5 個(gè)序號(hào),最后打印出來,5 個(gè)子進(jìn)程運(yùn)行結(jié)束。
由于進(jìn)程是 Python 中最小的資源分配單元,因此這些進(jìn)程和線程不同,各個(gè)進(jìn)程之間的數(shù)據(jù)是不會(huì)共享的,每啟動(dòng)一個(gè)進(jìn)程,都會(huì)獨(dú)立分配資源。
另外,在當(dāng)前 CPU 核數(shù)足夠的情況下,這些不同的進(jìn)程會(huì)分配給不同的 CPU 核來運(yùn)行,實(shí)現(xiàn)真正的并行執(zhí)行。
multiprocessing 還提供了幾個(gè)比較有用的方法,如我們可以通過 cpu_count 的方法來獲取當(dāng)前機(jī)器 CPU 的核心數(shù)量,通過 active_children 方法獲取當(dāng)前還在運(yùn)行的所有進(jìn)程。
下面通過一個(gè)實(shí)例來看一下:
import multiprocessing import timedef process(index):time.sleep(index)print(f'Process: {index}')if __name__ == '__main__':for i in range(5):p = multiprocessing.Process(target=process, args=[i])p.start()print(f'CPU number: {multiprocessing.cpu_count()}')for p in multiprocessing.active_children():print(f'Child process name: {p.name} id: {p.pid}')print('Process Ended')運(yùn)行結(jié)果如下:
Process: 0 CPU number: 8 Child process name: Process-5 id: 73595 Child process name: Process-2 id: 73592 Child process name: Process-3 id: 73593 Child process name: Process-4 id: 73594 Process Ended Process: 1 Process: 2 Process: 3 Process: 4在上面的例子中我們通過 cpu_count 成功獲取了 CPU 核心的數(shù)量:8 個(gè),當(dāng)然不同的機(jī)器結(jié)果可能不同。
另外我們還通過 active_children 獲取到了當(dāng)前正在活躍運(yùn)行的進(jìn)程列表。然后我們遍歷了每個(gè)進(jìn)程,并將它們的名稱和進(jìn)程號(hào)打印出來了,這里進(jìn)程號(hào)直接使用 pid 屬性即可獲取,進(jìn)程名稱直接通過 name 屬性即可獲取。
以上我們就完成了多進(jìn)程的創(chuàng)建和一些基本信息的獲取。
4.繼承 Process 類
在上面的例子中,我們創(chuàng)建進(jìn)程是直接使用 Process 這個(gè)類來創(chuàng)建的,這是一種創(chuàng)建進(jìn)程的方式。不過,創(chuàng)建進(jìn)程的方式不止這一種,同樣,我們也可以像線程 Thread 一樣來通過繼承的方式創(chuàng)建一個(gè)進(jìn)程類,進(jìn)程的基本操作我們?cè)谧宇惖?run 方法中實(shí)現(xiàn)即可。
通過一個(gè)實(shí)例來看一下:
from multiprocessing import Process import timeclass MyProcess(Process):def __init__(self, loop):Process.__init__(self)self.loop = loopdef run(self):for count in range(self.loop):time.sleep(1)print(f'Pid: {self.pid} LoopCount: {count}')if __name__ == '__main__':for i in range(2, 5):p = MyProcess(i)p.start()我們首先聲明了一個(gè)構(gòu)造方法,這個(gè)方法接收一個(gè) loop 參數(shù),代表循環(huán)次數(shù),并將其設(shè)置為全局變量。在 run 方法中,又使用這個(gè) loop 變量循環(huán)了 loop 次并打印了當(dāng)前的進(jìn)程號(hào)和循環(huán)次數(shù)。
在調(diào)用時(shí),我們用 range 方法得到了 2、3、4 三個(gè)數(shù)字,并把它們分別初始化了 MyProcess 進(jìn)程,然后調(diào)用 start 方法將進(jìn)程啟動(dòng)起來。
注意:這里進(jìn)程的執(zhí)行邏輯需要在 run 方法中實(shí)現(xiàn),啟動(dòng)進(jìn)程需要調(diào)用 start 方法,調(diào)用之后 run 方法便會(huì)執(zhí)行。
運(yùn)行結(jié)果如下:
Pid: 73667 LoopCount: 0 Pid: 73668 LoopCount: 0 Pid: 73669 LoopCount: 0 Pid: 73667 LoopCount: 1 Pid: 73668 LoopCount: 1 Pid: 73669 LoopCount: 1 Pid: 73668 LoopCount: 2 Pid: 73669 LoopCount: 2 Pid: 73669 LoopCount: 3可以看到,三個(gè)進(jìn)程分別打印出了 2、3、4 條結(jié)果,即進(jìn)程 73667 打印了 2 次 結(jié)果,進(jìn)程 73668 打印了 3 次結(jié)果,進(jìn)程 73669 打印了 4 次結(jié)果。
注意,這里的進(jìn)程 pid 代表進(jìn)程號(hào),不同機(jī)器、不同時(shí)刻運(yùn)行結(jié)果可能不同。
通過上面的方式,我們也非常方便地實(shí)現(xiàn)了一個(gè)進(jìn)程的定義。為了復(fù)用方便,我們可以把一些方法寫在每個(gè)進(jìn)程類里封裝好,在使用時(shí)直接初始化一個(gè)進(jìn)程類運(yùn)行即可。
5.守護(hù)進(jìn)程
在多進(jìn)程中,同樣存在守護(hù)進(jìn)程的概念,如果一個(gè)進(jìn)程被設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束后,子進(jìn)程會(huì)自動(dòng)被終止,我們可以通過設(shè)置 daemon 屬性來控制是否為守護(hù)進(jìn)程。
還是原來的例子,增加了 deamon 屬性的設(shè)置:
from multiprocessing import Process import timeclass MyProcess(Process):def __init__(self, loop):Process.__init__(self)self.loop = loopdef run(self):for count in range(self.loop):time.sleep(1)print(f'Pid: {self.pid} LoopCount: {count}')if __name__ == '__main__':for i in range(2, 5):p = MyProcess(i)p.daemon = Truep.start()print('Main Process ended')運(yùn)行結(jié)果如下:
Main Process ended結(jié)果很簡單,因?yàn)橹鬟M(jìn)程沒有做任何事情,直接輸出一句話結(jié)束,所以在這時(shí)也直接終止了子進(jìn)程的運(yùn)行。
這樣可以有效防止無控制地生成子進(jìn)程。這樣的寫法可以讓我們?cè)谥鬟M(jìn)程運(yùn)行結(jié)束后無需額外擔(dān)心子進(jìn)程是否關(guān)閉,避免了獨(dú)立子進(jìn)程的運(yùn)行。
6.進(jìn)程等待
上面的運(yùn)行效果其實(shí)不太符合我們預(yù)期:主進(jìn)程運(yùn)行結(jié)束時(shí),子進(jìn)程(守護(hù)進(jìn)程)也都退出了,子進(jìn)程什么都沒來得及執(zhí)行。
能不能讓所有子進(jìn)程都執(zhí)行完了然后再結(jié)束呢?當(dāng)然是可以的,只需要加入 join 方法即可,我們可以將代碼改寫如下:
processes = [] for i in range(2, 5):p = MyProcess(i)processes.append(p)p.daemon = Truep.start() for p in processes:p.join()運(yùn)行結(jié)果如下:
Pid: 40866 LoopCount: 0 Pid: 40867 LoopCount: 0 Pid: 40868 LoopCount: 0 Pid: 40866 LoopCount: 1 Pid: 40867 LoopCount: 1 Pid: 40868 LoopCount: 1 Pid: 40867 LoopCount: 2 Pid: 40868 LoopCount: 2 Pid: 40868 LoopCount: 3 Main Process ended在調(diào)用 start 和 join 方法后,父進(jìn)程就可以等待所有子進(jìn)程都執(zhí)行完畢后,再打印出結(jié)束的結(jié)果。
默認(rèn)情況下,join 是無限期的。也就是說,如果有子進(jìn)程沒有運(yùn)行完畢,主進(jìn)程會(huì)一直等待。這種情況下,如果子進(jìn)程出現(xiàn)問題陷入了死循環(huán),主進(jìn)程也會(huì)無限等待下去。怎么解決這個(gè)問題呢?可以給 join 方法傳遞一個(gè)超時(shí)參數(shù),代表最長等待秒數(shù)。如果子進(jìn)程沒有在這個(gè)指定秒數(shù)之內(nèi)完成,會(huì)被強(qiáng)制返回,主進(jìn)程不再會(huì)等待。也就是說這個(gè)參數(shù)設(shè)置了主進(jìn)程等待該子進(jìn)程的最長時(shí)間。
例如這里我們傳入 1,代表最長等待 1 秒,代碼改寫如下:
processes = [] for i in range(3, 5):p = MyProcess(i)processes.append(p)p.daemon = Truep.start() for p in processes:p.join(1)運(yùn)行結(jié)果如下:
Pid: 40970 LoopCount: 0 Pid: 40971 LoopCount: 0 Pid: 40970 LoopCount: 1 Pid: 40971 LoopCount: 1 Main Process ended可以看到,有的子進(jìn)程本來要運(yùn)行 3 秒,結(jié)果運(yùn)行 1 秒就被強(qiáng)制返回了,由于是守護(hù)進(jìn)程,該子進(jìn)程被終止了。
到這里,我們就了解了守護(hù)進(jìn)程、進(jìn)程等待和超時(shí)設(shè)置的用法。
7.終止進(jìn)程
當(dāng)然,終止進(jìn)程不止有守護(hù)進(jìn)程這一種做法,我們也可以通過 terminate 方法來終止某個(gè)子進(jìn)程,另外我們還可以通過 is_alive 方法判斷進(jìn)程是否還在運(yùn)行。
下面我們來看一個(gè)實(shí)例:
import multiprocessing import timedef process():print('Starting')time.sleep(5)print('Finished')if __name__ == '__main__':p = multiprocessing.Process(target=process)print('Before:', p, p.is_alive())p.start()print('During:', p, p.is_alive())p.terminate()print('Terminate:', p, p.is_alive())p.join()print('Joined:', p, p.is_alive())在上面的例子中,我們用 Process 創(chuàng)建了一個(gè)進(jìn)程,接著調(diào)用 start 方法啟動(dòng)這個(gè)進(jìn)程,然后調(diào)用 terminate 方法將進(jìn)程終止,最后調(diào)用 join 方法。
另外,在進(jìn)程運(yùn)行不同的階段,我們還通過 is_alive 方法判斷當(dāng)前進(jìn)程是否還在運(yùn)行。
運(yùn)行結(jié)果如下:
Before: <Process(Process-1, initial)> False During: <Process(Process-1, started)> True Terminate: <Process(Process-1, started)> True Joined: <Process(Process-1, stopped[SIGTERM])> False這里有一個(gè)值得注意的地方,在調(diào)用 terminate 方法之后,我們用 is_alive 方法獲取進(jìn)程的狀態(tài)發(fā)現(xiàn)依然還是運(yùn)行狀態(tài)。在調(diào)用 join 方法之后,is_alive 方法獲取進(jìn)程的運(yùn)行狀態(tài)才變?yōu)榻K止?fàn)顟B(tài)。
所以,在調(diào)用 terminate 方法之后,記得要調(diào)用一下 join 方法,這里調(diào)用 join 方法可以為進(jìn)程提供時(shí)間來更新對(duì)象狀態(tài),用來反映出最終的進(jìn)程終止效果。
8.進(jìn)程互斥鎖
在上面的一些實(shí)例中,我們可能會(huì)遇到如下的運(yùn)行結(jié)果:
Pid: 73993 LoopCount: 0 Pid: 73993 LoopCount: 1 Pid: 73994 LoopCount: 0Pid: 73994 LoopCount: 1Pid: 73994 LoopCount: 2 Pid: 73995 LoopCount: 0 Pid: 73995 LoopCount: 1 Pid: 73995 LoopCount: 2 Pid: 73995 LoopCount: 3 Main Process ended我們發(fā)現(xiàn),有的輸出結(jié)果沒有換行。這是什么原因造成的呢?
這種情況是由多個(gè)進(jìn)程并行執(zhí)行導(dǎo)致的,兩個(gè)進(jìn)程同時(shí)進(jìn)行了輸出,結(jié)果第一個(gè)進(jìn)程的換行沒有來得及輸出,第二個(gè)進(jìn)程就輸出了結(jié)果,導(dǎo)致最終輸出沒有換行。
那如何來避免這種問題?如果我們能保證,多個(gè)進(jìn)程運(yùn)行期間的任一時(shí)間,只能一個(gè)進(jìn)程輸出,其他進(jìn)程等待,等剛才那個(gè)進(jìn)程輸出完畢之后,另一個(gè)進(jìn)程再進(jìn)行輸出,這樣就不會(huì)出現(xiàn)輸出沒有換行的現(xiàn)象了。
這種解決方案實(shí)際上就是實(shí)現(xiàn)了進(jìn)程互斥,避免了多個(gè)進(jìn)程同時(shí)搶占臨界區(qū)(輸出)資源。我們可以通過 multiprocessing 中的 Lock 來實(shí)現(xiàn)。Lock,即鎖,在一個(gè)進(jìn)程輸出時(shí),加鎖,其他進(jìn)程等待。等此進(jìn)程執(zhí)行結(jié)束后,釋放鎖,其他進(jìn)程可以進(jìn)行輸出。
我們首先實(shí)現(xiàn)一個(gè)不加鎖的實(shí)例,代碼如下:
from multiprocessing import Process, Lock import timeclass MyProcess(Process):def __init__(self, loop, lock):Process.__init__(self)self.loop = loopself.lock = lockdef run(self):for count in range(self.loop):time.sleep(0.1)# self.lock.acquire()print(f'Pid: {self.pid} LoopCount: {count}')# self.lock.release()if __name__ == '__main__':lock = Lock()for i in range(10, 15):p = MyProcess(i, lock)p.start()運(yùn)行結(jié)果如下:
Pid: 74030 LoopCount: 0 Pid: 74031 LoopCount: 0 Pid: 74032 LoopCount: 0 Pid: 74033 LoopCount: 0 Pid: 74034 LoopCount: 0 Pid: 74030 LoopCount: 1 Pid: 74031 LoopCount: 1 Pid: 74032 LoopCount: 1Pid: 74033 LoopCount: 1Pid: 74034 LoopCount: 1 Pid: 74030 LoopCount: 2 ...可以看到運(yùn)行結(jié)果中有些輸出已經(jīng)出現(xiàn)了不換行的問題。
我們對(duì)其加鎖,取消掉剛才代碼中的兩行注釋,重新運(yùn)行,運(yùn)行結(jié)果如下:
Pid: 74061 LoopCount: 0 Pid: 74062 LoopCount: 0 Pid: 74063 LoopCount: 0 Pid: 74064 LoopCount: 0 Pid: 74065 LoopCount: 0 Pid: 74061 LoopCount: 1 Pid: 74062 LoopCount: 1 Pid: 74063 LoopCount: 1 Pid: 74064 LoopCount: 1 Pid: 74065 LoopCount: 1 Pid: 74061 LoopCount: 2 Pid: 74062 LoopCount: 2 Pid: 74064 LoopCount: 2 ...這時(shí)輸出效果就正常了。
所以,在訪問一些臨界區(qū)資源時(shí),使用 Lock 可以有效避免進(jìn)程同時(shí)占用資源而導(dǎo)致的一些問題。
9.信號(hào)量
進(jìn)程互斥鎖可以使同一時(shí)刻只有一個(gè)進(jìn)程能訪問共享資源,如上面的例子所展示的那樣,在同一時(shí)刻只能有一個(gè)進(jìn)程輸出結(jié)果。但有時(shí)候我們需要允許多個(gè)進(jìn)程來訪問共享資源,同時(shí)還需要限制能訪問共享資源的進(jìn)程的數(shù)量。
這種需求該如何實(shí)現(xiàn)呢?可以用信號(hào)量,信號(hào)量是進(jìn)程同步過程中一個(gè)比較重要的角色。它可以控制臨界資源的數(shù)量,實(shí)現(xiàn)多個(gè)進(jìn)程同時(shí)訪問共享資源,限制進(jìn)程的并發(fā)量。
如果你學(xué)過操作系統(tǒng),那么一定對(duì)這方面非常了解,如果你還不了解信號(hào)量是什么,可以先熟悉一下這個(gè)概念。
我們可以用 multiprocessing 庫中的 Semaphore 來實(shí)現(xiàn)信號(hào)量。
那么接下來我們就用一個(gè)實(shí)例來演示一下進(jìn)程之間利用 Semaphore 做到多個(gè)進(jìn)程共享資源,同時(shí)又限制同時(shí)可訪問的進(jìn)程數(shù)量,代碼如下:
from multiprocessing import Process, Semaphore, Lock, Queue import timebuffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock()class Consumer(Process):def run(self):global buffer, empty, full, lockwhile True:full.acquire()lock.acquire()buffer.get()print('Consumer pop an element')time.sleep(1)lock.release()empty.release()class Producer(Process):def run(self):global buffer, empty, full, lockwhile True:empty.acquire()lock.acquire()buffer.put(1)print('Producer append an element')time.sleep(1)lock.release()full.release()if __name__ == '__main__':p = Producer()c = Consumer()p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print('Main Process Ended')如上代碼實(shí)現(xiàn)了經(jīng)典的生產(chǎn)者和消費(fèi)者問題。它定義了兩個(gè)進(jìn)程類,一個(gè)是消費(fèi)者,一個(gè)是生產(chǎn)者。
另外,這里使用 multiprocessing 中的 Queue 定義了一個(gè)共享隊(duì)列,然后定義了兩個(gè)信號(hào)量 Semaphore,一個(gè)代表緩沖區(qū)空余數(shù),一個(gè)表示緩沖區(qū)占用數(shù)。
生產(chǎn)者 Producer 使用 acquire 方法來占用一個(gè)緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來進(jìn)行加鎖,對(duì)緩沖區(qū)進(jìn)行操作,然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費(fèi)者則相反。
運(yùn)行結(jié)果如下:
Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element Consumer pop an element Consumer pop an element Producer append an element Producer append an element我們發(fā)現(xiàn)兩個(gè)進(jìn)程在交替運(yùn)行,生產(chǎn)者先放入緩沖區(qū)物品,然后消費(fèi)者取出,不停地進(jìn)行循環(huán)。 你可以通過上面的例子來體會(huì)信號(hào)量 Semaphore 的用法,通過 Semaphore 我們很好地控制了進(jìn)程對(duì)資源的并發(fā)訪問數(shù)量。
10.隊(duì)列
在上面的例子中我們使用 Queue 作為進(jìn)程通信的共享隊(duì)列使用。
而如果我們把上面程序中的 Queue 換成普通的 list,是完全起不到效果的,因?yàn)檫M(jìn)程和進(jìn)程之間的資源是不共享的。即使在一個(gè)進(jìn)程中改變了這個(gè) list,在另一個(gè)進(jìn)程也不能獲取到這個(gè) list 的狀態(tài),所以聲明全局變量對(duì)多進(jìn)程是沒有用處的。
那進(jìn)程如何共享數(shù)據(jù)呢?可以用 Queue,即隊(duì)列。當(dāng)然這里的隊(duì)列指的是 multiprocessing 里面的 Queue。
依然用上面的例子,我們一個(gè)進(jìn)程向隊(duì)列中放入隨機(jī)數(shù)據(jù),然后另一個(gè)進(jìn)程取出數(shù)據(jù)。
from multiprocessing import Process, Semaphore, Lock, Queue import time from random import randombuffer = Queue(10) empty = Semaphore(2) full = Semaphore(0) lock = Lock()class Consumer(Process):def run(self):global buffer, empty, full, lockwhile True:full.acquire()lock.acquire()print(f'Consumer get {buffer.get()}')time.sleep(1)lock.release()empty.release()class Producer(Process):def run(self):global buffer, empty, full, lockwhile True:empty.acquire()lock.acquire()num = random()print(f'Producer put {num}')buffer.put(num)time.sleep(1)lock.release()full.release()if __name__ == '__main__':p = Producer()c = Consumer()p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print('Main Process Ended')運(yùn)行結(jié)果如下:
Producer put 0.719213647437 Producer put 0.44287326683 Consumer get 0.719213647437 Consumer get 0.44287326683 Producer put 0.722859424381 Producer put 0.525321338921 Consumer get 0.722859424381 Consumer get 0.525321338921在上面的例子中我們聲明了兩個(gè)進(jìn)程,一個(gè)進(jìn)程為生產(chǎn)者 Producer,另一個(gè)為消費(fèi)者 Consumer,生產(chǎn)者不斷向 Queue 里面添加隨機(jī)數(shù),消費(fèi)者不斷從隊(duì)列里面取隨機(jī)數(shù)。
生產(chǎn)者在放數(shù)據(jù)的時(shí)候調(diào)用了 Queue 的 put 方法,消費(fèi)者在取的時(shí)候使用了 get 方法,這樣我們就通過 Queue 實(shí)現(xiàn)兩個(gè)進(jìn)程的數(shù)據(jù)共享了。
11.管道
剛才我們使用 Queue 實(shí)現(xiàn)了進(jìn)程間的數(shù)據(jù)共享,那么進(jìn)程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用 Pipe,管道。
管道,我們可以把它理解為兩個(gè)進(jìn)程之間通信的通道。管道可以是單向的,即 half-duplex:一個(gè)進(jìn)程負(fù)責(zé)發(fā)消息,另一個(gè)進(jìn)程負(fù)責(zé)收消息;也可以是雙向的 duplex,即互相收發(fā)消息。
默認(rèn)聲明 Pipe 對(duì)象是雙向管道,如果要?jiǎng)?chuàng)建單向管道,可以在初始化的時(shí)候傳入 deplex 參數(shù)為 False。
我們用一個(gè)實(shí)例來感受一下:
from multiprocessing import Process, Pipeclass Consumer(Process):def __init__(self, pipe):Process.__init__(self)self.pipe = pipedef run(self):self.pipe.send('Consumer Words')print(f'Consumer Received: {self.pipe.recv()}')class Producer(Process):def __init__(self, pipe):Process.__init__(self)self.pipe = pipedef run(self):print(f'Producer Received: {self.pipe.recv()}')self.pipe.send('Producer Words')if __name__ == '__main__':pipe = Pipe()p = Producer(pipe[0])c = Consumer(pipe[1])p.daemon = c.daemon = Truep.start()c.start()p.join()c.join()print('Main Process Ended')在這個(gè)例子里我們聲明了一個(gè)默認(rèn)為雙向的管道,然后將管道的兩端分別傳給兩個(gè)進(jìn)程。兩個(gè)進(jìn)程互相收發(fā)。觀察一下結(jié)果:
Producer Received: Consumer Words Consumer Received: Producer Words Main Process Ended管道 Pipe 就像進(jìn)程之間搭建的橋梁,利用它我們就可以很方便地實(shí)現(xiàn)進(jìn)程間通信了。
12.進(jìn)程池
在前面,我們講了可以使用 Process 來創(chuàng)建進(jìn)程,同時(shí)也講了如何用 Semaphore 來控制進(jìn)程的并發(fā)執(zhí)行數(shù)量。
假如現(xiàn)在我們遇到這么一個(gè)問題,我有 10000 個(gè)任務(wù),每個(gè)任務(wù)需要啟動(dòng)一個(gè)進(jìn)程來執(zhí)行,并且一個(gè)進(jìn)程運(yùn)行完畢之后要緊接著啟動(dòng)下一個(gè)進(jìn)程,同時(shí)我還需要控制進(jìn)程的并發(fā)數(shù)量,不能并發(fā)太高,不然 CPU 處理不過來(如果同時(shí)運(yùn)行的進(jìn)程能維持在一個(gè)最高恒定值當(dāng)然利用率是最高的)。
那么我們?cè)撊绾蝸韺?shí)現(xiàn)這個(gè)需求呢?
用 Process 和 Semaphore 可以實(shí)現(xiàn),但是實(shí)現(xiàn)起來比較我們可以用 Process 和 Semaphore 解決問題,但是實(shí)現(xiàn)起來比較煩瑣。而這種需求在平時(shí)又是非常常見的。此時(shí),我們就可以派上進(jìn)程池了,即 multiprocessing 中的 Pool。
Pool 可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到 pool 中時(shí),如果池還沒有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行它。
我們用一個(gè)實(shí)例來實(shí)現(xiàn)一下,代碼如下:
from multiprocessing import Pool import timedef function(index):print(f'Start process: {index}')time.sleep(3)print(f'End process {index}', )if __name__ == '__main__':pool = Pool(processes=3)for i in range(4):pool.apply_async(function, args=(i,))print('Main Process started')pool.close()pool.join()print('Main Process ended')在這個(gè)例子中我們聲明了一個(gè)大小為 3 的進(jìn)程池,通過 processes 參數(shù)來指定,如果不指定,那么會(huì)自動(dòng)根據(jù)處理器內(nèi)核來分配進(jìn)程數(shù)。接著我們使用 apply_async 方法將進(jìn)程添加進(jìn)去,args 可以用來傳遞參數(shù)。
運(yùn)行結(jié)果如下:
Main Process started Start process: 0 Start process: 1 Start process: 2 End process 0 End process 1 End process 2 Start process: 3 End process 3 Main Process ended進(jìn)程池大小為 3,所以最初可以看到有 3 個(gè)進(jìn)程同時(shí)執(zhí)行,第4個(gè)進(jìn)程在等待,在有進(jìn)程運(yùn)行完畢之后,第4個(gè)進(jìn)程馬上跟著運(yùn)行,出現(xiàn)了如上的運(yùn)行效果。
最后,我們要記得調(diào)用 close 方法來關(guān)閉進(jìn)程池,使其不再接受新的任務(wù),然后調(diào)用 join 方法讓主進(jìn)程等待子進(jìn)程的退出,等子進(jìn)程運(yùn)行完畢之后,主進(jìn)程接著運(yùn)行并結(jié)束。
不過上面的寫法多少有些煩瑣,這里再介紹進(jìn)程池一個(gè)更好用的 map 方法,可以將上述寫法簡化很多。
map 方法是怎么用的呢?第一個(gè)參數(shù)就是要啟動(dòng)的進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)是一個(gè)可迭代對(duì)象,其中的每個(gè)元素會(huì)被傳遞給這個(gè)執(zhí)行方法。
舉個(gè)例子:現(xiàn)在我們有一個(gè) list,里面包含了很多 URL,另外我們也定義了一個(gè)方法用來抓取每個(gè) URL 內(nèi)容并解析,那么我們可以直接在 map 的第一個(gè)參數(shù)傳入方法名,第 2 個(gè)參數(shù)傳入 URL 數(shù)組。
我們用一個(gè)實(shí)例來感受一下:
from multiprocessing import Pool import urllib.request import urllib.errordef scrape(url):try:urllib.request.urlopen(url)print(f'URL {url} Scraped')except (urllib.error.HTTPError, urllib.error.URLError):print(f'URL {url} not Scraped')if __name__ == '__main__':pool = Pool(processes=3)urls = ['https://www.baidu.com','http://www.meituan.com/','http://blog.csdn.net/','http://xxxyxxx.net']pool.map(scrape, urls)pool.close()這個(gè)例子中我們先定義了一個(gè) scrape 方法,它接收一個(gè)參數(shù) url,這里就是請(qǐng)求了一下這個(gè)鏈接,然后輸出爬取成功的信息,如果發(fā)生錯(cuò)誤,則會(huì)輸出爬取失敗的信息。
首先我們要初始化一個(gè) Pool,指定進(jìn)程數(shù)為 3。然后我們聲明一個(gè) urls 列表,接著我們調(diào)用了 map 方法,第 1 個(gè)參數(shù)就是進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)就是 urls 列表,map 方法會(huì)依次將 urls 的每個(gè)元素作為 scrape 的參數(shù)傳遞并啟動(dòng)一個(gè)新的進(jìn)程,加到進(jìn)程池中執(zhí)行。
運(yùn)行結(jié)果如下:
URL https://www.baidu.com Scraped URL http://xxxyxxx.net not Scraped URL http://blog.csdn.net/ Scraped URL http://www.meituan.com/ Scraped這樣,我們就可以實(shí)現(xiàn) 3 個(gè)進(jìn)程并行運(yùn)行。不同的進(jìn)程相互獨(dú)立地輸出了對(duì)應(yīng)的爬取結(jié)果。
可以看到,我們利用 Pool 的 map 方法非常方便地實(shí)現(xiàn)了多進(jìn)程的執(zhí)行。后面我們也會(huì)在實(shí)戰(zhàn)案例中結(jié)合進(jìn)程池來實(shí)現(xiàn)數(shù)據(jù)的爬取。
以上便是 Python 中多進(jìn)程的基本用法,本節(jié)內(nèi)容比較多,后面的實(shí)戰(zhàn)案例也會(huì)用到這些內(nèi)容,需要好好掌握。
總結(jié)
以上是生活随笔為你收集整理的第06讲:多路加速,了解多进程基本原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第05讲:多路加速,了解多线程基本原理
- 下一篇: 第10讲:高效存储 MongoDB 的用