日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

Python | threading03 - 使用条件对象,实现线程间的同步

發(fā)布時間:2025/3/15 python 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python | threading03 - 使用条件对象,实现线程间的同步 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 一、前言
  • 二、生產(chǎn)者-消費者的模型
    • 2.1、代碼
    • 2.2、運行
    • 2.3、wait( )方法會將互斥鎖釋放
  • 三、條件同步 - threading.Condition( )
    • 3.1、相關(guān)API
    • 3.2、acquire( )
    • 3.2、release( )
    • 3.3、wait( )
    • 3.4、notify(n=1)
    • 3.5、notify_all( )
    • 3.6、wait_for(predicate,timeout=None)
  • 四、with語句讓代碼更加簡潔
    • 4.1、代碼
    • 4.2、運行結(jié)果

一、前言


上一篇博文學(xué)習(xí)了線程的互斥鎖(Python | threading02 - 互斥鎖解決多個線程之間隨機調(diào)度,造成“線程不安全”的問題 ),線程的互斥鎖的目的是解決“線程不安全”的問題。值得注意的是互斥鎖解決了”線程不安全“問題,但產(chǎn)生了另外一些問題,如“死鎖”,“加鎖不合理導(dǎo)致程序的效率低下”等。

今天繼續(xù)學(xué)習(xí)python線程的另外一個工具 - 條件對象,條件對象用于實現(xiàn)線程間的同步。

官方文檔:

threading - Thread-based parallelism - Python 3.10.1 documentation

二、生產(chǎn)者-消費者的模型


2.1、代碼

代碼的目的:

  • customer每隔2秒將item減少1(可以理解為消耗一個item)。當(dāng)發(fā)現(xiàn)item=0時(即沒有item可以消耗時,趕緊通知producer生產(chǎn)item)。
  • producer每隔1秒將item增加1。當(dāng)item等于5時(可以理解為沒有地方存放item了),馬上停止生產(chǎn)item。
# python3.9 import threading import timec = threading.Condition() # 聲明一個條件對象 item = 0 # 用于計數(shù)def producer():"""子線程-生產(chǎn)者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(1) # 線程休眠1秒c.acquire() # 獲取鎖# 如果item等于5,就進(jìn)行阻塞if item == 5:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))c.wait() # 進(jìn)入阻塞態(tài),并釋放鎖,,等待其他線程notifyitem += 1 # 每一秒增加1print("子線程%s將item增加1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.notify() # 通知customer解除阻塞態(tài)c.release() # 釋放鎖def customer():"""子線程-消費者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(2) # 線程休眠2秒c.acquire() #獲取鎖if item == 0:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))print("通知producer恢復(fù)運行(解除阻塞態(tài))")c.notify() # 通知producer線程解除阻塞態(tài)c.wait() # 進(jìn)入阻塞態(tài),并釋放鎖。等待其他線程notifyitem -= 1print("子線程%s將item減少1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.release() # 釋放鎖def main():t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 創(chuàng)建producer子線程t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 創(chuàng)建customer子線程t1.start() # 啟動customer線程 t2.start() # 啟動producer線程t1.join() # 子線程customer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束t2.join() # 子線程producer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束print("主線程運行結(jié)束!")if __name__ == "__main__":main()

2.2、運行

運行的結(jié)果:

2.3、wait( )方法會將互斥鎖釋放

剛開始我就犯了一個錯誤,c.wait( )之后緊跟著c.release( )。根據(jù)《Python并行編程》的解釋,wait( )方法會將互斥鎖釋放回去。

三、條件同步 - threading.Condition( )


3.1、相關(guān)API

  • acquire() — 線程鎖,注意線程條件變量Condition中的所有相關(guān)函數(shù)使用必須在acquire() /release() 內(nèi)部操作;
  • release() — 釋放鎖,注意線程條件變量Condition中的所有相關(guān)函數(shù)使用必須在acquire() /release() 內(nèi)部操作;
  • wait(timeout) — 線程掛起(阻塞狀態(tài)),直到收到一個notify通知或者超時才會被喚醒繼續(xù)運行(超時參數(shù)默認(rèn)不設(shè)置,可選填,類型是浮點數(shù),單位是秒)。wait()必須在已獲得Lock前提下才能調(diào)用,否則會觸發(fā)RuntimeError;
  • notify(n=1) — 通知其他線程,那些掛起的線程接到這個通知之后會開始運行,缺省參數(shù),默認(rèn)是通知一個正等待通知的線程,最多則喚醒n個等待的線程。notify()必須在已獲得Lock前提下才能調(diào)用,否則會觸發(fā)RuntimeError,notify()不會主動釋放Lock;
  • notifyAll() — 如果wait狀態(tài)線程比較多,notifyAll的作用就是通知所有線程;
  • wait_for(predicate,timeout=None) — 這個實用方法會重復(fù)地調(diào)用wait( ),直到滿足判斷式或者發(fā)生超時。

3.2、acquire( )

從代碼可以看到,使用c.notify( )與c.wait( )之前需要先調(diào)用c.acquire( )獲取互斥鎖,否則會拋出異常。

3.2、release( )

沒什么好說的,就是釋放互斥鎖。

3.3、wait( )

讓調(diào)用wait( )的線程進(jìn)入阻塞態(tài),等待其他線程調(diào)用notify( )來進(jìn)行同步,可以設(shè)置timeout。

3.4、notify(n=1)

讓其他調(diào)用wait( )的線程從阻塞態(tài)恢復(fù)至運行態(tài),繼續(xù)運行后面的代碼。

3.5、notify_all( )

跟notify( )類似,notify_all( )不止喚醒一個線程,而是喚醒所有因為wait( )而進(jìn)入阻塞態(tài)的線程。

3.6、wait_for(predicate,timeout=None)

這個wait_for相當(dāng)于以下代碼(來自官方的解釋):

while not predicate():c.wait()

使用代碼測試一下wait_for( )的用法。

# python3.9 import threading import timec = threading.Condition() # 聲明一個條件對象 item = 0 # 用于計數(shù)def customer_Wait_For():""""""global itemif item == 0:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))print("通知producer恢復(fù)運行(解除阻塞態(tài))")c.notify() # 通知producer線程解除阻塞態(tài)return False # 相當(dāng)于運行c.wait()else:return True # 相當(dāng)于不運行c.wait()def producer_Wait_For():""""""global itemif item == 5:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))return False # 相當(dāng)于運行c.wait()else:return True # 相當(dāng)于不運行c.wait()def producer():"""子線程-生產(chǎn)者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(1) # 線程休眠1秒c.acquire() # 獲取鎖c.wait_for(producer_Wait_For) # 看看是否需要阻塞item += 1 # 每一秒增加1print("子線程%s將item增加1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.notify() # 通知customer解除阻塞態(tài)c.release() # 釋放鎖def customer():"""子線程-消費者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(2) # 線程休眠2秒c.acquire() #獲取鎖c.wait_for(customer_Wait_For) # 看看是否需要阻塞item -= 1print("子線程%s將item減少1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.release() # 釋放鎖def main():t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 創(chuàng)建producer子線程t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 創(chuàng)建customer子線程t1.start() # 啟動customer線程 t2.start() # 啟動producer線程t1.join() # 子線程customer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束t2.join() # 子線程producer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束print("主線程運行結(jié)束!")if __name__ == "__main__":main()

從運行的結(jié)果看來,跟c.wait_for()只是將條件判斷(是否阻塞)封裝在一個函數(shù)里面而已,使得線程的函數(shù)變得更加簡潔。

四、with語句讓代碼更加簡潔


從官方文檔了解到,條件對象也支持with語句。

4.1、代碼

從下面的代碼可以看到,with語句幫我們管理了acquire( )方法與release( )方法(代碼里找不到acquire( )方法與release( )方法了。)

# python3.9 import threading import timec = threading.Condition() # 聲明一個條件對象 item = 0 # 用于計數(shù)def producer():"""子線程-生產(chǎn)者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(1) # 線程休眠1秒with c:# 如果item等于5,就進(jìn)行阻塞if item == 5:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))c.wait() # 進(jìn)入阻塞態(tài),并釋放鎖,,等待其他線程notifyitem += 1 # 每一秒增加1print("子線程%s將item增加1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))c.notify() # 通知customer解除阻塞態(tài)def customer():"""子線程-消費者"""global c # c是全局變量global item # item是全局變量while True:time.sleep(2) # 線程休眠2秒with c:if item == 0:print("子線程%s發(fā)現(xiàn)item = %d,time = %s,將進(jìn)入阻塞態(tài)" % (threading.current_thread().getName(),item,time.perf_counter()))print("通知producer恢復(fù)運行(解除阻塞態(tài))")c.notify() # 通知producer線程解除阻塞態(tài)c.wait() # 進(jìn)入阻塞態(tài),并釋放鎖。等待其他線程notify item -= 1print("子線程%s將item減少1,item = %d,time = %s" % (threading.current_thread().getName(),item,time.perf_counter()))def main():t1 = threading.Thread(target=customer,name="thread_customer",daemon=True) # 創(chuàng)建producer子線程t2 = threading.Thread(target=producer,name="thread_producer",daemon=True) # 創(chuàng)建customer子線程t1.start() # 啟動customer線程 t2.start() # 啟動producer線程t1.join() # 子線程customer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束t2.join() # 子線程producer是無限循環(huán)的線程,所以主線程需要等待它運行結(jié)束print("主線程運行結(jié)束!")if __name__ == "__main__":main()

4.2、運行結(jié)果

總結(jié)

以上是生活随笔為你收集整理的Python | threading03 - 使用条件对象,实现线程间的同步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。