日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python线程同步机制: Locks, RLocks, Semaphores, Condition

發布時間:2025/6/15 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python线程同步机制: Locks, RLocks, Semaphores, Condition 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么80%的碼農都做不了架構師?>>> ??

翻譯自Laurent Luce的博客
原文名稱:Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues
原文連接:http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/

本文詳細地闡述了Python線程同步機制。你將學習到以下有關Python線程同步機制:Lock,RLock,Semaphore,Condition,Event和Queue,還有Python的內部是如何實現這些機制的。 本文給出的程序的源代碼可以在github上找到。

首先讓我們來看一個沒有使用線程同步的簡單程序。

線程(Threading)

我們希望編程一個從一些URL中獲得內容并且將內容寫入文件的程序,完成這個程序可以不使用線程,為了加快獲取的速度,我們使用2個線程,每個線程處理一半的URL。

注:完成這個程序的最好方式是使用一個URL隊列,但是以下面的例子開始我的講解更加合適。

類FetchUrls是threading.Thread的子類,他擁有一個URL列表和一個寫URL內容的文件對象。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class FetchUrls(threading.Thread): """ 下載URL內容的線程 """ def __init__(self, urls, output): """ 構造器 @param urls 需要下載的URL列表 @param output 寫URL內容的輸出文件 """ threading.Thread.__init__(self) self.urls = urls self.output = output def run(self): """ 實現父類Thread的run方法,打開URL,并且一個一個的下載URL的內容 """ while self.urls: url = self.urls.pop() req = urllib2.Request(url) try: d = urllib2.urlopen(req) except urllib2.URLError, e: print 'URL %s failed: %s' % (url, e.reason) self.output.write(d.read()) print 'write done by %s' % self.name print 'URL %s fetched by %s' % (url, self.name)

main函數啟動了兩個線程,之后讓他們下載URL內容。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def main(): # URL列表1 urls1 = ['http://www.google.com', 'http://www.facebook.com'] # URL列表2 urls2 = ['http://www.yahoo.com', 'http://www.youtube.com'] f = open('output.txt', 'w+') t1 = FetchUrls(urls1, f) t2 = FetchUrls(urls2, f) t1.start() t2.start() t1.join() t2.join() f.close() if __name__ == '__main__': main()

上面的程序將出現兩個線程同時寫一個文件的情況,導致文件一團亂碼。我們需要找到一種在給定的時間里只有一個線程寫文件的方法。實現的方法就是使用像鎖(Locks)這樣的線程同步機制。

鎖(Lock)

鎖有兩種狀態:被鎖(locked)和沒有被鎖(unlocked)。擁有acquire()和release()兩種方法,并且遵循一下的規則:

  • 如果一個鎖的狀態是unlocked,調用acquire()方法改變它的狀態為locked;
  • 如果一個鎖的狀態是locked,acquire()方法將會阻塞,直到另一個線程調用release()方法釋放了鎖;
  • 如果一個鎖的狀態是unlocked調用release()會拋出RuntimeError異常;
  • 如果一個鎖的狀態是locked,調用release()方法改變它的狀態為unlocked。

解決上面兩個線程同時寫一個文件的問題的方法就是:我們給類FetchUrls的構造器中傳入一個鎖(lock),使用這個鎖來保護文件操作,實現在給定的時間只有一個線程寫文件。下面的代碼只顯示了關于lock部分的修改。完整的源碼可以在threads/lock.py中找到。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class FetchUrls(threading.Thread): ... def __init__(self, urls, output, lock): ... self.lock = lock #傳入的lock對象 def run(self): ... while self.urls: ... self.lock.acquire() #獲得lock對象,lock狀態變為locked,并且阻塞其他線程獲取lock對象(寫文件的權利) print 'lock acquired by %s' % self.name self.output.write(d.read()) print 'write done by %s' % self.name print 'lock released by %s' % self.name self.lock.release() #釋放lock對象,lock狀態變為unlocked,其他的線程可以重新獲取lock對象 ... def main(): ... lock = threading.Lock() ... t1 = FetchUrls(urls1, f, lock) t2 = FetchUrls(urls2, f, lock) ...

以下是程序的輸出:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 $ python locks.py lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.youtube.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.facebook.com fetched by Thread-1 lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.yahoo.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.google.com fetched by Thread-1

從上面的輸出我們可以看出,寫文件的操作被鎖保護,沒有出現兩個線程同時寫一個文件的現象。

下面我們看一下Python內部是如何實現鎖(Lock)的。我正在使用的Python版本是Linux操作系統上的Python 2.6.6。

threading模塊的Lock()方法就是thread.allocate_lock,代碼可以在Lib/threading.py中找到。

1 2 Lock = _allocate_lock _allocate_lock = thread.allocate_lock

C的實現在Python/thread_pthread.h中。程序假定你的系統支持POSIX信號量(semaphores)。sem_init()初始化鎖(Lock)所在地址的信號量。初始的信號量值是1,意味著鎖沒有被鎖(unlocked)。信號量將在處理器的不同線程之間共享。

1 2 3 4 5 6 7 8 9 10 11 12 13 PyThread_type_lock PyThread_allocate_lock(void) { ... lock = (sem_t *)malloc(sizeof(sem_t)); if (lock) { status = sem_init(lock,0,1); CHECK_STATUS("sem_init"); .... } ... }

當acquire()方法被調用時,下面的C代碼將被執行。默認的waitflag值是1,表示調用將被被阻塞直到鎖被釋放。sem_wait()方法減少信號量的值或者被阻塞直到信號量大于零。

1 2 3 4 5 6 7 8 9 10 11 12 int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) { ... do { if (waitflag) status = fix_status(sem_wait(thelock)); else status = fix_status(sem_trywait(thelock)); } while (status == EINTR); /* Retry if interrupted by a signal */ .... }

當release()方法被調用時,下面的C代碼將被執行。sem_post()方法增加信號量。

1 2 3 4 5 6 7 void PyThread_release_lock(PyThread_type_lock lock) { ... status = sem_post(thelock); ... }

可以將鎖(Lock)與“with”語句一起使用,鎖可以作為上下文管理器(context manager)。使用“with”語句的好處是:當程序執行到“with”語句時,acquire()方法將被調用,當程序執行完“with”語句時,release()方法會被調用(譯注:這樣我們就不用顯示地調用acquire()和release()方法,而是由“with”語句根據上下文來管理鎖的獲取和釋放。)下面我們用“with”語句重寫FetchUrls類。

1 2 3 4 5 6 7 8 9 10 11 12 class FetchUrls(threading.Thread): ... def run(self): ... while self.urls: ... with self.lock: #使用“with”語句管理鎖的獲取和釋放 print 'lock acquired by %s' % self.name self.output.write(d.read()) print 'write done by %s' % self.name print 'lock released by %s' % self.name ...

可重入鎖(RLock)

RLock是可重入鎖(reentrant lock),acquire()能夠不被阻塞的被同一個線程調用多次。要注意的是release()需要調用與acquire()相同的次數才能釋放鎖。

使用Lock,下面的代碼第二次調用acquire()時將被阻塞:

1 2 3 lock = threading.Lock() lock.acquire() lock.acquire()

如果你使用的是RLock,下面的代碼第二次調用acquire()不會被阻塞:

1 2 3 rlock = threading.RLock() rlock.acquire() rlock.acquire()

RLock使用的同樣是thread.allocate_lock(),不同的是他跟蹤宿主線程(the owner thread)來實現可重入的特性。下面是RLock的acquire()實現。如果調用acquire()的線程是資源的所有者,記錄調用acquire()次數的計數器就會加1。如果不是,就將試圖去獲取鎖。線程第一次獲得鎖時,鎖的擁有者將會被保存,同時計數器初始化為1。

1 2 3 4 5 6 7 8 9 10 11 12 13 def acquire(self, blocking=1): me = _get_ident() if self.__owner == me: self.__count = self.__count + 1 ... return 1 rc = self.__block.acquire(blocking) if rc: self.__owner = me self.__count = 1 ... ... return rc

下面我們看一下可重入鎖(RLock)的release()方法。首先它會去確認調用者是否是鎖的擁有者。如果是的話,計數器減1;如果計數器為0,那么鎖將會被釋放,這時其他線程就可以去獲取鎖了。

1 2 3 4 5 6 7 8 9 def release(self): if self.__owner != _get_ident(): raise RuntimeError("cannot release un-acquired lock") self.__count = count = self.__count - 1 if not count: self.__owner = None self.__block.release() ... ...

條件(Condition)

條件同步機制是指:一個線程等待特定條件,而另一個線程發出特定條件滿足的信號。 解釋條件同步機制的一個很好的例子就是生產者/消費者(producer/consumer)模型。生產者隨機的往列表中“生產”一個隨機整數,而消費者從列表中“消費”整數。完整的源碼可以在threads/condition.py中找到

在producer類中,producer獲得鎖,生產一個隨機整數,通知消費者有了可用的“商品”,并且釋放鎖。producer無限地向列表中添加整數,同時在兩個添加操作中間隨機的停頓一會兒。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.condition.acquire() #獲取條件鎖 print 'condition acquired by %s' % self.name self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'condition notified by %s' % self.name self.condition.notify() #喚醒消費者線程 print 'condition released by %s' % self.name self.condition.release() #釋放條件鎖 time.sleep(1) #暫停1秒鐘

下面是消費者(consumer)類。它獲取鎖,檢查列表中是否有整數,如果沒有,等待生產者的通知。當消費者獲取整數之后,釋放鎖。
注意在wait()方法中會釋放鎖,這樣生產者就能獲得資源并且生產“商品”。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.condition.acquire() #獲取條件鎖 print 'condition acquired by %s' % self.name while True: if self.integers: #判斷是否有整數 integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) break print 'condition wait by %s' % self.name self.condition.wait() #等待商品,并且釋放資源 print 'condition released by %s' % self.name self.condition.release() #最后釋放條件鎖

下面我們編寫main方法,創建兩個線程:

1 2 3 4 5 6 7 8 9 10 11 12 def main(): integers = [] condition = threading.Condition() t1 = Producer(integers, condition) t2 = Consumer(integers, condition) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main()

下面是程序的輸出:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 $ python condition.py condition acquired by Thread-1 159 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 condition acquired by Thread-2 159 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2 condition acquired by Thread-1 116 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 116 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2

Thread-1添加159到列表中,通知消費者同時釋放鎖,Thread-2獲得鎖,取回159,并且釋放鎖。此時因為執行time.sleep(1),生產者正在睡眠,當消費者再次試圖獲取整數時,列表中并沒有整數,這時消費者進入等待狀態,等待生產者的通知。當wait()被調用時,它會釋放資源,從而生產者能夠利用資源生產整數。

下面我們看一下Python內部是如何實現條件同步機制的。如果用戶沒有傳入鎖(lock)對象,condition類的構造器創建一個可重入鎖(RLock),這個鎖將會在調用acquire()和release()時使用。

1 2 3 4 5 6 7 class _Condition(_Verbose): def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() self.__lock = lock

接下來是wait()方法。為了簡化說明,我們假定在調用wait()方法時不使用timeout參數。wait()方法創建了一個名為waiter的鎖,并且設置鎖的狀態為locked。這個waiter鎖用于線程間的通訊,這樣生產者(在生產完整數之后)就可以通知消費者釋放waiter()鎖。鎖對象將會被添加到等待者列表,并且在調用waiter.acquire()時被阻塞。一開始condition鎖的狀態被保存,并且在wait()結束時被恢復。

1 2 3 4 5 6 7 8 9 10 11 12 13 def wait(self, timeout=None): ... waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # 無論如何恢復狀態 (例如, KeyboardInterrupt) if timeout is None: waiter.acquire() ... ... finally: self._acquire_restore(saved_state)

當生產者調用notify()方法時,notify()釋放waiter鎖,喚醒被阻塞的消費者。

1 2 3 4 5 6 7 8 9 10 11 def notify(self, n=1): ... __waiters = self.__waiters waiters = __waiters[:n] ... for waiter in waiters: waiter.release() try: __waiters.remove(waiter) except ValueError: pass

同樣Condition對象也可以和“with”語句一起使用,這樣“with”語句上下文會幫我們調用acquire()和release()方法。下面的代碼使用“with”語句改寫了生產者和消費者類。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class Producer(threading.Thread): ... def run(self): while True: integer = random.randint(0, 256) with self.condition: print 'condition acquired by %s' % self.name self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'condition notified by %s' % self.name self.condition.notify() print 'condition released by %s' % self.name time.sleep(1) class Consumer(threading.Thread): ... def run(self): while True: with self.condition: print 'condition acquired by %s' % self.name while True: if self.integers: integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) break print 'condition wait by %s' % self.name self.condition.wait() print 'condition released by %s' % self.name

信號量(Semaphore)

信號量同步基于內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用于訪問像服務器這樣的有限資源。

信號量同步的例子:

1 2 3 4 5 semaphore = threading.Semaphore() semaphore.acquire() # 使用共享資源 ... semaphore.release()

讓我們看一下信號量同步在Python內部是如何實現的。構造器使用參數value來表示計數器的初始值,默認值為1。一個條件鎖實例用于保護計數器,同時當信號量被釋放時通知其他線程。

1 2 3 4 5 6 7 class _Semaphore(_Verbose): ... def __init__(self, value=1, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__value = value ...

acquire()方法。如果信號量為0,線程被條件鎖的wait()方法阻塞,直到被其他線程喚醒;如果計數器大于0,調用acquire()使計數器減1。

1 2 3 4 5 6 7 8 9 10 11 def acquire(self, blocking=1): rc = False self.__cond.acquire() while self.__value == 0: ... self.__cond.wait() else: self.__value = self.__value - 1 rc = True self.__cond.release() return rc

信號量類的release()方法增加計數器的值并且喚醒其他線程。

1 2 3 4 5 def release(self): self.__cond.acquire() self.__value = self.__value + 1 self.__cond.notify() self.__cond.release()

還有一個“有限”(bounded)信號量類,可以確保release()方法的調用次數不能超過給定的初始信號量數值(value參數),下面是“有限”信號量類的Python代碼:

1 2 3 4 5 6 7 8 9 10 class _BoundedSemaphore(_Semaphore): """檢查release()的調用次數是否小于等于acquire()次數""" def __init__(self, value=1, verbose=None): _Semaphore.__init__(self, value, verbose) self._initial_value = value def release(self): if self._Semaphore__value >= self._initial_value: raise ValueError, "Semaphore released too many times" return _Semaphore.release(self)

同樣信號量(Semaphore)對象可以和“with”一起使用:

1 2 3 4 semaphore = threading.Semaphore() with semaphore: # 使用共享資源 ...

事件(Event)

基于事件的同步是指:一個線程發送/傳遞事件,另外的線程等待事件的觸發。 讓我們再來看看前面的生產者和消費者的例子,現在我們把它轉換成使用事件同步而不是條件同步。完整的源碼可以在threads/event.py里面找到。

首先是生產者類,我們傳入一個Event實例給構造器而不是Condition實例。一旦整數被添加進列表,事件(event)被設置和發送去喚醒消費者。注意事件(event)實例默認是被發送的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'event set by %s' % self.name self.event.set() #設置事件 self.event.clear() #發送事件 print 'event cleared by %s' % self.name time.sleep(1)

同樣我們傳入一個Event實例給消費者的構造器,消費者阻塞在wait()方法,等待事件被觸發,即有可供消費的整數。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.event.wait() #等待事件被觸發 try: integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) except IndexError: # catch pop on empty list time.sleep(1)

下面是程序的輸出,Thread-1添加124到整數列表中,然后設置事件并且喚醒消費者。消費者從wait()方法中喚醒,在列表中獲取到整數。

1 2 3 4 5 6 7 8 9 $ python event.py 124 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 124 popped from list by Thread-2 223 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 223 popped from list by Thread-2

事件鎖的Python內部實現,首先是Event鎖的構造器。構造器中創建了一個條件(Condition)鎖,來保護事件標志(event flag),同事喚醒其他線程當事件被設置時。

1 2 3 4 5 class _Event(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__flag = False

接下來是set()方法,它設置事件標志為True,并且喚醒其他線程。條件鎖對象保護程序修改事件標志狀態的關鍵部分。

1 2 3 4 5 6 7 def set(self): self.__cond.acquire() try: self.__flag = True self.__cond.notify_all() finally: self.__cond.release()

而clear()方法正好相反,它設置時間標志為False。

1 2 3 4 5 6 def clear(self): self.__cond.acquire() try: self.__flag = False finally: self.__cond.release()

最后,wait()方法將阻塞直到調用了set()方法,當事件標志為True時,wait()方法就什么也不做。

1 2 3 4 5 6 7 def wait(self, timeout=None): self.__cond.acquire() try: if not self.__flag: #如果flag不為真 self.__cond.wait(timeout) finally: self.__cond.release()

隊列(Queue)

隊列是一個非常好的線程同步機制,使用隊列我們不用關心鎖,隊列會為我們處理鎖的問題。 隊列(Queue)有以下4個用戶感興趣的方法:

  • put:?向隊列中添加一個項;
  • get:?從隊列中刪除并返回一個項;
  • task_done:?當某一項任務完成時調用;
  • join:?阻塞知道所有的項目都被處理完。

下面我們將上面的生產者/消費者的例子轉換成使用隊列。源代碼可以在threads/queue.py中找到。

首先是生產者類,我們不需要傳入一個整數列表,因為我們使用隊列就可以存儲生成的整數。生產者線程在一個無限循環中生成整數并將生成的整數添加到隊列中。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class Producer(threading.Thread): """ 向隊列中生產隨機整數 """ def __init__(self, queue): """ 構造器 @param integers 整數列表 #譯注:不需要這個參數 @param queue 隊列同步對象 """ threading.Thread.__init__(self) self.queue = queue def run(self): """ 實現Thread的run方法。在隨機時間向隊列中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.queue.put(integer) #將生成的整數添加到隊列 print '%d put to queue by %s' % (integer, self.name) time.sleep(1)

下面是消費者類。線程從隊列中獲取整數,并且在任務完成時調用task_done()方法。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class Consumer(threading.Thread): """ 從隊列中消費整數 """ def __init__(self, queue): """ 構造器 @param integers 整數列表 #譯注:不需要這個參數 @param queue 隊列同步對象 """ threading.Thread.__init__(self) self.queue = queue def run(self): """ 實現Thread的run()方法,從隊列中消費整數 """ while True: integer = self.queue.get() print '%d popped from list by %s' % (integer, self.name) self.queue.task_done()

以下是程序的輸出:

1 2 3 4 5 $ python queue.py 61 put to queue by Thread-1 61 popped from list by Thread-2 6 put to queue by Thread-1 6 popped from list by Thread-2

隊列同步的最大好處就是隊列幫我們處理了鎖?,F在讓我們去看看在Python內部是如何實現隊列同步機制的。

隊列(Queue)構造器創建一個鎖,保護隊列元素的添加和刪除操作。同時創建了一些條件鎖對象處理隊列事件,比如隊列不空事件(削除get()的阻塞),隊列不滿事件(削除put()的阻塞)和所有項目都被處理完事件(削除join()的阻塞)。

1 2 3 4 5 6 7 8 class Queue: def __init__(self, maxsize=0): ... self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0

put()方法向隊列中添加一個項,或者阻塞如果隊列已滿。這時隊列非空,它喚醒阻塞在get()方法中的線程。更多關于Condition鎖的內容請查看上面的講解。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 def put(self, item, block=True, timeout=None): ... self.not_full.acquire() try: if self.maxsize > 0: ... elif timeout is None: while self._qsize() == self.maxsize: self.not_full.wait() self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release()

get()方法從隊列中獲得并刪除一個項,或者阻塞當隊列為空時。這時隊列不滿,他喚醒阻塞在put()方法中的線程。

1 2 3 4 5 6 7 8 9 10 11 12 13 def get(self, block=True, timeout=None): ... self.not_empty.acquire() try: ... elif timeout is None: while not self._qsize(): self.not_empty.wait() item = self._get() self.not_full.notify() return item finally: self.not_empty.release()

當調用task_done()方法時,未完成任務的數量減1。如果未完成任務的數量為0,線程等待隊列完成join()方法。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def task_done(self): self.all_tasks_done.acquire() try: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished finally: self.all_tasks_done.release() def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: self.all_tasks_done.wait() finally: self.all_tasks_done.release()

本文到此結束,希望您喜歡這篇文章。歡迎您的留言和反饋。

---EOF---

轉載于:https://my.oschina.net/sukai/blog/647517

總結

以上是生活随笔為你收集整理的Python线程同步机制: Locks, RLocks, Semaphores, Condition的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。