日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python mutilprocessing多进程编程

發(fā)布時間:2024/1/23 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python mutilprocessing多进程编程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

??`為了更好的理解本文內(nèi)容,請務必先了解Synchronization、Asynchronization、Concurrent、Mutex等基本概念

??multiprocessing是一個類似于Threading模塊的由API產(chǎn)生進程的包,關于Threading模塊可以參考我的博客文章。multiprocessing能夠 提供本地和遠程兩種并發(fā)模式,通過使用子進程而不是線程有效地避開了GIL。因此,multiprocessing允許程序員充分利用機器上的多個處理器,且該包支持在Unix系統(tǒng)和Windows系統(tǒng)上運行。

??mutilprocessing還引入了在Threading模塊中沒有相類似的API。比如Pool對象,Pool對象提供了一種方便的方法,可以跨多個輸入值并行化函數(shù)的執(zhí)行,跨進程分配輸入數(shù)據(jù)(數(shù)據(jù)并行)。使用方法可以看看下面的例子:

from multiprocessing import Pooldef f(x):return x * xif __name__ == '__main__':with Pool(5) as p:print(p.map(f, [1, 2, 3, 4, 5, 6, 7]))# [1, 4, 9, 16, 25, 36, 49]

Process類

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={},daemon=None)

?group必須為None,設置該參數(shù)僅僅是為了與Threading模塊保持一致

?target是run()方法調(diào)用的可調(diào)用對象

?name是指進程名

?daemon指示是否設置為守護進程

  • run()

    ??表示進程活動的方法,可在子類中重寫此方法。標準run()方法調(diào)用傳遞給對象構(gòu)造函數(shù)的可調(diào)用對象作為目標參數(shù)(如果有),分別使用args和kwargs參數(shù)中的順序和關鍵字參數(shù)。

  • start()

    ??啟動進程的活動,每個進程對象最多只能調(diào)用一次,在一個單獨的進程中調(diào)用對象的run()方法

  • join([timeout])

    ??如果可選參數(shù)timeout為None(缺省值),則該方法將阻塞,直到調(diào)用其join()方法的進程終止。如果timeout是一個正數(shù),它最多會阻塞timeout秒。請注意,如果方法的進程終止或方法超時,則該方法返回None。檢查進程的exitcode以確定它是否終止。

  • name

    ??進程名

  • is_alive()

    ??指示進程是否還活著

  • daemon

    ??daemon flag, a Boolean value, 必須在進程start之前設置

  • pid

    ??process ID

  • exitcode

    ??負值-N表示孩子被信號N終止,默認為None,表示進程未被終止

  • authkey

    ??The process’s authentication key (a byte string)

  • sentinel

    ?系統(tǒng)對象的數(shù)字句柄,當進程結(jié)束時將變?yōu)椤皉eady”?

  • terminate()

    ??終止進程,但注意子進程不會被終止,只是會成孤兒

請注意,start(),join(),is_alive(),terminate()和exitcode方法只應由創(chuàng)建過程對象的進程調(diào)用。

>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True

??在multiprocessing中,通過創(chuàng)建Process對象然后調(diào)用其start()方法來生成進程,其使用方法和threading.Thread一樣。我們看下面的例子:

from multiprocessing import Processdef f(name):print('hello', name)if __name__ == '__main__': # 這句話是必要的,不可去掉p = Process(target=f, args=('bob',))p.start()p.join()

我們可以通過進程號來區(qū)分不同的進程:

from multiprocessing import Process import osdef info(title):print(title)print('module name:', __name__)print('parent process:', os.getppid())print('process id:', os.getpid(), '\n')def f(name):info('function f')print('hello', name)if __name__ == '__main__':info('main line')p = Process(target=f, args=('bob',)) # 創(chuàng)建新進程p.start() # 啟動進程p.join()

進程啟動

??根據(jù)平臺的不同,multiprocessing支持三種啟動進程的方法。這些啟動方法是:

  • spawn

    spawn

    ?調(diào)用改方法,父進程會啟動一個新的python進程,子進程只會繼承運行進程對象run()方法所需的那些資源。特別地,子進程不會繼承父進程中不必要的文件描述符和句柄。與使用fork或forkserver相比,使用此方法啟動進程相當慢。

    ? Available on Unix and Windows. The default on Windows.

  • fork

    ?父進程使用os.fork()來fork Python解釋器。子進程在開始時實際上與父進程相同,父進程的所有資源都由子進程繼承。請注意,安全創(chuàng)建多線程進程尚存在一定的問題。

    ?Available on Unix only. The default on Unix.

  • forkserver

    ?當程序啟動并選擇forkserverstart方法時,將啟動服務器進程。從那時起,每當需要一個新進程時,父進程就會連接到服務器并請求它fork一個新進程。 fork服務器進程是單線程的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。

    ?Available on Unix platforms which support passing file descriptors over Unix pipes.

    ??要選擇以上某一種start方法,請在主模塊的if __name__ == '__ main__'子句中使用mp.set_start_method()。

    并且mp.set_start_method()在一個程序中僅僅能使用一次。

    import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':mp.set_start_method('spawn')q = mp.Queue()p = mp.Process(target=foo, args=(q,))p.start()print(q.get())p.join()

    ??或者,您可以使用get_context()來獲取上下文對象。上下文對象與多處理模塊具有相同的API,并允許在同一程序中使用多個啟動方法。

    import multiprocessing as mpdef foo(q):q.put('hello')if __name__ == '__main__':ctx = mp.get_context('spawn')q = ctx.Queue()p = ctx.Process(target=foo, args=(q,))p.start()print(q.get())p.join()

    ?注意,與一個context相關的對象可能與不同context的進程不兼容。特別是,使用fork context創(chuàng)建的鎖不能傳遞給使用spawn或forkserver start方法啟動的進程。

進程通信

??當使用多個進程時,通常使用消息傳遞來進行進程之間的通信,并避免必須使用任何synchronization primitives(如鎖)。對于傳遞消息,可以使用Pipe(用于兩個進程之間的連接)或Queue(允許多個生產(chǎn)者和消費者)。

Queues

??class multiprocessing.Queue([maxsize])

???Queue實現(xiàn)queue.Queue的所有方法,但task_done()和join()除外。Queue是進程、線程安全的模型

from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q,))p.start()print(q.get()) # prints "[42, None, 'hello']"p.join()
Pipes

??Class multiprocessing.Pipe([duplex])

??返回一對(conn1, conn2) of Connection 對象代表pipe的兩端。如果duplex為True(默認值),則管道是雙向的;如果duplex為False,則管道是單向的:conn1只能用于接收消息,conn2只能用于發(fā)送消息。Pipe()`函數(shù)返回一個由Pipe連接的連接對象,默認情況下是全雙工雙向通信(duplex)。例如:

from multiprocessing import Process, Pipedef f(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=f, args=(child_conn,))p.start()print(parent_conn.recv()) # prints "[42, None, 'hello']"p.join()

??Pipe()返回的兩個連接對象代表管道的兩端,每個連接對象都有send()和recv()方法。需要注意的是,管道中的數(shù)據(jù)可能會不一致或被破壞,如當兩個進程(或線程)嘗試同時讀取或?qū)懭牍艿赖耐欢?。當?#xff0c;同時使用管道的不同端部的過程不存在損壞的風險。

進程共享狀態(tài)

??在進行并發(fā)編程時,通常最好避免使用共享狀態(tài),但是,如果你確實需要使用某些共享數(shù)據(jù),那么multiprocessing提供了以下兩種方法:

Shared Memory

?可以使用Value或Array將數(shù)據(jù)存儲在共享內(nèi)存的map(映射)中。例如,以下代碼:

from multiprocessing import Process, Value, Arraydef f(n, a):n.value = 3.1415927for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0)arr = Array('i', range(10))p = Process(target=f, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])# 3.1415927# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

??創(chuàng)建num和arr時使用的’d’和’i’參數(shù)是array module使用的類型的類型代碼:'d’表示雙精度浮點數(shù),'i’表示有符號整數(shù)。這些共享對象將是進程和線程安全的。為了更靈活地使用共享內(nèi)存,可以使用multiprocessing.sharedctypes模塊,該模塊支持創(chuàng)建從共享內(nèi)存分配的任意ctypes對象。但還是那句話,在進行并發(fā)編程時,通常最好避免使用共享狀態(tài)。

Server Process

??Manager()返回的Manager對象控制一個服務器進程(server process),該進程保存Python對象并允許其他進程使用代理操作它們。Manager對象支持的對象包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value 以及 Array。Managers提供了一種創(chuàng)建可在不同進程之間共享的數(shù)據(jù)的方法,包括在不同計算機上運行的進程之間通過網(wǎng)絡共享。管理器對象控制管理共享對象的服務器進程。其他進程可以使用代理訪問共享對象。

from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print(d)print(l)#{0.25: None, 1: '1', '2': 2} #[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Proxy

??代理是一個對象,它指的是(可能)在不同的進程中存在的共享對象。共享對象被認為是代理的指示對象。多個代理對象可能具有相同的指示對象。代理對象具有調(diào)用其引用對象的相應方法的方法。代理對象的一個重要特性是它們是pickable的,因此它們可以在進程之間傳遞。

>>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) >>> print(l) # l即是一個代理對象 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> print(repr(l)) <ListProxy object, typeid 'list' at 0x...> >>> l[4] 16 >>> l[2:5] [4, 9, 16]
Connection

?&esmp;connection對象允許發(fā)送和接收可序列化對象或字符串。它們可以被認為是面向消息的連接套接字,我們再上面介紹Pipe的時候所實例化的對象就是connection對象。

  • send(obj)

    將對象發(fā)送到連接的另一端,應使用recv()讀取,且該對象必須是pickable的,>32 MB的對象可能會引發(fā)ValueError異常。

  • recv()

    返回從連接另一端發(fā)送的對象。阻塞直到接收到東西。如果沒有剩余要接收和另一端被關閉,則引發(fā)EOFError。

  • fileno()

    返回conn所使用的文件描述符或句柄

  • close()

    關閉連接

  • poll([timeout])

    返回是否有可供讀取的數(shù)據(jù),如果未指定超時,則會立即返回;如果timeout是一個數(shù)字,則指定阻止的最長時間(以秒為單位);如果timeout為None,則使用無限超時。

  • send_bytes(buffer[, offset[, size]])

    發(fā)送字節(jié)數(shù)據(jù)

  • recv_bytes([maxlength])

    接受字節(jié)數(shù)據(jù)

  • recv_bytes_into(buffer[, offset])

    讀取從連接另一端發(fā)送的字節(jié)數(shù)據(jù)的完整消息到buffer,并返回消息中的字節(jié)數(shù)。

    >>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
summary

??Server process Manager比使用共享內(nèi)存對象更靈活,因為它們可以支持任意對象類型。此外,單個管理器可以通過網(wǎng)絡在不同計算機上的進程共享。但它比使用共享內(nèi)存慢。

Synchronization

??同步原語和Threading模塊幾乎一致,具體請參考Python Threading 多線程編程

Lock
from multiprocessing import Process, Lockdef f(l, i):"""保證同一時間只有一個標準輸出流"""l.acquire()try:print('hello world', i)finally:l.release()if __name__ == '__main__':lock = Lock()for num in range(10):Process(target=f, args=(lock, num)).start()# output hello world 1 hello world 0 hello world 2 hello world 4 hello world 3 hello world 6 hello world 9 hello world 5 hello world 8 hello world 7

Pool類

??Pool類用于創(chuàng)建進程池

主要方法有,具體例子見代碼,并請注意,pool對象的方法只能由創(chuàng)建它的進程使用:

  • pool.map()
  • pool.imap() Equivalent of map() – can be MUCH slower than Pool.map().
  • pool.starmap() Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments.
  • pool.starmap_async Asynchronous version of starmap() method
  • pool.map_async Asynchronous version of map() method.
  • pool.imap_unordered()
  • pool.apply()
  • pool.apply_async() Asynchronous version of apply() method.
from multiprocessing import Pool, TimeoutError import time import osdef f(x):return x*xif __name__ == '__main__':# start 4 worker processeswith Pool(processes=4) as pool:# print "[0, 1, 4,..., 81]"print(pool.map(f, range(10)))# print same numbers in arbitrary orderfor i in pool.imap_unordered(f, range(10)):print(i, end='\t')print()# evaluate "f(20)" asynchronouslyres = pool.apply_async(f, (20,)) # runs in *only* one processprint(res.get(timeout=1)) # prints "400"# evaluate "os.getpid()" asynchronouslyres = pool.apply_async(os.getpid, ()) # runs in *only* one processprint(res.get(timeout=1)) # prints the PID of that process# launching multiple evaluations asynchronously *may* use more processesmultiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]print([res.get(timeout=1) for res in multiple_results])# make a single worker sleep for 10 secsres = pool.apply_async(time.sleep, (10,))try:print(res.get(timeout=1))except TimeoutError:print("We lacked patience and got a multiprocessing.TimeoutError")print("For the moment, the pool remains available for more work")# exiting the 'with'-block has stopped the poolprint("Now the pool is closed and no longer available")# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 0 1 4 9 16 25 36 49 64 81 # 400 # 2696 # [2696, 2696, 2696, 2696] # We lacked patience and got a multiprocessing.TimeoutError # For the moment, the pool remains available for more work # Now the pool is closed and no longer available

Miscellaneous

  • multiprocessing.active_children()

    返回當前進程的所有活子進 程的列表

  • multiprocessing.cpu_count()

    返回系統(tǒng)中的CPU數(shù)量,此數(shù)字不等于當前進程可以使用的CPU數(shù)量??梢允褂胠en(os.sched_getaffinity(0))獲得可用CPU的數(shù)量

  • multiprocessing.current_process()

    返回與當前進程對應的Process對象

  • multiprocessing.freeze_support()

    為程序打包成exe可執(zhí)行文件提供支持,在Windows以外的任何操作系統(tǒng)上調(diào)用時,調(diào)用freeze_support()無效。此外,如果模塊由Windows上的Python解釋器正常運行(程序尚未凍結(jié)),則freeze_support()無效

    from multiprocessing import Process, freeze_supportdef f():print('hello world!')if __name__ == '__main__':freeze_support()Process(target=f).start()
  • multiprocessing.get_all_start_methods()

    返回支持的start方法列表,第一個是默認方法。可能的啟動方法是’fork’,‘spawn’和’forkserver’。在Windows上只有“spawn”可用。在Unix上’fork’和’spawn’總是受支持,'fork’是默認值。

  • multiprocessing.get_context(method=None)

    返回與multiprocessing模塊具有相同屬性的上下文對象,具體用法前面已經(jīng)有過例子

  • multiprocessing.get_start_method(allow_none=False)

    返回用于啟動進程的start方法的名稱,返回值可以是’fork’,‘spawn’,'forkserver’或None。 'fork’是Unix上的默認值,而’spawn’是Windows上的默認值。

  • multiprocessing.set_executable()

    設置啟動子進程時要使用的Python解釋器的路徑

  • multiprocessing.set_start_method(method)

    設置用于啟動子進程的方法。方法可以是’fork’,‘spawn’或’forkserver’。請注意,改法最多調(diào)用一次,并且應該寫在主模塊的if name ==’__ main__'子句中。

總結(jié)

以上是生活随笔為你收集整理的python mutilprocessing多进程编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。