日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

gevent拾遗

發(fā)布時間:2025/3/20 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 gevent拾遗 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

  在前文已經(jīng)介紹過了gevent的調(diào)度流程,本文介紹gevent一些重要的模塊,包括Timeout,Event\AsynResult, Semphore, socket patch,這些模塊都涉及當(dāng)前協(xié)程與hub的切換。本文分析的gevent版本為1.2

Timeout

  這個類在gevent.timeout模塊,其作用是超時后在當(dāng)前協(xié)程拋出異常,這樣執(zhí)行流程也強(qiáng)制回到了當(dāng)前協(xié)程。看一個簡單的例子:

1 SLEEP = 6 2 TIMEOUT = 5 3 4 timeout = Timeout(TIMEOUT) 5 timeout.start() 6 7 def wait(): 8 gevent.sleep(SLEEP) 9 print('log in wait') 10 11 begin = time.time() 12 try: 13 gevent.spawn(wait).join() 14 except Timeout: 15 print('after %s catch Timeout Exception' % (time.time() - begin)) 16 finally: 17 timeout.cancel()

  輸出為:after 5.00100016594 catch Timeout Exception。可以看出,在5s之后在main協(xié)程拋出了Timeout異常(繼承自BaseException)。Timeout的實現(xiàn)很簡單,核心在start函數(shù):

1 def start(self): 2 """Schedule the timeout.""" 3 assert not self.pending, '%r is already started; to restart it, cancel it first' % self 4 if self.seconds is None: # "fake" timeout (never expires) 5 return 6 7 if self.exception is None or self.exception is False or isinstance(self.exception, string_types): 8 # timeout that raises self 9 self.timer.start(getcurrent().throw, self) 10 else: # regular timeout with user-provided exception 11 self.timer.start(getcurrent().throw, self.exception)

?

  從源碼可以看到,在超時之后調(diào)用了getcurrent().throw(),throw方法會切換協(xié)程,并拋出異常(在上面的代碼中默認(rèn)拋出Timeout異常)。使用Timeout有兩點需要注意:

  第一:一定要記得在finally調(diào)用cancel,否則如果協(xié)程先于TIMEOUT時間恢復(fù),之后還會拋出異常,例如下面的代碼:

1 import gevent 2 from gevent import Timeout 3 4 SLEEP = 4 5 TIMEOUT = 5 6 7 timeout = Timeout(TIMEOUT) 8 timeout.start() 9 10 def wait(): 11 gevent.sleep(SLEEP) 12 print('log in wait') 13 14 begin = time.time() 15 try: 16 gevent.spawn(wait).join() 17 except Timeout: 18 print('after %s catch Timeout Exception' % (time.time() - begin)) 19 # finally: 20 # timeout.cancel() 21 22 gevent.sleep(2) 23 print 'program will finish' 協(xié)程先于超時恢復(fù)

  上述的代碼運行會拋出Timeout異常,在這個例子中,協(xié)程先于超時恢復(fù)(SLEEP < TIMEOUT),且沒有在finally中調(diào)用Timeout.cancel。最后的兩行保證程序不要過早結(jié)束退出,那么在hub調(diào)度的時候會重新拋出異常。

  由于Timeout實現(xiàn)了with協(xié)議(__enter__和__exit__方法),更好的寫法是將TImeout寫在with語句中,如下面的代碼:

1 import gevent 2 from gevent import Timeout 3 4 SLEEP = 4 5 TIMEOUT = 5 6 7 8 def wait(): 9 gevent.sleep(SLEEP) 10 print('log in wait') 11 12 with Timeout(TIMEOUT): 13 begin = time.time() 14 try: 15 gevent.spawn(wait).join() 16 except Timeout: 17 print('after %s catch Timeout Exception' % (time.time() - begin)) 18 19 gevent.sleep(2) 20 print 'program will finish' Timeout with

?

  第二:Timeout只是切換到當(dāng)前協(xié)程,并不會取消已經(jīng)注冊的協(xié)程(上面通過spawn發(fā)起的協(xié)程),我們改改代碼:

1 import gevent 2 from gevent import Timeout 3 4 SLEEP = 6 5 TIMEOUT = 5 6 7 timeout = Timeout(TIMEOUT) 8 timeout.start() 9 10 def wait(): 11 gevent.sleep(SLEEP) 12 print('log in wait') 13 14 begin = time.time() 15 try: 16 gevent.spawn(wait).join() 17 except Timeout: 18 print('after %s catch Timeout Exception' % (time.time() - begin)) 19 finally: 20 timeout.cancel() 21 22 gevent.sleep(2) 23 print 'program will finish' 24 # output: 25 # after 5.00100016594 catch Timeout Exception 26 # log in wait 27 # program will finish Timeout不影響發(fā)起的協(xié)程

  從輸出可以看到,即使因為超時切回了main greenlet,但spawn發(fā)起的協(xié)程并不受影響。如果希望超時取消之前發(fā)起的協(xié)程,那么可以在捕獲到異常之后調(diào)用 Greenlet.kill

?  第三:gevent對可能導(dǎo)致當(dāng)前協(xié)程掛起的函數(shù)都提供了timeout參數(shù),用于在指定時間到達(dá)之后恢復(fù)被掛起的協(xié)程。在函數(shù)內(nèi)部會捕獲Timeout異常,并不會拋出。例如:

1 SLEEP = 6 2 TIMEOUT = 5 3 4 5 def wait(): 6 gevent.sleep(SLEEP) 7 print('log in wait') 8 9 begin = time.time() 10 try: 11 gevent.spawn(wait).join(TIMEOUT) 12 except Timeout: 13 print('after %s catch Timeout Exception' % (time.time() - begin)) 14 15 print 'program will exit', time.time() - begin 函數(shù)的timeout參數(shù)

?

Event & AsyncResult:

  Event用來在Greenlet之間同步,tutorial上的例子簡單明了:

1 import gevent 2 from gevent.event import Event 3 4 ''' 5 Illustrates the use of events 6 ''' 7 8 9 evt = Event() 10 11 def setter(): 12 '''After 3 seconds, wake all threads waiting on the value of evt''' 13 print('A: Hey wait for me, I have to do something') 14 gevent.sleep(3) 15 print("Ok, I'm done") 16 evt.set() 17 18 19 def waiter(): 20 '''After 3 seconds the get call will unblock''' 21 print("I'll wait for you") 22 evt.wait() # blocking 23 print("It's about time") 24 25 def main(): 26 gevent.joinall([ 27 gevent.spawn(setter), 28 gevent.spawn(waiter), 29 gevent.spawn(waiter), 30 31 ]) 32 33 if __name__ == '__main__': main() Event Example

?

  Event主要的兩個方法是set和wait:wait等待事件發(fā)生,如果事件未發(fā)生那么掛起該協(xié)程;set通知事件發(fā)生,然后hub會喚醒所有wait在該事件的協(xié)程。從輸出可知, 一次event觸發(fā)可以喚醒所有在該event上等待的協(xié)程。AsyncResult同Event類似,只不過可以在協(xié)程喚醒的時候傳值(有點類似generator的next send的區(qū)別)。接下來大致看看Event的set和wait方法。

  Event.wait的核心代碼在gevent.event._AbstractLinkable._wait_core,其中_AbstractLinkable是Event的基類。_wait_core源碼如下:

1 def _wait_core(self, timeout, catch=Timeout): 2 # The core of the wait implementation, handling 3 # switching and linking. If *catch* is set to (), 4 # a timeout that elapses will be allowed to be raised. 5 # Returns a true value if the wait succeeded without timing out. 6 switch = getcurrent().switch 7 self.rawlink(switch) 8 try: 9 timer = Timeout._start_new_or_dummy(timeout) 10 try: 11 try: 12 result = self.hub.switch() 13 if result is not self: # pragma: no cover 14 raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, )) 15 return True 16 except catch as ex: 17 if ex is not timer: 18 raise 19 # test_set_and_clear and test_timeout in test_threading 20 # rely on the exact return values, not just truthish-ness 21 return False 22 finally: 23 timer.cancel() 24 finally: 25 self.unlink(switch)

  首先是將當(dāng)前協(xié)程的switch加入到Event的callback列表,然后切換到hub。

  接下來是set函數(shù):

1 def set(self): 2 self._flag = True # make event ready 3 self._check_and_notify() 1 def _check_and_notify(self): 2 # If this object is ready to be notified, begin the process. 3 if self.ready(): 4 if self._links and not self._notifier: 5 self._notifier = self.hub.loop.run_callback(self._notify_links)

?

  _check_and_notify函數(shù)通知hub調(diào)用_notify_links, 在這個函數(shù)中將調(diào)用Event的callback列表(記錄的是之前各個協(xié)程的switch函數(shù)),這樣就恢復(fù)了所有wait的協(xié)程。

?

Semaphore & Lock

  Semaphore是gevent提供的信號量,實例化為Semaphore(value), value代表了可以并發(fā)的量。當(dāng)value為1,就變成了互斥鎖(Lock)。Semaphore提供了兩個函數(shù),acquire(P操作)和release(V操作)。當(dāng)acquire操作導(dǎo)致資源數(shù)量將為0之后,就會在當(dāng)前協(xié)程wait,源代碼如下(gevent._semaphore.Semaphore.acquire):

1 def acquire(self, blocking=True, timeout=None): 2 3 if self.counter > 0: 4 self.counter -= 1 5 return True 6 7 if not blocking: 8 return False 9 10 timeout = self._do_wait(timeout) 11 if timeout is not None: 12 # Our timer expired. 13 return False 14 15 # Neither our timer no another one expired, so we blocked until 16 # awoke. Therefore, the counter is ours 17 self.counter -= 1 18 assert self.counter >= 0 19 return True

?

  邏輯比較簡單,如果counter數(shù)量大于0,那么表示可并發(fā)。否則進(jìn)入wait,_do_wait的實現(xiàn)與Event.wait十分類似,都是記錄當(dāng)前協(xié)程的switch,并切換到hub。當(dāng)資源足夠切換回到當(dāng)前協(xié)程,此時counter一定是大于0的。由于協(xié)程的并發(fā)并不等同于線程的并發(fā),在任意時刻,一個線程內(nèi)只可能有一個協(xié)程在調(diào)度,所以上面對counter的操作也不用加鎖

?

Monkey-Patch

  對于python這種動態(tài)語言,在運行時替換模塊、類、實例的屬性都是非常容易的。我們以patch_socket為例:

>>> import socket
>>> print(socket.socket)
<class 'gevent._socket2.socket'>
>>> from gevent import monkey
>>> monkey.patch_socket()
>>> print(socket.socket)
<class 'gevent._socket2.socket'>
>>>

  可見在patch前后,同一個名字(socket)所指向的對象是不一樣的。在python2.x環(huán)境下,patch后的socket源碼在gevent._socket2.py,如果是python3.x,那么對應(yīng)的源碼在gevent._socket3.py.。至于為什么patch之后就讓原生的socket操作可以在協(xié)程之間協(xié)作,看兩個函數(shù)socket.__init__ 和 socket.recv就明白了。

  __init__函數(shù)(gevent._socket2.socket.__init__):

1 def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None): 2 if _sock is None: 3 self._sock = _realsocket(family, type, proto) # 原生的socket 4 self.timeout = _socket.getdefaulttimeout() 5 else: 6 if hasattr(_sock, '_sock'): 7 self._sock = _sock._sock 8 self.timeout = getattr(_sock, 'timeout', False) 9 if self.timeout is False: 10 self.timeout = _socket.getdefaulttimeout() 11 else: 12 self._sock = _sock 13 self.timeout = _socket.getdefaulttimeout() 14 if PYPY: 15 self._sock._reuse() 16 self._sock.setblocking(0) #設(shè)置成非阻塞 17 fileno = self._sock.fileno() 18 self.hub = get_hub() # hub 19 io = self.hub.loop.io 20 self._read_event = io(fileno, 1) # 監(jiān)聽事件 21 self._write_event = io(fileno, 2)

  從init函數(shù)可以看到,patch后的socket還是會維護(hù)原生的socket對象,并且將原生的socket設(shè)置成非阻塞(line16),當(dāng)一個socket是非阻塞時,如果讀寫數(shù)據(jù)沒有準(zhǔn)備好,那么會拋出EWOULDBLOCK\EAGIN異常。最后兩行注冊socket的可讀和可寫事件。再來看看recv函數(shù)(gevent._socket2.socket.recv):

1 def recv(self, *args): 2 sock = self._sock # keeping the reference so that fd is not closed during waiting 3 while True: 4 try: 5 return sock.recv(*args) # 如果數(shù)據(jù)準(zhǔn)備好了,直接返回 6 except error as ex: 7 if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0: 8 raise 9 # QQQ without clearing exc_info test__refcount.test_clean_exit fails 10 sys.exc_clear() 11 self._wait(self._read_event) # 等待數(shù)據(jù)可讀的watcher

?  如果在while循環(huán)中讀到了數(shù)據(jù),那么直接返回。但實際很大概率數(shù)據(jù)并沒有準(zhǔn)備好,對于非阻塞socket,拋出EWOULDBLOCK異常(line7)。在第11行,調(diào)用wait,注冊當(dāng)前協(xié)程switch,并切換到hub,當(dāng)read_event觸發(fā)時,表示socket可讀,這個時候就會切回當(dāng)前協(xié)程,進(jìn)入下一次while循環(huán)。

?

references:

http://sdiehl.github.io/gevent-tutorial/

http://www.cnblogs.com/xybaby/p/6370799.html

?

總結(jié)

以上是生活随笔為你收集整理的gevent拾遗的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。