日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Python并发之协程gevent基础

發(fā)布時間:2025/3/15 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python并发之协程gevent基础 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

基本示例

from gevent import monkey monkey.patch_all() # 記住一定放在第一行,這里是打補(bǔ)丁的意思,time模塊在使用協(xié)程gevent模塊的時候,必須打補(bǔ)丁才行,記得放在第一行。 import gevent import timedef eat(name):print(f"{name} eat first")time.sleep(3)print(f"{name} eat second")def play(name):print(f"{name} play phone 1")time.sleep(2)print(f"{name} play phone 2")g1 = gevent.spawn(eat, "lily") g2 = gevent.spawn(play, name="lily") g1.join() g2.join()

?

1,gevent介紹

gevent是第三方庫,通過?greenlet?實現(xiàn)?coroutine,創(chuàng)建、調(diào)度的開銷比?線程(thread)?還小,因此程序內(nèi)部的?執(zhí)行流?效率高。

gevent 實現(xiàn)了 python 標(biāo)準(zhǔn)庫中一些阻塞庫的非阻塞版本,如 socket、os、select 等 (全部的可參考?gevent1.0 的 monkey.py 源碼),可用這些非阻塞的庫替代 python 標(biāo)準(zhǔn)庫中的阻塞的庫。

gevent 提供的 API 與 python 標(biāo)準(zhǔn)庫中的用法和名稱類似。

其基本思想是:當(dāng)一個greenlet遇到IO操作時,比如訪問網(wǎng)絡(luò),就自動切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r候切換回來繼續(xù)執(zhí)行。由于IO操作非常耗時,經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動切換協(xié)程,就保證總有g(shù)reenlet在運行,而不是等待IO。

gevent是基于協(xié)程的Python網(wǎng)絡(luò)庫。特點:

  • 基于libev的快速事件循環(huán)(Linux上epoll,FreeBSD上kqueue)。
  • 基于greenlet的輕量級執(zhí)行單元。
  • API的概念和Python標(biāo)準(zhǔn)庫一致(如事件,隊列)。
  • 可以配合socket,ssl模塊使用。
  • 能夠使用標(biāo)準(zhǔn)庫和第三方模塊創(chuàng)建標(biāo)準(zhǔn)的阻塞套接字(gevent.monkey)。
  • 默認(rèn)通過線程池進(jìn)行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環(huán)境變量開啟)。
  • TCP/UDP/HTTP服務(wù)器
  • 子進(jìn)程支持(通過gevent.subprocess)
  • 線程池

gevent常用方法:

gevent.spawn()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 創(chuàng)建一個普通的Greenlet對象并切換
gevent.spawn_later(seconds=3)?? ?延時創(chuàng)建一個普通的Greenlet對象并切換
gevent.spawn_raw()? ? ? ? ? ? ? ? ? ? ? ?創(chuàng)建的協(xié)程對象屬于一個組
gevent.getcurrent()? ? ? ? ? ? ? ? ? ? ? ? ?返回當(dāng)前正在執(zhí)行的greenlet
gevent.joinall(jobs)? ? ? ? ? ? ? ? ? ? ? ? ? 將協(xié)程任務(wù)添加到事件循環(huán),接收一個任務(wù)列表
gevent.wait()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 可以替代join函數(shù)等待循環(huán)結(jié)束,也可以傳入?yún)f(xié)程對象列表
gevent.kill()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 殺死一個協(xié)程
gevent.killall()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?殺死一個協(xié)程列表里的所有協(xié)程
monkey.patch_all()? ? ? ? ? ? ? ? ? ? ? ? ? ? 非常重要,會自動將python的一些標(biāo)準(zhǔn)模塊替換成gevent框架

greenlet常用實例方法:

# Greenlet對象 from gevent import Greenlet# Greenlet對象創(chuàng)建 job = Greenlet(target0, 3) Greenlet.spawn() # 創(chuàng)建一個協(xié)程并啟動 Greenlet.spawn_later(seconds=3) # 延時啟動# 協(xié)程啟動 job.start() # 將協(xié)程加入循環(huán)并啟動協(xié)程 job.start_later(3) # 延時啟動# 等待任務(wù)完成 job.join() # 等待任務(wù)完成 job.get() # 獲取協(xié)程返回的值# 任務(wù)中斷和判斷任務(wù)狀態(tài) job.dead() # 判斷協(xié)程是否死亡 job.kill() # 殺死正在運行的協(xié)程并喚醒其他的協(xié)程,這個協(xié)程將不會再執(zhí)行,可以 job.ready() # 任務(wù)完成返回一個真值 job.successful() # 任務(wù)成功完成返回真值,否則拋出錯誤# 獲取屬性 job.loop # 時間循環(huán)對象 job.value # 獲取返回的值# 捕捉異常 job.exception # 如果運行有錯誤,獲取它 job.exc_info # 錯誤的詳細(xì)信息# 設(shè)置回調(diào)函數(shù) job.rawlink(back) # 普通回調(diào),將job對象作為回調(diào)函數(shù)的參數(shù) job.unlink() # 刪除回調(diào)函數(shù) # 執(zhí)行成功的回調(diào)函數(shù) job.link_value(back) # 執(zhí)行失敗的回調(diào)函數(shù) job.link_exception(back)

?gevent.Pool的特殊方法:

pool.wait_available():等待直到有一個協(xié)程有結(jié)果 pool.dd(greenlet):向進(jìn)程池添加一個方法并跟蹤,非阻塞 pool.discard(greenlet):停止跟蹤某個協(xié)程 pool.start(greenlet):加入并啟動協(xié)程 pool.join():阻塞等待結(jié)束 pool.kill():殺死所有跟蹤的協(xié)程 pool.killone(greenlet):殺死一個協(xié)程

2,什么時候用/不用gevent

gevent 的優(yōu)勢:

  • 可以通過同步的邏輯實現(xiàn)并發(fā)操作,大大降低了編寫并行/并發(fā)程序的難度
  • 在一個進(jìn)程中使用 gevent 可以有效避免對?臨界資源?的互斥訪問

如果程序涉及較多的 I/O,可用 gevent 替代多線程來提高程序效率。但由于

  • gevent 中 coroutine 的調(diào)度是由使用者而非操作系統(tǒng)決定
  • 主要解決的是 I/O 問題,提高?IO-bound?類型的程序的效率
  • 由于是在一個進(jìn)程中實現(xiàn) coroutine,且操作系統(tǒng)以進(jìn)程為單位分配處理機(jī)資源 (一個進(jìn)程分配一個處理機(jī))
    ?

因此,gevent 不適合在以下場景中使用:

  • 對任務(wù)延遲有要求的場景,如交互式程序中 (此時需要操作系統(tǒng)進(jìn)行?公平調(diào)度)
  • CPU-bound?任務(wù)
  • 當(dāng)需要使用多處理機(jī)時 (可通過運行多個進(jìn)程,每個進(jìn)程內(nèi)實現(xiàn) coroutine 來解決這個問題)

3,gevent操作

如何生成 greenlet instance

一般有兩種方法:

  • 使用?gevent.spawn()?API
  • subclass?Greenlet

第一種方法是調(diào)用了?Greenlet?class 中的?spawn?類方法,且生成 greenlet instance 后將其放入 coroutine 的調(diào)度隊列中。第二種方法需要手動通過?instance.start()?方法手動將其加入到 coroutine 的調(diào)度隊列中。
代碼示例:

import gevent from gevent import Greenletclass MyGreen(Greenlet):def __init__(self, timeout, msg):Greenlet.__init__(self)self.timeout = timeoutself.msg = msgdef _run(self):print("I'm from subclass of Greenlet and want to say: %s" % (self.msg,))gevent.sleep(self.timeout)print("I'm from subclass of Greenlet and done!")class TestMultigreen(object):def __init__(self, timeout=0):self.timeout = timeoutdef run(self):green0 = gevent.spawn(self._task, 0, 'just 0 test') #方式一:使用gevent的spawn方法創(chuàng)建greenlet實例green1 = Greenlet.spawn(self._task, 1, 'just 1 test') #方式一:使用Greenlet的spawn方法創(chuàng)建greenlet實例green2 = MyGreen(self.timeout, 'just 2 test') #方式二:使用自定義的Greenlet子類創(chuàng)建實例,需要調(diào)用start()手動將greenlet實例加入到 coroutine 的調(diào)度隊列中g(shù)reen2.start()gevent.joinall([green0, green1, green2])print('Tasks done!')def _task(self, pid, msg):print("I'm task %d and want to say: %s" % (pid, msg))gevent.sleep(self.timeout)print("Task %d done." % (pid,))if __name__ == '__main__':test = TestMultigreen()test.run()

需要注意:

  • 若僅是想生成 greenlet instance 并置于調(diào)度隊列中,最好采用?gevent.spawn()?API
  • 若想僅生成 greenlet instance 且暫時不想加入到調(diào)度隊列,則可采用第二種方法。之后若想將其加入到調(diào)度隊列,則手動執(zhí)行?instance.start()?方法。
    ?

如何進(jìn)行主線程到 hub greenlet instance 的切換

  • gevent.sleep()
  • Greenlet 或 Greenlet 子類的 instance 的?join()?方法
  • monkey patch 的庫或方法 (參見?monkey.py):
  • ? ? socket
  • ? ? ssl
  • ? ? os.fork
  • ? ? time.sleep
  • ? ? select.select
  • ? ? thread
  • ? ? subprocess
  • ? ? sys.stdin,sys.stdout,sys.stderr

4,gevent核心功能

  • Greenlets
  • 同步和異步執(zhí)行
  • 確定性
  • 創(chuàng)建Greenlets
  • Greenlet狀態(tài)
  • 程序停止
  • 超時
  • 猴子補(bǔ)丁

?

4.1,Greenlets

在gevent中用到的主要模式是Greenlet, 它是以C擴(kuò)展模塊形式接入Python的輕量級協(xié)程。 Greenlet全部運行在主程序操作系統(tǒng)進(jìn)程的內(nèi)部,但它們被協(xié)作式地調(diào)度。

????一個 “greenlet” 是一個小型的獨立偽線程。可以把它想像成一些棧幀,棧底是初始調(diào)用的函數(shù),而棧頂是當(dāng)前greenlet的暫停位置。你使用greenlet創(chuàng)建一堆這樣的堆棧,然后在他們之間跳轉(zhuǎn)執(zhí)行。跳轉(zhuǎn)必須顯式聲明的:一個greenlet必須選擇要跳轉(zhuǎn)到的另一個greenlet,這會讓前一個掛起,而后一個在此前掛起處恢復(fù)執(zhí)行。不同greenlets之間的跳轉(zhuǎn)稱為切換(switching) 。

??????greenlet不是一種真正的并發(fā)機(jī)制,而是在同一線程內(nèi),在不同函數(shù)的執(zhí)行代碼塊之間切換,實施“你運行一會、我運行一會”,并且在進(jìn)行切換時必須指定何時切換以及切換到哪。

greenlet類主要有兩個方法:

  • switch:用來切換協(xié)程;
  • throw():用來拋出異常同時終止程序;
from greenlet import greenlet import timedef test1(gr,g):for i in range(100):print("---A--")gr.switch(g, gr) # 切換到另一個協(xié)程執(zhí)行time.sleep(0.5)def test2(gr, g):for i in range(100):print("---B--")gr.switch(g, gr)# gr.throw(AttributeError)time.sleep(0.5)if __name__ == '__main__':# 創(chuàng)建一個協(xié)程1gr1 = greenlet(test1)# 創(chuàng)建一個協(xié)程2gr2 = greenlet(test2)# 啟動協(xié)程gr1.switch(gr2, gr1)

4.2,同步和異步執(zhí)行

并發(fā)的核心思想在于,大的任務(wù)可以分解成一系列的子任務(wù),后者可以被調(diào)度成 同時執(zhí)行或異步執(zhí)行,而不是一次一個地或者同步地執(zhí)行。兩個子任務(wù)之間的 切換也就是上下文切換。在gevent里面,上下文切換是通過yielding來完成的.

當(dāng)我們在受限于網(wǎng)絡(luò)或IO的函數(shù)中使用gevent,這些函數(shù)會被協(xié)作式的調(diào)度, gevent的真正能力會得到發(fā)揮。Gevent處理了所有的細(xì)節(jié), 來保證你的網(wǎng)絡(luò)庫會在可能的時候,隱式交出greenlet上下文的執(zhí)行權(quán)。

示例如下:

例子中的select()函數(shù)通常是一個在各種文件描述符上輪詢的阻塞調(diào)用。

import time import gevent start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1():print('Started Polling: %s' % tic())select.select([], [], [], 1)print('Ended Polling: %s' % tic()) def gr2():print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic()) def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(3)print('Ended Polling: %s' % tic()) gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])

輸出:

Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 3.0 seconds

同步vs異步

下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(shù)(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執(zhí)行這個函數(shù)的副作用就是,每次task在它的執(zhí)行過程中都會隨機(jī)地停某些秒。

import gevent import randomdef task(pid):gevent.sleep(random.randint(0,2)*0.001)print('task {} done'.format(pid))def synchronous():for i in range(5):task(i)def asynchronous():gev_list = [gevent.spawn(task, i) for i in range(5)]gevent.joinall(gev_list)print("synchronous:") synchronous()print("asynchronous:") asynchronous()

運行結(jié)果:

synchronous: task 0 done task 1 done task 2 done task 3 done task 4 done asynchronous: task 4 done task 3 done task 0 done task 1 done task 2 done

上例中,在同步的部分,所有的task都同步的執(zhí)行, 結(jié)果當(dāng)每個task在執(zhí)行時主流程被阻塞(主流程的執(zhí)行暫時停住)。

程序的重要部分是將task函數(shù)封裝到Greenlet內(nèi)部線程的gevent.spawn。 初始化的greenlet列表存放在數(shù)組threads中,此數(shù)組被傳給gevent.joinall?函數(shù),后者阻塞當(dāng)前流程,并執(zhí)行所有給定的greenlet。執(zhí)行流程只會在 所有g(shù)reenlet執(zhí)行完后才會繼續(xù)向下走。

要重點留意的是,異步的部分本質(zhì)上是隨機(jī)的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結(jié)果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執(zhí)行。
?

4.3,確定性

greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產(chǎn)生相同的輸出。

下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic)?的task函數(shù)(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執(zhí)行這個函數(shù)的副作用就是,每次task在它的執(zhí)行過程中都會隨機(jī)地停某些秒。
?

import time def echo(i):time.sleep(0.001)return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, range(10))] run2 = [a for a in p.imap_unordered(echo, range(10))] run3 = [a for a in p.imap_unordered(echo, range(10))] run4 = [a for a in p.imap_unordered(echo, range(10))] print(run1 == run2 == run3 == run4)

執(zhí)行結(jié)果

False True

即使gevent通常帶有確定性,當(dāng)開始與如socket或文件等外部服務(wù)交互時, 不確定性也可能溜進(jìn)你的程序中。因此盡管gevent線程是一種“確定的并發(fā)”形式, 使用它仍然可能會遇到像使用POSIX線程或進(jìn)程時遇到的那些問題。

涉及并發(fā)長期存在的問題就是競爭條件(race condition)(當(dāng)兩個并發(fā)線程/進(jìn)程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現(xiàn)競爭條件),這會導(dǎo)致資源修改的結(jié)果狀態(tài)依賴于時間和執(zhí)行順序。 這個問題,會導(dǎo)致整個程序行為變得不確定。

解決辦法: 始終避免所有全局的狀態(tài).
?

4.4,創(chuàng)建Greenlets

gevent對Greenlet初始化提供了一些封裝.

import gevent from gevent import Greenlet def foo(message, n):gevent.sleep(n)print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) thread2 = gevent.spawn(foo, "I live!", 2) thread3 = gevent.spawn(lambda x: (x+1), 2) threads = [thread1, thread2, thread3] gevent.joinall(threads)

執(zhí)行結(jié)果:

Hello I live!

除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法。

import gevent from gevent import Greenlet class MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()

執(zhí)行結(jié)果

Hi there!

4.5,Greenlet狀態(tài)

greenlet的狀態(tài)通常是一個依賴于時間的參數(shù):

  • started – Boolean, 指示此Greenlet是否已經(jīng)啟動
  • ready() – Boolean, 指示此Greenlet是否已經(jīng)停止
  • successful() – Boolean, 指示此Greenlet是否已經(jīng)停止而且沒拋異常
  • value – 任意值, 此Greenlet代碼返回的值
  • exception – 異常, 此Greenlet內(nèi)拋出的未捕獲異常

代碼示例:
?

import geventdef win():return 'win game' def fail():raise Exception('You failed.')winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) print(loser.started) # Greenlet異常會保存在Greenlet,不會上拋給主進(jìn)程. try:gevent.joinall([winner, loser]) except Exception as e:print('This will never be reached') #此處不能捕獲Greenlet異常,永遠(yuǎn)不會觸發(fā)print(loser.exception) #Greenlet異常print(winner.value) # 'You win!' print(loser.value) # Noneprint(winner.ready()) # True print(loser.ready()) # True print(winner.successful()) # True print(loser.successful()) # False

執(zhí)行結(jié)果

True True You failed. win game None True True True False Traceback (most recent call last):File "src/gevent/greenlet.py", line 716, in gevent._greenlet.Greenlet.runFile "coroutine.py", line 121, in failraise Exception('You failed.') Exception: You failed. 2019-01-22T09:05:05Z <Greenlet "Greenlet-0" at 0x103d02848: fail> failed with Exception

4.6,程序停止
當(dāng)主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執(zhí)行。這導(dǎo)致了所謂的僵尸進(jìn)程, 它需要在Python解釋器之外被kill掉。

通用的處理模式就是在主程序中監(jiān)聽SIGQUIT信號,調(diào)用gevent.shutdown退出程序。
?

import gevent import signal def run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)thread = gevent.spawn(run_forever)thread.join()

4.7,超時

通過超時可以對代碼塊兒或一個Greenlet的運行時間進(jìn)行約束。

import gevent from gevent import Timeout seconds = 3 timeout = Timeout(seconds) timeout.start()def wait():gevent.sleep(4)try:gevent.spawn(wait).join() except Timeout:print('Could not complete')

執(zhí)行結(jié)果:

Could not complete

超時類

import gevent from gevent import Timeouttime_to_wait = 5class TimeLong(Exception):passwith Timeout(time_to_wait, TimeLong):gevent.sleep(6)

4.8,猴子補(bǔ)丁(Monkey patching)

我們現(xiàn)在來到gevent的死角了. 在此之前,我已經(jīng)避免提到猴子補(bǔ)丁(monkey patching) 以嘗試使gevent這個強(qiáng)大的協(xié)程模型變得生動有趣,但現(xiàn)在到了討論猴子補(bǔ)丁的黑色藝術(shù) 的時候了。你之前可能注意到我們提到了monkey.patch_socket()這個命令,這個 純粹副作用命令是用來改變標(biāo)準(zhǔn)socket庫的。
?

import socket print(socket.socket) print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket)import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select)

執(zhí)行結(jié)果:

<class 'socket.socket'> After monkey patch <class 'gevent._socket3.socket'> <built-in function select> After monkey patch <function select at 0x1074631e0>

Python的運行環(huán)境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數(shù)。 這是個一般說來令人驚奇的壞主意,因為它創(chuàng)造了“隱式的副作用”,如果出現(xiàn)問題 它很多時候是極難調(diào)試的。雖然如此,在極端情況下當(dāng)一個庫需要修改Python本身 的基礎(chǔ)行為的時候,猴子補(bǔ)丁就派上用場了。在這種情況下,gevent能夠修改標(biāo)準(zhǔn)庫里面大部分的阻塞式系統(tǒng)調(diào)用,包括socket、ssl、threading和 select等模塊,而變?yōu)閰f(xié)作式運行。

例如,Redis的python綁定一般使用常規(guī)的tcp socket來與redis-server實例通信。 通過簡單地調(diào)用gevent.monkey.patch_all(),可以使得redis的綁定協(xié)作式的調(diào)度 請求,與gevent棧的其它部分一起工作。

這讓我們可以將一般不能與gevent共同工作的庫結(jié)合起來,而不用寫哪怕一行代碼。 雖然猴子補(bǔ)丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。

?

參考文獻(xiàn):

https://blog.csdn.net/xumesang/article/details/53288363

http://blog.chinaunix.net/uid-9162199-id-4738168.html

https://www.cnblogs.com/cwp-bg/p/9593405.html
?

?

?

總結(jié)

以上是生活随笔為你收集整理的Python并发之协程gevent基础的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。