日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 线程同步_Python并发编程-线程同步(线程安全)

發(fā)布時間:2024/10/12 python 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 线程同步_Python并发编程-线程同步(线程安全) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Python并發(fā)編程-線程同步(線程安全)

作者:尹正杰

版權(quán)聲明:原創(chuàng)作品,謝絕轉(zhuǎn)載!否則將追究法律責(zé)任。

線程同步,線程間協(xié)調(diào),通過某種技術(shù),讓一個線程訪問某些數(shù)據(jù)時,其它線程不能訪問這些數(shù)據(jù),直到該線程完成對數(shù)據(jù)的操作。

一.Event

1>.Event的常用方法

Event事件,是線程通信機制中最簡單的實現(xiàn),使用一個內(nèi)部的標(biāo)記flag,通過flage的True或False的變化來進行操作。

常用方法如下:

set():

標(biāo)記為True。clear():

標(biāo)記設(shè)置為Flase。

is_set():

查詢標(biāo)記是否為True。wait(timeout=None):

設(shè)置等待標(biāo)記為True的時長,None為無線等待。等到返回True,未等到超時了返回False。

2>.Event使用案例

1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading import Event,Thread8 import logging9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(format=FORMAT,level=logging.INFO)12

13 def boss(event:Event):14 logging.info("I'm boss,waiting for U")15 event.wait() #阻塞等待,直到event被標(biāo)記為Ture16 logging.info("Good Job.")17

18 def worker(event:Event,count=10):19 logging.info("I'm working for U.")20 cups =[]21 while not event.wait(0.5): #使用wait等待0.5秒(相當(dāng)于"time.sleep(0.5)"),若規(guī)定事件內(nèi)event依舊標(biāo)記依舊沒有設(shè)置為True,則返回False22 logging.info("make 1 cup")23 cups.append(1)24 if len(cups) >=count:25 event.set() #將標(biāo)記設(shè)置為True26 break27 logging.info("I finished my job.cups={}".format(cups))28

29 event =Event()30 print(event.is_set()) #event實例的標(biāo)記默認為False31

32 b = Thread(target=boss,name="boss",args=(event,))33 w = Thread(target=worker,name="worker",args=(event,))34 b.start()35 w.start()

2019-11-23 14:54:53,177 boss 10916 I'm boss,waiting for U

2019-11-23 14:54:53,178 worker 15672 I'm working for U.

False2019-11-23 14:54:53,678 worker 15672 make 1cup2019-11-23 14:54:54,179 worker 15672 make 1cup2019-11-23 14:54:54,680 worker 15672 make 1cup2019-11-23 14:54:55,180 worker 15672 make 1cup2019-11-23 14:54:55,680 worker 15672 make 1cup2019-11-23 14:54:56,181 worker 15672 make 1cup2019-11-23 14:54:56,681 worker 15672 make 1cup2019-11-23 14:54:57,181 worker 15672 make 1cup2019-11-23 14:54:57,681 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 I finished my job.cups=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]2019-11-23 14:54:58,182 boss 10916 Good Job.

以上代碼執(zhí)行結(jié)果戳這里

3>.定時器Timer(延遲執(zhí)行)

1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 import threading7 import logging8 import time

9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 def worker():14 logging.info("working")15 time.sleep(2)16

17 """18 Timer是線程Thread的子類,Timer實例內(nèi)部提供了一個finished屬性,該屬性是Event對象。19 Timer是線程Thread的子類,就是線程類,具有線程的能力和特征。20 它的實例時能夠延時執(zhí)行目標(biāo)函數(shù)的線程,在真正執(zhí)行目標(biāo)函數(shù)之前,都可以cancel它。21 cacel方法本質(zhì)使用Event類實現(xiàn)。這并不是說,線程提供了取消的方法。22 """23 t = threading.Timer(4,worker) #當(dāng)t對象調(diào)用start方法后,等待4秒后執(zhí)行worker函數(shù)24 t.setName("timer")25

26 t.start()27 # t.cancel() #本質(zhì)上是在worker函數(shù)執(zhí)行前對finished屬性set方法操作,從而跳過了worker函數(shù)執(zhí)行,達到了取消的效果。28

29 for _ in range(10):30 print(threading.enumerate())31 time.sleep(1)

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]2019-11-23 15:07:40,987 timer 6656working

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

以上代碼執(zhí)行結(jié)果戳這里

二.Lock

1>.Lock的常用方法

鎖,一旦線程獲得鎖,其它試圖獲取鎖的線程將被阻塞。

鎖:凡是存在資源共享爭搶的地方都可以使用鎖,從而只有一個使用者可以完全使用這個資源。

鎖常用的方法如下:

acquire(blocking=True,timeout=-1):

默認阻塞,阻塞可以設(shè)置超時事件。非阻塞時,timeout禁止設(shè)置。

成功獲取鎖,返回True,否則返回False

release():

釋放鎖。可以從任何線程調(diào)用釋放。

已上鎖的鎖,會被重置為unlocked

若在未上鎖的鎖上調(diào)用,則會拋出RuntimeError異常。

2>.Lock鎖使用案例

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 from threading importThread,Lock7 importlogging8 importtime9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 cpus =[]14 lock =Lock()15

16 def worker(count=10):17 logging.info("I'm working for U.")18 flag =False19 whileTrue:20 lock.acquire() #獲取鎖

21 if len(cpus) >=count:22 flag =True23 time.sleep(0.0001) #為了看出線程切換效果

24 if notflag:25 cpus.append(1)26 lock.release()27 ifflag:28 break

29 logging.info("I finished . cups = {}".format(len(cpus)))30

31

32 for _ in range(10):33 Thread(target=worker,args=(1000,)).start()

2019-11-23 16:03:35,225 Thread-1 16204 I'm working for U.

2019-11-23 16:03:35,226 Thread-2 14436 I'm working for U.

2019-11-23 16:03:35,226 Thread-3 1164 I'm working for U.

2019-11-23 16:03:35,226 Thread-4 10460 I'm working for U.

2019-11-23 16:03:35,227 Thread-5 5072 I'm working for U.

2019-11-23 16:03:35,227 Thread-6 12016 I'm working for U.

2019-11-23 16:03:35,227 Thread-7 9732 I'm working for U.

2019-11-23 16:03:35,228 Thread-8 15644 I'm working for U.

2019-11-23 16:03:35,228 Thread-9 104 I'm working for U.

2019-11-23 16:03:35,228 Thread-10 16508 I'm working for U.

2019-11-23 16:03:37,130 Thread-1 16204 I finished . cups = 1000

2019-11-23 16:03:37,132 Thread-3 1164 I finished . cups = 1000

2019-11-23 16:03:37,134 Thread-4 10460 I finished . cups = 1000

2019-11-23 16:03:37,136 Thread-2 14436 I finished . cups = 1000

2019-11-23 16:03:37,138 Thread-5 5072 I finished . cups = 1000

2019-11-23 16:03:37,140 Thread-6 12016 I finished . cups = 1000

2019-11-23 16:03:37,142 Thread-7 9732 I finished . cups = 1000

2019-11-23 16:03:37,144 Thread-8 15644 I finished . cups = 1000

2019-11-23 16:03:37,146 Thread-9 104 I finished . cups = 1000

2019-11-23 16:03:37,148 Thread-10 16508 I finished . cups = 1000

以上代碼執(zhí)行結(jié)果戳這里

3>.加鎖和解鎖(計數(shù)器類案例)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 from threading importThread,Lock8 importlogging9 importtime10

11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

12 logging.basicConfig(level=logging.INFO,format=FORMAT)13

14 classCounter:15 def __init__(self):16 self._val =017 self.__lock =Lock()18

19 @property20 defvalue(self):21 with self.__lock:22 returnself._val23

24 definc(self):25 try:26 self.__lock.acquire()27 self._val += 1

28 finally:29 self.__lock.release()30

31 defdec(self):32 with self.__lock:33 self._val -= 1

34

35

36 def worker(c:Counter,count=100):37 for _ inrange(count):38 for i in range(-50,50):39 if i <0:40 c.dec()41 else:42 c.inc()43

44 c =Counter()45 c1 = 10

46 c2 = 10000

47

48 for i inrange(c1):49 Thread(target=worker,args=(c,c2)).start()50

51

52 whileTrue:53 time.sleep(1)54 print(threading.enumerate())55 if threading.active_count() == 1:56 print((c.value))57 break

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , ]

[<_MainThread(MainThread, started 14856)>]

0

以上代碼執(zhí)行結(jié)果戳這里

4>.鎖的應(yīng)用場景

鎖適用于訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候。

如果全部都是讀取同一個共享資源需要鎖嗎?

不需要。因為這時可以認為共享資源是不可變的,每一次讀取它都是一樣的值,所以不用加鎖

使用鎖的注意事項:

少用鎖,必要時用鎖。使用了鎖,多線程訪問被鎖的資源時,就成了串行,要么排隊執(zhí)行,要么爭搶執(zhí)行

舉例,高速公路上車并行跑,可是到了省界只開放了一個收費口,過了這個口,車輛依然可以在多車道上一起跑。過收費口的時候,如果排隊一輛輛過,加不加鎖一樣效率相當(dāng),但是一旦出現(xiàn)爭搶,就必須加鎖一輛輛過。注意,不管加不加鎖,只要是一輛輛過,效率就下降了。

加鎖時間越短越好,不需要就立即釋放鎖

一定要避免死鎖

不使用鎖,有了效率,但是結(jié)果是錯的。

使用了鎖,效率低下,但是結(jié)果是對的。

所以,我們是為了效率要錯誤結(jié)果呢?還是為了對的結(jié)果,讓計算機去計算吧。

5>.非阻塞鎖使用

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importlogging8 importtime9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)-10d %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 lock =threading.Lock()14

15 defworker(l:threading.Lock):16 whileTrue:17 flag =l.acquire(False)18 ifflag:19 logging.info("do something.") #為了顯示效果,沒有釋放鎖

20 else:21 logging.info("try again")22 time.sleep(1)23

24 for i in range(5):25 threading.Thread(target=worker,name="worker={}".format(i),args=(lock,)).start()

2019-11-24 15:58:31,932 worker=0 123145354420224do something.2019-11-24 15:58:31,933 worker=0 123145354420224 tryagain2019-11-24 15:58:31,933 worker=1 123145359675392 tryagain2019-11-24 15:58:31,933 worker=2 123145364930560 tryagain2019-11-24 15:58:31,934 worker=3 123145370185728 tryagain2019-11-24 15:58:31,934 worker=4 123145375440896 tryagain2019-11-24 15:58:32,933 worker=0 123145354420224 tryagain2019-11-24 15:58:32,933 worker=1 123145359675392 tryagain2019-11-24 15:58:32,934 worker=2 123145364930560 tryagain2019-11-24 15:58:32,936 worker=4 123145375440896 tryagain2019-11-24 15:58:32,936 worker=3 123145370185728 tryagain2019-11-24 15:58:33,935 worker=0 123145354420224 tryagain2019-11-24 15:58:33,935 worker=1 123145359675392 tryagain2019-11-24 15:58:33,935 worker=2 123145364930560 tryagain2019-11-24 15:58:33,940 worker=4 123145375440896 tryagain2019-11-24 15:58:33,940 worker=3 123145370185728 tryagain2019-11-24 15:58:34,939 worker=0 123145354420224 tryagain2019-11-24 15:58:34,940 worker=1 123145359675392 tryagain2019-11-24 15:58:34,940 worker=2 123145364930560 tryagain2019-11-24 15:58:34,944 worker=4 123145375440896 tryagain2019-11-24 15:58:34,945 worker=3 123145370185728 tryagain2019-11-24 15:58:35,943 worker=1 123145359675392 tryagain2019-11-24 15:58:35,944 worker=0 123145354420224 tryagain2019-11-24 15:58:35,944 worker=2 123145364930560 tryagain2019-11-24 15:58:35,948 worker=4 123145375440896 tryagain2019-11-24 15:58:35,949 worker=3 123145370185728 tryagain

......

以上代碼執(zhí)行結(jié)果戳這里

三.可重入鎖RLock

1>.可重入鎖不可跨越線程

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importtime8

9 lock =threading.RLock()10 print(lock.acquire()) #僅對當(dāng)前線程上鎖,但是代碼并不會阻塞而是可以繼續(xù)執(zhí)行

11 print(lock.acquire(blocking=True))12 print(lock.acquire(timeout=3)) #默認"blocking=True",因此可以設(shè)置值阻塞的超時時間,但當(dāng)blocking=False時,timeout無法使用。

13 print(lock.acquire(blocking=False))14

15 print("main thread {}".format(threading.main_thread().ident))16 print("lock in main thread {}".format(lock))17

18 print("{0} 我是分割線 {0}".format("*" * 15))19

20 lock.release()21 lock.release()22 lock.release()23 lock.release()24 #lock.release() #由于上面鎖定的lock調(diào)用了4此鎖定,因此解鎖也只能是4次,若解鎖次數(shù)超過上鎖次數(shù)則拋出"RuntimeError: cannot release un-acquired lock"異常。

25

26 print("main thread {}".format(threading.main_thread().ident))27 print("lock in main thread {}".format(lock))28

29 print("{0} 我是分割線 {0}".format("*" * 15))30

31 print(lock.acquire(blocking=False))32 print(lock.acquire(blocking=False))33 print("main thread {}".format(threading.main_thread().ident))34 print("lock in main thread {}".format(lock))35

36 #threading.Thread(target=lambda l:l.release(),args=(lock,)).start() #可重入鎖不可跨越線程,否則會拋出"RuntimeError: cannot release un-acquired lock"異常。

37 lock.release() #可重入鎖無論是上鎖還是解鎖要求在同一個線程中。

38

39 time.sleep(3)40 print("main thread {}".format(threading.main_thread().ident))41 print("lock in main thread {}".format(lock))

True

True

True

True

main thread18096lockin main thread

*************** 我是分割線 ***************main thread18096lockin main thread

*************** 我是分割線 ***************True

True

main thread18096lockin main thread main thread18096lockin main thread

以上代碼執(zhí)行結(jié)果戳這里

2>.為另一個線程傳入同一個RLock對象

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importtime8

9 lock =threading.RLock()10

11 defsub(l:threading.RLock):12 print("{}:{}".format(threading.current_thread(),l.acquire())) #阻塞

13 print("{}:{}".format(threading.current_thread(),l.acquire()))14 print("lock in sub thread {}".format(lock))15 l.release()16 print("release in sub 1")17 print("lock in sub thread {}".format(lock))18 l.release()19 print("release in sub 2")20 print("lock in sub thread {}".format(lock))21

22

23 print(lock.acquire())24 print("main thread {}".format(threading.main_thread().ident))25 print("lock in main thread {}".format(lock))26

27 print("{0} 我是分割線 {0}".format("*" * 15))28

29 threading.Timer(2,sub,(lock,)).start() #為另一個線程傳入同一個lock對象

30

31 print("in main thread, {}".format(lock.acquire()))32 lock.release()33 time.sleep(5)34 print("release lock in main thread =======",end="\n\n")35 lock.release()36 print("lock in main thread {}".format(lock))

True

main thread2456lockin main thread

*************** 我是分割線 ***************

inmain thread, True

release lockin main thread =======lockin main thread

:True:True

lockin sub thread releasein sub 1lockin sub thread releasein sub 2lockin sub thread

以上代碼執(zhí)行結(jié)果戳這里

3>.可重入鎖相關(guān)總結(jié)

可重入鎖,是線程相關(guān)的鎖。可在同一個線程中獲取鎖,并可以繼續(xù)在同一線程不阻塞多次獲取鎖。

當(dāng)鎖未釋放完,其它線程獲取鎖就會阻塞,直到當(dāng)前持有鎖的線程釋放完鎖。

鎖都應(yīng)該使用完后釋放??芍厝腈i也是鎖,應(yīng)該acquire多少次,就release多少次。

四.Condition

1>.Condition常用方法

Condition(lock=None):

可傳入一個Lock或者RLock對象,默認是RLock。

acquire(*args):

獲取鎖。

wait(self,timeout=None):

等待或超時。

notify(n=1):

喚醒至多指定數(shù)目個數(shù)的等待的線程,沒有等待的線程就沒有任何操作。

notify_all():

喚醒所有等待的線程。

2>.生產(chǎn)者消費者模型

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importlogging7 from threading importEvent,Thread,Condition8 importtime9 importrandom10

11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

12 logging.basicConfig(format=FORMAT,level=logging.INFO)13

14 classDispachter:15 def __init__(self):16 self.data =None17 self.event = Event() #event只是為了使用方便,與邏輯無關(guān)

18 self.cond =Condition()19

20 defproduce(self,total):21 for _ inrange(total):22 data = random.randint(1,100)23 with self.cond:24 logging.info(data)25 self.data =data26 #self.cond.notify_all()

27 self.cond.notify(2)28 self.event.wait(1)29

30 defconsume(self):31 while notself.event.is_set():32 with self.cond:33 self.cond.wait()34 data =self.data35 logging.info("recieved {}".format(data))36 self.data =None37 self.event.wait(0.5)38

39 d =Dispachter()40 p = Thread(target=d.produce,name="producer",args=(10,))41

42 #增加消費者

43 for i in range(5):44 c = Thread(target=d.consume,name="consumer")45 c.start()46

47 p.start()

2019-11-25 22:24:45,076 producer 12228 64

2019-11-25 22:24:45,076 consumer 7612 recieved 64

2019-11-25 22:24:45,076 consumer 18400recieved None2019-11-25 22:24:46,077 producer 12228 41

2019-11-25 22:24:46,077 consumer 15008 recieved 41

2019-11-25 22:24:46,078 consumer 16440recieved None2019-11-25 22:24:47,077 producer 12228 98

2019-11-25 22:24:47,077 consumer 14832 recieved 98

2019-11-25 22:24:47,077 consumer 7612recieved None2019-11-25 22:24:48,077 producer 12228 39

2019-11-25 22:24:48,077 consumer 18400 recieved 39

2019-11-25 22:24:48,078 consumer 15008recieved None2019-11-25 22:24:49,078 producer 12228 79

2019-11-25 22:24:49,078 consumer 16440 recieved 79

2019-11-25 22:24:49,078 consumer 14832recieved None2019-11-25 22:24:50,079 producer 12228 39

2019-11-25 22:24:50,079 consumer 7612 recieved 39

2019-11-25 22:24:50,080 consumer 15008recieved None

......

以上代碼執(zhí)行結(jié)果戳這里

3>.Condition總結(jié)

Condition用于生產(chǎn)者消費者模型中,解決生產(chǎn)者消費者速度匹配的問題。

采用了通知機制,非常有效率。

使用方式:

使用Condition,必須先acquire,用完了要release,因為內(nèi)部使用了鎖,默認使用RLock,最好的方式是使用with上下文。

消費者這wait,等待通知。

生產(chǎn)者生產(chǎn)好消息,對消費者發(fā)通知,可以使用notify或者notify_all方法。

五.semaphore

1>.semaphore常用方法

和Lock很像,信號量對象內(nèi)部維護一個倒計數(shù)器,每一次acquire都會減1,當(dāng)acquire方法發(fā)現(xiàn)計數(shù)為0就阻塞請求的線程,直到其它線程對信號量release后,計數(shù)大于0,恢復(fù)阻塞的線程。換句話說,計數(shù)器永遠不會低于0,因為acquire的時候,發(fā)現(xiàn)是0,都會被阻塞。

semaphore常用方法如下:

Semaphore(value=1):

構(gòu)造方法。value小于0,拋ValueError異常

acquire(blocking=True, timeout=None):

獲取信號量,計數(shù)器減1,獲取成功返回True

release():

釋放信號量,計數(shù)器加1

2>.基本使用案例(存在release方法超界限的問題)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading importThread, Semaphore8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'

12 logging.basicConfig(format=FORMAT, level=logging.INFO)13

14 defworker(s:Semaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18

19

20 #定義信號量的個數(shù)為3

21 s = Semaphore(3)22 print(s.__dict__)23

24 logging.info(s.acquire()) #獲取一把鎖之后,"_value"計數(shù)器就會減1。

25

26

27 print(s.acquire(),s._value)28

29 print(s.acquire(),s._value)30

31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35

36 #釋放一個

37 logging.info('release one')38 s.release()39 print(s.__dict__) #釋放鎖后可以被"worker"線程獲取

40

41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此處我們可以故意多釋放幾次鎖

46

47 print(s.__dict__) #竟然內(nèi)置計數(shù)器"_value"達到了6(也有可能是5,因為worker線程中需要獲取一把鎖),這樣實際上超出我們的最大值,需要解決這個問題。

2019-11-26 09:54:22,345 MainThread 140735817298880True2019-11-26 09:54:22,345 worker 123145401769984 inworker thread

{'_cond': , 0)>, '_value': 3}

True1True 02019-11-26 09:54:24,346 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880release one

{'_cond': , 0)>, '_value': 1}

{'_cond': , 0)>, '_value': 6}2019-11-26 09:54:27,350 worker 123145401769984True2019-11-26 09:54:27,350 worker 123145401769984 worker thread over

以上代碼執(zhí)行結(jié)果戳這里

3>.BoundedSemaphore類(有邊界的信號量,不允許使用release超出初始化范圍,否則,拋出“ValueError: Semaphore released too many times”異常)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading importThread, BoundedSemaphore8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'

12 logging.basicConfig(format=FORMAT, level=logging.INFO)13

14 defworker(s:BoundedSemaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18

19

20 #定義有邊界信號量的個數(shù)為3

21 s = BoundedSemaphore(3)22 print(s.__dict__)23

24 logging.info(s.acquire()) #獲取一把鎖之后,"_value"計數(shù)器就會減1。

25

26

27 print(s.acquire(),s._value)28

29 print(s.acquire(),s._value)30

31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35

36 #釋放一個

37 logging.info('release one')38 s.release()39 print(s.__dict__) #釋放鎖后可以被"worker"線程獲取

40

41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此處我們可以故意多釋放幾次鎖,一旦release超出初始值的范圍就拋出異常!

46

47 print(s.__dict__)

{'_cond': , 0)>, '_value': 3, '_initial_value': 3}

True1True 02019-11-26 10:09:17,632 MainThread 140735817298880True2019-11-26 10:09:17,635 worker 123145507561472 inworker thread2019-11-26 10:09:19,638 MainThread 140735817298880False

{'_cond': , 0)>, '_value': 1, '_initial_value': 3}2019-11-26 10:09:22,643 MainThread 140735817298880False2019-11-26 10:09:22,644 MainThread 140735817298880release one

Traceback (most recent call last):

File"/yinzhengjie/python/devops/python基礎(chǔ)/09.線程/04.信號量.py", line 43, in

2019-11-26 10:09:22,644 worker 123145507561472True2019-11-26 10:09:22,644 worker 123145507561472worker thread over

s.release()

File"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 483, inreleaseraise ValueError("Semaphore released too many times")

ValueError: Semaphore released too many times

以上代碼執(zhí)行結(jié)果戳這里

4>.應(yīng)用舉例(一個簡單的連接池)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importrandom7 importthreading8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'

12 logging.basicConfig(level=logging.INFO, format=FORMAT)13

14 classConn:15 def __init__(self, name):16 self.name =name17

18 """

19 連接池20 因為資源有限,且開啟一個連接成本高,所以,使用連接池。21

22 一個簡單的連接池23 連接池應(yīng)該有容量(總數(shù)),有一個工廠方法可以獲取連接,能夠把不用的連接返回,供其他調(diào)用者使用。24

25 使用信號量解決資源有限的問題。26 如果池中有資源,請求者獲取資源時信號量減1,拿走資源。當(dāng)請求超過資源數(shù),請求者只能等待。當(dāng)使用者用完歸還資源后信號量加1,等待線程就可以被喚醒拿走資源。27 注意:這個連接池的例子不能用到生成環(huán)境,只是為了說明信號量使用的例子,連接池還有很多未完成功能。28 """

29 classPool:30 def __init__(self, count:int):31 self.count =count32 #池中提前放著連接備用

33 self.pool = [self._connect('conn-{}'.format(i)) for i inrange(self.count)]34 self.semaphore =threading.Semaphore(self.count)35

36 def_connect(self, conn_name):37 #創(chuàng)建連接的方法,返回一個連接對象

38 returnConn(conn_name)39

40 defget_conn(self):41 #從池中拿走一個連接

42 logging.info('get~~~~~~~~~~~~~')43 self.semaphore.acquire()44 logging.info('-------------------------')45 returnself.pool.pop()46

47 defreturn_conn(self, conn: Conn):48 #向池中返回一個連接對象

49 logging.info('return~~~~~~~~~~~~~')50 self.pool.append(conn)51 self.semaphore.release()52

53 defworker(pool:Pool):54 conn =pool.get_conn()55 logging.info(conn)56 #模擬使用了一段時間

57 time.sleep(random.randint(1, 5))58 pool.return_conn(conn)59

60 #初始化連接池

61 pool = Pool(3)62

63 for i in range(6):64 threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()

2019-11-26 11:27:58,148 worker-0 123145324670976 get~~~~~~~~~~~~~

2019-11-26 11:27:58,148 worker-0 123145324670976 -------------------------

2019-11-26 11:27:58,148 worker-0 123145324670976 <__main__.Conn object at 0x102db0438>

2019-11-26 11:27:58,149 worker-1 123145329926144 get~~~~~~~~~~~~~

2019-11-26 11:27:58,149 worker-1 123145329926144 -------------------------

2019-11-26 11:27:58,149 worker-1 123145329926144 <__main__.Conn object at 0x102db03c8>

2019-11-26 11:27:58,149 worker-2 123145335181312 get~~~~~~~~~~~~~

2019-11-26 11:27:58,149 worker-2 123145335181312 -------------------------

2019-11-26 11:27:58,150 worker-2 123145335181312 <__main__.Conn object at 0x102db0240>

2019-11-26 11:27:58,150 worker-3 123145340436480 get~~~~~~~~~~~~~

2019-11-26 11:27:58,150 worker-4 123145345691648 get~~~~~~~~~~~~~

2019-11-26 11:27:58,151 worker-5 123145350946816 get~~~~~~~~~~~~~

2019-11-26 11:28:02,153 worker-0 123145324670976 return~~~~~~~~~~~~~

2019-11-26 11:28:02,153 worker-3 123145340436480 -------------------------

2019-11-26 11:28:02,154 worker-3 123145340436480 <__main__.Conn object at 0x102db0438>

2019-11-26 11:28:02,154 worker-1 123145329926144 return~~~~~~~~~~~~~

2019-11-26 11:28:02,154 worker-4 123145345691648 -------------------------

2019-11-26 11:28:02,154 worker-4 123145345691648 <__main__.Conn object at 0x102db03c8>

2019-11-26 11:28:03,154 worker-2 123145335181312 return~~~~~~~~~~~~~

2019-11-26 11:28:03,155 worker-5 123145350946816 -------------------------

2019-11-26 11:28:03,155 worker-5 123145350946816 <__main__.Conn object at 0x102db0240>

2019-11-26 11:28:05,155 worker-4 123145345691648 return~~~~~~~~~~~~~

2019-11-26 11:28:07,154 worker-3 123145340436480 return~~~~~~~~~~~~~

2019-11-26 11:28:08,159 worker-5 123145350946816 return~~~~~~~~~~~~~

以上代碼執(zhí)行結(jié)果戳這里

5>.信號量和鎖

信號量:

可以多個線程訪問共享資源,但這個共享資源數(shù)量有限。

鎖:

可以看做特殊的信號量,即信號量計數(shù)器初值為1。只允許同一個時間一個線程獨占資源。

六.Queue的線程安全

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importqueue7

8

9 """

10 標(biāo)準(zhǔn)庫queue模塊,提供FIFO的Queue、LIFO的隊列、優(yōu)先隊列。11

12 Queue類是線程安全的,適用于多線程間安全的交換數(shù)據(jù)。內(nèi)部使用了Lock和Condition。13

14 為什么講魔術(shù)方法時,說實現(xiàn)容器的大小,不準(zhǔn)確?15 1>.如果不加鎖,是不可能獲得準(zhǔn)確的大小的,因為你剛讀取到了一個大小,還沒有取走數(shù)據(jù),就有可能被其他線程改了。16 2>.Queue類的size雖然加了鎖,但是,依然不能保證立即get、put就能成功,因為讀取大小和get、put方法是分開的。17 """

18

19 q = queue.Queue(8)20

21 if q.qsize() == 7:22 q.put("abc") #上下兩句可能被打斷

23

24 if q.qsize() == 1:25 q.get() #未必會成功

總結(jié)

以上是生活随笔為你收集整理的python 线程同步_Python并发编程-线程同步(线程安全)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。