日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

网络编程7_ multiprocessing类-管道.数据共享, 信号量,事件,进程池

發(fā)布時間:2024/4/14 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 网络编程7_ multiprocessing类-管道.数据共享, 信号量,事件,进程池 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?一. multiprocessing類
????6.?管道
????進程間通信(ipc)方式二: 管道會導致數(shù)據(jù)不安全的情況, 后面我們會說到為什么會帶來數(shù)據(jù)不安全的問題
????創(chuàng)建管道的類:
????Pipe([duplex]): 在進程之間創(chuàng)建一條管道, 并返回元祖(conn1, conn2), 其中conn1和conn2表示管道兩端的連接對象, 強調(diào)一點: 必須在產(chǎn)生Process對象之前產(chǎn)生管道
????參數(shù)介紹:
????dumplex: 默認管道是雙全工的, 如果將duplex改成False, conn1只能用于接收, conn2只能用于發(fā)送
????主要方法:
????conn1.recv(): 接收conn2.send(obj)發(fā)送的對象, 如果沒有消息可接收, recv方法會一直阻塞, 如果連接的另外一端已關(guān)閉, 那么recv方法會拋出EOFError
????conn1.send(): 通過連接發(fā)送對象, obj是序列化兼容的任意對象
????其他方法:?
????conn1.close(): 關(guān)閉連接, 如果conn1被垃圾回收, 將自動調(diào)用此方法
????conn1.fileno(): 返回連接使用的整數(shù)文件描述符
????conn1.poll([timeout]): 如果連接上的數(shù)據(jù)可用, 返回True, timeout指定等待的最長時限, 如果省略此參數(shù), 方法將立即返回結(jié)果, 如果將timeout改成None, 操作將無限期地等待數(shù)據(jù)到達?
????conn1.recv_bytes([maxlength]): 接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息, maxlength指定要接收的最大字節(jié)數(shù), 如果進入的消息超過了這個最大值, 將引發(fā)EOFError異常
????conn1.send_bytes(buffer[, offset[, size]]): 通過連接發(fā)送字節(jié)數(shù)據(jù)緩存區(qū), buffer是支持緩沖區(qū)接口的任意對象, offset是緩沖區(qū)中的字節(jié)偏移量, 而size是要發(fā)送的字節(jié)數(shù), 結(jié)果數(shù)據(jù)以單條消息發(fā)送, 然后調(diào)用c.recv_bytes()函數(shù)接收
????conn1.recv_bytes_info(buffer[, offset]): 接收一條完整的字節(jié)消息, 并把它保存在buffer對象中, 該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象). offset指定緩沖區(qū)中放置消息處的字節(jié)位移, 返回值是收到的字節(jié)數(shù), 如果消息長度大于可用的緩沖區(qū)大小, 將引發(fā)BufferTooShort異常
????應該特別注意管道端點的正確管理問題, 如果是生產(chǎn)者或消費者中都沒有使用管道的某個端點, 就應該將他關(guān)閉, 這也說明了為何在生產(chǎn)者中關(guān)閉了管道的輸出端, 在消費者中關(guān)閉了管道的輸入端. 如果忘記這些步驟, 程序可能在消費者中的recv()操作上掛起(就是阻塞). 管道是有操作系統(tǒng)進行引用計數(shù)的, 如果在所有進程中關(guān)閉管道的相同一端就會生成EOFError異常, 因此, 在生產(chǎn)者中關(guān)閉管道不會有任何效果, 除非消費者也關(guān)閉了相同的管道端點
????from multiprocessing import Process, Pipe
????def f(parent_conn, child_conn):
????# parent_conn.close()????# 不寫close將不會引發(fā)EOFError
????while 1:
????????try:
????????????print(child_conn.recv())
????????except EOFError:
????????????child_conn.close()
????????????break
????if __name__ == "__main__":
????????parent_conn, child_conn = Pipe()
????????p = Process(target=f, args=(parent_conn, child_conn,))
????????p.start()
????????child_conn.close()
????????parent_conn.send("hello")
????????parent_conn.close()
????????p.join()
????主進程將管道兩端都傳遞給了子進程, 子進程和主進程共用管道的兩種報錯情況, 都是在recv接收的時候報錯的: (1). 主進程和子進程中的管道的相同一端都關(guān)閉了, 出現(xiàn)EOFError (2). 如果你管道的一端在主進程和子進程中都關(guān)閉了, 但是你還用這個關(guān)閉的一端去接收消息, 那么就會出現(xiàn)OSError.
????所以關(guān)閉管道的時候就容易出現(xiàn)問題, 需要將所有只用這個管道的進程中的兩端全部關(guān)閉才行, 當然也可以通過捕獲異常(try: except EOFError:)來處理
????雖然我們在主進程和子進程中都打印了一下conn一端的對象, 發(fā)現(xiàn)兩個不在同一個地址, 但是子進程中的管道 和主進程中的管道還是可以通信的, 因為管道是同一套, 系統(tǒng)能夠記錄
????我們的目標是關(guān)閉所有的管道, 那么主進程和子進程進行通信的時候, 可以給子進程傳管道的一端就夠了, 并且用我們之前學到的, 信息發(fā)送完之后, 再發(fā)送衣蛾結(jié)束信號None, 那么你收到的消息為None的時候直接結(jié)束接收或者說結(jié)束循環(huán), 就不用每次都關(guān)閉各個進程中的管道了
????from multiprocessing import Process, Pipe
????def consumer(p, name):
????????produce, consume = p
????????produce.close()
????????while 1:
????????????try:
????????????????baozi = consume.recv()
????????????????print("%s 收到包子: %s" % (name, baozi))
????????????except EOFError:
????????????????break

????def producer(seq, p):
????????produce, consume = p
????????consume.close()
????????for i in seq:
????????????produce.send(i)

????if __name__ == "__main__":
????????produce, consume = Pipe()
????????c1 = Process(target=consumer, args=((produce, consume), "c1"))
????????c1.start()

????????seq = (i for i in range(10))
????????producer(seq, (produce, consume))

????????produce.close()
????????consume.close()

????????c1.join()
????????print("主進程結(jié)束")
????# c1 收到包子: 0
????# c1 收到包子: 1
????# c1 收到包子: 2
????# c1 收到包子: 3
????# c1 收到包子: 4
????# c1 收到包子: 5
????# c1 收到包子: 6
????# c1 收到包子: 7
????# c1 收到包子: 8
????# c1 收到包子: 9
????# 主進程結(jié)束
????由于Pipe方法返回的兩個連接對象表示管道的兩端, 每個連接對象都有send和recv方法, 注意, 如果兩個進程試圖同時從管道的同一端讀取或?qū)懭霐?shù)據(jù), 那么管道中的數(shù)據(jù)可能會損壞,當然, 在使用管道的不同端部不存在損壞風險
????from multiprocessing import Process, Pipe, Lock

????def consumer(p, name, lock):
????????priduce, consume = p
????????produce.close()
????????while 1:
????????????lock.acquire()
????????????baozi = consume.recv()
????????????lock.release()
????????????if baozi:
????????????????print("%s 收到包子: %s" % (name, baozi))
????????????else:
????????????????consume.close()
????????????????break
????????
????def producer(p, n):
????????produce, consume = p
????????consume.close()
????????for i in range(n):
????????????produce.send(i)
????????produce.send(None)
????????produce.send(None)
????????produce.close()
????
????if __name__ == "__main__":
????????produce, consume = Pipe()
????????lock = Lock()
????????c1 = Process(target=consumer, args=((produce, consume), "c1", lock))
????????c2 = Process(target=consumer, args=((produce, consume), "c2", lock))
????????p1 = Process(target=producer, args=((produce, consume), 10))
????????c1.start()
????????c2.start()
????????p1.start()

????????produce.close()
????????consume.close()

????????c1.join()
????????c2.join()
????????p1.join()
????????print("主程序結(jié)束")
????管道通常用于雙工通信, 通常利用在客戶端/服務端中使用的請求/響應模型, 或者遠程過程調(diào)用, 就可以使用管道編寫與進程交互的程序, 像前面講網(wǎng)絡通信的時候, 我們使用了一個叫subprocess的模塊, 里面有個參數(shù)是pipe通道, 執(zhí)行系統(tǒng)命令, 并通過管道獲取結(jié)果
????7.?數(shù)據(jù)共享
????基于消息傳遞的并發(fā)編程是大勢所趨
????即使是使用線程, 推薦做法也是講程序設計成為大量獨立的線程集合
????通過消息隊列交換數(shù)據(jù), 這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統(tǒng)在哪個
????進程之間應盡量避免通信, 以后我們會嘗試使用數(shù)據(jù)庫來解決進程之間的數(shù)據(jù)共享問題
????進程之間數(shù)據(jù)共享的模塊之一Manager模塊:
????進程間數(shù)據(jù)是獨立的,可以借助于隊列或管道實現(xiàn)通信,二者都是基于消息傳遞的
????雖然進程間數(shù)據(jù)獨立,但可以通過Manager實現(xiàn)數(shù)據(jù)共享,事實上Manager的功能遠不止于此
????多進程共同去處理共享數(shù)據(jù)的時候,就和我們多進程同時去操作一個文件中的數(shù)據(jù)是一樣的,不加鎖就會出現(xiàn)錯誤的結(jié)果,進程不安全的,所以也需要加鎖
????from multiprocessing import Manager, Process, Lock
????def work(d, lock):
????????with lock:??????# 不加鎖而操作共享的數(shù)據(jù), 肯定會出現(xiàn)數(shù)據(jù)錯亂
????????????d["count"] -= 1
????if __name__ == '__main__':
????????lock = Lock()
????????with Manager() as m:
????????????dic = m.dict({"count": 100})
????????????p_1 = []
????????????for i in range(50):
????????????????p = Process(target=work, args=(dic, lock))
????????????????p_1.append(p)
????????????????p.start()
????????????for p in p_1:
????????????????p.join()
????????????print(dic)
????總結(jié)一下: 進程之間的通信: 隊列, 管道, 數(shù)據(jù)共享也算
????下面要講的信號量和事件也相當于鎖,也是全局的,所有進程都能拿到這些鎖的狀態(tài),進程之間這些鎖啊信號量啊事件啊等等的通信,其實底層還是socekt,只不過是基于文件的socket通信,而不是跟上面的數(shù)據(jù)共享啊空間共享啊之類的機制,我們之前學的是基于網(wǎng)絡的socket通信,還記得socket的兩個家族嗎,一個文件的一個網(wǎng)絡的,所以將來如果說這些鎖之類的報錯,可能你看到的就是類似于socket的錯誤,簡單知道一下就可以啦~~~
????工作中常用的是鎖,信號量和事件不常用,但是信號量和事件面試的時候會問到,你能知道就行啦~~~
????8.?信號量
????互斥鎖同時只允許一個線程更改數(shù)據(jù),而信號量Semaphore是同時允許一定數(shù)量的線程更改數(shù)據(jù) 。
????實現(xiàn):
????信號量同步基于內(nèi)部計數(shù)器,每調(diào)用一次acquire(),計數(shù)器減1;每調(diào)用一次release(),計數(shù)器加1.當計數(shù)器為0時,acquire()調(diào)用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現(xiàn)。信號量同步機制適用于訪問像服務器這樣的有限資源。
????信號量與進程池的概念很像,但是要區(qū)分開,信號量涉及到加鎖的概念
????from multiprocessing import Process, Semaphore
????import time, random
????def go_ktv(sem, user):
????????sem.acquire()
????????print("%s 占到一間ktv小屋"% user)
????????time.sleep(random.randint(1, 3))????# 模擬每個人在ktv中呆的時間不長
????????sem.release()
????if __name__ == "__main__":
????????sem = Semaphore(4)
????????p_l = []
????????for i in range(13):
????????????p = Process(target=go_ktv, args=(sem, "user%s"%i,))
????????????p.start()
????????????p_l.append(p)
????????for i in p_l:
????????????i.join()
????????print("主進程結(jié)束")
????9.?事件
????python線程的事件用于主線程控制其他線程的執(zhí)行, 事件主要提供了三個方法: set, wait, clear
????事件處理的機制: 全局定義了一個flag, 如果flag值為false, 那么當程序執(zhí)行event.wait方法時,
????就會阻塞, 如果flag值為true, 那么event.wait方法就不會阻塞
????clear: 將flag設置為flase
????set: 將flag設置為true
二. 進程池
????首先,創(chuàng)建進程需要消耗時間,銷毀進程(空間,變量,文件信息等等的內(nèi)容)也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統(tǒng)也不能讓他們同時執(zhí)行,維護一個很大的進程列表的同時,調(diào)度的時候,還需要進行切換并且記錄每個進程的執(zhí)行節(jié)點,也就是記錄上下文(各種變量等等亂七八糟的東西,雖然你看不到,但是操作系統(tǒng)都要做),這樣反而會影響程序的效率。因此我們不能無限制的根據(jù)任務開啟或者結(jié)束進程。
????在這里,要給大家介紹一個進程池的概念,定義一個池子,在里面放上固定數(shù)量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程并不關(guān)閉,而是將進程再放回進程池中繼續(xù)等待任務。如果有很多任務需要執(zhí)行,池中的進程數(shù)量不夠,任務就要等待之前的進程執(zhí)行任務完畢歸來,拿到空閑進程才能繼續(xù)執(zhí)行。也就是說,池中進程的數(shù)量是固定的,那么同一時間最多有固定數(shù)量的進程在運行。這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進程的時間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果
????1.?multiprocess pool 模塊
????創(chuàng)建進程池的類:如果指定numprocess為3, 則進程池會從無到有創(chuàng)建3個進程, 然后自始至終使用這三個進程去執(zhí)行所有的任務(高級一點的進程池可以根據(jù)并發(fā)量, 搞成動態(tài)增加或減少進程池中的進程數(shù)量的操作), 不會開啟其他進程, 提高操作系統(tǒng)效率, 減少空間的占用等
????Pool([numprocess [, initializer [, initargs]]]): 創(chuàng)建進程池
????參數(shù)介紹:
????numprocess: 要創(chuàng)建的進程數(shù), 如果省略, 將默認使用cpu_count()的值
????initializer: 是每個工作進程啟動時要執(zhí)行的可調(diào)用對象, 默認為None
????initargs: 是要傳給initializer的參數(shù)組
????主要方法介紹:
????p.apply(func [, args [, kwargs]]): 在一個池工作進程中執(zhí)行func(*args, **kwargs), 然后返回結(jié)果,?
????需要強調(diào)的是: 此操作并不會在所有池工作進程中并行執(zhí)行func函數(shù), 如果要通過不同參數(shù)并發(fā)的執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()
????p.apply_async(func [, args [, kwargs]]): 在一個池工作進程中執(zhí)行func(*args, **kwargs), 然后返回結(jié)果
????此方法的結(jié)果是AsyncResult類的實例, callback是可調(diào)用對象, 接收輸入?yún)?shù), 當func的結(jié)果變?yōu)榭捎脮r, 將結(jié)果傳遞給callback, callback禁止執(zhí)行任何阻塞操作, 否則將接收其他異步操作中的結(jié)果
????p.close(): 關(guān)閉進程池, 組織進一步操作, 如果所有操作持續(xù)掛起, 他們將在工作進程終止前完成
????pjoin(): 等待所有工作進程退出, 此方法只能在close()或terminate()之后調(diào)用
????方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:
????obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達。timeout是可選的。如果在指定時間內(nèi)還沒有到達,將引發(fā)異常。如果遠程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。
????obj.ready():如果調(diào)用完成,返回True
????obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常
????obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?br /> ????obj.terminate():立即終止所有工作進程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)
????2.?進程池的簡單應用及與多線程的效率對比
????import time
????from multiprocessing import Process, Pool
????def f1(n):
????????# time.sleep(2)
????????for i in range(10):
????????????n += i
????if __name__ == '__main__':
????????# 多進程
????????s = time.time()
????????p_list = []
????????for i in range(100):
????????????p = Process(target=f1, args=(i,))
????????????p_list.append(p)
????????????p.start()
????????[pp.join() for pp in p_list]
????????e = time.time()
????????print("多進程用時: %s" % (e - s))
????????# 進程池
????????s = time.time()
????????pool = Pool(4)
????????pool.map(f1, range(100))
????????e = time.time()
????????print("進程池用時: %s" % (e-s))
????# 多進程用時: 12.567207336425781
????# 進程池用時: 0.6605911254882812
????有一點, map是異步執(zhí)行的, 并且自帶close和join
????一般約定俗稱的是進程池中的進程數(shù)量為cpu的數(shù)量, 工作中要看具體情況來考量
????3.?同步和異步兩種執(zhí)行方式:
????(1). 進程的同步調(diào)用
????import os, time
????from multiprocessing import Pool
????def work(n):
????????print("%s run" % os.getpid())
????????time.sleep(1)
????????return n**2
????if __name__ == '__main__':
????????# 進程池中從無到有創(chuàng)建三個進程, 以后一直是這三個進程在執(zhí)行任務
????????s = time.time()
????????p = Pool(4)
????????res_l = []
????????for i in range(10):
????????????# 同步調(diào)用, 直到本次任務執(zhí)行完畢拿到res, 等待任務work執(zhí)行的過程中可能有阻塞也可能沒有阻塞, 但不管該任務是否存在阻塞, 同步調(diào)用都會在原地等著
????????????res = p.apply(work, args=(i,))
????????????res_l.append(res)
????????print(res_l)
????????e = time.time()
????????print("用時:", e-s)
????(2). 進程的異步調(diào)用
????import os, time, random
????from multiprocessing import Pool
????def work(n):
????????print("%s run" % os.getpid())
????????# time.sleep(random.randint(1, 3))
????????time.sleep(1)
????????return n**2
????if __name__ == '__main__':
????????s = time.time()
????????p = Pool(4)
????????res_l =[]
????????for i in range(10):
????????????res = p.apply_async(work, args=(i,))
????????????res_l.append(res)
????????# 結(jié)束進程池接收任務, 確保沒有新任務再提交過來
????????p.close()
????????# 感知進程池中的任務已經(jīng)執(zhí)行結(jié)束, 只有當沒有新任務添加進來的時候, 才能感知到任務結(jié)束了, 所以在join之前必須加上close方法
????????p.join()
????????for res in res_l:
????????????print(res.get())
????????????# 使用get來獲取apply_async的結(jié)果, 如果是apply, 則沒有g(shù)et方法, 因為apply是同步執(zhí)行, 立刻獲取到結(jié)果, 也根本無需get
????????e = time.time()
????????print("用時:", e-s)
????# 異步運行, 根據(jù)進程池中有的進程數(shù), 每次最多4個子進程在異步運行, 并且可以執(zhí)行不同的任務, 傳送任意的參數(shù)了,
????# 返回結(jié)果后, 將結(jié)果放入列表, 歸還進程, 之后再執(zhí)行新的任務
????# 需要注意的是, 進程池中的4個進程不會同時開啟或者同時結(jié)束,
????# 而是執(zhí)行完一個就釋放一個進程, 這個進程就去接收新的任務
????# 異步apply_async用法: 如果使用異步提交的任務, 主進程需要使用join, 等待進程池內(nèi)任務都處理完, 然后可以用get收集結(jié)果, 否則, 主進程結(jié)束, 進程池可能還沒來得及執(zhí)行, 也就跟著一起結(jié)束了,
????(3). 詳解apply_async和apply
????#一:使用進程池(異步調(diào)用,apply_async)
????#coding: utf-8
????from multiprocessing import Process,Pool
????import time
????def func(msg):
????????print( "msg:", msg)
????????time.sleep(1)
????????return msg
????if __name__ == "__main__":
????????pool = Pool(processes = 3)
????????res_l=[]
????????for i in range(10):
????????????msg = "hello %d" %(i)
????????????res=pool.apply_async(func, (msg, ))???#維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去
????????????res_l.append(res)
????????????# s = res.get() #如果直接用res這個結(jié)果對象調(diào)用get方法獲取結(jié)果的話,這個程序就變成了同步,因為get方法直接就在這里等著你創(chuàng)建的進程的結(jié)果,第一個進程創(chuàng)建了,并且去執(zhí)行了,那么get就會等著第一個進程的結(jié)果,沒有結(jié)果就一直等著,那么主進程的for循環(huán)是無法繼續(xù)的,所以你會發(fā)現(xiàn)變成了同步的效果
????????print("==============================>") #沒有后面的join,或get,則程序整體結(jié)束,進程池中的任務還沒來得及全部執(zhí)行完也都跟著主進程一起結(jié)束了
????????pool.close() #關(guān)閉進程池,防止進一步操作。如果所有操作持續(xù)掛起,它們將在工作進程終止前完成
????????pool.join()???#調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束
????????print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結(jié)果,但這一步是在join后執(zhí)行的,證明結(jié)果已經(jīng)計算完畢,剩下的事情就是調(diào)用每個對象下的get方法去獲取結(jié)果
????????for i in res_l:
????????????print(i.get()) #使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,因為apply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get
????#二:使用進程池(同步調(diào)用,apply)
????#coding: utf-8
????from multiprocessing import Process,Pool
????import time
????
????def func(msg):
????print( "msg:", msg)
????time.sleep(0.1)
????return msg
????if __name__ == "__main__":
????????pool = Pool(processes = 3)
????????res_l=[]
????????for i in range(10):
????????????msg = "hello %d" %(i)
????????????res=pool.apply(func, (msg, ))???#維持執(zhí)行的進程總數(shù)為processes,當一個進程執(zhí)行完畢后會添加新的進程進去
????????????res_l.append(res) #同步執(zhí)行,即執(zhí)行完一個拿到結(jié)果,再去執(zhí)行另外一個
????????print("==============================>")
????????pool.close()
????????pool.join()???#調(diào)用join之前,先調(diào)用close函數(shù),否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數(shù)等待所有子進程結(jié)束
????
????????print(res_l) #看到的就是最終的結(jié)果組成的列表
????????for i in res_l: #apply是同步的,所以直接得到結(jié)果,沒有g(shù)et()方法
????????????print(i)
????(4). 進程池版的socket并發(fā)聊天代碼示例
????(5). 回調(diào)函數(shù)
????需要回調(diào)函數(shù)的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結(jié)果了。主進程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù),這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通信來拿到返回值,進程池的這個回調(diào)也是進程通信的機制完成的。
????我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調(diào)函數(shù)(主進程負責執(zhí)行),這樣主進程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務的結(jié)果
????import os, time
????from multiprocessing import Pool
????
????def func1(n):
????????print("func1>>>", os.getpid())
????????print("func1")
????????return n*n
????
????def func2(nn):
????????print("func2>>>", os.getpid())
????????print("func2")
????????# print(nn)
????????# time.sleep(0.5)
????
????if __name__ == '__main__':
????????print("主進程:", os.getpid())
????????p = Pool(4)
????????# args里面的10給了func1, func1的返回值最為回調(diào)函數(shù)的參數(shù)給了callback對應的函數(shù), 不是直接給回調(diào)函數(shù)直接傳遞參數(shù), 只能是任務函數(shù)func1的返回值
????????# p.apply_async(func1, args=(10,), callback=func2)
????
????????# 如果是多個進程來執(zhí)行任務,那么當所有子進程將結(jié)果給了回調(diào)函數(shù)之后,回調(diào)函數(shù)又是在主進程上執(zhí)行的,那么就會出現(xiàn)打印結(jié)果是同步的效果。我們上面func2里面注銷的時間模塊打開看看
????????for i in range(10, 20):
????????????p.apply_async(func1, args=(i,), callback=func2)
????????p.close()
????????p.join()
????
????# 主進程: 9312?????# 回調(diào)函數(shù)是在主進程中完成的
????# func1>>> 9888
????# func1
????# func2>>> 9312
????# func2
????# 100
????回調(diào)函數(shù)在寫的時候注意一點,回調(diào)函數(shù)的形參執(zhí)行有一個,如果你的執(zhí)行函數(shù)有多個返回值,那么也可以被回調(diào)函數(shù)的這一個形參接收,接收的是一個元祖,包含著你執(zhí)行函數(shù)的所有返回值。

轉(zhuǎn)載于:https://www.cnblogs.com/guyannanfei/p/10268912.html

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術(shù)人生

總結(jié)

以上是生活随笔為你收集整理的网络编程7_ multiprocessing类-管道.数据共享, 信号量,事件,进程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。