快速了解Python并发编程的工程实现(下)
關(guān)于我
編程界的一名小程序猿,目前在一個(gè)創(chuàng)業(yè)團(tuán)隊(duì)任team lead,技術(shù)棧涉及Android、Python、Java和Go,這個(gè)也是我們團(tuán)隊(duì)的主要技術(shù)棧。 聯(lián)系:hylinux1024@gmail.com
0x00 使用進(jìn)程實(shí)現(xiàn)并發(fā)
上一篇文章介紹了線程的使用。然而Python中由于Global Interpreter Lock(全局解釋鎖GIL)的存在,每個(gè)線程在在執(zhí)行時(shí)需要獲取到這個(gè)GIL,在同一時(shí)刻中只有一個(gè)線程得到解釋鎖的執(zhí)行,Python中的線程并沒(méi)有真正意義上的并發(fā)執(zhí)行,多線程的執(zhí)行效率也不一定比單線程的效率更高。 如果要充分利用現(xiàn)代多核CPU的并發(fā)能力,就要使用multipleprocessing模塊了。
0x01 multipleprocessing
與使用線程的threading模塊類似,multipleprocessing模塊提供許多高級(jí)API。最常見(jiàn)的是Pool對(duì)象了,使用它的接口能很方便地寫出并發(fā)執(zhí)行的代碼。
from multiprocessing import Pooldef f(x):return x * xif __name__ == '__main__':with Pool(5) as p:# map方法的作用是將f()方法并發(fā)地映射到列表中的每個(gè)元素print(p.map(f, [1, 2, 3]))# 執(zhí)行結(jié)果 # [1, 4, 9] 復(fù)制代碼關(guān)于Pool下文中還會(huì)提到,這里我們先來(lái)看Process。
Process
要?jiǎng)?chuàng)建一個(gè)進(jìn)程可以使用Process類,使用start()方法啟動(dòng)進(jìn)程。
from multiprocessing import Process import osdef echo(text):# 父進(jìn)程IDprint("Process Parent ID : ", os.getppid())# 進(jìn)程IDprint("Process PID : ", os.getpid())print('echo : ', text)if __name__ == '__main__':p = Process(target=echo, args=('hello process',))p.start()p.join()# 執(zhí)行結(jié)果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process 復(fù)制代碼進(jìn)程池
正如開(kāi)篇提到的multiprocessing模塊提供了Pool類可以很方便地實(shí)現(xiàn)一些簡(jiǎn)單多進(jìn)程場(chǎng)景。 它主要有以下接口
- apply(func[, args[, kwds]])
執(zhí)行func(args,kwds)方法,在方法結(jié)束返回前會(huì)阻塞。 - apply_async(func[, args[, kwds[, callback[, error_callback]]]])
異步執(zhí)行func(args,kwds),會(huì)立即返回一個(gè)result對(duì)象,如果指定了callback參數(shù),結(jié)果會(huì)通過(guò)回調(diào)方法返回,還可以指定執(zhí)行出錯(cuò)的回調(diào)方法error_callback() - map(func, iterable[, chunksize])
類似內(nèi)置函數(shù)map(),可以并發(fā)執(zhí)行func,是同步方法 - map_async(func, iterable[, chunksize[, callback[, error_callback]]])
異步版本的map - close()
關(guān)閉進(jìn)程池。當(dāng)池中的所有工作進(jìn)程都執(zhí)行完畢時(shí),進(jìn)程會(huì)退出。 - terminate()
終止進(jìn)程池 - join()
等待工作進(jìn)程執(zhí)行完,必需先調(diào)用close()或者terminate()
map_async()和apply_async()執(zhí)行后會(huì)返回一個(gè)class multiprocessing.pool.AsyncResult對(duì)象,通過(guò)它的get()可以獲取到執(zhí)行結(jié)果,ready()可以判斷AsyncResult的結(jié)果是否準(zhǔn)備好。
進(jìn)程間數(shù)據(jù)的傳輸
multiprocessing模塊提供了兩種方式用于進(jìn)程間的數(shù)據(jù)共享:隊(duì)列(Queue)和管道(Pipe)
Queue是線程安全,也是進(jìn)程安全的。使用Queue可以實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享,例如下面的demo中子進(jìn)程put一個(gè)對(duì)象,在主進(jìn)程中就能get到這個(gè)對(duì)象。 任何可以序列化的對(duì)象都可以通過(guò)Queue來(lái)傳輸。
from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':# 使用Queue進(jìn)行數(shù)據(jù)通信q = Queue()p = Process(target=f, args=(q,))p.start()# 主進(jìn)程取得子進(jìn)程中的數(shù)據(jù)print(q.get()) # prints "[42, None, 'hello']"p.join()# 執(zhí)行結(jié)果 # [42, None, 'hello'] 復(fù)制代碼Pipe()返回一對(duì)通過(guò)管道連接的Connection對(duì)象。這兩個(gè)對(duì)象可以理解為管道的兩端,它們通過(guò)send()和recv()發(fā)送和接收數(shù)據(jù)。
from multiprocessing import Process, Pipedef write(conn):# 子進(jìn)程中發(fā)送一個(gè)對(duì)象conn.send([42, None, 'hello'])conn.close()def read(conn):# 在讀的進(jìn)程中通過(guò)recv接收對(duì)象data = conn.recv()print(data)if __name__ == '__main__':# Pipe()方法返回一對(duì)連接對(duì)象w_conn, r_conn = Pipe()wp = Process(target=write, args=(w_conn,))rp = Process(target=read, args=(r_conn,))wp.start()rp.start()# 執(zhí)行結(jié)果 # [42, None, 'hello']復(fù)制代碼需要注意的是,兩個(gè)進(jìn)程不能同時(shí)對(duì)一個(gè)連接對(duì)象進(jìn)行send或recv操作。
同步
我們知道線程間的同步是通過(guò)鎖機(jī)制來(lái)實(shí)現(xiàn)的,進(jìn)程也一樣。
from multiprocessing import Process, Lock import timedef print_with_lock(l, i):l.acquire()try:time.sleep(1)print('hello world', i)finally:l.release()def print_without_lock(i):time.sleep(1)print('hello world', i)if __name__ == '__main__':lock = Lock()# 先執(zhí)行有鎖的for num in range(5):Process(target=print_with_lock, args=(lock, num)).start()# 再執(zhí)行無(wú)鎖的# for num in range(5):# Process(target=print_without_lock, args=(num,)).start()復(fù)制代碼有鎖的代碼將每秒依次打印
hello world 0 hello world 1 hello world 2 hello world 3 hello world 4 復(fù)制代碼如果執(zhí)行無(wú)鎖的代碼,則在我的電腦上執(zhí)行結(jié)果是這樣的
hello worldhello world 0 1 hello world 2 hello world 3 hello world 4 復(fù)制代碼除了Lock,還包括RLock、Condition、Semaphore和Event等進(jìn)程間的同步原語(yǔ)。其用法也與線程間的同步原語(yǔ)很類似。API使用可以參考文末中引用的文檔鏈接。
在工程中實(shí)現(xiàn)進(jìn)程間的數(shù)據(jù)共享應(yīng)當(dāng)優(yōu)先使用隊(duì)列或管道。
0x02 總結(jié)
本文對(duì)multiprocessing模塊中常見(jiàn)的API作了簡(jiǎn)單的介紹。講述了Process和Pool的常見(jiàn)用法,同時(shí)介紹了進(jìn)程間的數(shù)據(jù)方式:隊(duì)列和管道。最后簡(jiǎn)單了解了進(jìn)程間的同步原語(yǔ)。
通過(guò)與上篇的對(duì)比學(xué)習(xí),本文的內(nèi)容應(yīng)該是更加容易掌握的。
0x03 引用
- python-parallel-programmning-cookbook.readthedocs.io
- docs.python.org/3/library/t…
- docs.python.org/3.7/library…
- docs.python.org/3/glossary.…
- docs.python.org/3/library/c…
轉(zhuǎn)載于:https://juejin.im/post/5cefdc60f265da1bca51c0cf
總結(jié)
以上是生活随笔為你收集整理的快速了解Python并发编程的工程实现(下)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: swoole的process模块创建和使
- 下一篇: websocket python爬虫_p