日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Cpython解释器支持的进程与线程

發布時間:2023/12/10 python 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Cpython解释器支持的进程与线程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、理論部分

一 什么是進程

? ? 進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。

? ? 舉例(單核+多道,實現多個進程的并發執行):

? ? egon在一個時間段內有很多任務要做:python備課的任務,寫書的任務,交女朋友的任務,王者榮耀上分的任務,  

? ? 但egon同一時刻只能做一個任務(cpu同一時間只能干一個活),如何才能玩出多個任務并發執行的效果?

? ? egon備一會課,再去跟李杰的女朋友聊聊天,再去打一會王者榮耀....這就保證了每個任務都在進行中.

二 進程與程序的區別

程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。

舉例:

想象一位有一手好廚藝的計算機科學家egon正在為他的女兒元昊烘制生日蛋糕。

他有做生日蛋糕的食譜,

廚房里有所需的原料:面粉、雞蛋、韭菜,蒜泥等。

在這個比喻中:

? ? 做蛋糕的食譜就是程序(即用適當形式描述的算法)

? ? 計算機科學家就是處理器(cpu)

? ? 而做蛋糕的各種原料就是輸入數據

? ?進程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和

?

現在假設計算機科學家egon的兒子alex哭著跑了進來,說:XXXXXXXXXXXXXX。

科學家egon想了想,處理兒子alex蟄傷的任務比給女兒元昊做蛋糕的任務更重要,于是

計算機科學家就記錄下他照著食譜做到哪兒了(保存進程的當前狀態),然后拿出一本急救手冊,按照其中的指示處理蟄傷。這里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優先級的進程(實施醫療救治),每個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完之后,這位計算機科學家又回來做蛋糕,從他
離開時的那一步繼續做下去。

需要強調的是:同一個程序執行兩次,那也是兩個進程,比如打開暴風影音,雖然都是同一個軟件,但是一個可以播放蒼井空,一個可以播放飯島愛。

三 并發與并行

無論是并行還是并發,在用戶看來都是'同時'運行的,不管是進程還是線程,都只是一個任務而已,真是干活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務

? ? ? 一 并發:是偽并行,即看起來是同時運行。單個cpu+多道技術就可以實現并發,(并行也屬于并發)

? ? ?二 并行:同時運行,只有具備多個cpu才能實現并行

? ? ? ? ?單核下,可以利用多道技術,多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的

? ? ? ? ?有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,

? ? ? ? ?一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術

? ? ? ? ?而一旦任務1的I/O結束了,操作系統會重新調用它(需知進程的調度、分配給哪個cpu運行,由操作系統說了算),可能被分配給四個cpu中的任意一個去執行

??

所有現代計算機經常會在同一時間做很多件事,一個用戶的PC(無論是單cpu還是多cpu),都可以同時運行多個任務(一個任務可以理解為一個進程)。

    啟動一個進程來殺毒(360軟件)

    啟動一個進程來看電影(暴風影音)

    啟動一個進程來聊天(騰訊QQ)

所有的這些進程都需被管理,于是一個支持多進程的多道程序系統是至關重要的

多道技術概念回顧:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另外一個,使每個進程各自運行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻可以運行多個進程,這就給人產生了并行的錯覺,即偽并發,以此來區分多處理器操作系統的真正硬件并行(多個cpu共享同一個物理內存)

四 同步與異步

同步執行:一個進程在執行某個任務時,另外一個進程必須等待其執行完畢,才能繼續執行
異步執行:一個進程在執行某個任務時,另外一個進程無需等待其執行完畢,就可以繼續執行,當有消息返回時,系統會通知后者進行處理,這樣可以提高執行效率

舉個例子,打電話時就是同步通信,發短息時就是異步通信。

同步\異步and阻塞\非阻塞(重點)

同步:

#所謂同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。但是一般而言,我們在說同步、異步的時候,特指那些需要其他部件協作或者需要一定時間完成的任務。 #舉例: #1. multiprocessing.Pool下的apply #發起同步調用后,就在原地等著任務結束,根本不考慮任務是在計算還是在io阻塞,總之就是一股腦地等任務結束 #2. concurrent.futures.ProcessPoolExecutor().submit(func,).result() #3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

異步:

#異步的概念和同步相對。當一個異步功能調用發出后,調用者不能立刻得到結果。當該異步功能完成后,通過狀態、通知或回調來通知調用者。如果異步功能用狀態來通知,那么調用者就需要每隔一定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這其實是一 種很嚴重的錯誤)。如果是使用通知的方式,效率則很高,因為異步功能幾乎不需要做額外的操作。至于回調函數,其實和通知沒太多區別。 #舉例: #1. multiprocessing.Pool().apply_async() #發起異步調用后,并不會等待任務結束才返回,相反,會立即獲取一個臨時結果(并不是最終的結果,可能是封裝好的一個對象)。 #2. concurrent.futures.ProcessPoolExecutor(3).submit(func,) #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

阻塞:

#阻塞調用是指調用結果返回之前,當前線程會被掛起(如遇到io操作)。函數只有在得到結果之后才會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不同的。對于同步調用來說,很多時候當前線程還是激活的,只是從邏輯上當前函數沒有返回而已。 #舉例: #1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果為止,但并未阻塞住(即便是被搶走cpu的執行權限,那也是處于就緒態); #2. 阻塞調用:當socket工作在阻塞模式的時候,如果沒有數據的情況下調用recv函數,則當前線程就會被掛起,直到有數據為止。

非阻塞:

#非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前也會立刻返回,同時該函數不會阻塞當前線程。

小結:

#1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處于激活狀態。而異步情況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候通過狀態、通知、事件等方式通知進程任務完成。#2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能滿足的時候就將進程掛起,而非阻塞則不會阻塞當前進程

?

五 進程的創建(了解)

  但凡是硬件,都需要有操作系統去管理,只要有操作系統,就有進程的概念,就需要有創建進程的方式,一些操作系統只為一個應用程序設計,比如微波爐中的控制器,一旦啟動微波爐,所有的進程都已經存在。

  而對于通用系統(跑很多應用程序),需要有系統運行過程中創建或撤銷進程的能力,主要分為4中形式創建新的進程

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,后臺運行的進程與用戶無關,運行在后臺并且只在需要時才喚醒的進程,稱為守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程中開啟了子進程(如nginx開啟多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而創建一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理作業的初始化(只在大型機的批處理系統中應用)

  

  無論哪一種,新進程的創建都是由一個已經存在的進程執行了一個用于創建進程的系統調用而創建的:

  1. 在UNIX中該系統調用是:fork,fork會創建一個與父進程一模一樣的副本,二者有相同的存儲映像、同樣的環境字符串和同樣的打開文件(在shell解釋器進程中,執行一個命令就會創建一個子進程)

  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的創建,也負責把正確的程序裝入新進程。

 

  關于創建的子進程,UNIX和windows

  1.相同的是:進程創建后,父進程和子進程有各自不同的地址空間(多道技術要求物理層面實現進程之間內存的隔離),任何一個進程的在其地址空間中的修改都不會影響到另外一個進程。

  2.不同的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是可以有只讀的共享內存區的。但是對于windows系統來說,從一開始父進程與子進程的地址空間就是不同的。

六 進程的終止(了解)

  1. 正常退出(自愿,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出錯退出(自愿,python a.py中a.py不存在)

  3. 嚴重錯誤(非自愿,執行非法指令,如引用不存在的內存,1/0等,可以捕捉異常,try...except...)

  4. 被其他進程殺死(非自愿,如kill -9)

七 進程的層次結構

  無論UNIX還是windows,進程只有一個父進程,不同的是:

  1. 在UNIX中所有的進程,都是以init進程為根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的所有成員。

  2. 在windows中,沒有進程層次的概念,所有的進程都是地位相同的,唯一類似于進程層次的暗示,是在創建進程時,父進程得到一個特別的令牌(稱為句柄),該句柄可以用來控制子進程,但是父進程有權把該句柄傳給其他子進程,這樣就沒有層次了。

八 進程的狀態

  tail -f access.log |grep '404'

  執行程序tail,開啟一個子進程,執行程序grep,開啟另外一個子進程,兩個進程之間基于管道'|'通訊,將tail的結果作為grep的輸入。

  進程grep在等待輸入(即I/O)時的狀態稱為阻塞,此時grep命令都無法運行

  其實在兩種情況下會導致一個進程在邏輯上不能運行,

  1. 進程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進程去執行,這樣保證CPU一直在工作

  2. 與進程無關,是操作系統層面,可能會因為一個進程占用時間過多,或者優先級等原因,而調用其他的進程去使用CPU。

  因而一個進程由三種狀態

九 進程并發的實現(了解)

  進程并發的實現在于,硬件中斷一個正在運行的進程,把此時進程運行的所有狀態保存下來,為此,操作系統維護一張表格,即進程表(process table),每個進程占用一個進程表項(這些表項也稱為進程控制塊)

  該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配狀況、所有打開文件的狀態、帳號和調度信息,以及其他在進程由運行態轉為就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啟動時,就像從未被中斷過一樣。

二、代碼知識部分

一 multiprocessing模塊介紹:

python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
? ? multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

 ?multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

? ? 需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內。

?

二 Process類的介紹

?創建進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

??參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name為子進程的名稱

?

?方法介紹:

p.start():啟動進程,并調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法

p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵尸進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處于等的狀態,而p是處于運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

屬性介紹:

1 p.daemon:默認值為False,如果設為True,代表p為后臺運行的守護進程,當p的父進程終止時,p也隨之終止,并且設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 3 p.name:進程的名稱 4 5 p.pid:進程的pid 6 7 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 8 9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

三 Process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() insideif __name__ == "__main__" since statements inside this if-statement will not get called upon import. 由于Windows沒有fork,多處理模塊啟動一個新的Python進程并導入調用模塊。 如果在導入時調用Process(),那么這將啟動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。

創建并開啟子進程的兩種方式

#開進程的方法一: import time import random from multiprocessing import Process def piao(name):print('%s piaoing' %name)time.sleep(random.randrange(1,5))print('%s piao end' %name)p1=Process(target=piao,args=('egon',)) #必須加,號 p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeqi',)) p4=Process(target=piao,args=('yuanhao',))p1.start() p2.start() p3.start() p4.start() print('主線程')方法一 #開進程的方法二: import time import random from multiprocessing import Processclass Piao(Process):def __init__(self,name):super().__init__()self.name=namedef run(self):print('%s piaoing' %self.name)time.sleep(random.randrange(1,5))print('%s piao end' %self.name)p1=Piao('egon') p2=Piao('alex') p3=Piao('wupeiqi') p4=Piao('yuanhao')p1.start() #start會自動調用run p2.start() p3.start() p4.start() print('主線程')

練習1:把所學的socket通信變成并發的形式

from socket import * from multiprocessing import Processserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__': #windows下start進程一定要寫到這下面while True:conn,client_addr=server.accept()p=Process(target=talk,args=(conn,client_addr))p.start()server端 from socket import *client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))多個client端 每來一個客戶端,都在服務端開啟一個進程,如果并發來一個萬個客戶端,要開啟一萬個進程嗎,你自己嘗試著在你自己的機器上開啟一萬個,10萬個進程試一試。 解決方法:進程池

Process對象的join方法

join:主進程等,等待子進程結束

from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.start() p.join(0.0001) #等待p停止,等0.0001秒就不再等了 print('開始')

有了join,程序不就是串行了嗎???

from multiprocessing import Process import time import random def piao(name):print('%s is piaoing' %name)time.sleep(random.randint(1,3))print('%s is piao end' %name)p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',))p1.start() p2.start() p3.start() p4.start()#有的同學會有疑問:既然join是等待進程結束,那么我像下面這樣寫,進程不就又變成串行的了嗎? #當然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,#詳細解析如下: #進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個并發的進程了 #而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 #join是讓主線程等,而p1-p4仍然是并發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待 # 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 p1.join() p2.join() p3.join() p4.join()print('主線程')#上述啟動進程與join進程可以簡寫為 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join()

?

Process對象的其他方法或屬性(了解)

terminate與is_alive #進程對象的其他方法一:terminate,is_alive from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,5))print('%s is piao end' %self.name)p1=Piao('egon1') p1.start()p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為Trueprint('開始') print(p1.is_alive()) #結果為False
name與pid from multiprocessing import Process import time import random class Piao(Process):def __init__(self,name):# self.name=name# super().__init__() #Process的__init__方法會執行self.name=Piao-1,# #所以加到這里,會覆蓋我們的self.name=name#為我們開啟的進程設置名字的做法super().__init__()self.name=namedef run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.start() print('開始') print(p.pid) #查看pid

?

四 守護進程

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束后就終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

?

from multiprocessing import Process import time import randomclass Piao(Process):def __init__(self,name):self.name=namesuper().__init__()def run(self):print('%s is piaoing' %self.name)time.sleep(random.randrange(1,3))print('%s is piao end' %self.name)p=Piao('egon') p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,并且父進程代碼執行結束,p即終止運行 p.start() print('') #主進程代碼運行完畢,守護進程就會結束 from multiprocessing import Process from threading import Thread import time def foo():print(123)time.sleep(1)print("end123")def bar():print(456)time.sleep(3)print("end456")p1=Process(target=foo) p2=Process(target=bar)p1.daemon=True p1.start() p2.start() print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止 迷惑人的例子

五 進程同步(鎖)

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,

競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個進程共享同一打印終端

并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 #并發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work():print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())if __name__ == '__main__':for i in range(3):p=Process(target=work)p.start()
加鎖:由并發變成了串行,犧牲了運行效率,但避免了競爭 #由并發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock):lock.acquire()print('%s is running' %os.getpid())time.sleep(2)print('%s is done' %os.getpid())lock.release() if __name__ == '__main__':lock=Lock()for i in range(3):p=Process(target=work,args=(lock,))p.start()

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

并發運行,效率高,但競爭寫同一文件,數據寫入錯亂 #文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()get() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start()
加鎖:購票行為由并發變成了串行,犧牲了運行效率,但保證了數據安全 #文件db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search():dic=json.load(open('db.txt'))print('\033[43m剩余票數%s\033[0m' %dic['count'])def get():dic=json.load(open('db.txt'))time.sleep(0.1) #模擬讀數據的網絡延遲if dic['count'] >0:dic['count']-=1time.sleep(0.2) #模擬寫數據的網絡延遲json.dump(dic,open('db.txt','w'))print('\033[43m購票成功\033[0m')def task(lock):search()lock.acquire()get()lock.release() if __name__ == '__main__':lock=Lock()for i in range(100): #模擬并發100個客戶端搶票p=Process(target=task,args=(lock,))p.start()

總結:

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低
2.需要自己加鎖處理

?

為此mutiprocessing模塊為我們提供了基于消息的IPC通信機制:隊列和管道。
1 隊列和管道都是將數據存放于內存中
2 隊列又是基于(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

?

六 隊列(推薦使用)

?? 進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 創建隊列的類(底層就是以管道和鎖定的方式實現)

1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。?

? ??參數介紹:

1 maxsize是隊列中允許最大項數,省略則無大小限制。

 ?方法介紹:

主要方法: q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

其他方法(了解):

1 q.cancel_join_thread():不會在進程退出時自動連接后臺線程。可以防止join_thread()方法阻塞 2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。 3 q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為

? 應用:

''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 '''from multiprocessing import Process,Queue import time q=Queue(3)#put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了

生產者消費者模型

在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

? ? 為什么要使用生產者和消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

? ? 什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

基于隊列實現生產者消費者模型

from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=(q,))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('')

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環

?

生產者在生產完畢后發送結束信號None from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(10):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.put(None) #發送結束信號 if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=(q,))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()print('')

注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號

主進程在生產者生產完畢后發送結束信號None from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):for i in range(2):time.sleep(random.randint(1,3))res='包子%s' %iq.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=(q,))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))#開始 p1.start()c1.start()p1.join()q.put(None) #發送結束信號print('')

但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決

有幾個生產者就需要發送幾次結束信號:相當low from multiprocessing import Process,Queue import time,random,os def consumer(q):while True:res=q.get()if res is None:break #收到結束信號則結束time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):for i in range(2):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':q=Queue()#生產者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))#開始 p1.start()p2.start()p3.start()c1.start()p1.join() #必須保證生產者全部生產完畢,才應該發送結束信號 p2.join()p3.join()q.put(None) #有幾個生產者就應該發送幾次結束信號Noneq.put(None) #發送結束信號q.put(None) #發送結束信號print('')

?

其實我們的思路無非是發送結束信號而已,有另外一種隊列提供了這種機制

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。#參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q):while True:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了def producer(name,q):for i in range(10):time.sleep(random.randint(1,3))res='%s%s' %(name,i)q.put(res)print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))q.join()if __name__ == '__main__':q=JoinableQueue()#生產者們:即廚師們p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨頭',q))p3=Process(target=producer,args=('泔水',q))#消費者們:即吃貨們c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))c1.daemon=Truec2.daemon=True#開始p_l=[p1,p2,p3,c1,c2]for p in p_l:p.start()p1.join()p2.join()p3.join()print('') #主進程等--->p1,p2,p3等---->c1,c2#p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據#因而c1,c2也沒有存在的價值了,應該隨著主進程的結束而結束,所以設置成守護進程

七 管道

進程間通信(IPC)方式二:管道(不推薦使用,了解即可)

?

#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象#其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,并且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大于可用的緩沖區空間,將引發BufferTooShort異常。介紹

?

基于管道實現進程間通信(與隊列的方式是類似的,隊列就是管道加鎖實現的) from multiprocessing import Process,Pipeimport time,os def consumer(p,name):left,right=pleft.close()while True:try:baozi=right.recv()print('%s 收到包子:%s' %(name,baozi))except EOFError:right.close()break def producer(seq,p):left,right=pright.close()for i in seq:left.send(i)# time.sleep(1)else:left.close() if __name__ == '__main__':left,right=Pipe()c1=Process(target=consumer,args=((left,right),'c1'))c1.start()seq=(i for i in range(10))producer(seq,(left,right))right.close()left.close()c1.join()print('主進程')

注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。如果忘記執行這些步驟,程序可能再消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道后才能生產EOFError異常。因此在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

?

管道可以用于雙向通信,利用通常在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可以使用管道編寫與進程交互的程序 from multiprocessing import Process,Pipeimport time,os def adder(p,name):server,client=pclient.close()while True:try:x,y=server.recv()except EOFError:server.close()breakres=x+yserver.send(res)print('server done') if __name__ == '__main__':server,client=Pipe()c1=Process(target=adder,args=((server,client),'c1'))c1.start()server.close()client.send((10,20))print(client.recv())client.close()c1.join()print('主進程') #注意:send()和recv()方法使用pickle模塊對對象進行序列化。

?

八 共享數據

展望未來,基于消息傳遞的并發編程是大勢所趨

即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合

通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,

還可以擴展到分布式系統中

進程間通信應該盡量避免使用本節所講的共享數據的方式

進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止于此A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example, #進程之間操作共享的數據
from
multiprocessing import Manager,Process,Lock import os def work(d,lock):# with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(100):p=Process(target=work,args=(dic,lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)#{'count': 94}

?

九 信號量(了解)

互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去,如果指定信號量為3,那么來一個人獲得一把鎖,計數加1,當計數等于3時,后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念from multiprocessing import Process,Semaphore import time,randomdef go_wc(sem,user):sem.acquire()print('%s 占到一個茅坑' %user)time.sleep(random.randint(0,3)) #模擬每個人拉屎速度不一樣,0代表有的人蹲下就起來了 sem.release()if __name__ == '__main__':sem=Semaphore(5)p_l=[]for i in range(13):p=Process(target=go_wc,args=(sem,'user%s' %i,))p.start()p_l.append(p)for i in p_l:i.join()print('============》')信號量Semahpore(同線程一樣)

十 事件(了解)

python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。clear:將“Flag”設置為False set:將“Flag”設置為True#_*_coding:utf-8_*_ #!/usr/bin/env pythonfrom multiprocessing import Process,Event import time,randomdef car(e,n):while True:if not e.is_set(): #Flaseprint('\033[31m紅燈亮\033[0m,car%s等著' %n)e.wait()print('\033[32m車%s 看見綠燈亮了\033[0m' %n)time.sleep(random.randint(3,6))if not e.is_set():continueprint('走你,car', n)breakdef police_car(e,n):while True:if not e.is_set():print('\033[31m紅燈亮\033[0m,car%s等著' % n)e.wait(1)print('燈的是%s,警車走了,car %s' %(e.is_set(),n))breakdef traffic_lights(e,inverval):while True:time.sleep(inverval)if e.is_set():e.clear() #e.is_set() ---->Falseelse:e.set()if __name__ == '__main__':e=Event()# for i in range(10):# p=Process(target=car,args=(e,i,))# p.start()for i in range(5):p = Process(target=police_car, args=(e, i,))p.start()t=Process(target=traffic_lights,args=(e,10))t.start()print('============》')Event(同線程一樣)

?

十一 進程池

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。多進程是實現并發的手段之一,需要注意的問題是:

  • 很明顯需要并發執行的任務通常要遠大于核數
  • 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
  • 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到并行)
  • 例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

    我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數...?
    ps:對于遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

    ? ??創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程

    1 Pool([numprocess [,initializer [, initargs]]]):創建進程池?

    ? ??參數介紹:

    1 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 3 initargs:是要傳給initializer的參數組

     ?方法介紹:

    主要方法: p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用

    ?其他方法(了解部分)

    方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。 obj.ready():如果調用完成,返回True obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

    ??應用:

    apply同步執行:阻塞式 from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res=p.apply(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res)print(res_l)
    apply_async異步執行:非阻塞 from multiprocessing import Pool import os,time def work(n):print('%s run' %os.getpid())time.sleep(3)return n**2if __name__ == '__main__':p=Pool(3) #進程池中從無到有創建三個進程,以后一直是這三個進程在執行任務res_l=[]for i in range(10):res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res)#異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然后可以用get收集結果,否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了 p.close()p.join()for res in res_l:print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

    ?

    詳解:apply_async與apply

    #一:使用進程池(非阻塞,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply_async(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 res_l.append(res)print("==============================>") #沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了 pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果for i in res_l:print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get#二:使用進程池(阻塞,apply) #coding: utf-8 from multiprocessing import Process,Pool import timedef func(msg):print( "msg:", msg)time.sleep(0.1)return msgif __name__ == "__main__":pool = Pool(processes = 3)res_l=[]for i in range(10):msg = "hello %d" %(i)res=pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個print("==============================>")pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print(res_l) #看到的就是最終的結果組成的列表for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法print(i)

    練習2:使用進程池維護固定數目的進程(重寫練習1)

    server端 #Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) #開啟6個客戶端,會發現2個客戶端處于等待狀態 #在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import osserver=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5)def talk(conn,client_addr):print('進程pid: %s' %os.getpid())while True:try:msg=conn.recv(1024)if not msg:breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p=Pool()while True:conn,client_addr=server.accept()p.apply_async(talk,args=(conn,client_addr))# p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

    ?


    客戶端

    from socket import *client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))

    ?

    發現:并發開啟多個客戶端,服務端同一時間只有3個不同的pid,干掉一個客戶端,另外一個客戶端才會進來,被3個進程之一處理

    ?

      回掉函數:

    需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

    我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

    from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<進程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''

    爬蟲案例

    from multiprocessing import Pool import time,random import requests import redef get_page(url,pattern):response=requests.get(url)if response.status_code == 200:return (response.text,pattern)def parse_page(info):page_content,pattern=infores=re.findall(pattern,page_content)for item in res:dic={'index':item[0],'title':item[1],'actor':item[2].strip()[3:],'time':item[3][5:],'score':item[4]+item[5]}print(dic) if __name__ == '__main__':pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)url_dic={'http://maoyan.com/board/7':pattern1,}p=Pool()res_l=[]for url,pattern in url_dic.items():res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)res_l.append(res)for i in res_l:i.get()# res=requests.get('http://maoyan.com/board/7')# print(re.findall(pattern,res.text))

    如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數

    from multiprocessing import Pool import time,random,osdef work(n):time.sleep(1)return n**2 if __name__ == '__main__':p=Pool()res_l=[]for i in range(10):res=p.apply_async(work,args=(i,))res_l.append(res)p.close()p.join() #等待進程池中所有進程執行完畢 nums=[]for res in res_l:nums.append(res.get()) #拿到所有結果print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理

    進程池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html

    ?

    轉載于:https://www.cnblogs.com/ctztake/p/7445428.html

    總結

    以上是生活随笔為你收集整理的Cpython解释器支持的进程与线程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。