日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 多进程 multiprocessing 使用示例

發布時間:2024/7/23 python 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 多进程 multiprocessing 使用示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


參考:http://blog.csdn.net/qdx411324962/article/details/46810421
參考:http://www.lxway.com/4488626156.htm

廖雪峰官網?進程和線程、多進程、多線程、ThreadLocal、進程 vs. 線程、分布式進程

multiprocessing?文檔:?https://docs.python.org/2/library/multiprocessing.html#managers
Python 中的進程、線程、協程、同步、異步、回調:https://segmentfault.com/a/1190000001813992
multiprocessing?多進程的用法 :https://cuiqingcai.com/3335.html

由于要做把一個多線程改成多進程,看一下相關方面的東西,總結一下,主要是以下幾個相關的標準庫

  • subprocess
  • signal
  • threading
  • multiprocessing
  • 從 Python3.2 開始,標準庫提供了concurrent.futures 模塊,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫 線程池/進程池 提供了直接的支持。?
    concurrent.futures 基礎模塊是 executor 和 future。

    concurrent.futures 官方文檔:https://docs.python.org/3/library/concurrent.futures.html

    Python3 模塊 - Concurrent.futures 教程:
    https://www.yiibai.com/concurrency_in_python/concurrency_in_python_pool_of_processes.html

    使用示例代碼:

    # -*- coding: utf-8 -*-import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 減庫存函數, 循環直到減庫存完成 # 庫存充足, 減庫存成功, 返回True # 庫存不足, 減庫存失敗, 返回Falsedef reduce_stock():# python中redis事務是通過pipeline的封裝實現的with r.pipeline() as pipe:while True:try:# watch庫存鍵, multi后如果該key被其他客戶端改變, # 事務操作會拋出WatchError異常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0: # 有庫存# 事務開始pipe.multi()pipe.decr('stock:count')# 把命令推送過去# execute返回命令執行結果列表, 這里只有一個decr返回當前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError異常, 觀察被watch鎖住的情況print(ex)pipe.unwatch()def worker():while True:# 沒有庫存就退出if not reduce_stock():breakif __name__ == "__main__":# 設置庫存為100r.set("stock:count", 100)# 多進程模擬多個客戶端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)

    python 單線程 和 多線程

    python 單線程

    # -*- coding:utf-8 -*-import time import datetimedef music(argv):for i in range(2):print("聽音樂 %s. %s" % (argv, datetime.datetime.now()))time.sleep(1)def movie(argv):for i in range(2):print("看電影 {}. {}".format(argv, datetime.datetime.now()))time.sleep(5)if __name__ == '__main__':music('trouble is a friend')movie('變形金剛')print(f"all over {datetime.datetime.now()}")

    python 多線程

    Python 中使用線程有兩種方式:函數 或者 用類來包裝線程對象

    • 創建多線程:使用 函數 方法。
      ? ? ? ? 函數式 ?:調用 thread 模塊中的 start_new_thread() 函數來產生新線程。
      ? ? ? ? 語法如下: thread.start_new_thread(function, args[, kwargs])
      ? ? ? ? 參數說明:
      ? ? ? ? ? ? function ?線程函數。
      ? ? ? ? ? ? args? ? ??傳遞給線程函數的參數,必須是個 tuple 類型。
      ? ? ? ? ? ? kwargs? ??可選參數。

    使用示例:( Python2 代碼?)

    import thread import timedef print_time(thread_name, delay):count = 0while count < 5:time.sleep(delay)count += 1print "%s: %s" % (thread_name, time.ctime(time.time()))if __name__ == "__main__":try:thread.start_new_thread(print_time, ("Thread-1", 2))thread.start_new_thread(print_time, ("Thread-2", 4))except BaseException as e:print eprint "Error: unable to start thread"while 1:pass
    • 創建多線程:通過 類繼承 。使用 Threading 模塊創建線程,直接從 threading.Thread 繼承,然后重寫 __init__ 方法和 run 方法:
    import threading import time import _threadexitFlag = 0 # 是否每個線程要進行工作后再退出,設定1則所有線程啟動后直接退出class MyThread(threading.Thread): # 繼承父類 threading.Threaddef __init__(self, thread_id, name, counter):super().__init__() #self.thread_id = thread_idself.name = nameself.counter = counterdef run(self):# 把要執行的代碼寫到 run 函數里面線程在創建后會直接運行 run 函數print("Starting " + self.name)print_time(self.name, 5, self.counter)print("Exiting " + self.name)def print_time(thread_name, delay, counter):while counter:if exitFlag:_thread.exit() # 這個是讓線程主動退出time.sleep(delay)print("%s: %s" % (thread_name, time.ctime(time.time())))counter -= 1# 創建新線程 thread_1 = MyThread(1, "Thread-1", 1) thread_2 = MyThread(2, "Thread-2", 2) # 開啟線程 thread_1.start() thread_2.start() print("Exiting Main Thread")
    • thread 和?threading 模塊(?強烈建議直接使用 threading )。python 提供了兩個模塊來實現多線程 thread 和 threading 。 thread 有一些缺點,在 threading 得到了彌補,但是還是強烈建議直接使用 threading。
    # -*- coding: utf-8 -*-import threading from time import ctime, sleepdef music(argv):for i in range(2):print("listen music %s. %s" % (argv, ctime()))sleep(1)def movie(argv):for i in range(2):print("watch movie %s! %s" % (argv, ctime()))sleep(5)threads = [] t1 = threading.Thread(target=music, args=('trouble is a friend',)) threads.append(t1) t2 = threading.Thread(target=movie, args=('變形金剛',)) threads.append(t2)if __name__ == '__main__':for t in threads:t.setDaemon(True)t.start()print("all over %s" % ctime())pass
    • 設置 精靈進程 ?setDaemon(True) 將線程聲明為守護線程,必須在 start() 方法調用之前設置,如果不設置為守護線程程序會被無限掛起。子線程啟動后,父線程也繼續執行下去,當父線程執行完最后一條語句 print "all over %s" %ctime()后,沒有等待子線程,直接就退出了,同時子線程也一同結束。
    if __name__ == '__main__':for t in threads:t.setDaemon(True)t.start() # start()開始線程活動。t.join()print "all over %s" %ctime()
    • join() 方法,用于等待線程終止。join() 的作用是,在子線程完成運行之前,這個子線程的父線程將一直被阻塞。注意:上面程序中 join() 方法的位置是在 for 循環外的,也就是說必須等待for循環里的兩個進程都結束后,才去執行主進程。
    import threading import timedef worker(num):time.sleep(1)print("The num is %d" % num)print t.getName()returnfor i in range(20):t = threading.Thread(target=worker, args=(i,), name="testThread")t.start()

    Thread 方法說明

    t.start() 激活線程, t.getName() 獲取線程的名稱 t.setName() 設置線程的名稱 t.name 獲取或設置線程的名稱 t.is_alive() 判斷線程是否為激活狀態 t.isAlive() 判斷線程是否為激活狀態 t.setDaemon() 設置為后臺線程或前臺線程(默認:False);通過一個布爾值設置線程是否為守護線程,必須在執行start()方法之后才可以使用。如果是后臺線程,主線程執行過程中,后臺線程也在進行,主線程執行完畢后,后臺線程不論成功與否,均停止;如果是前臺線程,主線程執行過程中,前臺線程也在進行,主線程執行完畢后,等待前臺線程也執行完成后,程序停止 t.isDaemon() 判斷是否為守護線程 t.ident 獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法之后該屬性才有效,否則它只返回None。 t.join() 逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義 t.run() 線程被cpu調度后自動執行線程對象的run方法

    線程同步

    ? ? ? ? 如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。

    線程鎖

    ? ? ? ? 多線程的優勢在于可以同時運行多個任務(至少感覺起來是這樣)。但是當線程需要共享數據時,可能存在數據不同步的問題。考慮這樣一種情況:一個列表里所有元素都是 0,線程 "set" 從后向前把所有元素改成1,而線程 "print" 負責從前往后讀取列表并打印。那么,可能線程 "set" 開始改的時候,線程 "print" 便來打印列表了,輸出就成了一半 0 一半 1,這就是?數據的不同步。
    為了避免這種情況,引入了
    的概念。鎖有兩種狀態:鎖定未鎖定
    每當一個線程比如 "set" 要訪問共享數據時,必須先獲得鎖定;如果已經有別的線程比如 "print" 獲得鎖定了,那么就讓線程 "set" 暫停,也就是同步阻塞;等到線程 "print" 訪問完畢,釋放鎖以后,再讓線程 "set" 繼續。經過這樣的處理,打印列表時要么全部輸出0,要么全部輸出1,不會再出現一半0一半1的尷尬場面。

    ? ? ? ? 使用 Thread對象Lock Rlock 可以實現簡單的線程同步,這兩個對象都有 acquire方法 release方法。對于那些需要每次只允許一個線程操作的數據,可以將其操作放到 acquire 和release 方法之間。

    threading.RLockthreading.Lock 的區別:

    • RLock 允許在同一線程中被多次 acquire。而 Lock 卻不允許這種情況。使用 RLock 時 acquire 和 release 必須成對出現,即調用了 n 次 acquire,必須調用 n 次的release 才能真正釋放所占用的瑣。
    # -*- coding: utf-8 -*-import threadinglock = threading.Lock() # Lock對象 rLock = threading.RLock() # RLock對象 def main_1():lock.acquire()lock.acquire() # 產生了死瑣。lock.release()lock.release()def main_2(): rLock.acquire()rLock.acquire() # 在同一線程內,程序不會堵塞。 rLock.release()rLock.release()

    示例:(?線程鎖 )?

    # -*- coding: utf-8 -*-import time import threading# 定義一個 "線程鎖" threadLock = threading.Lock()class MyThread(threading.Thread):def __init__(self, thread_id, name, counter):threading.Thread.__init__(self)self.thread_id = thread_idself.name = nameself.counter = counterdef run(self):print("Starting " + self.name)# 獲得鎖,成功獲得鎖定后返回 True# 可選的 timeout 參數不填時將一直阻塞直到獲得鎖定# 否則超時后將返回 FalsethreadLock.acquire()print_time(self.name, self.counter, 3)# 釋放鎖threadLock.release()def print_time(thread_name, delay, counter):while counter:time.sleep(delay)print("%s: %s" % (thread_name, time.ctime(time.time())))counter -= 1threads = [] # 創建新線程 thread1 = MyThread(1, "Thread-1", 1) thread2 = MyThread(2, "Thread-2", 2) # 開啟新線程 thread1.start() thread2.start() # 添加線程到線程列表中 threads.append(thread1) threads.append(thread2) # 等待所有線程完成 for t in threads:t.join() print("Exiting Main Thread")

    示例:

    import threading import time globals_num = 0 lock = threading.RLock()def func():lock.acquire() # 獲得鎖global globals_numglobals_num += 1time.sleep(1)print(globals_num)lock.release() # 釋放鎖for i in range(10):t = threading.Thread(target=func)t.start()pass

    Python 的 queue ( 線程安全?)

    Python 的 queue 模塊中提供了同步的、線程安全的隊列類。包括

    • FIFO(先入先出) 隊列
    • LIFO(后入先出)隊列
    • 優先級隊列 PriorityQueue

    這些隊列都實現了 鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。

    ?Queue 模塊中的常用方法:

    Queue.qsize() 返回隊列的大小 Queue.empty() 如果隊列為空,返回True,反之False Queue.full() 如果隊列滿了,返回True,反之False Queue.full 與 maxsize 大小對應 Queue.get([block[, timeout]]) 獲取隊列,timeout是等待時間 Queue.get_nowait() 相當Queue.get(False) Queue.put(item) 寫入隊列,timeout是等待時間 Queue.put_nowait(item) 相當Queue.put(item, False) Queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號 Queue.join() 實際上意味著等到隊列為空,再執行別的操作

    示例:

    # -*- coding: utf-8 -*-import time import queue import threadingtask_queue = queue.Queue()def produce():while True:for num in range(100):task_queue.put(num)time.sleep(0.1)def consume():while True:if task_queue.empty():print('隊列為空')continuenum = task_queue.get()print(num)time.sleep(1)if __name__ == '__main__':thread_list = []t1 = threading.Thread(target=produce)thread_list.append(t1)for i in range(3):t_id = threading.Thread(target=consume)thread_list.append(t_id)for index in thread_list:index.start()for index in thread_list:index.join()

    queue 是線程安全的,這是為了演示,給 queue 加鎖

    # -*- coding: utf-8 -*-import queue import threading import timeexitFlag = 0 threading_lock = threading.Lock() workQueue = queue.Queue(10)class MyThread(threading.Thread):def __init__(self, thread_id, name, q):threading.Thread.__init__(self)self.thread_id = thread_idself.name = nameself.q = qdef run(self):print("Starting " + self.name)process_data(self.name, self.q)print("Exiting " + self.name)def process_data(thread_name, q):while not exitFlag:threading_lock.acquire()if not workQueue.empty():data = q.get()threading_lock.release()print("%s processing %s" % (thread_name, data))else:threading_lock.release()time.sleep(1)def main():thread_list = ["Thread-1", "Thread-2", "Thread-3"]name_list = ["One", "Two", "Three", "Four", "Five"]threads = []thread_id = 1# 創建線程for tName in thread_list:thread = MyThread(thread_id, tName, workQueue)thread.start()threads.append(thread)thread_id += 1# 填充隊列threading_lock.acquire()for word in name_list:workQueue.put(word)threading_lock.release()# 等待隊列清空while not workQueue.empty():pass# 通知線程退出exitFlag = 1# 等待所有線程完成for t in threads:t.join()print("Exiting Main Thread")if __name__ == '__main__':main()pass

    threading.Condition

    一個 condition 變量總是與某些類型的鎖相聯系,當幾個condition變量必須共享和同一個鎖的時候,是很有用的。鎖 是 conditon 對象的一部分:沒有必要分別跟蹤。

    Condition 類實現了一個 conditon 變量。這個 conditiaon 變量允許一個或多個線程等待,直到他們被另一個線程通知。

    • 如果 lock 參數非空,那么他必須是一個 lock 或者 Rlock 對象,它用來做底層鎖。
    • 如果 lock 參數為空,則會創建一個新的 Rlock 對象,用來做底層鎖。

    condition 變量服從上下文管理協議:with 語句塊封閉之前可以獲取與鎖的聯系。
    acquire() 和 release() 會調用與鎖相關聯的相應的方法。
    其他和鎖關聯的方法必須被調用,wait()方法會釋放鎖,
    當另外一個線程使用 notify() or notify_all()喚醒它之前會一直阻塞。一旦被喚醒,wait()會重新獲得鎖并返回,
    wait(timeout=None) :等待通知,或者等到設定的超時時間。
    當調用這wait()方法時,如果調用它的線程沒有得到鎖,那么會拋出一個RuntimeError異常。
    wati()釋放鎖以后,在被調用相同條件的另一個進程用notify() or notify_all() 叫醒之前會一直阻塞。
    wait()還可以指定一個超時時間。 如果有等待的線程,notify()方法會喚醒一個在等待conditon變量的線程。notify_all() 則會喚醒所有在等待conditon變量的線程。

    注意: notify()和notify_all()不會釋放鎖,也就是說,線程被喚醒后不會立刻返回他們的wait() 調用。
    除非線程調用notify()和notify_all()之后放棄了鎖的所有權。
    在典型的設計風格里,利用condition變量用鎖去通許訪問一些共享狀態,線程在獲取到它想得到的狀態前,會反復調用wait()。
    修改狀態的線程在他們狀態改變時調用 notify() or notify_all(),用這種方式,線程會盡可能的獲取到想要的一個等待者狀態。

    例子:生產者-消費者模型

    import threading import timedef consumer(cond):with cond:print("consumer before wait")cond.wait()print("consumer after wait")def producer(cond):with cond:print("producer before notifyAll")cond.notifyAll()print("producer after notifyAll")condition = threading.Condition() consumer_1 = threading.Thread(name="c1", target=consumer, args=(condition,)) consumer_2 = threading.Thread(name="c2", target=consumer, args=(condition,)) producer = threading.Thread(name="p", target=producer, args=(condition,))consumer_1.start() time.sleep(2) consumer_2.start() time.sleep(2) producer.start()

    python 多進程共享變量

    https://my.oschina.net/leejun2005/blog/203148

    共享內存 (Shared memory)

    Data can be stored in a shared memory map using Value or Array.?

    For example, the following code. ??https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

    在使用并發設計的時候最好盡可能的避免共享數據,尤其是在使用多進程的時候。如果你真有需要要共享數據, multiprocessing提供了兩種方式。

    multiprocessing 中的 Array 和 Value。數據可以用 Value 或 Array 存儲在一個共享內存地圖里,如下:

    from multiprocessing import Array, Value, Processdef func(a, b):a.value = 3.333333333333333for j in range(len(b)):b[j] = -b[j]if __name__ == "__main__":num = Value('d', 0.0)arr = Array('i', range(11))if 0:t = Process(target=func, args=(num, arr))t.start()t.join()else:c = Process(target=func, args=(num, arr))d = Process(target=func, args=(num, arr))c.start()d.start()c.join()d.join()print(num.value)print(arr[:])for i in arr:print i,

    輸出

    3.33333333333 0 1 2 3 4 5 6 7 8 9 10

    創建 num 和 arr 時,“d”和“i”參數 由Array模塊使用的typecodes創建:“d”表示一個雙精度的浮點數,“i”表示一個有符號的整數,這些共享對象將被線程安全的處理。

    Array(‘i’, range(10))中的‘i’參數: ‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double

    Server process

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
    A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.

    https://docs.python.org/2/library/multiprocessing.html#managers

    multiprocessing 中的 Manager()

    Python中進程間共享數據,除了基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。
    Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。
    Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

    from multiprocessing import Process, Managerdef f(d, l):d["name"] = "king"d["age"] = 100d["Job"] = "python"l.reverse()if __name__ == "__main__":with Manager() as man:d_temp = man.dict()l_temp = man.list(range(10))p = Process(target=f, args=(d_temp, l_temp))p.start()p.join()print(d_temp)print(l_temp)

    Server process manager 比 shared memory 更靈活,因為它可以支持任意的對象類型。另外,一個單獨的manager可以通過進程在網絡上不同的計算機之間共享,不過他比shared memory要慢。

    threading.Event

    python 線程的事件用于主線程控制其他線程的執行。事件主要提供了三個方法

    • set 。將 “Flag” 設置為 False
    • wait 。將 “Flag” 設置為 True
    • clear 。判斷標識位是否為Ture。

    事件處理的機制:全局定義了一個 “Flag”,如果 “Flag” 值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果 “Flag” 值為True,那么 event.wait 方法時便不再阻塞。

    import threadingdef do(event):print('start')event.wait()print('execute')event_obj = threading.Event() for i in range(10):t = threading.Thread(target=do, args=(event_obj,))t.start()event_obj.clear() # inp = input('input:') inp = raw_input('input:') if inp == 'true':event_obj.set()

    當線程執行的時候,如果 flag 為False,則線程會阻塞,當 flag 為True 的時候,線程不會阻塞。它提供了 本地 和 遠程 的并發性。

    python 協程

    關于協程,可以參考 greenlet、stackless、gevent、eventlet 等的實現。

    我們知道并發(不是并行)編程目前有四種方式,多進程,多線程,異步,和協程。

    多進程編程在 python 中有類似 C 的 os.fork,當然還有更高層封裝的 multiprocessing 標準庫,在之前寫過的python高可用程序設計方法? http://www.cnblogs.com/hymenz/p/3488837.html? 中提供了類似nginx中master process和worker process間信號處理的方式,保證了業務進程的退出可以被主進程感知。

    多線程編程 Python 中有 Thread 和 threading,在 linux 下所謂的線程,實際上是 LWP 輕量級進程,其在內核中具有和進程相同的調度方式,有關 LWP,COW(寫時拷貝),fork,vfork,clone等的資料較多,這里不再贅述。異步在 linux 下主要有三種實現 select,poll,epoll 。

    協程 又稱 微線程?。英文名 Coroutine。?

    協程的好處:

    • 無需線程上下文切換的開銷
    • 無需原子操作鎖定及同步的開銷
    • 方便切換控制流,簡化編程模型
    • 高并發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理。

    缺點:

    • 無法利用多核資源:協程的本質是個單線程,它不能同時將單個 CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上。當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
    • 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

    "函數 ( 又叫?子程序?)?" 在所有語言中都是層級調用,比如 A 調用 B,B 在執行過程中又調用了 C,C 執行完畢返回,B 執行完畢返回,最后是 A 執行完畢。所以子程序調用是通過棧實現的,一個線程就是執行一個子程序。

    • 子程序調用總是一個入口,一次返回,調用順序是明確的。
    • 協程的調用 和 子程序不同。協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。注意:是在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷。

    比如:子程序 A、B:

    def A():print '1'print '2'print '3'def B():print 'x'print 'y'print 'z'

    假設由協程執行,在執行A的過程中,可以隨時中斷,去執行B,B 也可能在執行過程中中斷再去執行A,結果可能是:

    1 2 x y 3 z

    但是在A中是沒有調用B的,所以協程的調用比函數調用理解起來要難一些。看起來 A、B 的執行有點像多線程,但協程的特點在于是一個線程執行,

    協程和多線程比,協程有何優勢?

    • 協程最大的優勢就是協程極高的執行效率。因為是子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。
    • 第二大優勢就是不需要多線程的鎖機制。因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

    因為協程是一個線程執行,那怎么利用多核CPU呢 ?

    • 多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

    一個例子:

    傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:

    import timedef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):c.next()n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()if __name__=='__main__':c = consumer()produce(c)

    執行結果:

    [PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK

    注意到consumer函數是一個generator(生成器),把一個consumer傳入produce后:
    ? ? ? ? 1. 首先調用c.next()啟動生成器;
    ? ? ? ??2. 然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
    ? ? ? ??3. consumer通過yield拿到消息,處理,又通過yield把結果傳回;
    ? ? ? ??4. produce拿到consumer處理的結果,繼續生產下一條消息;
    ? ? ? ??5. produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

    整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。

    最后套用Donald Knuth的一句話總結協程的特點:“子程序就是協程的一種特例

    線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。
    協程存在的意義:對于多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。
    協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
    協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),
    event loop是協程執行的控制點,如果你希望執行協程,就需要用到它們。
    event loop提供了如下的特性:?

    ? ? ? ? ? ? ? ? ?注冊、執行、取消延時調用(異步函數)、創建用于通信的client和server協議(工具)、創建和別的程序通信的子進程和協議(工具) 把函數調用送入線程池中

    協程示例:

    #---------python3_start--------------- import asyncio async def cor1():print("COR1 start")await cor2()print("COR1 end")async def cor2():print("COR2")loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close() #---------python3_end---------------

    最后三行是重點。
    ? ? ? ? asyncio.get_event_loop() : asyncio啟動默認的event loop
    ? ? ? ? run_until_complete() : ? ? ? ? 這個函數是阻塞執行的,知道所有的異步函數執行完成,
    ? ? ? ? close() : ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 關閉 event loop。

    python 的 greenlet 模塊

    import greenlet def fun1():print("12")gr2.switch()print("56")gr2.switch()def fun2():print("34")gr1.switch()print("78")gr1 = greenlet.greenlet(fun1) gr2 = greenlet.greenlet(fun2) gr1.switch()

    gevent

    gevent 屬于第三方模塊需要下載安裝包
    pip3 install --upgrade pip3
    pip3 install gevent

    import gevent def fun1():print("www.baidu.com") # 第一步gevent.sleep(0)print("end the baidu.com") # 第三步def fun2():print("www.zhihu.com") # 第二步gevent.sleep(0)print("end th zhihu.com") # 第四步gevent.joinall([gevent.spawn(fun1),gevent.spawn(fun2), ])

    遇到 IO 操作自動切換:

    import gevent import requestsdef func(url):print("get: %s" % url)gevent.sleep(0)proxies = {"http": "http://172.17.18.80:8080","https": "http://172.17.18.80:8080",}date = requests.get(url, proxies=proxies)ret = date.textprint(url, len(ret))gevent.joinall([gevent.spawn(func, 'https://www.baidu.com/'),gevent.spawn(func, 'http://www.sina.com.cn/'),gevent.spawn(func, 'http://www.qq.com/'), ])

    http://www.cnblogs.com/zingp/p/5911537.html

    http://python.jobbole.com/87310/

    http://www.cnblogs.com/gide/p/6187080.html

    python中多進程+協程的使用以及為什么要用它:?http://blog.csdn.net/lambert310/article/details/51162634

    從兩個簡單例子窺視協程的驚人性能(Python):http://walkerqt.blog.51cto.com/1310630/1439034

    greenlet:http://greenlet.readthedocs.org/en/latest/
    eventlet:?http://eventlet.net/
    http://gashero.iteye.com/blog/442177

    示例代碼:

    """ 對于有些人來說Gevent和multiprocessing組合在一起使用算是個又高大上又奇葩的工作模式. Python的多線程受制于GIL全局鎖的特性,Gevent身為協程也是線程的一種,只是io調度上自己說了算而已。那么如何使用多個cpu核心? 可以利用多進程 mutliprocessing 來進行多核并行工作, 在多進程里面使用gevent協程框架可以更好的做io調度,相比線程來說減少了無謂的上下文切換.廢話少說,直接上個例子. 下面是多進程下生產者消費者的工作模式 """from multiprocessing import Process, cpu_count, Queue, JoinableQueue from gevent import monkeymonkey.patch_all() import gevent import datetimeclass Consumer(object):def __init__(self, q, no_tasks, name):self._no_tasks = no_tasksself._queue = qself.name = nameself._rungevent(self._queue, self._no_tasks)def _rungevent(self, q, no_tasks):jobs = [gevent.spawn(self._printq) for x in range(no_tasks)]gevent.joinall(jobs)def _printq(self):while 1:value = self._queue.get()if value is None:self._queue.task_done()breakelse:print("{0} time: {1}, value: {2}".format(self.name, datetime.datetime.now(), value))returnclass Producer(object):def __init__(self, q, no_tasks, name, consumers_tasks):print(name)self._q = qself._no_tasks = no_tasksself.name = nameself.consumer_tasks = consumers_tasksself._rungevent()def _rungevent(self):jobs = [gevent.spawn(self.produce) for x in range(self._no_tasks)]gevent.joinall(jobs)for x in range(self.consumer_tasks):self._q.put_nowait(None)self._q.close()def produce(self):for no in range(10000):print(no)self._q.put(no, block=False)returndef main():total_cores = cpu_count()total_processes = total_cores * 2q = JoinableQueue()print("Gevent on top multiprocessing with 17 gevent coroutines ""\n 10 producers gevent and 7 consumers gevent")producer_gevents = 10consumer_gevents = 7jobs = []start = datetime.datetime.now()for x in range(total_cores):if not x % 2:p = Process(target=Producer, args=(q, producer_gevents, "producer %d" % 1, consumer_gevents))p.start()jobs.append(p)else:p = Process(target=Consumer, args=(q, consumer_gevents, "consumer %d" % x))p.start()jobs.append(p)for job in jobs:job.join()print("{0} process with {1} producer gevents and {2} consumer gevents took{3}\seconds to produce {4} numbers and consume".format(total_processes,producer_gevents * total_cores,consumer_gevents * total_cores,datetime.datetime.now() - start,producer_gevents * total_cores * 10000))if __name__ == '__main__':main()

    mutilprocess 簡介

    由于 Python 設計的限制 ( 這里指 CPython,GLI )。最多只能用滿1個CPU核心。但是 Python 的多進程包 multiprocessing 可以輕松完成從單進程到并發執行的轉換。像 線程一樣管理進程,這個是 mutilprocess 的核心,他與 threading 很是相像,對多核CPU的利用率會比 threading 好的多。

    簡單的創建進程

    import multiprocessingdef worker(num):"""thread worker function"""print 'Worker:', numreturnif __name__ == '__main__':jobs = []for i in range(5):p = multiprocessing.Process(target=worker, args=(i,))jobs.append(p)p.start()

    示例:

    # -*- coding: utf-8 -*-import time import multiprocessingdef func(msg):for i in range(3):print(msg)time.sleep(1)if __name__ == "__main__":p = multiprocessing.Process(target=func, args=("hello",))p.start()p.join()print("Sub-process done.")

    確定當前的進程,即是給進程命名,方便標識區分,跟蹤

    import multiprocessing import timedef worker():name = multiprocessing.current_process().nameprint(name, 'Starting')time.sleep(2)print(name, 'Exiting')def my_service():name = multiprocessing.current_process().nameprint(name, 'Starting')time.sleep(3)print(name, 'Exiting')if __name__ == '__main__':service = multiprocessing.Process(name='my_service', target=my_service)worker_1 = multiprocessing.Process(name='worker 1', target=worker)worker_2 = multiprocessing.Process(target=worker) # default nameworker_1.start()worker_2.start()service.start()

    使用 進程池(非阻塞、阻塞)

    是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。

    注意要用 apply_async,如果落下 async,就變成阻塞版本了。

    使用 進程池( 非阻塞 版本)

    # -*- coding: utf-8 -*-import multiprocessing import timedef func(msg):for i in range(3):print(msg)time.sleep(1)if __name__ == "__main__":# processes=4 是最多并發進程數量。pool = multiprocessing.Pool(processes=4)for index in range(10):msg = f"hello {index}"pool.apply_async(func, (msg,))pool.close()pool.join()print("Sub-process(es) done.")

    函數解釋

    • apply_async(func[, args[, kwds[, callback]]]) 是非阻塞,apply(func[, args[, kwds]])是阻塞
    • close() ? ?關閉 pool,使其不在接受新的任務。
    • terminate() ? ?結束工作進程,不在處理未完成的任務。
    • join() ? ?主進程阻塞,等待子進程的退出, join 方法要在 close 或 terminate 之后使用。

    示例:

    # -*- coding: utf-8 -*-import os import time import random import multiprocessingdef Lee():print("\nRun task Lee-%s" % (os.getpid())) # os.getpid()獲取當前的進程的IDstart = time.time()time.sleep(random.random() * 10) # random.random()隨機生成0-1之間的小數end = time.time()print('Task Lee, runs %0.2f seconds.' % (end - start))def Marlon():print("\nRun task Marlon-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 40)end = time.time()print('Task Marlon runs %0.2f seconds.' % (end - start))def Allen():print("\nRun task Allen-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 30)end = time.time()print('Task Allen runs %0.2f seconds.' % (end - start))def Frank():print("\nRun task Frank-%s" % (os.getpid()))start = time.time()time.sleep(random.random() * 20)end = time.time()print('Task Frank runs %0.2f seconds.' % (end - start))if __name__ == '__main__':function_list = [Lee, Marlon, Allen, Frank]print("parent process %s" % (os.getpid()))pool = multiprocessing.Pool(4)for func in function_list:# Pool執行函數,當有一個進程執行完畢后,會添加一個新的進程到pool中pool.apply_async(func)print('Waiting for all subprocesses done...')pool.close()# 調用join之前,一定要先調用close() 函數,否則會出錯# close()執行后不會有新的進程加入到 pool, join 函數等待素有子進程結束pool.join()print('All subprocesses done.')pass

    使用 進程池( 阻塞 版本)

    #coding: utf-8 import multiprocessing import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, )) #維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join() #調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束print "Sub-process(es) done."

    使用 Pool,并需要關注結果

    更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:

    # -*- coding: utf-8 -*-import multiprocessing import timedef func(msg):for i in range(3):print(msg)time.sleep(1)return "done " + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for index in range(10):msg = f"hello {index}"result.append(pool.apply_async(func, (msg,)))pool.close()pool.join()for res in result:print(res.get())print("Sub-process(es) done.")

    示例?

    import multiprocessingdef do_calculation(data):return data * 2def start_process():print('Starting', multiprocessing.current_process().name)if __name__ == '__main__':inputs = list(range(10))print('Inputs :', inputs)builtin_output = list(map(do_calculation, inputs))print('Build-In :', builtin_output)pool_size = multiprocessing.cpu_count() * 2pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, )# 默認情況下,Pool會創建固定數目的工作進程,并向這些工作進程傳遞作業,直到再沒有更多作業為止。# maxtasksperchild 參數為每個進程執行 task 的最大數目,# 設置 maxtasksperchild參數可以告訴池在完成一定數量任務之后重新啟動一個工作進程,# 來避免運行時間很長的工作進程消耗太多的系統資源。# pool = multiprocessing.Pool(processes=pool_size, initializer=start_process, maxtasksperchild=2)print('-' * 20)pool_outputs = pool.map(do_calculation, inputs)pool.close()pool.join()print('Pool :', pool_outputs)

    multiprocessing 的 pool.map 使用

    #coding: utf-8 import multiprocessing def m1(x): print x * x if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count()) i_list = range(8)pool.map(m1, i_list)

    示例:

    import numpy as np from time import time from multiprocessing import Process, Queue import multiprocessing as mp import randomdef my_func(x):s0 = time()res = 0for _ in range(x*1000000):res += 1print(mp.current_process(),'run time:%.3f s, result:%.1f'%(time()-s0,res))return res''' multiprocessing.Pool 只是用來啟動多個進程而不是在每個core上啟動一個進程。 換句話說Python解釋器本身不會去在每個core或者processor去做負載均衡。 這個是由操作系統決定的。如果你的工作特別的計算密集型的話,操作系統確實會分配更多的core,但這也不是Python或者代碼所能控制的或指定的。 multiprocessing.Pool(num)中的num可以很小也可以很大,比如I/O密集型的操作,這個值完全可以大于cpu的個數。 硬件系統的資源分配是由操作系統決定的,如果你希望每個core都在工作,就需要更多的從操作系統出發了~ 這段話轉自https://segmentfault.com/q/1010000011117956 ''' def main():pool = mp.Pool(processes=mp.cpu_count())st = time()result = pool.map(my_func, [30]*8)print('total run time: %.3f s'%(time()-st))print(result)if __name__ == "__main__":main()

    守護進程

    守護進程就是不阻擋主程序退出,自己干自己的。?mutilprocess.setDaemon(True)就這句。

    等待守護進程退出,要加上 join,join 可以傳入浮點數值,等待n久就不等了

    import multiprocessing import time import sysdef daemon():name = multiprocessing.current_process().nameprint('Starting:', name)time.sleep(2)print('Exiting :', name)def non_daemon():name = multiprocessing.current_process().nameprint('Starting:', name)print('Exiting :', name)if __name__ == '__main__':d = multiprocessing.Process(name='daemon',target=daemon)d.daemon = Truen = multiprocessing.Process(name='non-daemon',target=non_daemon)n.daemon = Falsed.start()n.start()d.join(1)print 'd.is_alive()', d.is_alive()n.join()

    終止進程

    最好使用 poison pill,強制的使用 terminate()。注意 terminate 之后要 join,使其可以更新狀態

    import multiprocessing import timedef slow_worker():print('Starting worker')time.sleep(0.1)print('Finished worker')if __name__ == '__main__':p = multiprocessing.Process(target=slow_worker)print('BEFORE:', p, p.is_alive())p.start()print('DURING:', p, p.is_alive())p.terminate()print('TERMINATED:', p, p.is_alive())p.join()print('JOINED:', p, p.is_alive())

    進程的退出狀態

    • ?== 0 ? ? 未生成任何錯誤
    • ?0 ? ? ? ? ? 進程有一個錯誤,并以該錯誤碼退出
    • ?< 0 ? ? ? 進程由一個-1 * exitcode信號結束
    import multiprocessing import sys import timedef exit_error():sys.exit(1)def exit_ok():returndef return_value():return 1def raises():raise RuntimeError('There was an error!')def terminated():time.sleep(3)if __name__ == '__main__':jobs = []for f in [exit_error, exit_ok, return_value, raises, terminated]:print('Starting process for', f.func_name)j = multiprocessing.Process(target=f, name=f.func_name)jobs.append(j)j.start()jobs[-1].terminate()for j in jobs:j.join()print('%15s.exitcode = %s' % (j.name, j.exitcode))

    日志

    方便的調試,可以用logging

    import multiprocessing import logging import sysdef worker():print 'Doing some work'sys.stdout.flush()if __name__ == '__main__':multiprocessing.log_to_stderr()logger = multiprocessing.get_logger()logger.setLevel(logging.INFO)p = multiprocessing.Process(target=worker)p.start()p.join()

    派生進程

    利用 class 來創建進程,定制子類

    import multiprocessingclass Worker(multiprocessing.Process):def run(self):print('In %s' % self.name)returnif __name__ == '__main__':jobs = []for i in range(5):p = Worker()jobs.append(p)p.start()for j in jobs:j.join()

    python 進程間傳遞消息

    一般的情況是 Queue 來傳遞。

    import multiprocessingclass MyFancyClass(object):def __init__(self, name):self.name = namedef do_something(self):proc_name = multiprocessing.current_process().nameprint 'Doing something fancy in %s for %s!' % \(proc_name, self.name)def worker(q):obj = q.get()obj.do_something()if __name__ == '__main__':queue = multiprocessing.Queue()p = multiprocessing.Process(target=worker, args=(queue,))p.start()queue.put(MyFancyClass('Fancy Dan'))# Wait for the worker to finishqueue.close()queue.join_thread()p.join()import multiprocessing import timeclass Consumer(multiprocessing.Process):def __init__(self, task_queue, result_queue):multiprocessing.Process.__init__(self)self.task_queue = task_queueself.result_queue = result_queuedef run(self):proc_name = self.namewhile True:next_task = self.task_queue.get()if next_task is None:# Poison pill means shutdownprint '%s: Exiting' % proc_nameself.task_queue.task_done()breakprint '%s: %s' % (proc_name, next_task)answer = next_task()self.task_queue.task_done()self.result_queue.put(answer)returnclass Task(object):def __init__(self, a, b):self.a = aself.b = bdef __call__(self):time.sleep(0.1) # pretend to take some time to do the workreturn '%s * %s = %s' % (self.a, self.b, self.a * self.b)def __str__(self):return '%s * %s' % (self.a, self.b)if __name__ == '__main__':# Establish communication queuestasks = multiprocessing.JoinableQueue()results = multiprocessing.Queue()# Start consumersnum_consumers = multiprocessing.cpu_count() * 2print 'Creating %d consumers' % num_consumersconsumers = [ Consumer(tasks, results)for i in xrange(num_consumers) ]for w in consumers:w.start()# Enqueue jobsnum_jobs = 10for i in xrange(num_jobs):tasks.put(Task(i, i))# Add a poison pill for each consumerfor i in xrange(num_consumers):tasks.put(None)# Wait for all of the tasks to finishtasks.join()# Start printing resultswhile num_jobs:result = results.get()print 'Result:', resultnum_jobs -= 1

    進程間信號傳遞

    Event 提供一種簡單的方法,可以在進程間傳遞狀態信息。事件可以切換設置和未設置狀態。通過使用一個可選的超時值,時間對象的用戶可以等待其狀態從未設置變為設置。

    import multiprocessing import timedef wait_for_event(e):"""Wait for the event to be set before doing anything"""print('wait_for_event: starting')e.wait()print('wait_for_event: e.is_set()->', e.is_set())def wait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""print('wait_for_event_timeout: starting')e.wait(t)print('wait_for_event_timeout: e.is_set()->', e.is_set())if __name__ == '__main__':e = multiprocessing.Event()w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))w1.start()w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))w2.start()print('main: waiting before calling Event.set()')time.sleep(3)e.set()print('main: event is set')

    Python 多進程 multiprocessing.Pool類詳解

    multiprocessing 模塊

    multiprocessing 包是 Python 中的 多進程 管理包。它與 threading.Thread 類似,可以利用multiprocessing.Process 對象來創建一個進程。該進程可以允許放在 Python程序內部編寫的函數中。該 Process對象與Thread對象的用法相同,擁有 is_alive()、join([timeout])、run()、start()、terminate() 等方法。屬性有:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。此外 multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進程,其用法也與 threading 包中的同名類一樣。multiprocessing 的很大一部份與 threading 使用同一套 API,只不過換到了多進程的情境。

    這個模塊表示像線程一樣管理進程,這個是 multiprocessing 的核心,它與 threading 很相似,對多核 CPU 的利用率會比 threading 好的多。

    看一下 Process?類的構造方法:

    __init__(self, group=None, target=None, name=None, args=(), kwargs={})

    參數說明:

    • group:進程所屬組。基本不用
    • target:表示調用對象。
    • args:表示調用對象的位置參數元組。
    • name:別名
    • kwargs:表示調用對象的字典。

    創建進程的簡單實例:

    # -*- coding: utf-8 -*-import multiprocessingdef do(n):# 獲取當前線程的名字name = multiprocessing.current_process().nameprint(name, 'starting')print("worker ", n)returnif __name__ == '__main__':numList = []for i in range(5):p = multiprocessing.Process(target=do, args=(i,))numList.append(p)p.start()p.join()print("Process end.")

    執行結果:

    Process-1 starting worker 0 Process end. Process-2 starting worker 1 Process end. Process-3 starting worker 2 Process end. Process-4 starting worker 3 Process end. Process-5 starting worker 4 Process end.

    創建子進程時,只需要傳入一個 執行函數函數的參數,然后用?start() 方法啟動。
    join() 方法表示等待子進程結束以后再繼續往下運行,通常用于進程間的同步。

    注意:在 Windows 上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的 if __name__ == ‘__main__’ :語句的下面,才能正常使用 Windows 下的進程模塊。Unix/Linux下則不需要。

    Pool 類 ( 進程池 )

    在使用 Python 進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多臺主機,并行操作可以節約大量的時間。如果操作的對象數目不大時,還可以直接使用 Process類動態的生成多個進程,十幾個還好,但是如果上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時 進程池 就派上用場了。進程池 (Process Pool) 可以創建多個進程。這些進程就像是隨時待命的士兵,準備執行任務(程序)。一個進程池中可以容納多個待命的士兵。比如下面的程序:

    import multiprocessing as muldef func_test(x):return x ** 2if __name__ == '__main__':pool = mul.Pool(5)rel = pool.map(func_test, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])print(rel)pass

    Pool 類可以提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,如果池還沒有滿,就會創建一個新的進程來執行請求。如果池滿,請求就會告知先等待,直到池中有進程結束,才會創建新的進程來執行這些請求。

    Pool 類描述了一個工作進程池,他有幾種不同的方法讓任務卸載工作進程。 進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。我們可以用 Pool 類創建一個進程池,展開提交的任務給進程池。

    一個進程池對象可以控制工作進程池的哪些工作可以被提交,它支持 超時 和 回調的異步結果,有一個類似 map 的實現。

    參數

    • processes :進程的數量,如果 processes 是 None 那么使用 os.cpu_count() 返回的數量。
    • initializer:如果是 None,那么每一個工作進程在開始的時候會調 initializer(*initargs)。?
    • maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild 默認是 None,意味著只要 Pool 存在工作進程就會一直存活。?
    • context:用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者 一個context 對象的 Pool() 方法來創建一個池,兩種方法都適當的設置了context

    注意:Pool 對象的方法只可以被創建 pool 的進程所調用。

    下面介紹一下 multiprocessing 模塊下的 Pool 類下的幾個方法

    進程池的方法?

    • apply(func[, args[, kwds]]) :調用 func 函數并傳遞 args 和 kwds,結果返回前會一直阻塞,由于這個原因,apply_async() 更適合并發執行,另外,func函數僅被 pool 中的一個進程運行。 ??
    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :?apply() 方法的一個變體,會返回一個結果對象。?如果 callback 被指定,那么 callback 可以接收一個參數然后被調用,當結果準備好回調時會調用 callback,調用失敗時,則用 error_callback 替換 callback。 Callbacks 應被立即完成,否則處理結果的線程會被阻塞。 ??
    • close() :阻止更多的任務提交到 pool,待任務完成后,工作進程會退出。 ??
    • terminate() :不管任務是否完成,立即停止工作進程。在對 pool 對象進程垃圾回收的時候,會立即調用 terminate()。
    • join():wait 工作線程的退出,在調用 join() 前,必須調用 close() 或者?terminate()。這樣是因為被終止的進程需要被父進程調用 wait(join等價與wait),否則進程會成為僵尸進程。 ??
    • map(func, iterable[, chunksize]) ??
    • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    • imap(func, iterable[, chunksize])
    • imap_unordered(func, iterable[, chunksize]) ??
    • starmap(func, iterable[, chunksize])
    • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

    apply()

    函數原型:apply(func[, args=()[, kwds={}]])? ? 該函數用于傳遞不定參數,主進程會被阻塞直到函數執行結束(不建議使用,并且 3.x 以后不在出現)。apply 方法 示例:

    import time from multiprocessing import Pooldef f1(arg):time.sleep(0.5)print(arg)return arg + 100if __name__ == "__main__":pool = Pool(5)for i in range(1, 10):pool.apply(func=f1, args=(i,))pass

    apply_async()

    函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

    與 apply 用法一樣,但它是非阻塞且支持結果返回進行回調。apply_async 方法 示例:

    import time from multiprocessing import Pooldef f1(i):time.sleep(1)print(i)return i + 100def f2(arg):print(arg)if __name__ == "__main__":pool = Pool(5)for i in range(1, 10):pool.apply_async(func=f1, args=(i,), callback=f2)print('主進程等待')pool.close()pool.join()

    ??

    map()

    函數原型:map(func, iterable[, chunksize=None])

    Pool 類中的 map 方法,與內置的 map 函數用法行為基本一致,它會使進程阻塞直到返回結果。
    注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

    import time from multiprocessing import Pooldef run(arg=None):time.sleep(1)print('arg * arg')return arg * argif __name__ == "__main__":temp_list = [1, 2, 3, 4, 5, 6]start_time = time.time()for item in temp_list:run(item)end_time = time.time()print("順序執行時間:", int(end_time - start_time))pool = Pool(5) # 創建擁有5個進程數量的進程池start_time = time.time()result = pool.map(run, temp_list) # 使進程阻塞直到返回結果pool.close() # 關閉進程池,不再接受新的進程pool.join() # 主進程阻塞等待子進程的退出end_time = time.time()print("并行執行時間:", int(end_time - start_time))print(f'map 的所有子進程返回的結果列表: {result}')

    上例是一個創建多個進程并發處理與順序執行處理同一數據,所用時間的差別。從結果可以看出,并發執行的時間明顯比順序執行要快很多,但是進程是要耗資源的,所以平時工作中,進程數也不能開太大。程序中的 result?表示全部進程執行結束后全部的返回結果集run 函數有返回值,所以一個進程對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有進程都執行完畢,就返回這個列表(列表的順序不定)。

    對 Pool對象調用 join() 方法會等待所有子進程執行完畢,調用 join() 之前必須先調用 close(),讓其不再接受新的 Process。

    結果中為什么還有 空行 和沒有 換行 的數據呢?其實這跟進程調度有關,當有多個進程并行執行時,每個進程得到的時間片時間不一樣,哪個進程接受哪個請求以及執行完成時間都是不定的,所以會出現輸出亂序的情況。那為什么又會有沒這行和空行的情況呢?因為有可能在執行第一個進程時,剛要打印換行符時,切換到另一個進程,這樣就極有可能兩個數字打印到同一行,并且再次切換回第一個進程時會打印一個換行符,所以就會出現空行的情況。

    示例:

    import time from multiprocessing import Pooldef run(arg=None):time.sleep(2)print(arg)if __name__ == "__main__":startTime = time.time()temp_list = [1, 2, 3, 4, 5]pool = Pool(10) # 可以同時跑10個進程pool.map(run, temp_list)pool.close()pool.join()endTime = time.time()print("time :", endTime - startTime)

    close()

    關閉進程池(pool),使其不在接受新的任務。

    terminate()

    結束工作進程,不在處理未處理的任務。

    join()

    主進程阻塞等待子進程的退出,join 方法必須在 close 或 terminate 之后使用。

    threading 和 multiprocessing

    (請盡量先閱讀?Python多線程與同步?)

    multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

    但在使用這些共享API的時候,我們要注意以下幾點:

    • 在UNIX平臺上,當某個進程終結之后,該進程需要被其父進程調用wait,否則進程成為僵尸進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同于wait)。對于多線程來說,由于只有一個進程,所以不存在此必要性。
    • multiprocessing 提供了 threading 包中沒有的 IPC ( 比如:Pipe 和 Queue ),效率上更高。應優先考慮 Pipe 和 Queue,避免使用 Lock/Event/Semaphore/Condition 等同步方式 (因為它們占據的不是用戶進程的資源 )。
    • 多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如?使用全局變量或者傳遞參數。在多進程情況下,由于每個進程有自己獨立的內存空間,以上方法并不合適。此時我們可以通過?共享內存?和 Manager 的方法來共享資源。但這樣做提高了程序的復雜度,并因為同步的需要而降低了程序的效率。

    Process.PID中保存有PID,如果進程還沒有start(),則PID為None。

    進程 同步

    我們可以從下面的程序中看到 Thread 對象和 Process對象 在使用上的相似性與結果上的不同。各個線程和進程都做一件事:打印PID。但問題是,所有的任務在打印的時候都會向同一個標準輸出(stdout)輸出。這樣輸出的字符會混合在一起,無法閱讀。使用 Lock 同步,在一個任務輸出完成之后,再允許另一個任務輸出,可以避免多個任務同時向終端輸出。

    import os import threading import multiprocessing# worker function def worker(sign=None, t_lock=None):t_lock.acquire()print(sign, os.getpid())t_lock.release()# Main print('Main:', os.getpid())# Multi-thread record = [] threading_lock = threading.Lock() for i in range(5):thread = threading.Thread(target=worker, args=('thread', threading_lock))thread.start()record.append(thread)for thread in record:thread.join()# Multi-process record = [] process_lock = multiprocessing.Lock() for i in range(5):process = multiprocessing.Process(target=worker, args=('process', process_lock))process.start()record.append(process)for process in record:process.join()

    所有 Thread 的 PID 都與主程序相同,而每個 Process 都有一個不同的 PID。

    Pipe ( 管道 ) 和??mutiprocessing.Queue( 隊列 )

    正如我們在?Linux多線程?中介紹的管道PIPE和消息隊列 message queue,multiprocessing 包中有Pipe類?和 Queue類 來分別支持這兩種 IPC 機制。Pipe 和 Queue 可以用來傳送常見的對象。

    Pipe 可以是單向(half-duplex),也可以是雙向(duplex)。

    通過mutiprocessing.Pipe(duplex=False) 創建單向管道 (默認為雙向)。一個進程從 PIPE 一端輸入對象,然后被 PIPE 另一端的進程接收,單向管道只允許管道一端的進程輸入,而雙向管道則允許從兩端輸入。下面的程序展示了 Pipe 的使用:(?這里的 Pipe 是雙向的。 )

    import multiprocessing as muldef proc1(pipe=None):pipe.send('hello')print('proc1 rec:', pipe.recv())def proc2(pipe=None):print('proc2 rec:', pipe.recv())pipe.send('hello, too')# Build a pipe pipe = mul.Pipe()# Pass an end of the pipe to process 1 p1 = mul.Process(target=proc1, args=(pipe[0],)) # Pass the other end of the pipe to process 2 p2 = mul.Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join()

    Pipe 對象建立的時候,返回一個含有兩個元素的表,每個元素代表 Pipe 的一端(Connection對象)。對 Pipe 的某一端調用 send() 方法來傳送對象,在另一端使用 recv() 來接收。

    mutiprocessing.Queue

    Queue 與 Pipe 相類似,都是先進先出的結構。但 Queue 允許多個進程放入,多個進程從隊列取出對象。Queue 使用 mutiprocessing.Queue(maxsize) 創建,maxsize 表示隊列中可以存放對象的最大數量。下面的程序展示了 Queue 的使用:

    import os import multiprocessing import time# input worker def input_queue(queue=None):info = str(os.getpid()) + '(put):' + str(time.time())queue.put(info)# output worker def output_queue(queue=None, lock):info = queue.get()lock.acquire()print(str(os.getpid()) + '(get):' + info)lock.release()# =================== # Main record1 = [] # store input processes record2 = [] # store output processes lock = multiprocessing.Lock() # To prevent messy print queue = multiprocessing.Queue(3)# input processes for i in range(10):process = multiprocessing.Process(target=input_queue, args=(queue,))process.start()record1.append(process)# output processes for i in range(10):process = multiprocessing.Process(target=output_queue, args=(queue, lock))process.start()record2.append(process)for p in record1:p.join()queue.close() # No more object will come, close the queuefor p in record2:p.join()

    一些進程使用 put() 在 Queue 中放入字符串,這個字符串中包含 PID 和時間。另一些進程從Queue 中取出,并打印自己的 PID 以及 get() 的字符串

    共享資源

    在Python多進程初步已經提到,我們應該盡量避免多進程共享資源。多進程共享資源必然會帶來進程間相互競爭。而這種競爭又會造成race condition,我們的結果有可能被競爭的不確定性所影響。但如果需要,我們依然可以通過 共享內存Manager對象 這么做。

    共享內存

    在?Linux進程間通信?中,已經說過共享內存(shared memory) 的原理,這里給出用 Python 實現的例子:

    import multiprocessingdef f(n, a):n.value = 3.14a[0] = 5num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10))p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join()print(num.value) print(arr[:])

    這里我們實際上只有主進程和Process對象代表的進程。我們在主進程的內存空間中創建共享的內存,也就是Value和Array兩個對象。對象Value被設置成為雙精度數(d), 并初始化為0.0。而Array則類似于C中的數組,有固定的類型(i, 也就是整數)。在Process進程中,我們修改了Value和Array對象。回到主程序,打印出結果,主程序也看到了兩個對象的改變,說明資源確實在兩個進程之間共享。

    Manager

    Manager 對象類似于 服務器 與 客戶 之間的通信 (server-client),與我們在 Internet 上的活動很類似。我們用一個進程作為服務器,建立 Manager 來真正存放資源。其它的進程可以通過參數傳遞或者根據地址來訪問Manager,建立連接后,操作服務器上的資源。在防火墻允許的情況下,我們完全可以將Manager運用于多計算機,從而模仿了一個真實的網絡情境。下面的例子中,我們對Manager的使用類似于shared memory,但可以共享更豐富的對象類型。

    import multiprocessingdef f(x, arr, l):x.value = 3.14arr[0] = 5l.append('Hello')server = multiprocessing.Manager() x = server.Value('d', 0.0) arr = server.Array('i', range(10)) l = server.list()proc = multiprocessing.Process(target=f, args=(x, arr, l)) proc.start() proc.join()print(x.value) print(arr) print(l)

    Manager 利用 list() 方法提供了表的共享方式。實際上你可以利用 dict() 來共享詞典,Lock() 來共享 threading.Lock ( 注意,我們共享的是 threading.Lock,而不是進程的 mutiprocessing.Lock。后者本身已經實現了進程共享) 等。 這樣 Manager 就允許我們共享更多樣的對象。

    • Python多進程通信Queue、Pipe、Value、Array實例
    • Python中使用Queue和Condition進行線程同步的方法
    • Python Queue模塊詳解
    • python基于queue和threading實現多線程下載實例
    • 淺析Python中的多進程與多線程的使用
    • Python多進程同步Lock、Semaphore、Event實例
    • python 多進程通信模塊的簡單實現
    • 探究Python多進程編程下線程之間變量的共享問題
    • python多進程操作實例
    • 簡單談談python中的Queue與多進程

    總結

    以上是生活随笔為你收集整理的Python 多进程 multiprocessing 使用示例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。