线程queue、事件event及协程
線程queue、事件event及協(xié)程
線程queue
多線程搶占資源,讓其保持串行的兩種方式:
? 1、互斥鎖
? 2、隊列
線程隊列分為以下三種:
1、Queue(先進先出)
import queueq = queue.Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4,block=False) # 若不設(shè)置block參數(shù),默認為True,大于隊列長度進入阻塞狀態(tài),若設(shè)置block為False,大于對列長度直接報錯 print(q.get()) print(q.get()) print(q.get()) # print(q.get(timeout=2)) 阻塞2s 還沒有值直接報錯 # 結(jié)果 1 2 32、LifoQueue(后進先出)
import queueq = queue.LifoQueue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) # 結(jié)果: 3 2 13、PriorityQueue(優(yōu)先級隊列)
import queueq = queue.PriorityQueue(3) q.put((-1,'awe')) # 操作對象為元祖,第一個位置的數(shù)字越小,優(yōu)先級越高 q.put((2,6)) q.put((0,3)) print(q.get()) print(q.get()) print(q.get()) # 結(jié)果: (-1, 'awe') (0, 3) (2, 6)事件event
開啟兩個線程,一個線程運行到中間的某個階段,觸發(fā)另個線程執(zhí)行.兩個線程增加了耦合性.
引入事件event的兩個階段:
版本1:(判斷全局變量狀態(tài))
from threading import Thread from threading import current_thread import timeflag =False def check():print(f'{current_thread().name}監(jiān)測服務(wù)器是否開啟')time.sleep(3)global flagflag = Trueprint('服務(wù)器已開啟')def connect():while not flag:print(f'{current_thread().name}等待連接')time.sleep(0.5)else:print(f'{current_thread().name} 連接成功...')t1 = Thread(target=check,) t2 = Thread(target=connect,) t1.start() t2.start() # 結(jié)果: Thread-1監(jiān)測服務(wù)器是否開啟 Thread-2等待連接 Thread-2等待連接 Thread-2等待連接 Thread-2等待連接 Thread-2等待連接 Thread-2等待連接 服務(wù)器已開啟 Thread-2 連接成功...版本2:(事件Event)
from threading import Thread from threading import current_thread from threading import Event import timeevent = Event() # 創(chuàng)建事件對象def check():print(f'{current_thread().name}監(jiān)測服務(wù)器是否開啟')time.sleep(3)print(event.is_set()) # 判斷事件是否設(shè)置event.set() # 設(shè)置事件print(event.is_set())print('服務(wù)器已開啟')def connect():print(f'{current_thread().name}等待連接')event.wait() # 等待事件設(shè)置,阻塞狀態(tài)print(f'{current_thread().name} 連接成功...')t1 = Thread(target=check,) t2 = Thread(target=connect,) t1.start() t2.start()小練習:
將上述例子改為一個線程監(jiān)測服務(wù)器狀態(tài),另一個線程判斷服務(wù)器狀態(tài),如果服務(wù)器狀態(tài)開啟,則顯示連接成功,此線程每1秒嘗試連接服務(wù)器一次,一共連接3次,還沒連接成功,則顯示連接失敗
from threading import Thread from threading import current_thread from threading import Event import timeevent = Event() # 創(chuàng)建事件對象def check():print(f'{current_thread().name}監(jiān)測服務(wù)器是否開啟')time.sleep(3)event.set() # 設(shè)置事件print('服務(wù)器已開啟')def connect():count = 1while 1:print(f'{current_thread().name}等待連接')event.wait(1) # 等待事件設(shè)置,阻塞狀態(tài)if count == 4:print(f'{current_thread().name}連接成功')count += 1print(f'{current_thread().name}嘗試連接{count}次...')else:print(f'{current_thread().name}連接成功')t1 = Thread(target=check,) t2 = Thread(target=connect,) t1.start() t2.start()協(xié)程
協(xié)程:簡單的來說就是一個線程并發(fā)的處理任務(wù).
串行:一個線程執(zhí)行一個任務(wù),執(zhí)行完畢之后,執(zhí)行下一個任務(wù).
并行: 多個cpu執(zhí)行多個任務(wù), 4個cpu 執(zhí)行4個任務(wù).
并發(fā): 一個cpu執(zhí)行多個任務(wù),看起來像是同時運行.
并發(fā)真正的核心:切換并且保持狀態(tài).
多線程的并發(fā): 3個線程處理10個任務(wù),如果線程1處理的這個任務(wù),遇到阻塞,cpu被操作系統(tǒng)切換到另一個線程,
一個線程并發(fā)處理任務(wù):以一個線程執(zhí)行3個任務(wù)為例:
協(xié)程定義:協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。
單個cpu并發(fā)執(zhí)行10個任務(wù)的三種方式:
? 1、方式一:開啟多進程并發(fā)執(zhí)行, 操作系統(tǒng)切換+保持狀態(tài).
? 2、方式二:開啟多線程并發(fā)執(zhí)行,操作系統(tǒng)切換+保持狀態(tài).
? 3、方式三:開啟協(xié)程并發(fā)的執(zhí)行, 自己的程序 把控著cpu 在3個任務(wù)之間來回切換+保持狀態(tài).
以上三種實現(xiàn)方式,協(xié)程最好,這是因為:
? 1.協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級
? 2.協(xié)程的運行速度更快
? 3.協(xié)程會長期霸占cpu只執(zhí)行我程序里面的所有任務(wù).
協(xié)程的特點:
Greenlet
Greenlet是python中的一個第三方模塊,真正的協(xié)程模塊就是使用greenlet完成的切換
并發(fā)的兩個核心:切換并且保持狀態(tài).接下來我們從一個例子慢慢引入此模塊的用法
# 版本一:單切換 def func1():print('in func1')def func2():print('in func2')func1()print('end')func2()# 版本二:切換+保持狀態(tài) import time def gen():while 1:yield 1time.sleep(0.5) # 手動設(shè)置IO,遇到IO無法自動切換def func():obj = gen()for i in range(10):next(obj) func()# 版本三:切換+保持狀態(tài),遇到IO自動切換 from greenlet import greenlet import time def eat(name):print('%s eat 1' %name) # 2g2.switch('taibai') # 3time.sleep(3)print('%s eat 2' %name) # 6g2.switch() # 7def play(name):print('%s play 1' %name) # 4g1.switch() # 5print('%s play 2' %name) # 8g1=greenlet(eat) g2=greenlet(play)g1.switch('taibai') # 1 切換到eat任務(wù)協(xié)程模塊gevent
gevent 是一個第三方庫,可以輕松通過gevent實現(xiàn)并發(fā)同步或異步編程,在gevent中用到的主要模式是greenlet, 它是以C擴展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進程的內(nèi)部,但它們被協(xié)作式地調(diào)度。
# gevent模塊的幾個用法 # 用法: g1=gevent.spawn(func,1,2,3,x=4,y=5)創(chuàng)建一個協(xié)程對象g1,spawn括號內(nèi)第一個參數(shù)是函數(shù)名,如eat,后面可以有多個參數(shù),可以是位置實參或關(guān)鍵字實參,都是傳給函數(shù)eat的,spawn是異步提交任務(wù)g2=gevent.spawn(func2)g1.join() # 等待g1結(jié)束g2.join() # 等待g2結(jié)束 有人測試的時候會發(fā)現(xiàn),不寫第二個join也能執(zhí)行g(shù)2,是的,協(xié)程幫你切換執(zhí)行了,但是你會發(fā)現(xiàn),如果g2里面的任務(wù)執(zhí)行的時間長,但是不寫join的話,就不會執(zhí)行完等到g2剩下的任務(wù)了# 或者上述兩步合作一步:gevent.joinall([g1,g2])使用time.sleep模擬程序中遇到的阻塞:
import gevent import time from threading import current_thread def eat(name):print('%s eat 1' %name)print(current_thread().name)# gevent.sleep(2)time.sleep(2)print('%s eat 2' %name)def play(name):print('%s play 1' %name)print(current_thread().name)# gevent.sleep(1) # gevent.sleep(1)模擬的是gevent可以識別的io阻塞time.sleep(1)# time.sleep(1)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了print('%s play 2' %name)g1 = gevent.spawn(eat,'egon') g2 = gevent.spawn(play,name='egon') print(f'主{current_thread().name}') g1.join() g2.join() # 結(jié)果: 主MainThread egon eat 1 MainThread egon eat 2 egon play 1 MainThread egon play 2最終版本:
import gevent from gevent import monkey monkey.patch_all() # 打補丁: 將下面的所有的任務(wù)的阻塞都打上標記 def eat(name):print('%s eat 1' %name)time.sleep(2)print('%s eat 2' %name)def play(name):print('%s play 1' %name)time.sleep(1)print('%s play 2' %name)g1 = gevent.spawn(eat,'egon') g2 = gevent.spawn(play,name='egon')# g1.join() # g2.join() gevent.joinall([g1,g2]) # 結(jié)果: egon eat 1 egon play 1 egon play 2 egon eat 2負載均衡:就是指將負載(工作任務(wù))進行平衡、分攤到多個操作單元上進行運行
Nginx:Nginx是一款輕量級的Web服務(wù)器/反向代理服務(wù)器及電子郵件(IMAP/POP3)代理服務(wù)器,其特點是占有內(nèi)存少,并發(fā)能力強。
一般在工作中我們都是進程+線程+協(xié)程的方式來實現(xiàn)并發(fā),以達到最好的并發(fā)效果,如果是4核的cpu,一般起5個進程,每個進程中20個線程(5倍cpu數(shù)量),每個線程可以起500個協(xié)程,大規(guī)模爬取頁面的時候,等待網(wǎng)絡(luò)延遲的時間的時候,我們就可以用協(xié)程去實現(xiàn)并發(fā)。 并發(fā)數(shù)量 = 5 * 20 * 500 = 50000個并發(fā),這是一般一個4cpu的機器最大的并發(fā)數(shù)。nginx在負載均衡的時候最大承載量就是5w個
單線程里的這20個任務(wù)的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執(zhí)行任務(wù)1時遇到阻塞,就利用阻塞的時間去執(zhí)行任務(wù)2。。。。如此,才能提高效率,這就用到了Gevent模塊。
轉(zhuǎn)載于:https://www.cnblogs.com/lifangzheng/p/11420036.html
總結(jié)
以上是生活随笔為你收集整理的线程queue、事件event及协程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 童年经典 《大富翁11》官宣:Q4登陆P
- 下一篇: this关键字+super关键字