日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python并发编程之多进程(二)

發布時間:2024/9/30 python 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python并发编程之多进程(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

十、進程同步

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

--------------------------------------------------------------------注:如果你對python感興趣,我這有個學習Python基地,里面有很多學習資料,感興趣的+Q群:895817687--------------------------------------------------------------------# 模擬搶票,購票行為由并行變成了串行,犧牲了效率,提高了數據安全性 from multiprocessing import Process, Lock import json import time import osdef search_ticket():with open("file/ticket", mode="r", encoding="utf-8") as f:ticket_num = int(f.read())print("剩余票數:{0}".format(ticket_num))def get_ticket():with open("file/ticket", mode="r", encoding="utf-8") as f:ticket = int(f.read())time.sleep(0.1) # 模擬搶票延時if ticket:ticket -= 1print("{0}搶到了一張票,還剩{1}張票".format(os.getpid(), ticket))else:print("{0}沒有搶到票".format(os.getpid()))f = open("file/ticket", mode="w", encoding="utf-8")f.write(str(ticket))def task(lock):search_ticket()lock.acquire()get_ticket()lock.release()if __name__ == '__main__':lock = Lock()for i in range(100):p = Process(target=task, args=(lock,))p.start()

總結:加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

1:雖然可以用文件共享數據實現進程間通信,但問題是:

2:效率低(共享數據基于文件,而文件是硬盤上的數據)
需要自己加鎖處理

十一、隊列(推薦使用)

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

Queue(底層就是以管道和鎖的方式實現)

方法介紹

maxsize是隊列中允許最大項數,省略則無大小限制。 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。q.get方法可以從隊列讀取并且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.q.get_nowait():同q.get(False) q.put_nowait():同q.put(False)q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣q.cancel_join_thread():不會在進程退出時自動連接后臺線程。可以防止join_thread()方法阻塞q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,后臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。q.join_thread():連接隊列的后臺線程。此方法用于在調用q.close()方法之后,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為 from multiprocessing import Queueq = Queue(maxsize=3)q.put(1) q.put({"name":"dogfa"}) q.put([1,2,3])print(q.full()) # Trueprint(q.get()) # 1 print(q.get()) # {'name': 'dogfa'} print(q.get()) # [1, 2, 3]print(q.empty()) # True

2:生產者消費者模型

在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

3:為什么使用生產者消費者模式

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

4:什么是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

5:JoinableQueue()

#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。#參數介紹:maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹:JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

6:生產者消費者模型的實現

from multiprocessing import JoinableQueue, Process import os import random import timedef customer(q):while 1:time.sleep(0.5)print("{0}號顧客吃了{1}".format(os.getpid(), q.get()))q.task_done()def producter(food, q):for i in range(10):time.sleep(random.randint(1, 2))q.put(food)print("{0}號廚師完成了{1}的制作".format(os.getpid(), food))q.join()if __name__ == '__main__':q = JoinableQueue()pro1 = Process(target=producter, args=("包子", q))pro2 = Process(target=producter, args=("油條", q))pro3 = Process(target=producter, args=("花卷", q))cus1 = Process(target=customer, args=(q,))cus2 = Process(target=customer, args=(q,))cus1.daemon = Truecus2.daemon = Truelst = [pro1, pro2, pro3, cus1, cus2][i.start() for i in lst]pro1.join()pro2.join()pro3.join()print("ending...")# 主進程等待pro1,Pro2,pro3執行完成,當pro執行完成意味著cus必定執行完成,所以可以將cus設置成守護進程

7:生產者消費者模式總結

#程序中有兩類角色一類負責生產數據(生產者)一類負責處理數據(消費者)#引入生產者消費者模型為了解決的問題是:平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度#如何實現:生產者<-->隊列<——>消費者#生產者消費者模型實現類程序的解耦和

十二、管道(不推薦使用)

1:管道

#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送。 #主要方法:conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那么recv方法會拋出EOFError。conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象#其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,并且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然后調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大于可用的緩沖區空間,將引發BufferTooShort異常。

2:利用管道實現進程間的通信

from multiprocessing import Process,Pipeimport time,os def consumer(p,name):left,right=pleft.close()while True:try:baozi=right.recv()print('%s 收到包子:%s' %(name,baozi))except EOFError:right.close()break def producer(seq,p):left,right=pright.close()for i in seq:left.send(i)# time.sleep(1)else:left.close() if __name__ == '__main__':left,right=Pipe()c1=Process(target=consumer,args=((left,right),'c1'))c1.start()seq=(i for i in range(10))producer(seq,(left,right))right.close()left.close()c1.join()print('主進程')

十三、數據共享

進程間數據是獨立的,可以借助于隊列或管道實現通信,二者都是基于消息傳遞的,雖然進程間數據獨立,但可以通過Manager實現數據共享

from multiprocessing import Process, Manager, Lock import os import random import timedef func(dic, lock):lock.acquire() # 不加鎖肯定會造成數據混亂time.sleep(random.randrange(2))dic["count"] -= 1lock.release()if __name__ == '__main__':lock = Lock()with Manager() as m:dic = m.dict({"count": 100})lst = []for i in range(100):p = Process(target=func, args=(dic, lock))lst.append(p)p.start()[i.join() for i in lst]print(dic["count"])

進程間通信應該盡量避免使用上述共享數據的方式

十三、進程池

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。多進程是實現并發的手段之一,需要注意的問題是:

  • 很明顯需要并發執行的任務通常要遠大于核數
  • 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
  • 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到并行)
  • 如果當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。我們就可以通過維護一個進程池來控制進程數目。

    ps:對于遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

    創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程

    1:創建進程池

    ool([numprocess [,initializer [, initargs]]]):創建進程池

    2:參數介紹

    numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值
    initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
    initargs:是要傳給initializer的參數組

    3:方法介紹

    p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:obj.get():返回結果,如果有必要則等待結果到達。 obj.ready():如果調用完成,返回True obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

    4:進程池的使用
    4.1:使用進程池(異步調用,apply_async)

    from multiprocessing import Process, Pool import os import time import socketdef func(i):print(i)time.sleep(1)return i ** 2if __name__ == '__main__':pool = Pool(os.cpu_count() + 1)ret_lst = []for i in range(100):# 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去ret = pool.apply_async(func, args=(i,))ret_lst.append(ret)# 沒有后面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了print("=======================")# 關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成pool.close()# 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束pool.join()# 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join后執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果print(ret_lst)# 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get[print(i.get()) for i in ret_lst]

    4.2:使用進程池(同步調用,apply)

    from multiprocessing import Process, Pool import os import time import socketdef func(i):print(i)time.sleep(1)return i ** 2if __name__ == '__main__':# 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去pool = Pool(os.cpu_count() + 1)# 同步執行,即執行完一個拿到結果,再去執行另外一個ret_lst = []for i in range(100):ret = pool.apply(func, args=(i,))ret_lst.append(ret)# 看到的就是最終的結果組成的列表,apply是同步的,所以直接得到結果,沒有get()方法print(ret_lst)

    4.3:進程池實現基于TCP協議的socket并發效果

    # 服務端 from multiprocessing import Process, Pool import os import time import socketdef func(conn, client_addr):print("進程:{0}".format(os.getpid()))while 1:try:c_msg = conn.recv(1024).decode("utf-8")if not c_msg: breakprint(c_msg)conn.send(c_msg.upper().encode("utf-8"))except Exception:breakif __name__ == '__main__':sk = socket.socket()sk.bind(("127.0.0.1", 8080))sk.listen(5)pool = Pool(os.cpu_count() + 1)while 1:conn, addr = sk.accept()pool.apply_async(func, args=(conn, addr))# 服務端 import socketsk = socket.socket()sk.connect(("127.0.0.1", 8080))while 1:c_msg = input(">>")if not c_msg: continuesk.send(c_msg.encode("utf-8"))s_msg = sk.recv(1024).decode("utf-8")print(s_msg)

    當連接數達到開啟的進程池中的最大進程數量時,再有其它客戶端進行連接,將會阻塞等待,當另外的客戶端結束連接時才會建立起會話連接。

    4.5:回調函數(callback())

    需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

    我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

    from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def pasrse_page(res):print('<進程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=Pool(3)res_l=[]for url in urls:res=p.apply_async(get_page,args=(url,),callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''

    如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數

    十四、信號量

    互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去,如果指定信號量為3,那么來一個人獲得一把鎖,計數加1,當計數等于3時,后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖

    信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念

    from multiprocessing import Process,Semaphore import time,randomdef go_wc(sem,user):sem.acquire()print('%s 占到一個茅坑' %user)time.sleep(random.randint(0,3)) #模擬每個人拉屎速度不一樣,0代表有的人蹲下就起來了sem.release()if __name__ == '__main__':sem=Semaphore(5)p_l=[]for i in range(13):p=Process(target=go_wc,args=(sem,'user%s' %i,))p.start()p_l.append(p)for i in p_l:i.join()print('============》')

    十五、事件

    Python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

    clear:將“Flag”設置為False
    set:將“Flag”設置為True

    from multiprocessing import Process,Event import time,randomdef car(e,n):while True:if not e.is_set(): #Flaseprint('\033[31m紅燈亮\033[0m,car%s等著' %n)e.wait()print('\033[32m車%s 看見綠燈亮了\033[0m' %n)time.sleep(random.randint(3,6))if not e.is_set():continueprint('走你,car', n)breakdef police_car(e,n):while True:if not e.is_set():print('\033[31m紅燈亮\033[0m,car%s等著' % n)e.wait(1)print('燈的是%s,警車走了,car %s' %(e.is_set(),n))breakdef traffic_lights(e,inverval):while True:time.sleep(inverval)if e.is_set():e.clear() #e.is_set() ---->Falseelse:e.set()if __name__ == '__main__':e=Event()# for i in range(10):# p=Process(target=car,args=(e,i,))# p.start()for i in range(5):p = Process(target=police_car, args=(e, i,))p.start()t=Process(target=traffic_lights,args=(e,10))t.start()print('============》')

    總結

    以上是生活随笔為你收集整理的Python并发编程之多进程(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。