进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event) day38
進(jìn)程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)
鎖 —— multiprocess.Lock
? ? ? 通過剛剛的學(xué)習(xí),我們千方百計(jì)實(shí)現(xiàn)了程序的異步,讓多個(gè)任務(wù)可以同時(shí)在幾個(gè)進(jìn)程中并發(fā)處理,他們之間的運(yùn)行沒有順序,一旦開啟也不受我們控制。盡管并發(fā)編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當(dāng)多個(gè)進(jìn)程使用同一份數(shù)據(jù)資源的時(shí)候,就會(huì)引發(fā)數(shù)據(jù)安全或順序混亂問題。?
#加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改,即串行的修改,沒錯(cuò),速度是慢了,但犧牲了速度卻保證了數(shù)據(jù)安全。 #雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信,但問題是: #1.效率低(共享數(shù)據(jù)基于文件,而文件是硬盤上的數(shù)據(jù)) #2.需要自己加鎖處理#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個(gè)進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊(duì)列和管道。 #隊(duì)列和管道都是將數(shù)據(jù)存放于內(nèi)存中 #隊(duì)列又是基于(管道+鎖)實(shí)現(xiàn)的,可以讓我們從復(fù)雜的鎖問題中解脫出來, #我們應(yīng)該盡量避免使用共享數(shù)據(jù),盡可能使用消息傳遞和隊(duì)列,避免處理復(fù)雜的同步和鎖問題,而且在進(jìn)程數(shù)目增多時(shí),往往可以獲得更好的可獲展性。信號(hào)量 —— multiprocess.Semaphore(了解)
#互斥鎖同時(shí)只允許一個(gè)線程更改數(shù)據(jù),而信號(hào)量Semaphore是同時(shí)允許一定數(shù)量的線程更改數(shù)據(jù) 。 #假設(shè)商場(chǎng)里有4個(gè)迷你唱吧,所以同時(shí)可以進(jìn)去4個(gè)人,如果來了第五個(gè)人就要在外面等待,等到有人出來才能再進(jìn)去玩。 #實(shí)現(xiàn): #信號(hào)量同步基于內(nèi)部計(jì)數(shù)器,每調(diào)用一次acquire(),計(jì)數(shù)器減1;每調(diào)用一次release(),計(jì)數(shù)器加1.當(dāng)計(jì)數(shù)器為0時(shí),acquire()調(diào)用被阻塞。這是迪科斯徹(Dijkstra)信號(hào)量概念P()和#V()的Python實(shí)現(xiàn)。信號(hào)量同步機(jī)制適用于訪問像服務(wù)器這樣的有限資源。 #信號(hào)量與進(jìn)程池的概念很像,但是要區(qū)分開,信號(hào)量涉及到加鎖的概念?
# 信號(hào)量介紹Semaphore # 多進(jìn)程中的組件 # ktv # 4個(gè) # 一套資源 同一時(shí)間 只能被n個(gè)人訪問 # 某一段代碼 同一時(shí)間 只能被n個(gè)進(jìn)程執(zhí)行 import time#引入時(shí)間模塊 import random#引入隨機(jī)數(shù) from multiprocessing import Process#引入進(jìn)程模塊 from multiprocessing import Semaphore#引入信號(hào)模塊# sem = Semaphore(4)#實(shí)例化4個(gè)信號(hào) # sem.acquire() # print('拿到第一把鑰匙') # sem.acquire() # print('拿到第二把鑰匙') # sem.acquire() # print('拿到第三把鑰匙') # sem.acquire() # print('拿到第四把鑰匙') # sem.acquire() # print('拿到第五把鑰匙') def ktv(i,sem):sem.acquire() #獲取鑰匙print('%s走進(jìn)ktv'%i) #進(jìn)入ktvtime.sleep(random.randint(1,5))#隨機(jī)選擇1到5之間的數(shù)print('%s走出ktv'%i)#打印走出ktvsem.release() #還鑰匙if __name__ == '__main__' :#如果為真sem = Semaphore(4)#實(shí)例化一個(gè)紅綠燈for i in range(20):#循環(huán)20個(gè)數(shù)p = Process(target=ktv,args=(i,sem))#開啟一個(gè)進(jìn)程對(duì)象p.start()#開啟這個(gè)進(jìn)程?
事件 —— multiprocess.Event(了解)
#python線程的事件用于主線程控制其他線程的執(zhí)行,事件主要提供了三個(gè)方法 set、wait、clear。#事件處理的機(jī)制:全局定義了一個(gè)“Flag”,如果“Flag”值為 False,那么當(dāng)程序執(zhí)行 #event.wait 方法時(shí)就會(huì)阻塞,如果“Flag”值為True,那么event.wait 方法時(shí)便不再阻塞。#clear:將“Flag”設(shè)置為False #set:將“Flag”設(shè)置為True?
?
#紅綠燈示例 # 通過一個(gè)信號(hào) 來控制 多個(gè)進(jìn)程 同時(shí) 執(zhí)行或者阻塞 # 事件 # from multiprocessing import Event # 一個(gè)信號(hào)可以使所有的進(jìn)程都進(jìn)入阻塞狀態(tài) # 也可以控制所有的進(jìn)程解除阻塞 # 一個(gè)事件被創(chuàng)建之后,默認(rèn)是阻塞狀態(tài) # e = Event() # 創(chuàng)建了一個(gè)事件 # print(e.is_set()) # 查看一個(gè)事件的狀態(tài),默認(rèn)被設(shè)置成阻塞 # e.set() # 將這個(gè)事件的狀態(tài)改為True # print(e.is_set()) # e.wait() # 是依據(jù)e.is_set()的值來決定是否阻塞的 # print(123456) # e.clear() # 將這個(gè)事件的狀態(tài)改為False # print(e.is_set()) # e.wait() # 等待 事件的信號(hào)被變成True # print('*'*10)# set 和 clear# 分別用來修改一個(gè)事件的狀態(tài) True或者False # is_set 用來查看一個(gè)事件的狀態(tài) # wait 是依據(jù)事件的狀態(tài)來決定自己是否在wait處阻塞# False阻塞 True不阻塞# 紅綠燈事件 import time#引入時(shí)間模塊 import random#引入隨機(jī)模塊 from multiprocessing import Event,Process#引入進(jìn)程模塊和時(shí)間模塊 def cars(e,i):#定義一個(gè)函數(shù)if not e.is_set():#如果信號(hào)燈為真的時(shí)候print('car%i在等待'%i)#打印內(nèi)容e.wait() # 阻塞 直到得到一個(gè) 事件狀態(tài)變成 True 的信號(hào)print('\033[0;32;40mcar%i通過\033[0m' % i)#打印通過def light(e):#定義一個(gè)燈while True:#循環(huán)為真if e.is_set():#如果事件狀態(tài)為真e.clear()#則清除信號(hào)燈print('\033[31m紅燈亮了\033[0m')#打印紅燈亮了else:#否則e.set()#設(shè)置狀態(tài)為真print('\033[32m綠燈亮了\033[0m')#打印綠燈亮了time.sleep(2)#睡2秒if __name__ == '__main__':#如果為真e = Event()#實(shí)例化一個(gè)事件traffic = Process(target=light,args=(e,))#定義一個(gè)燈的進(jìn)程traffic.start()#開始進(jìn)程for i in range(20):#循環(huán)20次car = Process(target=cars, args=(e,i))#創(chuàng)建20個(gè)汽車進(jìn)程car.start()#啟動(dòng)汽車進(jìn)程time.sleep(random.random())#隨機(jī)睡,隨機(jī)出現(xiàn)0~1之間的小數(shù)?
進(jìn)程間通信——隊(duì)列和管道(multiprocess.Queue、multiprocess.Pipe)
進(jìn)程間通信
IPC(Inter-Process Communication) #進(jìn)程間通信
隊(duì)列?
概念介紹
創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。?
#Queue([maxsize]) #創(chuàng)建共享的進(jìn)程隊(duì)列。 #參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無(wú)大小限制。 #底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。?
#方法介紹 Queue([maxsize]) 創(chuàng)建共享的進(jìn)程隊(duì)列。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無(wú)大小限制。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。另外,還需要運(yùn)行支持線程以便隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小? Queue的實(shí)例q具有以下方法:q.get( [ block [ ,timeout ] ] ) 返回q中的一個(gè)項(xiàng)目。如果q為空,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時(shí)時(shí)間,用在阻塞模式中。如果在制定的時(shí)間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎?#xff0c;將引發(fā)Queue.Empty異常。#q.get_nowait( ) 同q.get(False)方法。#q.put(item [, block [,timeout ] ] ) 將item放入隊(duì)列。如果隊(duì)列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫(kù)模塊中)。timeout指定在阻塞模式中等待可用空間的時(shí)間長(zhǎng)短。超時(shí)后將引發(fā)Queue.Full異常。#q.qsize() 返回隊(duì)列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。#q.empty() 如果調(diào)用此方法時(shí) q為空,返回True。如果其他進(jìn)程或線程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目。#q.full() 如果q已滿,返回為True. 由于線程的存在,結(jié)果也可能是不可靠的(參考q.empty()方法)。。?
#其他方法 #q.close() 關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí),后臺(tái)線程將繼續(xù)寫入那些已入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將自動(dòng)調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正被阻塞在#get()操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致get()方法返回錯(cuò)誤。#q.cancel_join_thread() 不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)線程。這可以防止join_thread()方法阻塞。#q.join_thread() 連接隊(duì)列的后臺(tái)線程。此方法用于在調(diào)用q.close()方法后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用q.cancel_join_thread()方法可以禁止這種行為。?
代碼實(shí)例
''' multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列 都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口 '''from multiprocessing import Queue#引入一個(gè)隊(duì)列模塊 q=Queue(3)#實(shí)例化一個(gè)隊(duì)列#put ,get ,put_nowait,get_nowait,full,empty q.put(3)#放入隊(duì)列中 q.put(3)#放入隊(duì)列中 q.put(3)#放入隊(duì)列中 # q.put(3) # 如果隊(duì)列已經(jīng)滿了,程序就會(huì)停在這里,等待數(shù)據(jù)被別人取走,再將數(shù)據(jù)放入隊(duì)列。# 如果隊(duì)列中的數(shù)據(jù)一直不被取走,程序就會(huì)永遠(yuǎn)停在這里。 try:#異常處理q.put_nowait(3) # 可以使用put_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)殛?duì)列滿了而報(bào)錯(cuò)。 except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去,但是會(huì)丟掉這個(gè)消息。print('隊(duì)列已經(jīng)滿了')# 因此,我們?cè)俜湃霐?shù)據(jù)之前,可以先看一下隊(duì)列的狀態(tài),如果已經(jīng)滿了,就不繼續(xù)put了。 print(q.full()) #滿了print(q.get())#取出一個(gè) print(q.get())#取出一個(gè) print(q.get())#取出一個(gè) # print(q.get()) # 同put方法一樣,如果隊(duì)列已經(jīng)空了,那么繼續(xù)取就會(huì)出現(xiàn)阻塞。 try:#異常處理q.get_nowait(3) # 可以使用get_nowait,如果隊(duì)列滿了不會(huì)阻塞,但是會(huì)因?yàn)闆]取到值而報(bào)錯(cuò)。 except: # 因此我們可以用一個(gè)try語(yǔ)句來處理這個(gè)錯(cuò)誤。這樣程序不會(huì)一直阻塞下去。print('隊(duì)列已經(jīng)空了')print(q.empty()) #空了上面這個(gè)例子還沒有加入進(jìn)程通信,只是先來看看隊(duì)列為我們提供的方法,以及這些方法的使用和現(xiàn)象。
#子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程 import time#引入一個(gè)時(shí)間模塊 from multiprocessing import Process, Queue#引入一個(gè)進(jìn)程和隊(duì)列模塊def f(q):#定義一個(gè)函數(shù)q.put([time.asctime(), 'from Eva', 'hello']) #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。if __name__ == '__main__':#定義一個(gè)函數(shù)q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象p = Process(target=f, args=(q,)) #創(chuàng)建一個(gè)進(jìn)程p.start()#開始進(jìn)程print(q.get())#拿出一個(gè)p.join()#感知子進(jìn)程結(jié)束上面是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。 接下來看一個(gè)稍微復(fù)雜一些的例子:
#批量生產(chǎn)數(shù)據(jù)放入隊(duì)列再批量獲取結(jié)果 import os#引入操作系統(tǒng)模塊 import time#引入時(shí)間模塊 import multiprocessing#引入多元進(jìn)程模塊# 向queue中輸入數(shù)據(jù)的函數(shù) def inputQ(queue):#定義一個(gè)函數(shù)info = str(os.getpid()) + '(put):' + str(time.asctime())queue.put(info)向隊(duì)列中放入一個(gè)信息# 向queue中輸出數(shù)據(jù)的函數(shù) def outputQ(queue):#取隊(duì)列中的數(shù)據(jù)info = queue.get()#取信息print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))#打印這個(gè)內(nèi)容# Main if __name__ == '__main__':#如果用戶名是當(dāng)前用戶名multiprocessing.freeze_support()# record1 = [] # store input processesrecord2 = [] # store output processesqueue = multiprocessing.Queue(3)#實(shí)例化一個(gè)隊(duì)列# 輸入進(jìn)程for i in range(10):#循環(huán)10個(gè)數(shù)process = multiprocessing.Process(target=inputQ,args=(queue,))#創(chuàng)建一個(gè)進(jìn)程process.start()#開始這個(gè)進(jìn)程record1.append(process)#添加到列表中# 輸出進(jìn)程for i in range(10):#循環(huán)10個(gè)數(shù)process = multiprocessing.Process(target=outputQ,args=(queue,))#創(chuàng)建一個(gè)進(jìn)程process.start()#開始進(jìn)程record2.append(process)#添加到列表里for p in record1:#循環(huán)這個(gè)列表p.join()#感知子進(jìn)程結(jié)束for p in record2:#循環(huán)這個(gè)進(jìn)程p.join()#感知子進(jìn)程結(jié)束生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
#基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型 # 隊(duì)列 # 生產(chǎn)者消費(fèi)者模型# 生產(chǎn)者 進(jìn)程 # 消費(fèi)者 進(jìn)程 import time#引入時(shí)間模塊 import random#引入隨機(jī)數(shù) from multiprocessing import Process,Queue#引入進(jìn)程模塊和隊(duì)列模塊 def consumer(q,name):#定義一個(gè)消費(fèi)者函數(shù)while True:#循環(huán)為真food = q.get()#拿出食物if food is None:#如果食物為空print('%s獲取到了一個(gè)空'%name)#打印胡去到一個(gè)空break#打斷print('\033[31m%s消費(fèi)了%s\033[0m' % (name,food))#打印誰(shuí)消費(fèi)了什么食物time.sleep(random.randint(1,3))#隨機(jī)睡1~3秒def producer(name,food,q):#定義一個(gè)生產(chǎn)者函數(shù)for i in range(4):#循環(huán)4次time.sleep(random.randint(1,3))#隨機(jī)睡1~3秒f = '%s生產(chǎn)了%s%s'%(name,food,i)#誰(shuí)生產(chǎn)了什么食物print(f)#打印內(nèi)容q.put(f)#把食物放到隊(duì)列里if __name__ == '__main__':#如果名稱是當(dāng)前名稱q = Queue(20)#實(shí)例化一個(gè)隊(duì)列20p1 = Process(target=producer,args=('Egon','包子',q))#創(chuàng)建一個(gè)進(jìn)程p2 = Process(target=producer, args=('wusir','泔水', q))#創(chuàng)建一個(gè)進(jìn)程c1 = Process(target=consumer, args=(q,'alex'))#創(chuàng)建一個(gè)進(jìn)程c2 = Process(target=consumer, args=(q,'jinboss'))#創(chuàng)建一個(gè)進(jìn)程p1.start()#啟動(dòng)一個(gè)進(jìn)程p2.start()#啟動(dòng)一個(gè)進(jìn)程c1.start()#啟動(dòng)一個(gè)進(jìn)程c2.start()#啟動(dòng)一個(gè)進(jìn)程p1.join()#感知p1進(jìn)程結(jié)p2.join()#感知p2進(jìn)程結(jié)束q.put(None)#往隊(duì)列中添加一個(gè)Noneq.put(None)#往隊(duì)列中添加一個(gè)None?
此時(shí)的問題是主進(jìn)程永遠(yuǎn)不會(huì)結(jié)束,原因是:生產(chǎn)者p在生產(chǎn)完后就結(jié)束了,但是消費(fèi)者c在取空了q之后,則一直處于死循環(huán)中且卡在q.get()這一步。
解決方式無(wú)非是讓生產(chǎn)者在生產(chǎn)完畢后,往隊(duì)列中再發(fā)一個(gè)結(jié)束信號(hào),這樣消費(fèi)者在接收到結(jié)束信號(hào)后就可以break出死循環(huán)。
?
注意:結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)
JoinableQueue([maxsize])?
創(chuàng)建可連接的共享進(jìn)程隊(duì)列。這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生產(chǎn)者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。?
#JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外,還具有以下方法:#q.task_done() #使用者使用此方法發(fā)出信號(hào),表示q.get()返回的項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除的項(xiàng)目數(shù)量,將引發(fā)ValueError異常。#q.join() #生產(chǎn)者將使用此方法進(jìn)行阻塞,直到隊(duì)列中所有項(xiàng)目均被處理。阻塞將持續(xù)到為隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止。 #下面的例子說明如何建立永遠(yuǎn)運(yùn)行的進(jìn)程,使用和處理隊(duì)列上的項(xiàng)目。生產(chǎn)者將項(xiàng)目放入隊(duì)列,并等待它們被處理。 #JoinableQueue隊(duì)列實(shí)現(xiàn)消費(fèi)之生產(chǎn)者模型 import time#引入一個(gè)時(shí)間模塊 import random#引入一個(gè)隨機(jī)數(shù)模塊 from multiprocessing import Process,JoinableQueue#引入進(jìn)程模塊和隊(duì)列模塊 def consumer(q,name):#定義一個(gè)消費(fèi)者函數(shù)while True:#循環(huán)為真food = q.get()#從隊(duì)列中拿出食物print('\033[31m%s消費(fèi)了%s\033[0m' % (name,food))#打印內(nèi)容time.sleep(random.randint(1,3))#隨機(jī)睡1~3秒q.task_done() # count - 1#def producer(name,food,q):#生產(chǎn)者for i in range(4):#循環(huán)4次time.sleep(random.randint(1,3))#隨機(jī)睡1~3秒f = '%s生產(chǎn)了%s%s'%(name,food,i)#誰(shuí)生產(chǎn)了食物print(f)#打印這個(gè)內(nèi)容q.put(f)#放入到隊(duì)列里q.join() # 阻塞 直到一個(gè)隊(duì)列中的所有數(shù)據(jù) 全部被處理完畢if __name__ == '__main__':#如果文件名為當(dāng)前名稱q = JoinableQueue(20)#實(shí)例化一個(gè)隊(duì)列對(duì)象p1 = Process(target=producer,args=('Egon','包子',q))#創(chuàng)建一個(gè)生產(chǎn)者進(jìn)程p2 = Process(target=producer, args=('wusir','泔水', q))#創(chuàng)建一個(gè)生產(chǎn)著進(jìn)程c1 = Process(target=consumer, args=(q,'alex'))#創(chuàng)建一個(gè)消費(fèi)者c2 = Process(target=consumer, args=(q,'jinboss'))#創(chuàng)建一個(gè)消費(fèi)者進(jìn)程p1.start()#開啟一個(gè)生產(chǎn)者進(jìn)程p2.start()#開啟一個(gè)生產(chǎn)者進(jìn)程c1.daemon = True # 設(shè)置為守護(hù)進(jìn)程 主進(jìn)程中的代碼執(zhí)行完畢之后,子進(jìn)程自動(dòng)結(jié)束c2.daemon = True #設(shè)置守護(hù)進(jìn)程c1.start() #開啟一個(gè)消費(fèi)者進(jìn)程c2.start() #開啟一個(gè)消費(fèi)者進(jìn)程p1.join() #感知一個(gè)生產(chǎn)者進(jìn)程結(jié)束p2.join() # 感知一個(gè)進(jìn)程的結(jié)束# 在消費(fèi)者這一端:# 每次獲取一個(gè)數(shù)據(jù)# 處理一個(gè)數(shù)據(jù)# 發(fā)送一個(gè)記號(hào) : 標(biāo)志一個(gè)數(shù)據(jù)被處理成功# 在生產(chǎn)者這一端:# 每一次生產(chǎn)一個(gè)數(shù)據(jù),# 且每一次生產(chǎn)的數(shù)據(jù)都放在隊(duì)列中# 在隊(duì)列中刻上一個(gè)記號(hào)# 當(dāng)生產(chǎn)者全部生產(chǎn)完畢之后,# join信號(hào) : 已經(jīng)停止生產(chǎn)數(shù)據(jù)了# 且要等待之前被刻上的記號(hào)都被消費(fèi)完# 當(dāng)數(shù)據(jù)都被處理完時(shí),join阻塞結(jié)束# consumer 中把所有的任務(wù)消耗完 # producer 端 的 join感知到,停止阻塞 # 所有的producer進(jìn)程結(jié)束 # 主進(jìn)程中的p.join結(jié)束 # 主進(jìn)程中代碼結(jié)束 # 守護(hù)進(jìn)程(消費(fèi)者的進(jìn)程)結(jié)束
?
管道(了解)
#創(chuàng)建管道的類: Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道 #參數(shù)介紹: dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。 #主要方法: conn1.recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。conn1.send(obj):通過連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象#其他方法: conn1.close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法 conn1.fileno():返回連接使用的整數(shù)文件描述符 conn1.poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無(wú)限期地等待數(shù)據(jù)到達(dá)。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。 from multiprocessing import Process, Pipe#引入進(jìn)程模塊和管道模塊def f(conn):#定義一個(gè)函數(shù)conn.send("Hello The_Third_Wave")#發(fā)送一條信息conn.close()#關(guān)閉這個(gè)進(jìn)程if __name__ == '__main__':#如果名字等于當(dāng)前名稱parent_conn, child_conn = Pipe()#接收兩個(gè)參數(shù)p = Process(target=f, args=(child_conn,))#創(chuàng)建一個(gè)進(jìn)程p.start()#啟動(dòng)進(jìn)程print(parent_conn.recv())#接收一個(gè)信息p.join()#等待進(jìn)程結(jié)束?
應(yīng)該特別注意管道端點(diǎn)的正確管理問題。如果是生產(chǎn)者或消費(fèi)者中都沒有使用管道的某個(gè)端點(diǎn),就應(yīng)將它關(guān)閉。這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端,在消費(fèi)者中關(guān)閉管道的輸入端。如果忘記執(zhí)行這些步驟,程序可能在消費(fèi)者中的recv()操作上掛起。管道是由操作系統(tǒng)進(jìn)行引用計(jì)數(shù)的,必須在所有進(jìn)程中關(guān)閉管道后才能生成EOFError異常。因此,在生產(chǎn)者中關(guān)閉管道不會(huì)有任何效果,除非消費(fèi)者也關(guān)閉了相同的管道端點(diǎn)。?
#引發(fā)EOFError from multiprocessing import Process, Pipe引入進(jìn)程模塊和管道模塊def f(parent_conn,child_conn):#定義一個(gè)函數(shù)傳入兩個(gè)參數(shù)#parent_conn.close() #不寫close將不會(huì)引發(fā)EOFErrorwhile True:#循環(huán)為真try:#異常處理print(child_conn.recv())#打印接收的值except EOFError:#萬(wàn)能異常child_conn.close()#關(guān)閉連接if __name__ == '__main__':#如果用戶名是當(dāng)前用戶名parent_conn, child_conn = Pipe()#接受兩個(gè)參數(shù)p = Process(target=f, args=(parent_conn,child_conn,))#實(shí)例化一個(gè)進(jìn)程p.start()#而開始進(jìn)程child_conn.close()#關(guān)閉客戶端連接parent_conn.send('hello')#發(fā)送信息parent_conn.close()#冠詞這個(gè)信息p.join()#等待進(jìn)程結(jié)束 #pipe實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者 from multiprocessing import Process,Pipe#引入兩個(gè)模塊def consumer(p,name):#定義一個(gè)消費(fèi)者produce, consume=p#接收兩個(gè)參數(shù)produce.close()#關(guān)閉生產(chǎn)者while True:#循環(huán)為真try:#異常處理baozi=consume.recv()#接收信息print('%s 收到包子:%s' %(name,baozi))#打印內(nèi)容except EOFError:#異常處理break#打斷def producer(seq,p):#定義一個(gè)生產(chǎn)者produce, consume=p#接受兩個(gè)參數(shù)consume.close()#關(guān)閉生產(chǎn)者for i in seq:#循環(huán)打印produce.send(i)#發(fā)送if __name__ == '__main__':#如果用戶名等于當(dāng)前用戶名produce,consume=Pipe()#接受兩個(gè)信息 c1=Process(target=consumer,args=((produce,consume),'c1'))#創(chuàng)建一個(gè)進(jìn)程c1.start()#開始這個(gè)進(jìn)程 seq=(i for i in range(10))#循環(huán)是個(gè)數(shù)producer(seq,(produce,consume))#運(yùn)行生產(chǎn)者函數(shù) produce.close()#關(guān)閉生產(chǎn)者consume.close()#關(guān)閉消費(fèi)者 c1.join()#等待進(jìn)程結(jié)束print('主進(jìn)程')?
#多個(gè)消費(fèi)之之間的競(jìng)爭(zhēng)問題帶來的數(shù)據(jù)不安全問題 from multiprocessing import Process,Pipe,Lock#引入進(jìn)程模塊,管道,鎖def consumer(p,name,lock):#定義一個(gè)消費(fèi)者produce, consume=p#傳入兩個(gè)參數(shù)produce.close()#關(guān)閉生產(chǎn)者while True:#循環(huán)為真lock.acquire()#拿鑰匙baozi=consume.recv()#接收信息lock.release()#還鑰匙if baozi:#如果有信息print('%s 收到包子:%s' %(name,baozi))#打印接收到包子else:#否則consume.close()#關(guān)閉消費(fèi)者break#打斷def producer(p,n):#定義一個(gè)生產(chǎn)者produce, consume=p#接收兩個(gè)參數(shù)consume.close()#關(guān)閉消費(fèi)者for i in range(n):#循環(huán)produce.send(i)#發(fā)送iproduce.send(None)#生產(chǎn)者發(fā)送一個(gè)noneproduce.send(None)#生產(chǎn)者發(fā)送一個(gè)noneproduce.close()#關(guān)閉生產(chǎn)者if __name__ == '__main__':#如果名字等于當(dāng)前用戶名produce,consume=Pipe()#接收兩個(gè)參數(shù)lock = Lock()#實(shí)例化一個(gè)鎖c1=Process(target=consumer,args=((produce,consume),'c1',lock))#創(chuàng)建一個(gè)消費(fèi)者進(jìn)程c2=Process(target=consumer,args=((produce,consume),'c2',lock))#創(chuàng)建一個(gè)消費(fèi)者進(jìn)程p1=Process(target=producer,args=((produce,consume),10))#創(chuàng)建一個(gè)生產(chǎn)者進(jìn)程c1.start()#開啟進(jìn)程c2.start()#開啟進(jìn)程p1.start()#開啟進(jìn)程 produce.close()#關(guān)閉生產(chǎn)者consume.close()#關(guān)閉消費(fèi)者 c1.join()#等待進(jìn)程接收c2.join()#等待進(jìn)程接收p1.join()#等待進(jìn)程接收print('主進(jìn)程') #多個(gè)消費(fèi)之之間的競(jìng)爭(zhēng)問題帶來的數(shù)據(jù)不安全問題 from multiprocessing import Process,Pipe,Lock#引入進(jìn)程模塊,管道,鎖def consumer(p,name,lock):#定義一個(gè)消費(fèi)者produce, consume=p#傳入兩個(gè)參數(shù)produce.close()#關(guān)閉生產(chǎn)者while True:#循環(huán)為真lock.acquire()#拿鑰匙baozi=consume.recv()#接收信息lock.release()#還鑰匙if baozi:#如果有信息print('%s 收到包子:%s' %(name,baozi))#打印接收到包子else:#否則consume.close()#關(guān)閉消費(fèi)者break#打斷def producer(p,n):#定義一個(gè)生產(chǎn)者produce, consume=p#接收兩個(gè)參數(shù)consume.close()#關(guān)閉消費(fèi)者for i in range(n):#循環(huán)produce.send(i)#發(fā)送iproduce.send(None)#生產(chǎn)者發(fā)送一個(gè)noneproduce.send(None)#生產(chǎn)者發(fā)送一個(gè)noneproduce.close()#關(guān)閉生產(chǎn)者if __name__ == '__main__':#如果名字等于當(dāng)前用戶名produce,consume=Pipe()#接收兩個(gè)參數(shù)lock = Lock()#實(shí)例化一個(gè)鎖c1=Process(target=consumer,args=((produce,consume),'c1',lock))#創(chuàng)建一個(gè)消費(fèi)者進(jìn)程c2=Process(target=consumer,args=((produce,consume),'c2',lock))#創(chuàng)建一個(gè)消費(fèi)者進(jìn)程p1=Process(target=producer,args=((produce,consume),10))#創(chuàng)建一個(gè)生產(chǎn)者進(jìn)程c1.start()#開啟進(jìn)程c2.start()#開啟進(jìn)程p1.start()#開啟進(jìn)程 produce.close()#關(guān)閉生產(chǎn)者consume.close()#關(guān)閉消費(fèi)者 c1.join()#等待進(jìn)程接收c2.join()#等待進(jìn)程接收p1.join()#等待進(jìn)程接收print('主進(jìn)程')?
進(jìn)程之間的數(shù)據(jù)共享
展望未來,基于消息傳遞的并發(fā)編程是大勢(shì)所趨
即便是使用線程,推薦做法也是將程序設(shè)計(jì)為大量獨(dú)立的線程集合,通過消息隊(duì)列交換數(shù)據(jù)。
這樣極大地減少了對(duì)使用鎖定和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。
但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題。
以后我們會(huì)嘗試使用數(shù)據(jù)庫(kù)來解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問題。
#Manger模塊介紹 #進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊(duì)列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的 #雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此#A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.#A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. from multiprocessing import Manager,Process,Lock#引入進(jìn)程模塊,鎖模塊 def work(d,lock):#定義一個(gè)工作方法with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會(huì)出現(xiàn)數(shù)據(jù)錯(cuò)亂d['count']-=1if __name__ == '__main__':#如果用戶名等于當(dāng)前用戶名lock=Lock()#實(shí)例化一個(gè)鎖 with Manager() as m:dic=m.dict({'count':100})#傳入一個(gè)字典p_l=[]#創(chuàng)建一個(gè)空列表for i in range(100):#循環(huán)100個(gè)數(shù)p=Process(target=work,args=(dic,lock))##創(chuàng)建一個(gè)進(jìn)程p_l.append(p)#添加到列表里p.start()#開始進(jìn)程for p in p_l:#循環(huán)列表p.join()#等待進(jìn)程結(jié)束print(dic)#打印這個(gè)字典?
進(jìn)程池和multiprocess.Pool模塊
進(jìn)程池
為什么要有進(jìn)程池?進(jìn)程池的概念。
在程序?qū)嶋H處理問題過程中,忙時(shí)會(huì)有成千上萬(wàn)的任務(wù)需要被執(zhí)行,閑時(shí)可能只有零星任務(wù)。那么在成千上萬(wàn)個(gè)任務(wù)需要被執(zhí)行的時(shí)候,我們就需要去創(chuàng)建成千上萬(wàn)個(gè)進(jìn)程么?首先,創(chuàng)建進(jìn)程需要消耗時(shí)間,銷毀進(jìn)程也需要消耗時(shí)間。第二即便開啟了成千上萬(wàn)的進(jìn)程,操作系統(tǒng)也不能讓他們同時(shí)執(zhí)行,這樣反而會(huì)影響程序的效率。因此我們不能無(wú)限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程。那么我們要怎么做呢?
在這里,要給大家介紹一個(gè)進(jìn)程池的概念,定義一個(gè)池子,在里面放上固定數(shù)量的進(jìn)程,有需求來了,就拿一個(gè)池中的進(jìn)程來處理任務(wù),等到處理完畢,進(jìn)程并不關(guān)閉,而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠,任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。也就是說,池中進(jìn)程的數(shù)量是固定的,那么同一時(shí)間最多有固定數(shù)量的進(jìn)程在運(yùn)行。這樣不會(huì)增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進(jìn)程的時(shí)間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。
multiprocess.Pool模塊
概念介紹
#Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池 #1 numprocess:要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值 #2 initializer:是每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None #3 initargs:是要傳給initializer的參數(shù)組?
#1 p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 #2 '''需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()''' #3 #4 p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 #5 '''此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。''' #6 #7 p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成 #8 #9 P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用?
#1 方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法 #2 obj.get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)一場(chǎng)。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。 #3 obj.ready():如果調(diào)用完成,返回True #4 obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 #5 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/span> #6 obj.terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)?
代碼實(shí)例
進(jìn)程池和多進(jìn)程效率對(duì)比
同步和異步
#進(jìn)程池的同步調(diào)用 import os,time#引入系統(tǒng)模塊和時(shí)間模塊 from multiprocessing import Pool#引入進(jìn)程池模塊def work(n):#定義一個(gè)函數(shù)print('%s run' %os.getpid())#打印idtime.sleep(3)#睡3秒return n**2#返回一個(gè)n平方if __name__ == '__main__':#如果文件名等于當(dāng)前文件名p=Pool(3) #進(jìn)程池中從無(wú)到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)res_l=[]#創(chuàng)建一個(gè)列表for i in range(10):#循環(huán)十個(gè)數(shù)res=p.apply(work,args=(i,)) # 同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到res,等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務(wù)是否存在阻塞,同步調(diào)用都會(huì)在原地等著print(res_l)#打印列表?
import os#引入系統(tǒng)模塊 import time#引入時(shí)間模塊 import random#引入隨機(jī)數(shù)模塊 from multiprocessing import Pool#引入進(jìn)程池模塊def work(n):#定義一個(gè)函數(shù)print('%s run' %os.getpid())#打印內(nèi)容time.sleep(random.random())#隨機(jī)睡一會(huì)return n**2#返回n*nif __name__ == '__main__':#如果文件名等于當(dāng)前用戶名p=Pool(3) #進(jìn)程池中從無(wú)到有創(chuàng)建三個(gè)進(jìn)程,以后一直是這三個(gè)進(jìn)程在執(zhí)行任務(wù)res_l=[]#得到一個(gè)空列表for i in range(10):#循環(huán)十個(gè)數(shù)res=p.apply_async(work,args=(i,)) # 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個(gè)子進(jìn)程在異步執(zhí)行# 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù)# 需要注意的是,進(jìn)程池中的三個(gè)進(jìn)程不會(huì)同時(shí)開啟或者同時(shí)結(jié)束# 而是執(zhí)行完一個(gè)就釋放一個(gè)進(jìn)程,這個(gè)進(jìn)程就去接收新的任務(wù)。 res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果# 否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了 p.close()p.join()for res in res_l:print(res.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無(wú)需get?
練習(xí)
?server:進(jìn)程池版socket并發(fā)聊天 ?client發(fā)現(xiàn):并發(fā)開啟多個(gè)客戶端,服務(wù)端同一時(shí)間只有4個(gè)不同的pid,只能結(jié)束一個(gè)客戶端,另外一個(gè)客戶端才會(huì)進(jìn)來.
進(jìn)程池的其他實(shí)現(xiàn)方式:https://docs.python.org/dev/library/concurrent.futures.html
參考資料http://www.cnblogs.com/linhaifeng/articles/6817679.html
https://www.jianshu.com/p/1200fd49b583
https://www.jianshu.com/p/aed6067eeac9
?
轉(zhuǎn)載于:https://www.cnblogs.com/chongdongxiaoyu/p/8658379.html
總結(jié)
以上是生活随笔為你收集整理的进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event) day38的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Mybatis_接口编程
- 下一篇: 吴恩达机器学习笔记(二) —— Logi