Python并发之协程gevent基础
基本示例
from gevent import monkey monkey.patch_all() # 記住一定放在第一行,這里是打補(bǔ)丁的意思,time模塊在使用協(xié)程gevent模塊的時候,必須打補(bǔ)丁才行,記得放在第一行。 import gevent import timedef eat(name):print(f"{name} eat first")time.sleep(3)print(f"{name} eat second")def play(name):print(f"{name} play phone 1")time.sleep(2)print(f"{name} play phone 2")g1 = gevent.spawn(eat, "lily") g2 = gevent.spawn(play, name="lily") g1.join() g2.join()?
1,gevent介紹
gevent是第三方庫,通過?greenlet?實現(xiàn)?coroutine,創(chuàng)建、調(diào)度的開銷比?線程(thread)?還小,因此程序內(nèi)部的?執(zhí)行流?效率高。
gevent 實現(xiàn)了 python 標(biāo)準(zhǔn)庫中一些阻塞庫的非阻塞版本,如 socket、os、select 等 (全部的可參考?gevent1.0 的 monkey.py 源碼),可用這些非阻塞的庫替代 python 標(biāo)準(zhǔn)庫中的阻塞的庫。
gevent 提供的 API 與 python 標(biāo)準(zhǔn)庫中的用法和名稱類似。
其基本思想是:當(dāng)一個greenlet遇到IO操作時,比如訪問網(wǎng)絡(luò),就自動切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r候切換回來繼續(xù)執(zhí)行。由于IO操作非常耗時,經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動切換協(xié)程,就保證總有g(shù)reenlet在運行,而不是等待IO。
gevent是基于協(xié)程的Python網(wǎng)絡(luò)庫。特點:
- 基于libev的快速事件循環(huán)(Linux上epoll,FreeBSD上kqueue)。
- 基于greenlet的輕量級執(zhí)行單元。
- API的概念和Python標(biāo)準(zhǔn)庫一致(如事件,隊列)。
- 可以配合socket,ssl模塊使用。
- 能夠使用標(biāo)準(zhǔn)庫和第三方模塊創(chuàng)建標(biāo)準(zhǔn)的阻塞套接字(gevent.monkey)。
- 默認(rèn)通過線程池進(jìn)行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環(huán)境變量開啟)。
- TCP/UDP/HTTP服務(wù)器
- 子進(jìn)程支持(通過gevent.subprocess)
- 線程池
gevent常用方法:
gevent.spawn()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 創(chuàng)建一個普通的Greenlet對象并切換
gevent.spawn_later(seconds=3)?? ?延時創(chuàng)建一個普通的Greenlet對象并切換
gevent.spawn_raw()? ? ? ? ? ? ? ? ? ? ? ?創(chuàng)建的協(xié)程對象屬于一個組
gevent.getcurrent()? ? ? ? ? ? ? ? ? ? ? ? ?返回當(dāng)前正在執(zhí)行的greenlet
gevent.joinall(jobs)? ? ? ? ? ? ? ? ? ? ? ? ? 將協(xié)程任務(wù)添加到事件循環(huán),接收一個任務(wù)列表
gevent.wait()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 可以替代join函數(shù)等待循環(huán)結(jié)束,也可以傳入?yún)f(xié)程對象列表
gevent.kill()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 殺死一個協(xié)程
gevent.killall()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?殺死一個協(xié)程列表里的所有協(xié)程
monkey.patch_all()? ? ? ? ? ? ? ? ? ? ? ? ? ? 非常重要,會自動將python的一些標(biāo)準(zhǔn)模塊替換成gevent框架
greenlet常用實例方法:
# Greenlet對象 from gevent import Greenlet# Greenlet對象創(chuàng)建 job = Greenlet(target0, 3) Greenlet.spawn() # 創(chuàng)建一個協(xié)程并啟動 Greenlet.spawn_later(seconds=3) # 延時啟動# 協(xié)程啟動 job.start() # 將協(xié)程加入循環(huán)并啟動協(xié)程 job.start_later(3) # 延時啟動# 等待任務(wù)完成 job.join() # 等待任務(wù)完成 job.get() # 獲取協(xié)程返回的值# 任務(wù)中斷和判斷任務(wù)狀態(tài) job.dead() # 判斷協(xié)程是否死亡 job.kill() # 殺死正在運行的協(xié)程并喚醒其他的協(xié)程,這個協(xié)程將不會再執(zhí)行,可以 job.ready() # 任務(wù)完成返回一個真值 job.successful() # 任務(wù)成功完成返回真值,否則拋出錯誤# 獲取屬性 job.loop # 時間循環(huán)對象 job.value # 獲取返回的值# 捕捉異常 job.exception # 如果運行有錯誤,獲取它 job.exc_info # 錯誤的詳細(xì)信息# 設(shè)置回調(diào)函數(shù) job.rawlink(back) # 普通回調(diào),將job對象作為回調(diào)函數(shù)的參數(shù) job.unlink() # 刪除回調(diào)函數(shù) # 執(zhí)行成功的回調(diào)函數(shù) job.link_value(back) # 執(zhí)行失敗的回調(diào)函數(shù) job.link_exception(back)?gevent.Pool的特殊方法:
pool.wait_available():等待直到有一個協(xié)程有結(jié)果 pool.dd(greenlet):向進(jìn)程池添加一個方法并跟蹤,非阻塞 pool.discard(greenlet):停止跟蹤某個協(xié)程 pool.start(greenlet):加入并啟動協(xié)程 pool.join():阻塞等待結(jié)束 pool.kill():殺死所有跟蹤的協(xié)程 pool.killone(greenlet):殺死一個協(xié)程2,什么時候用/不用gevent
gevent 的優(yōu)勢:
- 可以通過同步的邏輯實現(xiàn)并發(fā)操作,大大降低了編寫并行/并發(fā)程序的難度
- 在一個進(jìn)程中使用 gevent 可以有效避免對?臨界資源?的互斥訪問
如果程序涉及較多的 I/O,可用 gevent 替代多線程來提高程序效率。但由于
- gevent 中 coroutine 的調(diào)度是由使用者而非操作系統(tǒng)決定
- 主要解決的是 I/O 問題,提高?IO-bound?類型的程序的效率
- 由于是在一個進(jìn)程中實現(xiàn) coroutine,且操作系統(tǒng)以進(jìn)程為單位分配處理機(jī)資源 (一個進(jìn)程分配一個處理機(jī))
?
因此,gevent 不適合在以下場景中使用:
- 對任務(wù)延遲有要求的場景,如交互式程序中 (此時需要操作系統(tǒng)進(jìn)行?公平調(diào)度)
- CPU-bound?任務(wù)
- 當(dāng)需要使用多處理機(jī)時 (可通過運行多個進(jìn)程,每個進(jìn)程內(nèi)實現(xiàn) coroutine 來解決這個問題)
3,gevent操作
如何生成 greenlet instance
一般有兩種方法:
- 使用?gevent.spawn()?API
- subclass?Greenlet
第一種方法是調(diào)用了?Greenlet?class 中的?spawn?類方法,且生成 greenlet instance 后將其放入 coroutine 的調(diào)度隊列中。第二種方法需要手動通過?instance.start()?方法手動將其加入到 coroutine 的調(diào)度隊列中。
代碼示例:
需要注意:
- 若僅是想生成 greenlet instance 并置于調(diào)度隊列中,最好采用?gevent.spawn()?API
- 若想僅生成 greenlet instance 且暫時不想加入到調(diào)度隊列,則可采用第二種方法。之后若想將其加入到調(diào)度隊列,則手動執(zhí)行?instance.start()?方法。
?
如何進(jìn)行主線程到 hub greenlet instance 的切換
- gevent.sleep()
- Greenlet 或 Greenlet 子類的 instance 的?join()?方法
- monkey patch 的庫或方法 (參見?monkey.py):
- ? ? socket
- ? ? ssl
- ? ? os.fork
- ? ? time.sleep
- ? ? select.select
- ? ? thread
- ? ? subprocess
- ? ? sys.stdin,sys.stdout,sys.stderr
4,gevent核心功能
- Greenlets
- 同步和異步執(zhí)行
- 確定性
- 創(chuàng)建Greenlets
- Greenlet狀態(tài)
- 程序停止
- 超時
- 猴子補(bǔ)丁
?
4.1,Greenlets
在gevent中用到的主要模式是Greenlet, 它是以C擴(kuò)展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進(jìn)程的內(nèi)部,但它們被協(xié)作式地調(diào)度。
????一個 “greenlet” 是一個小型的獨立偽線程。可以把它想像成一些棧幀,棧底是初始調(diào)用的函數(shù),而棧頂是當(dāng)前greenlet的暫停位置。你使用greenlet創(chuàng)建一堆這樣的堆棧,然后在他們之間跳轉(zhuǎn)執(zhí)行。跳轉(zhuǎn)必須顯式聲明的:一個greenlet必須選擇要跳轉(zhuǎn)到的另一個greenlet,這會讓前一個掛起,而后一個在此前掛起處恢復(fù)執(zhí)行。不同greenlets之間的跳轉(zhuǎn)稱為切換(switching) 。
??????greenlet不是一種真正的并發(fā)機(jī)制,而是在同一線程內(nèi),在不同函數(shù)的執(zhí)行代碼塊之間切換,實施“你運行一會、我運行一會”,并且在進(jìn)行切換時必須指定何時切換以及切換到哪。
greenlet類主要有兩個方法:
- switch:用來切換協(xié)程;
- throw():用來拋出異常同時終止程序;
4.2,同步和異步執(zhí)行
并發(fā)的核心思想在于,大的任務(wù)可以分解成一系列的子任務(wù),后者可以被調(diào)度成 同時執(zhí)行或異步執(zhí)行,而不是一次一個地或者同步地執(zhí)行。兩個子任務(wù)之間的 切換也就是上下文切換。在gevent里面,上下文切換是通過yielding來完成的.
當(dāng)我們在受限于網(wǎng)絡(luò)或IO的函數(shù)中使用gevent,這些函數(shù)會被協(xié)作式的調(diào)度, gevent的真正能力會得到發(fā)揮。Gevent處理了所有的細(xì)節(jié), 來保證你的網(wǎng)絡(luò)庫會在可能的時候,隱式交出greenlet上下文的執(zhí)行權(quán)。
示例如下:
例子中的select()函數(shù)通常是一個在各種文件描述符上輪詢的阻塞調(diào)用。
import time import gevent start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1():print('Started Polling: %s' % tic())select.select([], [], [], 1)print('Ended Polling: %s' % tic()) def gr2():print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic()) def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(3)print('Ended Polling: %s' % tic()) gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])輸出:
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 3.0 seconds同步vs異步
下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(shù)(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執(zhí)行這個函數(shù)的副作用就是,每次task在它的執(zhí)行過程中都會隨機(jī)地停某些秒。
import gevent import randomdef task(pid):gevent.sleep(random.randint(0,2)*0.001)print('task {} done'.format(pid))def synchronous():for i in range(5):task(i)def asynchronous():gev_list = [gevent.spawn(task, i) for i in range(5)]gevent.joinall(gev_list)print("synchronous:") synchronous()print("asynchronous:") asynchronous()運行結(jié)果:
synchronous: task 0 done task 1 done task 2 done task 3 done task 4 done asynchronous: task 4 done task 3 done task 0 done task 1 done task 2 done上例中,在同步的部分,所有的task都同步的執(zhí)行, 結(jié)果當(dāng)每個task在執(zhí)行時主流程被阻塞(主流程的執(zhí)行暫時停住)。
程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。 初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall?函數(shù),后者阻塞當(dāng)前流程,并執(zhí)行所有給定的greenlet。執(zhí)行流程只會在 所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。
要重點留意的是,異步的部分本質(zhì)上是隨機(jī)的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結(jié)果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執(zhí)行。
?
4.3,確定性
greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產(chǎn)生相同的輸出。
下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(shù)(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執(zhí)行這個函數(shù)的副作用就是,每次task在它的執(zhí)行過程中都會隨機(jī)地停某些秒。
?
執(zhí)行結(jié)果
False True即使gevent通常帶有確定性,當(dāng)開始與如socket或文件等外部服務(wù)交互時, 不確定性也可能溜進(jìn)你的程序中。因此盡管gevent線程是一種“確定的并發(fā)”形式, 使用它仍然可能會遇到像使用POSIX線程或進(jìn)程時遇到的那些問題。
涉及并發(fā)長期存在的問題就是競爭條件(race condition)(當(dāng)兩個并發(fā)線程/進(jìn)程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現(xiàn)競爭條件),這會導(dǎo)致資源修改的結(jié)果狀態(tài)依賴于時間和執(zhí)行順序。 這個問題,會導(dǎo)致整個程序行為變得不確定。
解決辦法: 始終避免所有全局的狀態(tài).
?
4.4,創(chuàng)建Greenlets
gevent對Greenlet初始化提供了一些封裝.
import gevent from gevent import Greenlet def foo(message, n):gevent.sleep(n)print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) thread2 = gevent.spawn(foo, "I live!", 2) thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] gevent.joinall(threads)執(zhí)行結(jié)果:
Hello I live!除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。
import gevent from gevent import Greenlet class MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()執(zhí)行結(jié)果
Hi there!4.5,Greenlet狀態(tài)
greenlet的狀態(tài)通常是一個依賴于時間的參數(shù):
- started – Boolean, 指示此Greenlet是否已經(jīng)啟動
- ready() – Boolean, 指示此Greenlet是否已經(jīng)停止
- successful() – Boolean, 指示此Greenlet是否已經(jīng)停止而且沒拋異常
- value – 任意值, 此Greenlet代碼返回的值
- exception – 異常, 此Greenlet內(nèi)拋出的未捕獲異常
代碼示例:
?
執(zhí)行結(jié)果
True True You failed. win game None True True True False Traceback (most recent call last):File "src/gevent/greenlet.py", line 716, in gevent._greenlet.Greenlet.runFile "coroutine.py", line 121, in failraise Exception('You failed.') Exception: You failed. 2019-01-22T09:05:05Z <Greenlet "Greenlet-0" at 0x103d02848: fail> failed with Exception4.6,程序停止
當(dāng)主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執(zhí)行。這導(dǎo)致了所謂的僵尸進(jìn)程, 它需要在Python解釋器之外被kill掉。
通用的處理模式就是在主程序中監(jiān)聽SIGQUIT信號,調(diào)用gevent.shutdown退出程序。
?
4.7,超時
通過超時可以對代碼塊兒或一個Greenlet的運行時間進(jìn)行約束。
import gevent from gevent import Timeout seconds = 3 timeout = Timeout(seconds) timeout.start()def wait():gevent.sleep(4)try:gevent.spawn(wait).join() except Timeout:print('Could not complete')執(zhí)行結(jié)果:
Could not complete超時類
import gevent from gevent import Timeouttime_to_wait = 5class TimeLong(Exception):passwith Timeout(time_to_wait, TimeLong):gevent.sleep(6)4.8,猴子補(bǔ)丁(Monkey patching)
我們現(xiàn)在來到gevent的死角了. 在此之前,我已經(jīng)避免提到猴子補(bǔ)丁(monkey patching) 以嘗試使gevent這個強(qiáng)大的協(xié)程模型變得生動有趣,但現(xiàn)在到了討論猴子補(bǔ)丁的黑色藝術(shù) 的時候了。你之前可能注意到我們提到了monkey.patch_socket()這個命令,這個 純粹副作用命令是用來改變標(biāo)準(zhǔn)socket庫的。
?
執(zhí)行結(jié)果:
<class 'socket.socket'> After monkey patch <class 'gevent._socket3.socket'> <built-in function select> After monkey patch <function select at 0x1074631e0>Python的運行環(huán)境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數(shù)。 這是個一般說來令人驚奇的壞主意,因為它創(chuàng)造了“隱式的副作用”,如果出現(xiàn)問題 它很多時候是極難調(diào)試的。雖然如此,在極端情況下當(dāng)一個庫需要修改Python本身 的基礎(chǔ)行為的時候,猴子補(bǔ)丁就派上用場了。在這種情況下,gevent能夠修改標(biāo)準(zhǔn)庫里面大部分的阻塞式系統(tǒng)調(diào)用,包括socket、ssl、threading和 select等模塊,而變?yōu)閰f(xié)作式運行。
例如,Redis的python綁定一般使用常規(guī)的tcp socket來與redis-server實例通信。 通過簡單地調(diào)用gevent.monkey.patch_all(),可以使得redis的綁定協(xié)作式的調(diào)度 請求,與gevent棧的其它部分一起工作。
這讓我們可以將一般不能與gevent共同工作的庫結(jié)合起來,而不用寫哪怕一行代碼。 雖然猴子補(bǔ)丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。
?
參考文獻(xiàn):
https://blog.csdn.net/xumesang/article/details/53288363
http://blog.chinaunix.net/uid-9162199-id-4738168.html
https://www.cnblogs.com/cwp-bg/p/9593405.html
?
?
?
總結(jié)
以上是生活随笔為你收集整理的Python并发之协程gevent基础的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 判断当前用户是否为root
- 下一篇: 设置ubuntu默认python3设置