Python 的协程库 greenlet 和 gevent
greenlet 官方文檔:https://greenlet.readthedocs.io/en/latest/
From:https://www.jianshu.com/u/3ab212f28d91
Python Gevent – 高性能的 Python 并發框架:https://blog.csdn.net/freeking101/article/details/53097420
? ? ? ? 在 Python 里,按照官方解釋 greenlet 是輕量級的并行編程,gevent 就是利用 greenlet 實現的基于協程(coroutine)的 python 的網絡 library,通過使用 greenlet 提供了一個在 libev 事件循環頂部的高級別并發 API。即?gevent 是對 greenlet 的高級封裝。
greenlet
為了更好使用 協程 來完成多任務,python 中 greenlet 模塊對其封裝,從而使得切換任務變得更加簡單
安裝方式:pip3 install greenlet
官網示例:
from greenlet import greenletdef test1():print(12)gr2.switch()print(34)def test2():print(56)gr1.switch()print(78)gr1 = greenlet(test1) gr2 = greenlet(test2)gr1.switch() # 切換到 gr1 開始運行, 即 從 gr1 對應 的 test1 開始運行運行代碼,輸出為:12 56 34
當創建一個 greenlet 時,首先初始化一個空的棧, switch 到這個棧的時候,會運行在 greenlet 構造時傳入的函數(首先在test1中打印 12), 如果在這個函數(test1)中 switch 到其他協程(到了test2 打印 56),那么該協程會被掛起,等到切換回來(在test2 中切換到 test1?打印34)。當這個協程對應函數執行完畢,那么這個協程就變成dead狀態。
看下面代碼:
from greenlet import greenletdef test1():print(12)gr2.switch()print(34)def test2():print(56)gr1.switch()print(78)gr1 = greenlet(test1) gr2 = greenlet(test2)gr2.switch() # 切換到 gr2 開始運行, 即 從 gr2 對應 的 test2 開始運行運行代碼,輸出為:56 12 78
greenlet 的 module 與 class
一起看一下greenlet中的屬性:
其中,比較重要的是: getcurrent(), 類greenlet、異常類GreenletExit?。
getcurrent() :返回當前的greenlet實例;
GreenletExit:是一個特殊的異常,當觸發了這個異常的時候,即使不處理,也不會拋到其parent(后面會提到協程中對返回值或者異常的處理)
然后我們再來看看 greenlet.greenlet 這個類:
比較重要的幾個屬性:
注意,本文后面提到的 greenlet 大多都是指 greenlet.greenlet 這個 class,注意區分
Switch not call
對于 greenlet,最常用的寫法是 x = gr.switch(y)。 這句話的意思是切換到 gr,傳入參數 y。當從其他協程(不一定是這個gr)切換回來的時候,將值付給 x 。
import greenletdef test1(x, y):z = gr2.switch(x+y)print('test1 ', z)def test2(u):print('test2 ', u)gr1.switch(10)gr1 = greenlet.greenlet(test1) gr2 = greenlet.greenlet(test2) print(gr1.switch("hello", " world"))輸出:
'test2 ' 'hello world'
'test1 ' 10
None
上面的例子,第12行從 main greenlet 切換到了gr1,test1 第3行切換到了gr2,然后 gr1 掛起,第8行從 gr2 切回 gr1 時,將值(10)返回值給了 z。
每一個 Greenlet 都有一個 parent,一個新的 greenlet 在哪里創生,當前環境的 greenlet 就是這個新 greenlet 的 parent。所有的greenlet 構成一棵樹,其跟節點就是還沒有手動創建 greenlet 時候的 ”main” greenlet(事實上,在首次 import greenlet 的時候實例化)。當一個協程 正常結束,執行流程回到其對應的parent;或者在一個協程中拋出未被捕獲的異常,該異常也是傳遞到其parent。
學習 python的時候,有一句話會被無數次重復 ”everything is oblect”,即 一切皆對象。
在學習 greenlet 的調用中,同樣有一句話應該深刻理解, “switch not call”。即 切換 不是 調用。
由這個例子可以看出,盡管是從 test1 所在的協程 gr1 切換到了 gr2,但 gr2 的 parent 還是 ’main’ greenlet,因為默認的 parent取決于 greenlet 的創生環境。另外,在 test2 中 return 之后整個返回值返回到了其 parent,而不是 switch 到該協程的地方(即不是 test1 ),這個跟我們平時的函數調用不一樣,記住 “switch not call” 。對于異常,也是展開至 parent:
import greenletdef test1(x, y):try:z = gr2.switch(x+y)except Exception:print('catch Exception in test1')def test2(u):assert Falsegr1 = greenlet.greenlet(test1) gr2 = greenlet.greenlet(test2) try:gr1.switch("hello", " world") except:print('catch Exception in main')輸出為:
catch Exception in main
greenlet 生命周期
本文開始的地方提到第一個例子中的 gr2 其實并沒有正常結束,我們可以用 greenlet.dead 這個屬性來查看:
from greenlet import greenletdef test1():gr2.switch(1)print('test1 finished')def test2(x):print('test2 first', x)z = gr1.switch()print('test2 back', z)gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() print('gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead)) gr2.switch() print('gr1 is dead?: %s, gr2 is dead?: %s' % (gr1.dead, gr2.dead)) print(gr2.switch(10))輸出如下:
test2 first 1 test1 finished gr1 is dead?: True, gr2 is dead?: False test2 back () gr1 is dead?: True, gr2 is dead?: True 10從這個例子可以看出:
1.只有當協程對應的函數執行完畢,協程才會 die,所以第一次 Check 的時候 gr2 并沒有 die,因為第 9 行切換出去了就沒切回來。在 main 中再 switch 到 gr2 的時候, 執行后面的邏輯,gr2 die
2.如果試圖再次 switch 到一個已經是 dead 狀態的 greenlet 會怎么樣呢,事實上會切換到其 parent greenlet。
Greenlet Traceing
Greenlet 也提供了接口使得程序員可以監控 greenlet 的整個調度流程。主要是 gettrace 和 settrace(callback) 函數。
import greenletdef test_greenlet_tracing():def callback(event, args):print(event, 'from', id(args[0]), 'to', id(args[1]))def dummy():g2.switch()def dummyexception():raise Exception('excep in coroutine')main = greenlet.getcurrent()g1 = greenlet.greenlet(dummy)g2 = greenlet.greenlet(dummyexception)print('main id %s, gr1 id %s, gr2 id %s' % (id(main), id(g1), id(g2)))oldtrace = greenlet.settrace(callback)try:g1.switch()except BaseException as e:print('Exception : ', e)finally:greenlet.settrace(oldtrace)test_greenlet_tracing()結果:
main id 1397838280136, gr1 id 1397838280312, gr2 id 1397838280488 switch from 1397838280136 to 1397838280312 switch from 1397838280312 to 1397838280488 throw from 1397838280488 to 1397838280136 Exception : excep in coroutine其中 callback 函數 event 是 switch 或者 throw 之一,表明是正常調度還是異常跑出;args 是二元組,表示是從協程 args[0] 切換到了協程 args[1]。上面的輸出展示了切換流程:從 main 到 gr1,然后到 gr2,最后回到 main。
greenlet使用建議
使用greenlet需要注意一下三點:
- 1. greenlet 創建之后,一定要結束,不能 switch 出去就不回來了,否則容易造成內存泄露
- 2. python 中每個線程都有自己的 main greenlet 及其對應的 sub-greenlet ,不同線程之間的 greenlet 是不能相互切換的
- 3. 不能存在循環引用,這個是官方文檔明確說明:”Greenlets do not participate in garbage collection; cycles involving data that is present in a greenlet’s frames will not be detected. “
來看一個例子:
from greenlet import greenlet, GreenletExithuge = []def show_leak():def test1():gr2.switch()def test2():huge.extend([x * x for x in range(100)])gr1.switch()print('finish switch del huge')del huge[:]gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()gr1 = gr2 = Noneprint('length of huge is zero ? %s' % len(huge))if __name__ == '__main__':show_leak()# output: length of huge is zero ? 100在test2函數中,第11行,我們將huge清空,然后再第16行將gr1、gr2的引用計數降到了0。但運行結果告訴我們,第11行并沒有執行,所以如果一個協程沒有正常結束是很危險的,往往不符合程序員的預期。greenlet提供了解決這個問題的辦法,官網文檔提到:如果一個greenlet實例的引用計數變成0,那么會在上次掛起的地方拋出GreenletExit異常,這就使得我們可以通過try ... finally 處理資源泄露的情況。如下面的代碼:
from greenlet import greenlet, GreenletExithuge = []def show_leak():def test1():gr2.switch()def test2():huge.extend([x * x for x in range(100)])try:gr1.switch()finally:print('finish switch del huge')del huge[:]gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()gr1 = gr2 = Noneprint('length of huge is zero ? %s' % len(huge))if __name__ == '__main__':show_leak()# output : # finish switch del huge # length of huge is zero ? 0上述代碼的switch流程:main greenlet --> gr1 --> gr2 --> gr1 --> main greenlet, 很明顯gr2沒有正常結束(在第10行掛起了)。第18行之后gr1,gr2的引用計數都變成0,那么會在第10行拋出GreenletExit異常,因此finally語句有機會執行。同時,在文章開始介紹Greenlet module的時候也提到了,GreenletExit這個異常并不會拋出到parent,所以main greenlet也不會出異常。
看上去貌似解決了問題,但這對程序員要求太高了,百密一疏。所以最好的辦法還是保證協程的正常結束。
gevent
安裝:pip3 install gevent?
? ? ? ? 因為 python 線程的性能問題,在python中使用多線程運行代碼經常不能達到預期的效果。而有些時候我們的邏輯中又需要開更高的并發,或者簡單的說,就是讓我們的代碼跑的更快,在同樣時間內執行更多的有效邏輯、減少無用的等待。gevent 就是一個現在很火、支持也很全面的 python第三方協程庫。?
? ? ? ? Python 通過?yield?提供了對 協程 的基本支持,但是不完全。greenlet 已經實現了協程,但是這個需要工人切換,有點麻煩。python 還有一個比 greenlet 更強大的并且能夠自動切換任務的模塊?gevent,gevent 為 Python提供了比較完善的協程支持。
? ? ? ? gevent 是 Python 的第三方并發框架庫,以微線程greenlet為核心,使用了epoll事件監聽機制以及諸多其他優化而變得高效。而且其中有個monkey類,將現有基于Python 線程直接轉化為 greenlet(類似于打patch),通過 greenlet 實現協程,其基本思想是:當一個 greenlet 遇到IO操作時,比如訪問網絡,就自動切換到其他的 greenlet,等到 IO 操作完成,再在適當的時候切換回來繼續執行。由于IO操作非常耗時,經常使程序處于等待狀態,有了 gevent 為我們自動切換協程,就保證總有 greenlet 在運行,而不是等待IO。同時也因為只有一個線程在執行,會極大的減少上下文切換的成本。
gevent 基本使用
示例代碼:
from gevent import monkey; monkey.patch_socket() import geventdef f(n):for i in range(n):print(gevent.getcurrent(), i)g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) g1.join() g2.join() g3.join()運行結果:
<Greenlet at 0x10e49f550: f(5)> 0 <Greenlet at 0x10e49f550: f(5)> 1 <Greenlet at 0x10e49f550: f(5)> 2 <Greenlet at 0x10e49f550: f(5)> 3 <Greenlet at 0x10e49f550: f(5)> 4 <Greenlet at 0x10e49f910: f(5)> 0 <Greenlet at 0x10e49f910: f(5)> 1 <Greenlet at 0x10e49f910: f(5)> 2 <Greenlet at 0x10e49f910: f(5)> 3 <Greenlet at 0x10e49f910: f(5)> 4 <Greenlet at 0x10e49f4b0: f(5)> 0 <Greenlet at 0x10e49f4b0: f(5)> 1 <Greenlet at 0x10e49f4b0: f(5)> 2 <Greenlet at 0x10e49f4b0: f(5)> 3 <Greenlet at 0x10e49f4b0: f(5)> 4可以看到,3個 greenlet 是依次運行而不是交替運行。
要讓 greenlet 交替運行,可以通過?gevent.sleep()交出控制權:
def f(n):for i in range(n):print(gevent.getcurrent(), i)gevent.sleep(0)執行結果:
<Greenlet at 0x10cd58550: f(5)> 0 <Greenlet at 0x10cd58910: f(5)> 0 <Greenlet at 0x10cd584b0: f(5)> 0 <Greenlet at 0x10cd58550: f(5)> 1 <Greenlet at 0x10cd584b0: f(5)> 1 <Greenlet at 0x10cd58910: f(5)> 1 <Greenlet at 0x10cd58550: f(5)> 2 <Greenlet at 0x10cd58910: f(5)> 2 <Greenlet at 0x10cd584b0: f(5)> 2 <Greenlet at 0x10cd58550: f(5)> 3 <Greenlet at 0x10cd584b0: f(5)> 3 <Greenlet at 0x10cd58910: f(5)> 3 <Greenlet at 0x10cd58550: f(5)> 4 <Greenlet at 0x10cd58910: f(5)> 4 <Greenlet at 0x10cd584b0: f(5)> 43個 greenlet 交替運行,把循環次數改為 500000,運行時間長一點,然后在操作系統的進程管理器中看,線程數只有1個。
示例代碼:
# -*- coding: utf-8 -*-import geventdef f1():for i in range(5):print('run func: f1, index: %s ' % i)gevent.sleep(0)def f2():for i in range(5):print('run func: f2, index: %s ' % i)gevent.sleep(0)t1 = gevent.spawn(f1) t2 = gevent.spawn(f2) gevent.joinall([t1, t2])運行后輸出如下圖所示:
由圖中可以看出,f1 和 f2 是交叉打印信息的,因為在代碼執行的過程中,我們人為使用 gevent.sleep(0) 創建了一個阻塞,gevent 在運行到這里時就會自動切換函數切換函數。也可以在執行的時候 sleep 更長時間,可以發現兩個函數基本是同時運行然后各自等待。
????在實際運用的過程中,我們如果有需要通過人為 sleep 來增加時間間隔或者確保部分邏輯安全的時候,此處使用就很方便了。當然,更多時候我們還是在需要進行網絡請求的時候使用 gevent,由于切換是在 IO 操作時自動完成,所以 gevent 需要修改 Python 自帶的一些標準庫,這一過程在啟動時通過 monkey patch 完成:
# -*- coding: utf-8 -*-from gevent import monkey; monkey.patch_all() import gevent import requests from datetime import datetimedef f(url):print(f'time: {datetime.now()}, GET: {url}')resp = requests.get(url)print(f'time: {datetime.now()}, {len(resp.text)} bytes received from {url}.')gevent.joinall([gevent.spawn(f, 'https://www.python.org/'),gevent.spawn(f, 'https://www.yahoo.com/'),gevent.spawn(f, 'https://github.com/'), ])運行上述代碼,結果如下:
由上圖可以看出,程序基本在同一時間觸發了對三個網站的請求,然后各自進行,分別結束,而且結束順序不同,也就是當 gevent 發現阻塞之后,讓當前繼續執行,然后自動切換到了另外的請求中運行。而且程序只有一個線程。
示例代碼:
from gevent import monkey# 有耗時操作時需要 monkey.patch_all() # 將程序中用到的耗時操作的代碼,換為 gevent中自己實現的模塊import gevent import random import timedef coroutine_work(coroutine_name):for i in range(10):print(coroutine_name, i)time.sleep(random.random())gevent.joinall([gevent.spawn(coroutine_work, "work1"),gevent.spawn(coroutine_work, "work2")] )加鎖
如果需要在使用 gevent 的時候加鎖,也是非常方便的:
# -*- coding: utf-8 -*-import gevent from gevent.lock import Semaphoresem = Semaphore(1)def f1():for i in range(5):sem.acquire()print('run f1, this is ', i)sem.release()gevent.sleep(1)def f2():for i in range(5):sem.acquire()print('run f2, that is ', i)sem.release()gevent.sleep(0.3)t1 = gevent.spawn(f1) t2 = gevent.spawn(f2) gevent.joinall([t1, t2])運行結果如下:
由輸出可以發現,程序會同時判斷是否在 sleep 以及是否有鎖兩種情況,然后執行當前的最有操作。
gevent 實現 生產者 - 消費者
# -*- coding: utf-8 -*-from gevent import monkey# 猴子補丁,all是所有能切換協程的地方都切換,包含了socket,所以一般都用all monkey.patch_all() from gevent.queue import Queue # 隊列 gevent中的隊列 import gevent import randomqq = Queue(3)def producer(index=1):while True:print(f'producer [{index}] --> ', end='')item = random.randint(0, 99)qq.put(item)print("生產了:", item)def consumer(index=1):while True:print(f'consumer [{index}] --> ', end='')item = qq.get()print("消費了:", item)def main_1():thread_1 = gevent.spawn(producer)thread_2 = gevent.spawn(consumer)thread_3 = gevent.spawn(consumer, 2)thread_list = [thread_1, thread_2, thread_3]gevent.joinall(thread_list)if __name__ == '__main__':main_1()# main_2()passgevent 調度過程解析
https://www.jianshu.com/p/f55148c41f54
gevent 比較重要的模塊
https://www.jianshu.com/p/0a0feb3fe361
gevent 比較重要的模塊,包括Timeout,Event/AsynResult,Semphore,socket patch,這些模塊都涉及當前協程與 hub 的切換。
總結
以上是生活随笔為你收集整理的Python 的协程库 greenlet 和 gevent的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python爬取大量数据时防止被封IP
- 下一篇: 简明Python教程学习笔记_8_异常