日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python之路-进程

發(fā)布時間:2023/12/31 python 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python之路-进程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

理論知識

操作系統(tǒng)背景知識

顧名思義,進(jìn)程即程序正在執(zhí)行的一個過程,進(jìn)程是對正在運(yùn)行的程序的一個抽象.

進(jìn)程的概念起源于操作系統(tǒng),是操作系統(tǒng)最核心的概念,也是操作系統(tǒng)提供的最古老也是最總要的抽象概念之一.操作系統(tǒng)的其他所有內(nèi)容都是圍繞進(jìn)程概念展開的.

所以想要真正了解進(jìn)程,必須事先了解操作系統(tǒng)

PS:即使可以利用cpu只有一個(早期的計算機(jī)確實如此),也能保證支持(偽)并發(fā)的能力,將一個單獨(dú)的cpu變成多個虛擬的cup(多道技術(shù):時間多路復(fù)用和空間多路復(fù)用+硬件上支持隔離),沒有進(jìn)程的抽象,現(xiàn)代計算機(jī)將不復(fù)存在.

必備的理論基礎(chǔ):

一 操作系統(tǒng)的作用:

  1:隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口

  2:管理,調(diào)度進(jìn)程,并且將多個進(jìn)程對硬件的競爭變的有序

二 多道技術(shù):

  1.產(chǎn)生背景:針對單核,實現(xiàn)開發(fā)

  ps:

  現(xiàn)在的主機(jī)一般是多核,那么每個核都會利用多道技術(shù)

  有4個cpu,運(yùn)行于cpu1的某個程序遇到io阻塞,會等到io結(jié)束再重新調(diào)度,會被調(diào)度到4個cpu中的任意一個,具體由操作系統(tǒng)調(diào)度算法決定.

  2.空間上的復(fù)用:如內(nèi)存中同時有多道程序

  3.時間上的復(fù)用:復(fù)用一個cpu的時間片

    強(qiáng)調(diào):遇到io切,占用cpu時間過長也切,核心在于切之前將進(jìn)程的狀態(tài)保存下來,這樣才能保證下次切換回來時,能基于上次切走的位置繼續(xù)運(yùn)行

什么是進(jìn)程

進(jìn)程(Process)是計算機(jī)中程序關(guān)于某數(shù)據(jù)集合上的一次運(yùn)行活動,是系統(tǒng)進(jìn)行資源分配好調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ).在早期面向進(jìn)程設(shè)計的計算機(jī)結(jié)構(gòu)中,進(jìn)程是程序 基本執(zhí)行實體;在當(dāng)代面向線程設(shè)計的計算機(jī)結(jié)構(gòu)中,進(jìn)程是線程的容器,線程是程序的基本執(zhí)行實體.程序是指令,數(shù)據(jù)以及其組織形式的描述,進(jìn)程是程序的實體.

狹義的定義:進(jìn)程是正在運(yùn)行的程序的實例(an instance of a computer program that exected)

廣義定義:進(jìn)程是一個具有一定獨(dú)立功能的程序關(guān)于某個數(shù)據(jù)集合的一次運(yùn)行活動。它是操作系統(tǒng)動態(tài)執(zhí)行的基本單元,在傳統(tǒng)的操作系統(tǒng)中,進(jìn)程既是基本的分配單元,也是基本的執(zhí)行單元。

進(jìn)程的概念

第一,進(jìn)程是一個實體。每一個進(jìn)程都有它自己的地址空間,一般情況下,包括文本區(qū)域(text region)、數(shù)據(jù)區(qū)域(data region)和堆棧(stack region)。文本區(qū)域存儲處理器執(zhí)行的代碼;數(shù)據(jù)區(qū)域存儲變量和進(jìn)程執(zhí)行期間使用的動態(tài)分配的內(nèi)存;堆棧區(qū)域存儲著活動過程調(diào)用的指令和本地變量。
第二,進(jìn)程是一個“執(zhí)行中的程序”。程序是一個沒有生命的實體,只有處理器賦予程序生命時(操作系統(tǒng)執(zhí)行之),它才能成為一個活動的實體,我們稱其為進(jìn)程。[3]
進(jìn)程是操作系統(tǒng)中最基本、重要的概念。是多道程序系統(tǒng)出現(xiàn)后,為了刻畫系統(tǒng)內(nèi)部出現(xiàn)的動態(tài)情況,描述系統(tǒng)內(nèi)部各道程序的活動規(guī)律引進(jìn)的一個概念,所有多道程序設(shè)計操作系統(tǒng)都建立在進(jìn)程的基礎(chǔ)上。

操作系統(tǒng)引入進(jìn)程的概念的原因

從理論角度看,是對正在運(yùn)行的程序過程的抽象;

從實現(xiàn)角度看,是一種數(shù)據(jù)結(jié)構(gòu),目的在于清晰地刻畫動態(tài)系統(tǒng)的內(nèi)在規(guī)律,有效管理和調(diào)度進(jìn)入計算機(jī)系統(tǒng)主存儲器運(yùn)行的程序.

進(jìn)程的特征

動態(tài)性:進(jìn)程的實質(zhì)是程序 在多道程序系統(tǒng)中的一次執(zhí)行過程,進(jìn)程是動態(tài)產(chǎn)生,動態(tài)消亡的.

并發(fā)性:任何進(jìn)程都可以同其他進(jìn)程一起并發(fā)執(zhí)行

獨(dú)立性:進(jìn)程是一個獨(dú)立運(yùn)行的基本單位,同時也是系統(tǒng)分配資源和調(diào)度的獨(dú)立單位;

異步性:由于進(jìn)程間的相互制約,使進(jìn)程具有執(zhí)行的間斷性,即進(jìn)程按各自獨(dú)立的,不可預(yù)知的速度向前推進(jìn)

結(jié)構(gòu)特征:進(jìn)程由程序,數(shù)據(jù)和進(jìn)程控制塊三部分組成.

多個不同的進(jìn)程可以包含相同的程序:一個程序在不同的數(shù)據(jù)集里就構(gòu)成不同的進(jìn)程,嫩得到不同的結(jié)果;但是執(zhí)行過程中,程序不能發(fā)生改變.

進(jìn)程與程序中的區(qū)別

程序是指令和數(shù)據(jù)的有序集合,其本身沒有任何運(yùn)行的含義,是一個靜態(tài)的概念.

而進(jìn)程是程序在處理機(jī)上的一次執(zhí)行過程,它是一個動態(tài)的概念.

程序可以作為一種軟件資料長期存在,而進(jìn)程是有一定生命期的.

程序是永久的,進(jìn)程是暫時的.

注意:同一個程序執(zhí)行兩次,就會在操作系統(tǒng)中出現(xiàn)兩個進(jìn)程,所以我們可以同時運(yùn)行一個軟件,分別做不同的事情也不會混亂.

進(jìn)程調(diào)度

想要多個進(jìn)程交替運(yùn)行,操作系統(tǒng)必須對這些進(jìn)程進(jìn)行調(diào)度,這個調(diào)度也不是隨機(jī)進(jìn)行的,而是需要遵循一定的原則,由此就有了進(jìn)程的調(diào)度算法.

先來先服務(wù)調(diào)度算法:

先來先服務(wù)(FCFS)調(diào)度算法是一種最簡單的調(diào)度算法,該算法即可以用于作業(yè)調(diào)度,也可用于進(jìn)程調(diào)度,FCFS算法比較有利于長作業(yè)(進(jìn)程),而不利于短作業(yè)(進(jìn)程).由此可知,本算法適合于CPU繁忙型作業(yè),而不利于I/O繁忙型的作業(yè)(進(jìn)程)

短作業(yè)優(yōu)先調(diào)度算法

短作業(yè)(進(jìn)程)優(yōu)先調(diào)度算法(SJ/PF)是指對短作業(yè)或者短進(jìn)程優(yōu)先調(diào)度的算法,該算法即可用于作業(yè)調(diào)度,也可用于進(jìn)程調(diào)度,但其對長作業(yè)不利;不能保證緊迫性作業(yè)(進(jìn)程)被及時處理;作業(yè)長短只是被估算出來的.

時間片輪轉(zhuǎn)法

時間片輪轉(zhuǎn)(Round Robin,RR)法的基本思路是讓每個進(jìn)程在就緒隊列中的等待時間與享受服務(wù)的時間成比例。在時間片輪轉(zhuǎn)法中,需要將CPU的處理時間分成固定大小的時間片,例如,幾十毫秒至幾百毫秒。如果一個進(jìn)程在被調(diào)度選中之后用完了系統(tǒng)規(guī)定的時間片,但又未完成要求的任務(wù),則它自行釋放自己所占有的CPU而排到就緒隊列的末尾,等待下一次調(diào)度。同時,進(jìn)程調(diào)度程序又去調(diào)度當(dāng)前就緒隊列中的第一個進(jìn)程。
顯然,輪轉(zhuǎn)法只能用來調(diào)度分配一些可以搶占的資源。這些可以搶占的資源可以隨時被剝奪,而且可以將它們再分配給別的進(jìn)程。CPU是可搶占資源的一種。但打印機(jī)等資源是不可搶占的。由于作業(yè)調(diào)度是對除了CPU之外的所有系統(tǒng)硬件資源的分配,其中包含有不可搶占資源,所以作業(yè)調(diào)度不使用輪轉(zhuǎn)法。
在輪轉(zhuǎn)法中,時間片長度的選取非常重要。首先,時間片長度的選擇會直接影響到系統(tǒng)的開銷和響應(yīng)時間。如果時間片長度過短,則調(diào)度程序搶占處理機(jī)的次數(shù)增多。這將使進(jìn)程上下文切換次數(shù)也大大增加,從而加重系統(tǒng)開銷。反過來,如果時間片長度選擇過長,例如,一個時間片能保證就緒隊列中所需執(zhí)行時間最長的進(jìn)程能執(zhí)行完畢,則輪轉(zhuǎn)法變成了先來先服務(wù)法。時間片長度的選擇是根據(jù)系統(tǒng)對響應(yīng)時間的要求和就緒隊列中所允許最大的進(jìn)程數(shù)來確定的。
在輪轉(zhuǎn)法中,加入到就緒隊列的進(jìn)程有3種情況:
一種是分給它的時間片用完,但進(jìn)程還未完成,回到就緒隊列的末尾等待下次調(diào)度去繼續(xù)執(zhí)行。
另一種情況是分給該進(jìn)程的時間片并未用完,只是因為請求I/O或由于進(jìn)程的互斥與同步關(guān)系而被阻塞。當(dāng)阻塞解除之后再回到就緒隊列。
第三種情況就是新創(chuàng)建進(jìn)程進(jìn)入就緒隊列。
如果對這些進(jìn)程區(qū)別對待,給予不同的優(yōu)先級和時間片從直觀上看,可以進(jìn)一步改善系統(tǒng)服務(wù)質(zhì)量和效率。例如,我們可把就緒隊列按照進(jìn)程到達(dá)就緒隊列的類型和進(jìn)程被阻塞時的阻塞原因分成不同的就緒隊列,每個隊列按FCFS原則排列,各隊列之間的進(jìn)程享有不同的優(yōu)先級,但同一隊列內(nèi)優(yōu)先級相同。這樣,當(dāng)一個進(jìn)程在執(zhí)行完它的時間片之后,或從睡眠中被喚醒以及被創(chuàng)建之后,將進(jìn)入不同的就緒隊列。

多級反饋隊列

前面介紹的各種用作進(jìn)程調(diào)度的算法都有一定的局限性,如短進(jìn)程優(yōu)先的調(diào)度算法,僅照顧了短進(jìn)程而忽略了長進(jìn)程,而且如果并未指明進(jìn)程的長度,則短進(jìn)程優(yōu)先和基于進(jìn)程長度的搶占式調(diào)度算法都將無法使用.

而多級反饋隊列調(diào)度算法則不必事先知道各種進(jìn)程所需的執(zhí)行時間,而且還可以滿足各種類型進(jìn)程的需要,因而它是目前被公認(rèn)的一種較好的進(jìn)程調(diào)度算法,在采用多級反饋隊列調(diào)度算法的系統(tǒng)中,掉度算法的實施過程如下所述.

(1)應(yīng)設(shè)置多個就緒隊列,并為各個隊列賦予不同的優(yōu)先級,第一個隊列的優(yōu)先級最高,第二個隊列次之,其余各隊列的優(yōu)先權(quán)逐個降低.該算法賦予各個隊列中進(jìn)程執(zhí)行時間片的大小也各不相同,在優(yōu)先權(quán)愈高的隊列中,為每個進(jìn)程所規(guī)定的執(zhí)行時間片愈小.例如,第二個隊列的時間片要比第一個隊列的時間片長一杯,......,第i+1個隊列的時間片要比第i個列隊的時間片長一倍.

(2) 當(dāng)一個新進(jìn)程進(jìn)入內(nèi)存后,首先將它放入第一隊列的末尾,按FCFS原則排隊等待調(diào)度。當(dāng)輪到該進(jìn)程執(zhí)行時,如它能在該時間片內(nèi)完成,便可準(zhǔn)備撤離系統(tǒng);如果它在一個時間片結(jié)束時尚未完成,調(diào)度程序便將該進(jìn)程轉(zhuǎn)入第二隊列的末尾,再同樣地按FCFS原則等待調(diào)度執(zhí)行;如果它在第二隊列中運(yùn)行一個時間片后仍未完成,再依次將它放入第三隊列,……,如此下去,當(dāng)一個長作業(yè)(進(jìn)程)從第一隊列依次降到第n隊列后,在第n 隊列便采取按時間片輪轉(zhuǎn)的方式運(yùn)行。

(3) 僅當(dāng)?shù)谝魂犃锌臻e時,調(diào)度程序才調(diào)度第二隊列中的進(jìn)程運(yùn)行;僅當(dāng)?shù)?~(i-1)隊列均空時,才會調(diào)度第i隊列中的進(jìn)程運(yùn)行。如果處理機(jī)正在第i隊列中為某進(jìn)程服務(wù)時,又有新進(jìn)程進(jìn)入優(yōu)先權(quán)較高的隊列(第1~(i-1)中的任何一個隊列),則此時新進(jìn)程將搶占正在運(yùn)行進(jìn)程的處理機(jī),即由調(diào)度程序把正在運(yùn)行的進(jìn)程放回到第i隊列的末尾,把處理機(jī)分配給新到的高優(yōu)先權(quán)進(jìn)程

進(jìn)程的并行與并發(fā)

并行:并行是指兩者同時執(zhí)行,比如有兩條車道,在某一個時間點,兩條車道上都有車在跑;(資源夠用,比如三個線程,四核的cpu)

并發(fā):并發(fā)是指資源有限的情況下,兩者交替輪流使用資源,比如只有一條車道(單核cpu資源),那么就是A車先走,在某個時刻A車退出把道路讓給B走,B走完了繼續(xù)給A,交替使用,目的是提高效率.

區(qū)別:

并行是從微觀上,也就是一個精確的時間片刻,有不同的程序在執(zhí)行,這就要求必須有多個處理器.

并發(fā)是從宏觀上,在一個時間段上可以看出是同時執(zhí)行的,比如一個服務(wù)器同時處理多個session.

注意:早期單核cpu時候,對于進(jìn)程也是微觀上串行,(站在cpu角度看),宏觀上并行(站在人的角度看就是同時有很多程序在執(zhí)行).

同步異步阻塞非阻塞

狀態(tài)介紹

  在了解其他概念之前,我們首先了解進(jìn)程的幾個狀態(tài),在程序運(yùn)行的過程中,由于被操作系統(tǒng)的調(diào)度算法控制,程序會進(jìn)入幾個狀態(tài):就緒,運(yùn)行,阻塞.

(1)就緒(Ready)狀態(tài)

  在進(jìn)程已經(jīng)分配到除cpu以外的所有必要的資源,只要獲得處理機(jī)便立即執(zhí)行,這時的進(jìn)程狀態(tài)稱為就緒狀態(tài).

(2)執(zhí)行/運(yùn)行(Running)狀態(tài)當(dāng)進(jìn)程已獲得處理機(jī),其程序正在處理機(jī)上執(zhí)行,此時的進(jìn)程狀態(tài)稱為執(zhí)行狀態(tài).

(3)阻塞(Blocked)狀態(tài)正在執(zhí)行的過程,由于等待某個事件發(fā)生而無法執(zhí)行時,便放棄處理機(jī)而處于阻塞狀態(tài).引起進(jìn)程的時間可能有多種,例如,等待I/O完成,(input),申請緩沖區(qū)不能滿足,等待信件(信號)等.

同步和異步

  所謂同步就是一個任務(wù)的完成需要依賴另外一個任務(wù)時,只有等待被依賴的任務(wù)完成后,依賴的任務(wù)才能算完成,這是一種可靠的任務(wù)序列。要么成功都成功,失敗都失敗,兩個任務(wù)的狀態(tài)可以保持一致。

  所謂異步是不需要等待被依賴的任務(wù)完成,只是通知被依賴的任務(wù)要完成什么工作,依賴的任務(wù)也立即執(zhí)行,只要自己完成了整個任務(wù)就算完成了。至于被依賴的任務(wù)最終是否真正完成,依賴它的任務(wù)無法確定,所以它是不可靠的任務(wù)序列。

例如:

比如我去銀行辦理業(yè)務(wù),可能會有兩種方式:

第一種:選擇排隊等候;

第二種:選擇取一個小紙條上面有我的號碼,等到排到我這一號時由柜臺的人通知我輪到我去辦理業(yè)務(wù)了;

第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業(yè)務(wù)情況;

第二種:后者(等待別人通知)就是異步等待消息通知.在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業(yè)務(wù)的人)往往注冊一個回調(diào)機(jī)制,在所等待的時間被觸發(fā)時由觸發(fā)機(jī)制(在這里是柜臺的人)通過某種機(jī)制(在這里是寫在小紙條上的號碼,喊號)找到等待該事件的人

阻塞與非阻塞

  阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態(tài)有關(guān).也就是說阻塞與非阻塞主要程序(線程)等待消息通知時的狀態(tài)角度來說的

例子:

繼續(xù)上面的那個例子,不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待消息通知之外不能做其它的事情,那么該機(jī)制就是阻塞的,表現(xiàn)在程序中,也就是該程序一直阻塞在該函數(shù)調(diào)用處不能繼續(xù)往下執(zhí)行。

相反,有的人喜歡在銀行辦理這些業(yè)務(wù)的時候一邊打打電話發(fā)發(fā)短信一邊等待,這樣的狀態(tài)就是非阻塞的,因為他(等待者)沒有阻塞在這個消息通知上,而是一邊做自己的事情一邊等待。

注意:同步非阻塞形式實際上是效率低下的,想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有。如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,因為打電話是你(等待者)的事情,而通知你則是柜臺(消息觸發(fā)機(jī)制)的事情,程序沒有在兩種不同的操作中來回切換。

同步/異步與阻塞/非阻塞

  1.同步阻塞形式

  效率最低,拿上面的例子來說,就是你專心排隊,什么別的事都不做.

  2.異步阻塞形式

如果在銀行等待辦理業(yè)務(wù)的人采用的是異步的方式去等待消息被觸發(fā)(通知),也就是領(lǐng)了一張小紙條,假如在這段時間里他不能離開銀行做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;

  異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

  • 同步非阻塞形式
  •   實際上是效率低下的。

      想象一下你一邊打著電話一邊還需要抬頭看到底隊伍排到你了沒有(兩個操作不能同時執(zhí)行,因為是同步),如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的。

  • 異步非阻塞形式
  •   效率更高,

      因為打電話是你(等待者)的事情,而通知你則是柜臺(消息觸發(fā)機(jī)制)的事情,程序沒有在兩種不同的操作中來回切換。

      比如說,這個人突然發(fā)覺自己煙癮犯了,需要出去抽根煙,于是他告訴大堂經(jīng)理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了。

      

    很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現(xiàn)出來,同樣的,很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的IO操作處被阻塞。

    進(jìn)程的創(chuàng)建與結(jié)束

    進(jìn)程的創(chuàng)建

      但凡是硬件,都需要有操作系統(tǒng)去管理,只要有操作系統(tǒng),就有進(jìn)程的概念,就需要有創(chuàng)建進(jìn)程的方式,一些操作系統(tǒng)只為一個應(yīng)用程序設(shè)計,比如微波爐中的控制器,一旦啟動微波爐,所有的進(jìn)程都已經(jīng)存在。

      而對于通用系統(tǒng)(跑很多應(yīng)用程序),需要有系統(tǒng)運(yùn)行過程中創(chuàng)建或撤銷進(jìn)程的能力,主要分為4中形式創(chuàng)建新的進(jìn)程:

      1. 系統(tǒng)初始化(查看進(jìn)程linux中用ps命令,windows中用任務(wù)管理器,前臺進(jìn)程負(fù)責(zé)與用戶交互,后臺運(yùn)行的進(jìn)程與用戶無關(guān),運(yùn)行在后臺并且只在需要時才喚醒的進(jìn)程,稱為守護(hù)進(jìn)程,如電子郵件、web頁面、新聞、打印)

      2. 一個進(jìn)程在運(yùn)行過程中開啟了子進(jìn)程(如nginx開啟多進(jìn)程,os.fork,subprocess.Popen等)

      3. 用戶的交互式請求,而創(chuàng)建一個新進(jìn)程(如用戶雙擊暴風(fēng)影音)

      4. 一個批處理作業(yè)的初始化(只在大型機(jī)的批處理系統(tǒng)中應(yīng)用)

      無論哪一種,新進(jìn)程的創(chuàng)建都是由一個已經(jīng)存在的進(jìn)程執(zhí)行了一個用于創(chuàng)建進(jìn)程的系統(tǒng)調(diào)用而創(chuàng)建的。

    1. 在UNIX中該系統(tǒng)調(diào)用是:fork,fork會創(chuàng)建一個與父進(jìn)程一模一樣的副本,二者有相同的存儲映像、同樣的環(huán)境字符串和同樣的打開文件(在shell解釋器進(jìn)程中,執(zhí)行一個命令就會創(chuàng)建一個子進(jìn)程)2. 在windows中該系統(tǒng)調(diào)用是:CreateProcess,CreateProcess既處理進(jìn)程的創(chuàng)建,也負(fù)責(zé)把正確的程序裝入新進(jìn)程。關(guān)于創(chuàng)建子進(jìn)程,UNIX和windows1.相同的是:進(jìn)程創(chuàng)建后,父進(jìn)程和子進(jìn)程有各自不同的地址空間(多道技術(shù)要求物理層面實現(xiàn)進(jìn)程之間內(nèi)存的隔離),任何一個進(jìn)程的在其地址空間中的修改都不會影響到另外一個進(jìn)程。2.不同的是:在UNIX中,子進(jìn)程的初始地址空間是父進(jìn)程的一個副本,提示:子進(jìn)程和父進(jìn)程是可以有只讀的共享內(nèi)存區(qū)的。但是對于windows系統(tǒng)來說,從一開始父進(jìn)程與子進(jìn)程的地址空間就是不同的。

    進(jìn)程的結(jié)束

      1. 正常退出(自愿,如用戶點擊交互式頁面的叉號,或程序執(zhí)行完畢調(diào)用發(fā)起系統(tǒng)調(diào)用正常退出,在linux中用exit,在windows中用ExitProcess)

      2. 出錯退出(自愿,python a.py中a.py不存在)

      3. 嚴(yán)重錯誤(非自愿,執(zhí)行非法指令,如引用不存在的內(nèi)存,1/0等,可以捕捉異常,try...except...)

      4. 被其他進(jìn)程殺死(非自愿,如kill -9)

    在python程序中的進(jìn)程操作

    之前我們已經(jīng)了解了很多進(jìn)程相關(guān)的理論知識,了解進(jìn)程是什么應(yīng)該不再困難了,剛剛我們已經(jīng)了解了,運(yùn)行中的程序就是一個進(jìn)程。所有的進(jìn)程都是通過它的父進(jìn)程來創(chuàng)建的。因此,運(yùn)行起來的python程序也是一個進(jìn)程,那么我們也可以在程序中再創(chuàng)建進(jìn)程。多個進(jìn)程可以實現(xiàn)并發(fā)效果,也就是說,當(dāng)我們的程序中存在多個進(jìn)程的時候,在某些時候,就會讓程序的執(zhí)行速度變快。以我們之前所學(xué)的知識,并不能實現(xiàn)創(chuàng)建進(jìn)程這個功能,所以我們就需要借助python中強(qiáng)大的模塊。

    ?

    multiprocessing模塊

    ?仔細(xì)說來,multiprocessing不是一個模塊而是python中一個操作、管理進(jìn)程的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進(jìn)程有關(guān)的所有子模塊。由于提供的子模塊非常多,為了方便大家歸類記憶,我將這部分大致分為四個部分:創(chuàng)建進(jìn)程部分,進(jìn)程同步部分,進(jìn)程池部分,進(jìn)程之間數(shù)據(jù)共享。

    multiprocessing.process模塊

    process模塊介紹

    process模塊是一個創(chuàng)建進(jìn)程的模塊,借助這個模塊,就可以完成進(jìn)程的創(chuàng)建。

    Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進(jìn)程中的任務(wù)(尚未啟動)強(qiáng)調(diào): 1. 需要使用關(guān)鍵字的方式來指定參數(shù) 2. args指定的為傳給target函數(shù)的位置參數(shù),是一個元組形式,必須有逗號參數(shù)介紹: group參數(shù)未使用,值始終為None target表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù) args表示調(diào)用對象的位置參數(shù)元組,args=(1,2,'egon',) kwargs表示調(diào)用對象的字典,kwargs={'name':'egon','age':18} name為子進(jìn)程的名稱 p.start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run() p.run():進(jìn)程啟動時運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法 p.terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖 p.is_alive():如果p仍然運(yùn)行,返回True p.join([timeout]):主線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時時間,需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程 方法介紹 p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時,p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置 p.name:進(jìn)程的名稱 p.pid:進(jìn)程的pid p.exitcode:進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束(了解即可) p.authkey:進(jìn)程的身份驗證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)屬性介紹 在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進(jìn)程的機(jī)制),在創(chuàng)建子進(jìn)程的時候會自動
    import 啟動它的這個文件,而在 import 的時候又執(zhí)行了整個文件。因此如果將process()
    直接寫在文件中就會無限遞歸創(chuàng)建子進(jìn)程報錯。所以必須把創(chuàng)建子進(jìn)程的部分使用if __name__ ==‘__main__’
    判斷保護(hù)起來,import 的時候 ,就不會遞歸運(yùn)行了。

    使用process模塊創(chuàng)建進(jìn)程

    在一個python進(jìn)程中開啟子進(jìn)程,start方法和并發(fā)的效果.

    from multiprocessing import Process import timedef func():time.sleep(1)print('這里是son')if __name__ == '__main__':p = Process(target = func)p.start()time.sleep(2)print('這里是father') from multiprocessing import Process import timedef func(name):print('son 的名字是 %s '%name)time.sleep(5)print('這里是son')if __name__ == '__main__':p = Process(target = func,args=('Alex',))p.start()time.sleep(2)p.join()# 代碼執(zhí)行到這里,主進(jìn)程main會停止等待子進(jìn)程執(zhí)行完畢才繼續(xù)print('這里是father') from multiprocessing import Process import osdef func():print('我是子進(jìn)程,我的進(jìn)程id是%s,我爸爸的id是%s'%(os.getpid(),os.getppid()))if __name__ == '__main__':print('我是main爸爸,我的進(jìn)程id是%s' % os.getpid())for i in range(5):p = Process(target = func,args=())p.start() 查看子進(jìn)程和父進(jìn)程的id號

    進(jìn)階,多個進(jìn)程同時執(zhí)行(注意:子進(jìn)程的執(zhí)行順序并不受開啟子進(jìn)程的順序影響)

    from multiprocessing import Process import time def func(i):print('這里是第%s個子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':print('這里是main爸爸')for i in range(5):p = Process(target = func,args=(i,))p.start() 多個進(jìn)程同時執(zhí)行 from multiprocessing import Process import time def func(i):print('這里是第%s個子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':for i in range(5):p = Process(target = func,args=(i,))p.start()p.join()# main會停在這一句,等子進(jìn)程執(zhí)行完,再繼續(xù)走,也就是才再走下一次for循環(huán)print('這里是main爸爸') join在搞事情(1) from multiprocessing import Process import time def func(i):print('這里是第%s個子進(jìn)程'%(i))time.sleep(1)if __name__ == '__main__':p_l = []for i in range(5):p = Process(target = func,args=(i,))p.start()p_l.append(p)[i.join() for i in p_l]print('這里是main爸爸')

    ?上邊是直接開啟多進(jìn)程,接下來介紹一個高大上的方法 -- 繼承Process類的方式

    from multiprocessing import Process import os class MyProcess(Process):def __init__(self,name):super().__init__()self.name = namedef run(self):print('我是%s,我正在和蒼老師談人生,我的id是%s'%(self.name,os.getpid()))if __name__ == '__main__':p1 = MyProcess('WuSir')p2 = MyProcess('邱老板')p3 = MyProcess('金老板')p1.start()# 調(diào)用start方法,start方法內(nèi)自動調(diào)用run方法p2.start()# p2.run()p3.start()p1.join()p2.join()# 注意,如果調(diào)用run方法,就不能再調(diào)用join方法p3.join()print('我是main爸爸')繼承那些事(Process) 繼承那些事(Process)

    ?多進(jìn)程之間關(guān)于數(shù)據(jù)隔離的那些年那些事兒

    from multiprocessing import Processdef func():global nn = 0print('子進(jìn)程內(nèi) n = %s'%n)if __name__ == '__main__':n = 100p = Process(target=func)p.start()print('主進(jìn)程內(nèi) n = %s'%n)

    守護(hù)進(jìn)程? ? ?

    會隨著父進(jìn)程的結(jié)束而結(jié)束。

    父進(jìn)程創(chuàng)建守護(hù)進(jìn)程

      其一:守護(hù)進(jìn)程會在父進(jìn)程代碼執(zhí)行結(jié)束后就終止

      其二:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

    注意:進(jìn)程之間是互相獨(dú)立的,父進(jìn)程代碼運(yùn)行結(jié)束,守護(hù)進(jìn)程隨即終止

    from multiprocessing import Process import timedef func():print('子進(jìn)程開始執(zhí)行')time.sleep(2)print('子進(jìn)程結(jié)束執(zhí)行')if __name__ == '__main__':print('主進(jìn)程開始執(zhí)行')p = Process(target=func1,)p.daemon = True# 將p 設(shè)置為守護(hù)進(jìn)程,此代碼一定要在start之前設(shè)置。p.start()time.sleep(1)print('主進(jìn)程結(jié)束執(zhí)行') from multiprocessing import Process import timedef func2():print('子進(jìn)程2開始執(zhí)行')time.sleep(2)print('子進(jìn)程2結(jié)束執(zhí)行')def func1():print('子進(jìn)程開始執(zhí)行')time.sleep(2)print('子進(jìn)程結(jié)束執(zhí)行')if __name__ == '__main__':print('主進(jìn)程開始執(zhí)行')p1 = Process(target=func1,)p2 = Process(target=func2)p1.daemon = True# 將p1 設(shè)置為守護(hù)進(jìn)程,此代碼一定要在start之前設(shè)置。p1.start()p2.start()time.sleep(1)# 此時p1 p2 和main 都已經(jīng)開始執(zhí)行print('主進(jìn)程結(jié)束執(zhí)行')# 當(dāng)主進(jìn)程打印完這句話,代表主進(jìn)程結(jié)束,守護(hù)進(jìn)程p1肯定隨之結(jié)束# 但是p2 不是守護(hù)進(jìn)程,不會結(jié)束,所以此時程序(也就是主進(jìn)程)會等待p2結(jié)束之后才結(jié)束。

    socket tcp協(xié)議并發(fā)實現(xiàn)聊天??

    from multiprocessing import Process import socket from socket import SOL_SOCKET,SO_REUSEADDR SERVER_ADDR = ('127.0.0.1',8080)sk = socket.socket() sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sk.bind(SERVER_ADDR)sk.listen(5)def func(conn):while 1:try:msg_r = conn.recv(1024).decode('utf-8')print(msg_r)if not msg_r:breakconn.send(msg_r.upper().encode('utf-8'))except:breakif __name__ == '__main__':while 1:conn,addr = sk.accept()p = Process(target=func,args=(conn,))p.start()sk.close()socket-tcp協(xié)議并發(fā)實現(xiàn)_server import socketsk = socket.socket() sk.connect(('127.0.0.1',8080))while 1:msg_s = input('>>>')if not msg_s:continuesk.send(msg_s.encode('utf-8'))print(sk.recv(1024).decode('utf-8')) sk.close()

    ?多進(jìn)程中其他方法?

    from multiprocessing import Process import random import timeclass MyProcess(Process):def __init__(self,name):super(MyProcess, self).__init__()self.name = name# name是父類Process中的屬性,這里相當(dāng)于給子進(jìn)程命名def run(self):print('%s 正在撩小姐姐'%self.name)time.sleep(random.randint(1,3))print('%s 還在撩小姐姐'%self.name)if __name__ == '__main__':p = MyProcess('Alex')p.start()time.sleep(0.1)p.terminate()# 將p進(jìn)程殺死的命令。 將任務(wù)提交給操作系統(tǒng),操作系統(tǒng)什么時候執(zhí)行不受用戶決定print(p.is_alive())# 判斷p進(jìn)程是否還存在time.sleep(1)print(p.is_alive())# 判斷p進(jìn)程是否還存在進(jìn)程的terminate和is_alive方法 from multiprocessing import Process import random import timeclass MyProcess(Process):def __init__(self,name):super(MyProcess, self).__init__()self.name = name# name是父類Process中的屬性,這里相當(dāng)于給子進(jìn)程命名def run(self):print('%s 正在撩小姐姐'%self.name)time.sleep(random.randint(1,3))print('%s 還在撩小姐姐'%self.name)if __name__ == '__main__':p = MyProcess('Alex')p.start()print(p.name,p.pid)# 打印進(jìn)程名字,進(jìn)程id號進(jìn)程的name和pid屬性

    進(jìn)程同步(multiprocessing.Lock、multiprocessing.Semaphore、multiprocessing.Event)

    ? ? ? ?在計算機(jī)中,有一些硬件和軟件,例如處理器、打印機(jī)等,都屬于競爭類資源,當(dāng)有需求時,很多進(jìn)程都要爭搶這些資源,而對于這類資源,就屬于臨界資源。當(dāng)多進(jìn)程共同處理某一個數(shù)據(jù)時,這個數(shù)據(jù)也就屬于一個臨界資源。操作系統(tǒng)對計算機(jī)內(nèi)各種資源都使其在競爭中有序化,但是對于數(shù)據(jù)來說,尤其是用戶動態(tài)產(chǎn)生的數(shù)據(jù),當(dāng)處理時就變成了臨界資源,所以我們作為程序猿來說,需要對臨界資源加以保護(hù),否則就會出現(xiàn)數(shù)據(jù)混亂現(xiàn)象。這是在提高程序效率的優(yōu)勢下,帶來的一個隱患。小伙伴們,加油吧!

    鎖 —— multiprocessing.Lock??

    通過剛剛的學(xué)習(xí),我們千方百計實現(xiàn)了程序的異步,讓多個任務(wù)可以同時在幾個進(jìn)程中并發(fā)處理,他們之間的運(yùn)行沒有順序(或者說由操作系統(tǒng)調(diào)度決定他們的順序),一旦開啟也不受我們控制。盡管并發(fā)編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。

      當(dāng)多個進(jìn)程使用同一份數(shù)據(jù)資源的時候,就會引發(fā)數(shù)據(jù)安全或順序混亂問題。

    from multiprocessing import Process import random import timedef func(addr):print('我是%s'%addr)time.sleep(random.random())print('謝謝!')if __name__ == '__main__':l = ['四川的','湖南的','河南的','江蘇的']for addr in l:p = Process(target=func,args=(addr,))p.start()time.sleep(2)print('\n\n我選%s'%random.choice(l)) # 關(guān)于搶占輸出資源的事情,是指多進(jìn)程并發(fā)執(zhí)行時,并不是一個進(jìn)程執(zhí)行完任務(wù)后其他進(jìn)程再執(zhí)行。 # 比如 此程序會輸出:我是四川的 我是河南的 我是江蘇的 謝謝!謝謝!我是湖南的 謝謝! 謝謝! # 而不是 : 我是四川的 謝謝! 我是河南的 謝謝! ...多進(jìn)程關(guān)于搶占輸出資源的事情 from multiprocessing import Process import random import time from multiprocessing import Lock def func(addr,lock):lock.acquire()print('我是%s'%addr)time.sleep(random.random())print('謝謝!')lock.release()if __name__ == '__main__':lock = Lock()l = ['四川的','湖南的','河南的','江蘇的']for addr in l:p = Process(target=func,args=(addr,lock))p.start()time.sleep(4)print('\n\n我選%s'%random.choice(l))

    ?  上面這種情況,使用了加鎖的形式確保了程序的順序執(zhí)行,但是執(zhí)行又變成了串行,降低了效率,但是不得不說,它確保了數(shù)據(jù)的安全性。

    ? ? ? 下面舉例來說鎖的重要性:模擬12306搶票問題。模擬銀行賬戶的存取款問題

    # 注意,文件中存儲需要以{'c':1}這種形式,c的引號一定要帶 # 否則json識別不出來 # 此代碼的效果,并發(fā)執(zhí)行,但是多進(jìn)程同時讀寫同一個文件數(shù)據(jù),造成數(shù)據(jù)混亂from multiprocessing import Process,Lock import json import timedef check(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)print('第%s個人在查票,余票為%s' % (i, dic['c']))pay(i,l)def pay(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)time.sleep(0.5)# 模擬網(wǎng)絡(luò)延遲,當(dāng)購買過程中也會有網(wǎng)絡(luò)延遲if dic['c']:print('第%s個人買到票了 '%i)dic['c'] -= 1else:print('第%s個人沒買到票'%i)with open('a.txt','w') as f:json.dump(dic,f)if __name__ == '__main__':l = Lock()for i in range(10):p = Process(target=check,args=(i+1,l))p.start()多個人同時搶票

    很明顯,上述例子中,因為多進(jìn)程同時對一個臨界資源(a.txt文件)進(jìn)行了讀寫操作,使文件內(nèi)數(shù)據(jù)混亂,也造成了余票為1張,但是很多人都搶到票的假象。那就加鎖來解決它吧

    from multiprocessing import Process,Lock import json import timedef check(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)print('第%s個人在查票,余票為%s' % (i, dic['c']))l.acquire()pay(i,l)# 為什么在這里加鎖? 因為每個人都可以查票,讀取數(shù)據(jù),不會造成數(shù)據(jù)混亂,但是當(dāng)買票的時候,就需要對臨界資源的寫入,所以對寫操作加鎖,使某一個進(jìn)程在寫文件時候,其他進(jìn)程不能碰此文件。l.release()def pay(i,l):with open('a.txt','r',encoding='utf-8') as f:dic = json.load(f)time.sleep(0.5)# 模擬網(wǎng)絡(luò)延遲,當(dāng)購買過程中也會有網(wǎng)絡(luò)延遲if dic['c']:print('第%s個人買到票了 '%i)dic['c'] -= 1else:print('第%s個人沒買到票'%i)with open('a.txt','w') as f:json.dump(dic,f)if __name__ == '__main__':l = Lock()for i in range(10):p = Process(target=check,args=(i+1,l))p.start()加鎖解決買票問題

    ?關(guān)于銀行存取款的問題。同一個賬戶,某個人一直存,某個人在同一時間一直取,如果不對數(shù)據(jù)進(jìn)行保護(hù)起來,就會造成的一種數(shù)據(jù)混亂問題。

    from multiprocessing import Process, Lock,Valuedef save_money(num):for i in range(100):time.sleep(0.05)num.value += 1def draw_money(num):for i in range(100):time.sleep(0.05)num.value -= 1if __name__ == '__main__':num = Value('i',1000)# 多進(jìn)程中共享數(shù)據(jù),一個int類型的數(shù)據(jù),1000man = Process(target=save_money,args=(num,))woman = Process(target=draw_money,args=(num,))man.start()woman.start()time.sleep(6)print(num.value)錢多錢少怪誰? from multiprocessing import Process, Lock,Valuedef save_money(num,l):for i in range(100):time.sleep(0.05)l.acquire()num.value += 1l.release()def draw_money(num,l):for i in range(100):time.sleep(0.05)l.acquire()# 在操作存取款的數(shù)據(jù)時,先將數(shù)據(jù)鎖住,不允許其他人更改此數(shù)據(jù)num.value -= 1l.release()if __name__ == '__main__':l = Lock()num = Value('i',1000)# 多進(jìn)程中共享數(shù)據(jù),一個int類型的數(shù)據(jù),1000man = Process(target=save_money,args=(num,l))woman = Process(target=draw_money,args=(num,l))man.start()woman.start()time.sleep(6)print(num.value)這樣才對!!!

    信號量 —— multiprocessing.Semaphore(了解)

    # 上述講的Lock,屬于互斥鎖,也就是一把鑰匙配備一把鎖,同時只允許鎖住某一個數(shù)據(jù)。而信號量則是多把鑰匙配備多把鎖,也就是說同時允許鎖住多個數(shù)據(jù)。比如在一個粉紅發(fā)廊,里邊有5位服務(wù)人員,那么這個發(fā)廊最多就同時允許進(jìn)入5位客人,當(dāng)又有第6位客人來的時候,就需要在門外等待;當(dāng)服務(wù)人員服務(wù)完某位客人后,才允許后續(xù)的人再進(jìn)來一個,換句話說,這個發(fā)廊最多同時接待5位客人,多的客人必須等待。信號量同步基于內(nèi)部計數(shù)器,用戶初始化一個計數(shù)器初值(比如上述例子中就初始化為5),每調(diào)用一次acquire(),計數(shù)器減1;每調(diào)用一次release(),計數(shù)器加1。當(dāng)計數(shù)器為0時,acquire()調(diào)用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現(xiàn)。信號量同步機(jī)制適用于訪問像服務(wù)器這樣的有限資源。信號量與進(jìn)程池的概念很像,但是要區(qū)分開,信號量涉及到加鎖的概念 信號量 Semaphore from multiprocessing import Semaphore from multiprocessing import Process import time import randomdef sing(i,se):se.acquire()# 每次進(jìn)來一位客人,信號量內(nèi)部計數(shù)器減1print('%s進(jìn)入小黑屋'%i)time.sleep(random.randint(1,3))print('%s交錢走人'%i)se.release()# 每次離開一位客人,信號量內(nèi)部計數(shù)器加1if __name__ == '__main__':se = Semaphore(5)# 初始化5把鑰匙配備5把鎖for i in range(10): # 模擬10個人要進(jìn)入小黑屋子p = Process(target=sing,args=(i,se))p.start()信號量機(jī)制舉個栗子

    事件 —— multiprocessing.Event(了解)?

    python中的事件機(jī)制,主要用于主進(jìn)程控制其他進(jìn)程的執(zhí)行,事件主要提供了三個方法 set、wait、clear。事件處理的機(jī)制:全局定義了一個“Flag”(event.is_set()),如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。clear:將“Flag”設(shè)置為False set:將“Flag”設(shè)置為True is_set:返回全局‘Flag’的bool值事件 Event def traffic_lights(e):while 1:if e.is_set():# 如果True,代表e.wait() 不阻塞time.sleep(5)# 這段時間代表綠燈亮的時間,可以過車print('\033[31m紅燈亮!\033[0m')# 該紅燈亮了e.clear()# 將e.is_set()改成False,else:# 執(zhí)行car函數(shù)的進(jìn)程們,讀e.wait,因為上邊已經(jīng)是False,所以e.wait將會阻塞住time.sleep(5)# 紅燈亮的期間print('\033[32m綠燈亮!\033[0m')# 該綠燈亮了e.set()# 將e.is_set()設(shè)置成True,此時e.wait()將不再阻塞,車可以過def car(i,e):e.wait()# 讀取是否阻塞,如果阻塞,代表在print('第%s輛車過'%i)if __name__ == '__main__':e = Event()triff_light = Process(target=traffic_lights,args=(e,))triff_light.start()for i in range(50):cars = Process(target=car,args=(i,e))cars.start()時間機(jī)制舉個栗子

    進(jìn)程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

    ?進(jìn)程間通信--IPC(Inter-Process Communication)

    IPC的方法:此處介紹隊列和管道

    隊列 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

    概念:創(chuàng)建共享的進(jìn)程隊列,Queue是多進(jìn)程安全的隊列,可以使用Queue實現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。

    Queue([maxsize]) 創(chuàng)建共享的進(jìn)程隊列。 參數(shù) :maxsize是隊列中允許存在的最大元素個數(shù)。如果省略此參數(shù),則無大小限制。 底層隊列使用管道和鎖定實現(xiàn)。 Queue([maxsize]) q = Queue([maxsize])q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在指定的時間間隔內(nèi)沒有項目可用,將引發(fā)Queue.Empty異常。q.get_nowait( ) 同q.get(False)方法。q.put(item [, block [,timeout ] ] ) 將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時后將引發(fā)Queue.Full異常。q.qsize() 返回隊列中目前項目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因為在返回結(jié)果和在稍后程序中使用結(jié)果之間,隊列中可能添加或刪除了項目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。q.empty() 如果調(diào)用此方法時 q為空,返回True。如果其他進(jìn)程或線程正在往隊列中添加項目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊列中可能已經(jīng)加入新的項目。q.full() 如果q已滿,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()方法)。。方法介紹Queue中的方法介紹 q.close() 關(guān)閉隊列,防止隊列中加入更多數(shù)據(jù)。調(diào)用此方法時,后臺線程將繼續(xù)寫入那些已入隊列但尚未寫入的數(shù)據(jù),但將在此方法完成時馬上關(guān)閉。如果q被垃圾收集,將自動調(diào)用此方法。關(guān)閉隊列不會在隊列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關(guān)閉生產(chǎn)者中的隊列不會導(dǎo)致get()方法返回錯誤。q.cancel_join_thread() 不會再進(jìn)程退出時自動連接后臺線程。這可以防止join_thread()方法阻塞。q.join_thread() 連接隊列的后臺線程。此方法用于在調(diào)用q.close()方法后,等待所有隊列項被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread()方法可以禁止這種行為。其他方法(了解)需要你了解的幾個方法

    ?代碼示例??

    from multiprocessing import Queueq = Queue(4) q.put(1) q.put(2) q.put(3) q.put(4) # q.put(5) #如果是put,因為已經(jīng)放滿數(shù)據(jù),所以程序會阻塞在put,等待取出數(shù)據(jù) # q.put_nowait(5) # 如果是put_nowait() ,不會阻塞,直接放入隊列數(shù)據(jù),隊列滿則報異常 try:q.put_nowait(5)# 在此處用try直接處理異常。此時數(shù)據(jù)不會放入到隊列,會被直接丟棄 except:print('隊列已滿')print(q.get()) print(q.get()) print(q.get()) print(q.get()) # print(q.get())# 此處和上邊一樣,因為隊列中已空,所以程序會阻塞在get,等待放入數(shù)據(jù) # q.get_nowait() # 不會阻塞,直接從隊列中獲取數(shù)據(jù),獲取不到則報錯 try:q.get_nowait()# 在此處用try處理錯誤,此時獲取不到數(shù)據(jù),直接跳過 except:print('隊列已空')了解隊列的用法 了解隊列的用法

    ?上面這個例子還沒有加入進(jìn)程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現(xiàn)象。

    from multiprocessing import Queue,Process import time def func(q):# time.sleep(1)q.put('我是四川的')if __name__ == '__main__':q = Queue(5)p = Process(target=func,args=(q,))p.start()#print(q.get_nowait())# 此處,可能會報錯,因為子進(jìn)程和父進(jìn)程同時運(yùn)行,不一定隊列中有數(shù)據(jù)print(q.get())# 此處一定不會報錯,因為get是阻塞獲取數(shù)據(jù),如果隊列沒有就等著多進(jìn)程中使用隊列 多進(jìn)程中使用隊列

    上面是一個queue的簡單應(yīng)用,使用隊列q對象調(diào)用get函數(shù)來取得隊列中最先進(jìn)入的數(shù)據(jù)。 接下來看一個稍微復(fù)雜一些的例子:

    from multiprocessing import Process, Queue,freeze_support import random import osdef put_func(q):info = str(os.getpid()) + '\t:\t' + str(random.randint(0, 100))q.put(info)def get_func(q):print('%s 獲取到數(shù)據(jù) :\033[33m; %s \033[0m' % (os.getpid(), q.get()))if __name__ == '__main__':# freeze_support() 如果有windows系統(tǒng)開啟多進(jìn)程導(dǎo)致程序崩潰,可嘗試調(diào)用此函數(shù)q = Queue(5)l_put = []l_get = []for i in range(10):p_put = Process(target=put_func, args=(q,))p_put.start()l_put.append(p_put)for i in range(10):p_get = Process(target=get_func, args=(q,))p_get.start()l_put.append(p_get)# [i.join() for i in l_put]# [i.join() for i in l_get]批量數(shù)據(jù)放入隊列并批量獲取 批量數(shù)據(jù)放入隊列并批量獲取

    生產(chǎn)者消費(fèi)者模型 ? ??

    在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)進(jìn)程和消費(fèi)進(jìn)程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

    舉個應(yīng)用栗子:全棧開發(fā)時候,前端接收客戶請求,后端處理請求邏輯。當(dāng)某時刻客戶請求過于多的時候,后端處理不過來,此時完全可以借助隊列來輔助,將客戶請求放入隊列中,后端邏輯代碼處理完一批客戶請求后馬上從隊列中繼續(xù)獲取,這樣平衡兩端的效率。

    為什么要使用生產(chǎn)者和消費(fèi)者模式

    進(jìn)程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進(jìn)程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的進(jìn)程。在多進(jìn)程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費(fèi)者模式。

    什么是生產(chǎn)者消費(fèi)者模式

    生產(chǎn)者消費(fèi)者模式是通過一個容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

    基于隊列實現(xiàn)生產(chǎn)者消費(fèi)者模型
    from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()print('\033[32m廠長拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()基于隊列的生產(chǎn)者消費(fèi)者模型

    ?上述代碼是基于隊列實現(xiàn)的生產(chǎn)者消費(fèi)者模型,生產(chǎn)者一直在生產(chǎn)娃娃,消費(fèi)者一直在從隊列中獲取娃娃,但是消費(fèi)者因為不知道生產(chǎn)者要生產(chǎn)多少娃娃,也不知道生產(chǎn)者何時就不生產(chǎn)了,所以消費(fèi)者需要一個死循環(huán)一直嘗試去從隊列中獲取娃娃,那么此時問題就出現(xiàn)了,3個進(jìn)程,主進(jìn)程開啟了兩個子進(jìn)程分別為生產(chǎn)者和消費(fèi)者,當(dāng)生產(chǎn)者生產(chǎn)完數(shù)據(jù)后,生產(chǎn)者結(jié)束,消費(fèi)者一直在嘗試接收數(shù)據(jù),那么問題就出在了消費(fèi)者的get方法這里,當(dāng)get不到數(shù)據(jù)時候就一直阻塞,那么主進(jìn)程就一直等待,此時程序就不會結(jié)束了。

    解決方法也很簡單,可以嘗試讓生產(chǎn)者在生產(chǎn)完數(shù)據(jù)后,再往隊列中放一個結(jié)束生產(chǎn)的信號,當(dāng)消費(fèi)者接受到信號后,自動的break出死循環(huán)即可。

    from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識時,消費(fèi)者自動break出死循環(huán)print('\033[32m廠長拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))q.put(None)# 放入一個結(jié)束生產(chǎn)的標(biāo)識if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()修正版-消費(fèi)者生產(chǎn)者模型

    ?注意:上述代碼中,生產(chǎn)者放入的停止生產(chǎn)的標(biāo)識,放入標(biāo)識這件事其實交給主進(jìn)程來做也可以,但是此時就需要主進(jìn)程獲取到生產(chǎn)者什么時候結(jié)束生產(chǎn)。

    from multiprocessing import Queue,Process import time import randomdef get_func(q):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識時,消費(fèi)者自動break出死循環(huán)print('\033[32m廠長拿走了%s \033[0m'%info)def put_func(q):for i in range(20):info = '娃娃%s號'%iq.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)p_get = Process(target=get_func, args=(q,))p_put = Process(target=put_func, args=(q,))p_get.start()p_put.start()p_put.join()# 讓主進(jìn)程可以獲取到生產(chǎn)者結(jié)束生產(chǎn)q.put(None)主進(jìn)程發(fā)送結(jié)束生產(chǎn)標(biāo)識

    ?當(dāng)小伙子們嘗試上述這個代碼時,發(fā)現(xiàn)所有問題都阻擋不了你成功的腳步了,但是!!你有沒有嘗試過多個生產(chǎn)者多個消費(fèi)者的模型?what fuck? 現(xiàn)在的解決方案是不是就很 low bee了!因為你會發(fā)現(xiàn):(標(biāo)題)

    from multiprocessing import Queue,Process import time import randomdef get_func(q,consumer):while 1:time.sleep(random.randint(1, 3))info = q.get()if info == None:break# 當(dāng)獲取到結(jié)束生產(chǎn)的標(biāo)識時,消費(fèi)者自動break出死循環(huán)print('\033[32m%s 拿走了%s \033[0m'%(consumer,info))def put_func(q,product):for i in range(5):info = '%s %s號'%(product,i)q.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))if __name__ == '__main__':q = Queue(5)con1 = Process(target=get_func, args=(q,'Alex'))con2 = Process(target=get_func, args=(q,'WuSir'))pro1 = Process(target=put_func, args=(q,'蒼老師版'))pro2 = Process(target=put_func, args=(q,'小澤瑪雅麗版'))pro3 = Process(target=put_func, args=(q,'島國米飯保你愛版'))pro1.start()pro2.start()pro3.start()con1.start()con2.start()pro1.join()# 讓主進(jìn)程可以獲取到生產(chǎn)者結(jié)束生產(chǎn)pro2.join()# 主進(jìn)程必須等待所有生產(chǎn)者生產(chǎn)完畢后才可以放結(jié)束生產(chǎn)的信號pro3.join()q.put(None)# 有幾個消費(fèi)者就應(yīng)該給幾個信號q.put(None)多個消費(fèi)者:有幾個消費(fèi)者就應(yīng)該放幾個結(jié)束生產(chǎn)標(biāo)識

    JoinableQueue([maxsize])?

    ?創(chuàng)建可連接的共享隊列進(jìn)程。它就好像一個Queue對象,但是它自帶光環(huán),允許消費(fèi)者通知生產(chǎn)者是不是已經(jīng)消費(fèi)完所有的數(shù)據(jù)了。通知進(jìn)程是使用共享的信號和條件變量來實現(xiàn)的。?

    老樣子,先來接收一下JoinableQueue給咱們提供的方法:

    JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:q.task_done() 消費(fèi)者使用此方法發(fā)出信號,表示隊列中放入的數(shù)據(jù)已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中獲取的數(shù)據(jù)數(shù)量,將引發(fā)ValueError異常。q.join() 生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊列中所有項目均被處理。阻塞將持續(xù)到消費(fèi)者為隊列中的每個數(shù)據(jù)均調(diào)用q.task_done()方法為止。 方法介紹

    下面舉個栗子,建立一個永遠(yuǎn)運(yùn)行的進(jìn)程,來無限制的消費(fèi)生產(chǎn)者的數(shù)據(jù),生產(chǎn)者只需要將數(shù)據(jù)放入隊列并等待被處理即可

    from multiprocessing import JoinableQueue,Process import time import randomdef get_func(q,consumer):while 1:time.sleep(random.randint(1, 3))info = q.get()print('\033[32m%s 拿走了%s \033[0m'%(consumer,info))q.task_done()# 消費(fèi)者每消費(fèi)一個數(shù)據(jù),就要返回一次標(biāo)識給q.join,證明數(shù)據(jù)已經(jīng)被消費(fèi)def put_func(q,product):for i in range(5):info = '%s %s號'%(product,i)q.put(info)print('\033[31m生產(chǎn)了%s \033[0m' % info)time.sleep(random.randint(1, 3))q.join()# 生產(chǎn)完畢,使用此方法進(jìn)行阻塞,直到隊列中所有數(shù)據(jù)均被處理。if __name__ == '__main__':q = JoinableQueue(5)con1 = Process(target=get_func, args=(q,'Alex'))con2 = Process(target=get_func, args=(q,'WuSir'))pro1 = Process(target=put_func, args=(q,'蒼老師版'))pro2 = Process(target=put_func, args=(q,'小澤瑪雅麗版'))pro3 = Process(target=put_func, args=(q,'島國米飯保你愛版'))l_p = [pro1,pro2,pro3,con1,con2]for i in l_p:i.start()pro1.join()pro2.join()pro3.join() # 此代碼的大體邏輯:主進(jìn)程開啟3個生產(chǎn)者和2個消費(fèi)者,合計6個進(jìn)程在并發(fā)執(zhí)行。 # 主進(jìn)程等待 pro1 pro2 pro3的執(zhí)行完畢 # pro1 pro2 pro3 進(jìn)程中都有q.join(),會等待消費(fèi)者con1 con2消費(fèi)完所以數(shù)據(jù) # 即 :主進(jìn)程在等pro1 pro2 pro3 ,而pro1 pro2 pro3 在等 con1 con2 # 此代碼中消費(fèi)者進(jìn)程con1 con2 會一直阻塞在q.get等待接收數(shù)據(jù),而造成程序不會結(jié)束 # 如果程序想要結(jié)束.......小伙子,試試把con1 con2設(shè)置成守護(hù)進(jìn)程試試JoinableQueue實現(xiàn)生產(chǎn)者消費(fèi)者模型

    管道(了解)?

    #創(chuàng)建管道的類: Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強(qiáng)調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道 #參數(shù)介紹: dumplex:默認(rèn)管道是全雙工的,即conn1和conn2都是既能收數(shù)據(jù)也能發(fā)數(shù)據(jù)的,如果將duplex設(shè)成False,conn1只能用于接收,conn2只能用于發(fā)送。 #主要方法:conn1.recv():接收conn2.send(obj)發(fā)送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會拋出EOFError。conn1.send(obj):通過連接發(fā)送對象。obj是與序列化兼容的任意對象#其他方法: conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動調(diào)用此方法 conn1.fileno():返回連接使用的整數(shù)文件描述符 conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout設(shè)成None,操作將無限期地等待數(shù)據(jù)到達(dá)。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。介紹管道的介紹 from multiprocessing import Process, Pipe import timedef son(conn2):time.sleep(1)print(conn2.recv())conn2.send('我是你爺爺')if __name__ == '__main__':conn1, conn2 = Pipe()p = Process(target=son, args=(conn2,))p.start()conn1.send('我是你爸爸!') # conn1既能發(fā)數(shù)據(jù)time.sleep(1)print(conn1.recv()) # conn1 也能接數(shù)據(jù)p.join()管道的初使用

    ?應(yīng)該特別注意管道端點的正確管理問題。如果是生產(chǎn)者或消費(fèi)者中都沒有使用管道的某個端點,就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端,在消費(fèi)者中關(guān)閉管道的輸入端。如果忘記執(zhí)行這些步驟,程序可能在消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生成EOFError異常。因此,在生產(chǎn)者中關(guān)閉管道不會有任何效果,除非消費(fèi)者也關(guān)閉了相同的管道端點。

    from multiprocessing import Process, Pipedef son(conn2,conn1):# conn1.close() # 不寫就不會發(fā)生EOFError異常while 1:try:print(conn2.recv())conn2.send('我是你爺爺')except EOFError:# print(123) # 驗證確實發(fā)生了EOFError異常conn2.close()breakif __name__ == '__main__':conn1, conn2 = Pipe()p = Process(target=son, args=(conn2,conn1))p.start()conn2.close()conn1.send('我是你爸爸!') # conn1既能發(fā)數(shù)據(jù)print(conn1.recv()) # conn1 也能接數(shù)據(jù)conn1.close()自己整個異常玩玩 from multiprocessing import Pipe, Process import time import randomdef consumer(conn, man):conn1, conn2 = connconn1.close()while 1:try:info = conn2.recv()time.sleep(random.random())print('\033[31m%s 取走了%s\033[0m' % (man, info))except EOFError:print('\033[32m沒有娃娃了,等下一期吧\033[0m')conn2.close()break # 借助一定產(chǎn)生的EOFError異常來讓程序結(jié)束def producer(conn, pro):conn1, conn2 = connconn2.close()for i in range(10):info = '%s 的娃娃%s號' % (pro, i)conn1.send(info)conn1.close()if __name__ == '__main__':conn1, conn2 = Pipe()cons1 = Process(target=consumer, args=((conn1, conn2), 'alex'))prod1 = Process(target=producer, args=((conn1, conn2), '蒼老師版'))cons1.start()prod1.start()conn1.close() # 不寫的話,消費(fèi)者進(jìn)程結(jié)束不了。 一定要記住,多進(jìn)程中內(nèi)核會對每個進(jìn)程的管道進(jìn)行計數(shù),必須在所有進(jìn)程中都關(guān)閉管道才會引發(fā)EOFError異常conn2.close()pipe實現(xiàn)的生產(chǎn)者消費(fèi)者模型 from multiprocessing import Pipe, Process, Lockdef consumer(conn, man,color,l):conn1, conn2 = connconn1.close()while 1:l.acquire()info = conn2.recv()l.release()if info:print('\033[%s %s 取走了%s\033[0m' % (color,man, info))else:conn2.close()breakdef producer(conn, pro):conn1, conn2 = connconn2.close()for i in range(20):info = '%s 的娃娃%s號' % (pro, i)conn1.send(info)conn1.send(None)conn1.send(None)conn1.send(None)conn1.close()if __name__ == '__main__':conn1, conn2 = Pipe()l = Lock()cons1 = Process(target=consumer, args=((conn1, conn2), 'alex','31m',l))cons2 = Process(target=consumer, args=((conn1, conn2), 'Wusir','33m',l))cons3 = Process(target=consumer, args=((conn1, conn2), '太白','34m',l))cons4 = Process(target=consumer, args=((conn1, conn2), '彥濤','35m',l))cons5 = Process(target=consumer, args=((conn1, conn2), 'AAA','36m',l))cons6 = Process(target=consumer, args=((conn1, conn2), 'BBB','37m',l))prod1 = Process(target=producer, args=((conn1, conn2), '蒼老師版'))prod2 = Process(target=producer, args=((conn1, conn2), '韓紅版'))l_p = [cons1,cons2,cons3,cons4,cons5,cons6,prod1,prod2,][i.start() for i in l_p]conn1.close()conn2.close()[i.join() for i in l_p]多個消費(fèi)者競爭數(shù)據(jù)帶來了數(shù)據(jù)不安全的問題

    進(jìn)程直接的數(shù)據(jù)共享 ? ? ? ? ? ?

    展望未來,基于消息傳遞的并發(fā)編程是大勢所趨

    即便是使用線程,推薦做法也是將程序設(shè)計為大量獨(dú)立的線程集合,通過消息隊列交換數(shù)據(jù)。

    這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。

    但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題。

    以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問題。

    進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊列或管道實現(xiàn)通信,二者都是基于消息傳遞的 雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實現(xiàn)數(shù)據(jù)共享,事實上Manager的功能遠(yuǎn)不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.Manager模塊官方說法 from multiprocessing import Manager,Process,Lockdef func(dic,lock):# lock.acquire() # 使用manager模塊,多進(jìn)程共享數(shù)據(jù)時,如果不加鎖,必然會造成數(shù)據(jù)混亂dic[0] -= 1# lock.release()if __name__ == '__main__':m = Manager()lock = Lock()s = m.list([50])print(s)l = []for i in range(50):p = Process(target=func,args=(s,lock))p.start()l.append(p)[p.join() for p in l]print(s[0])Manager模塊的簡單用法

    進(jìn)程池和multiprocessing.Pool 模塊 ? ? ?

    進(jìn)程池 ? ? ? ?

    為什么要有進(jìn)程池?進(jìn)程池的概念。

    在程序?qū)嶋H處理問題過程中,忙時會有成千上萬的任務(wù)需要被執(zhí)行,閑時可能只有零星任務(wù)。那么在成千上萬個任務(wù)需要被執(zhí)行的時候,我們就需要去創(chuàng)建成千上萬個進(jìn)程么?首先,創(chuàng)建進(jìn)程需要消耗時間,銷毀進(jìn)程也需要消耗時間。第二即便開啟了成千上萬的進(jìn)程,操作系統(tǒng)也不能讓他們同時執(zhí)行,這樣反而會影響程序的效率。因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程。那么我們要怎么做呢?

    在這里,要給大家介紹一個進(jìn)程池的概念,定義一個池子,在里面放上固定數(shù)量的進(jìn)程,有需求來了,就拿這個池中的進(jìn)程來處理任務(wù),等到處理完畢,進(jìn)程并不結(jié)束,而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠,任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。也就是說,池中進(jìn)程的數(shù)量是固定的,那么同一時間最多有固定數(shù)量的進(jìn)程在運(yùn)行。這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進(jìn)程的時間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。

    舉個栗子:在小黑屋子里(池)有4個小姐姐(進(jìn)程)一直在等待服務(wù)客人,當(dāng)有1個客人來,就派一個小姐姐去服務(wù),當(dāng)有很多人來,就得等哪個小姐姐服務(wù)完上個人之后再繼續(xù)去服務(wù)下一個人。

    multiprocessing.Pool 模塊

    p = Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池1 numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值 2 initializer:是每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None 3 initargs:是要傳給initializer的參數(shù)組 p.apply(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]],callback=None):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''此方法的結(jié)果是AsyncResult類的實例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將結(jié)果傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。'''p.map( func, iterable):將iterable序列中每一個元素當(dāng)成參數(shù)傳遞給func,異步執(zhí)行func函數(shù)。p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用主要方法Pool主要方法 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:1、obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)一場。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。 2、obj.ready():如果調(diào)用完成,返回True 3、obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 4、obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?5、obj.terminate():立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)其他方法(了解即可)

    代碼實例 ? ? ? ?

    進(jìn)程池和多進(jìn)程效率對比
    from multiprocessing import Pool,Process import time def func(i):i += 1print(i)if __name__ == '__main__':p = Pool(4)# 創(chuàng)建進(jìn)程池,池中有4個進(jìn)程待命start = time.time()t = [i for i in range(100)]p.map(func,t)#p.close()p.join()# 等待進(jìn)程池中所有進(jìn)程都執(zhí)行完所有任務(wù)。print(time.time() - start)# 打印進(jìn)程池做任務(wù)的時間start = time.time()l = []for i in range(100):p = Process(target=func,args=(i,))p.start()l.append(p)[i.join() for i in l]# 等待所有進(jìn)程工作結(jié)束print(time.time() - start)# 打印開啟100個進(jìn)程做任務(wù)的時間print('--'*20+'>')map進(jìn)程池和多進(jìn)程做任務(wù)的效率對比

    ?進(jìn)程池的同步調(diào)用和異步調(diào)用

    from multiprocessing import Pool,Process import time,osdef func(i):i+= 1time.sleep(2)print(i,os.getpid())return i if __name__ == '__main__':p = Pool(5)# 創(chuàng)建進(jìn)程池,池中有5個進(jìn)程待命res_l = []for i in range(10):res = p.apply(func,args=(i,))# 有10個任務(wù)來,交給進(jìn)程池中5個進(jìn)程,5個進(jìn)程同步執(zhí)行任務(wù)直到拿到resres_l.append(res)print(res_l)進(jìn)程池的同步調(diào)用 from multiprocessing import Pool,Process import time,osdef func(i):i+= 1time.sleep(2)print(i,os.getpid())return i if __name__ == '__main__':p = Pool(5)# 創(chuàng)建進(jìn)程池,池中有5個進(jìn)程待命res_l = []for i in range(10):res = p.apply_async(func,args=(i,))# 異步執(zhí)行10個任務(wù),每次最多5個進(jìn)程同時運(yùn)行res_l.append(res)# 拿到結(jié)果后是一個AsyncResul的實例obj,先將結(jié)果放入列表p.close()p.join()# 異步執(zhí)行需要join,也就是讓主進(jìn)程等待進(jìn)程池中所有進(jìn)程執(zhí)行完所有任務(wù),否則可能進(jìn)程池中的進(jìn)程還沒來得及執(zhí)行任務(wù),主進(jìn)程就結(jié)束了。for i in res_l:print(i.get())# 異步機(jī)制,從AsyncResul的實例obj中g(shù)et到實際結(jié)果,同步機(jī)制沒有此方法# 因為同步機(jī)制能直接拿到實際結(jié)果# 其實get是阻塞等待的,也就是說,如果沒有上邊的close和join :# 主進(jìn)程一樣會阻塞在get等待進(jìn)程池中給返回結(jié)果,進(jìn)程池異步執(zhí)行任務(wù)獲取結(jié)果# 每次有一個進(jìn)程返回結(jié)果后,就能get到一個結(jié)果,然后for循環(huán)到下一次繼續(xù)阻塞等待拿結(jié)果進(jìn)程池的異步調(diào)用

    ?練習(xí):進(jìn)程池實現(xiàn)socket并發(fā)聊天

    from multiprocessing import Pool import socketSOURCE_ADDR = ('127.0.0.1', 8090)def communication(conn):while 1:try:msg_r = conn.recv(1024).decode('utf-8')if not msg_r: breakprint(msg_r)conn.send(msg_r.upper().encode('utf-8'))except Exception: # 可能接收比如客戶端直接強(qiáng)制斷開連接等錯誤。print('結(jié)束')breakif __name__ == '__main__':p = Pool(4)sk = socket.socket()sk.bind(SOURCE_ADDR)sk.listen()while 1:conn, addr = sk.accept()p.apply_async(communication, args=(conn,))server端 import socketsk = socket.socket() SOURCE_ADDR = ('127.0.0.1',8090) sk.connect(SOURCE_ADDR)while 1:msg_s = input('>>>')if not msg_s:continuesk.send(msg_s.encode('utf-8'))print(sk.recv(1024).decode('utf-8'))client端

    ?上述代碼體現(xiàn)出:并發(fā)開啟多個客戶端,服務(wù)端同一時間只能接收4個客戶端的請求,當(dāng)再有客戶端請求時,需要等待,只能結(jié)束一個客戶端,另外一個客戶端才會進(jìn)來.

    需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果需要注意的是:在進(jìn)程池中,回調(diào)函數(shù)是由主函數(shù)調(diào)用的! import requests from multiprocessing import Pool,Process import time import os def get_url(url):while 1:r = requests.get(url)if r.status_code == 200:return url,r.textdef write_res(result):info = '%s : %s'%(result[0],result[1])with open('info.txt','a',encoding='utf-8') as f:f.write(info)# print('回調(diào)函數(shù)pid:%s'%(os.getpid()))if __name__ == '__main__':p = Pool(4)# print('主函數(shù)pid:%s'%(os.getpid()))urls = ['https://www.baidu.com','http://www.python.org','http://www.jd.com','http://www.taobao.com','http://www.mi.com','http://www.cnblogs.com']res_l = []for url in urls:res = p.apply_async(get_url,args=(url,),callback=write_res)res_l.append(res)p.close()p.join()for i in res_l:print(i.get()[0])使用進(jìn)程池請求多個url以減少網(wǎng)絡(luò)延遲等待 import requests import bs4 from multiprocessing import Poolheader = {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 SE 2.X MetaSr 1.0' }# 請求頭def get_img_url(url):'''獲取目標(biāo)網(wǎng)頁中,每個目的圖片的準(zhǔn)確地址'''img_url = []res = requests.get(url, headers=header)if res.status_code == 200:soup = bs4.BeautifulSoup(res.text, 'html.parser')img_addr = soup.find('div', class_='beauty_details_imgs_box').find_all('li')for i in img_addr:img = i.find('img').get('src')img_url.append(img)return img_urldef get_img(img_url):'''進(jìn)程池異步請求 每個目的圖片的準(zhǔn)確地址'''res = requests.get(img_url)if res.status_code == 200:return res, img_urldef save_img(img_url):'''回調(diào)函數(shù),每有一個進(jìn)程獲取到圖片地址后,將圖片寫入本地'''res, img = img_url[0], img_url[1]with open(img.split('/')[-1].split('_')[0], 'wb') as f:f.write(res.content)if __name__ == '__main__':url = 'http://www.xiao4j.com/beauty/photos/30687.html'p = Pool(4)img_addr = get_img_url(url)for img in img_addr:p.apply_async(get_img, args=(img,), callback=save_img)p.close()p.join()爬妹子 爬妹子

    ?在進(jìn)程池中,如果對于每個進(jìn)程返回的結(jié)果并不需要及時處理,需要等待所有結(jié)果完成后再統(tǒng)一處理的話,就無需回調(diào)函數(shù)了。

    def func(num):sum = 0for i in range(num):sum += ireturn sumif __name__ == '__main__':p = Pool(4)res = []for i in range(20):r = p.apply_async(func,args=(i,))res.append(r)p.close()p.join()[print(i.get()) for i in res]不需要用回調(diào)函數(shù)

    進(jìn)程池的其他實現(xiàn)方式:https://docs.python.org/dev/library/concurrent.futures.html

    ?

    參考資料:

    http://www.cnblogs.com/linhaifeng/articles/6817679.html

    http://www.cnblogs.com/Eva-J/articles/8253549.html

    https://www.jianshu.com/p/1200fd49b583

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/zhaoyang110/p/9508273.html

    總結(jié)

    以上是生活随笔為你收集整理的python之路-进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。