Gevent简明教程
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
1、前述
進(jìn)程 線程 協(xié)程 異步
并發(fā)編程(不是并行)目前有四種方式:多進(jìn)程、多線程、協(xié)程和異步。
- 多進(jìn)程編程在python中有類似C的os.fork,更高層封裝的有multiprocessing標(biāo)準(zhǔn)庫
- 多線程編程python中有Thread和threading
- 異步編程在linux下主+要有三種實(shí)現(xiàn)select,poll,epoll
- 協(xié)程在python中通常會說到y(tǒng)ield,關(guān)于協(xié)程的庫主要有g(shù)reenlet,stackless,gevent,eventlet等實(shí)現(xiàn)
進(jìn)程
- 不共享任何狀態(tài)
- 調(diào)度由操作系統(tǒng)完成
- 有獨(dú)立的內(nèi)存空間(上下文切換的時候需要保存棧、cpu寄存器、虛擬內(nèi)存、以及打開的相關(guān)句柄等信息,開銷大)
- 通訊主要通過信號傳遞的方式來實(shí)現(xiàn)(實(shí)現(xiàn)方式有多種,信號量、管道、事件等,通訊都需要過內(nèi)核,效率低)
線程
- 共享變量(解決了通訊麻煩的問題,但是對于變量的訪問需要加鎖)
- 調(diào)度由操作系統(tǒng)完成(由于共享內(nèi)存,上下文切換變得高效)
- 一個進(jìn)程可以有多個線程,每個線程會共享父進(jìn)程的資源(創(chuàng)建線程開銷占用比進(jìn)程小很多,可創(chuàng)建的數(shù)量也會很多)
- 通訊除了可使用進(jìn)程間通訊的方式,還可以通過共享內(nèi)存的方式進(jìn)行通信(通過共享內(nèi)存通信比通過內(nèi)核要快很多)
協(xié)程
- 調(diào)度完全由用戶控制
- 一個線程(進(jìn)程)可以有多個協(xié)程
- 每個線程(進(jìn)程)循環(huán)按照指定的任務(wù)清單順序完成不同的任務(wù)(當(dāng)任務(wù)被堵塞時,執(zhí)行下一個任務(wù);當(dāng)恢復(fù)時,再回來執(zhí)行這個任務(wù);任務(wù)間切換只需要保存任務(wù)的上下文,沒有內(nèi)核的開銷,可以不加鎖的訪問全局變量)
- 協(xié)程需要保證是非堵塞的且沒有相互依賴
- 協(xié)程基本上不能同步通訊,多采用異步的消息通訊,效率比較高
總結(jié)
協(xié)程 > 多進(jìn)程 >多線程 使用gevent,可以獲得極高的并發(fā)性能,但gevent只能在Unix/Linux下運(yùn)行,在Windows下不保證正常安裝和運(yùn)行既然有了線程為什么還要協(xié)程呢?因?yàn)榫€程是系統(tǒng)級別的,在做切換的時候消耗是特別大的,具體為什么這么大等我研究好了再告訴你;同時線程的切換是由CPU決定的,可能你剛好執(zhí)行到一個地方的時候就要被迫終止,這個時候你需要用各種措施來保證你的數(shù)據(jù)不出錯,所以線程對于數(shù)據(jù)安全的操作是比較復(fù)雜的。而協(xié)程是用戶級別的切換,且切換是由自己控制,不受外力終止.
- 進(jìn)程擁有自己獨(dú)立的堆和棧,既不共享堆,亦不共享?xiàng)?#xff0c;進(jìn)程由操作系統(tǒng)調(diào)度
- 線程擁有自己獨(dú)立的棧和共享的堆,共享堆,不共享?xiàng)?#xff0c;線程亦由操作系統(tǒng)調(diào)度(標(biāo)準(zhǔn)線程是的)
- 協(xié)程和線程一樣共享堆,不共享?xiàng)?#xff0c;協(xié)程由程序員在協(xié)程的代碼里顯示調(diào)度
聊聊協(xié)程
協(xié)程,又稱微線程,纖程。 Python的線程并不是標(biāo)準(zhǔn)線程,是系統(tǒng)級進(jìn)程,線程間上下文切換有開銷,而且Python在執(zhí)行多線程時默認(rèn)加了一個全局解釋器鎖(GIL),因此Python的多線程其實(shí)是串行的,所以并不能利用多核的優(yōu)勢,也就是說一個進(jìn)程內(nèi)的多個線程只能使用一個CPU。
def coroutine(func):def ret():f = func()f.next()return freturn ret@coroutine def consumer():print "Wait to getting a task"while True:n = (yield)print "Got %s",nimport time def producer():c = consumer()task_id = 0while True:time.sleep(1)print "Send a task to consumer" % task_idc.send("task %s" % task_id)if __name__ == "__main__":producer()運(yùn)行結(jié)果:Wait to getting a task Send a task 0 to consumer Got task 0 Send a task 1 to consumer Got task 1 Send a task 2 to consumer Got task 2傳統(tǒng)的生產(chǎn)者-消費(fèi)者模型是一個線程寫消息,一個線程取消息,通過鎖機(jī)制控制隊(duì)列和等待,但容易死鎖。 如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉(zhuǎn)到消費(fèi)者開始執(zhí)行,待消費(fèi)者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高。
2、Gevent
介紹
gevent是基于協(xié)程的Python網(wǎng)絡(luò)庫。特點(diǎn):
- 基于libev的快速事件循環(huán)(Linux上epoll,FreeBSD上kqueue)。
- 基于greenlet的輕量級執(zhí)行單元。
- API的概念和Python標(biāo)準(zhǔn)庫一致(如事件,隊(duì)列)。
- 可以配合socket,ssl模塊使用。
- 能夠使用標(biāo)準(zhǔn)庫和第三方模塊創(chuàng)建標(biāo)準(zhǔn)的阻塞套接字(gevent.monkey)。
- 默認(rèn)通過線程池進(jìn)行DNS查詢,也可通過c-are(通過GEVENT_RESOLVER=ares環(huán)境變量開啟)。
- TCP/UDP/HTTP服務(wù)器
- 子進(jìn)程支持(通過gevent.subprocess)
- 線程池
核心部分
- Greenlets
- 同步和異步執(zhí)行
- 確定性
- 創(chuàng)建Greenlets
- Greenlet狀態(tài)
- 程序停止
- 超時
- 猴子補(bǔ)丁
Greenlets
gevent中的主要模式, 它是以C擴(kuò)展模塊形式接入Python的輕量級協(xié)程。 全部運(yùn)行在主程序操作系統(tǒng)進(jìn)程的內(nèi)部,但它們被程序員協(xié)作式地調(diào)度。
在任何時刻,只有一個協(xié)程在運(yùn)行。區(qū)別于multiprocessing、threading等提供真正并行構(gòu)造的庫, 這些庫輪轉(zhuǎn)使用操作系統(tǒng)調(diào)度的進(jìn)程和線程,是真正的并行。
同步和異步執(zhí)行
并發(fā)的核心思想在于,大的任務(wù)可以分解成一系列的子任務(wù),后者可以被調(diào)度成 同時執(zhí)行或異步執(zhí)行,而不是一次一個地或者同步地執(zhí)行。兩個子任務(wù)之間的 切換也就是上下文切換。
在gevent里面,上下文切換是通過yielding來完成的.
import gevent def foo():print('Running in foo')gevent.sleep(0)print('Explicit context switch to foo again') def bar():print('Explicit context to bar')gevent.sleep(0)print('Implicit context switch back to bar') gevent.joinall([gevent.spawn(foo),gevent.spawn(bar), ])Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar網(wǎng)絡(luò)延遲或IO阻塞隱式交出greenlet上下文的執(zhí)行權(quán).
import time import gevent from gevent import select start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1():print('Started Polling: %s' % tic())select.select([], [], [], 1)print('Ended Polling: %s' % tic()) def gr2():print('Started Polling: %s' % tic())select.select([], [], [], 2)print('Ended Polling: %s' % tic()) def gr3():print("Hey lets do some stuff while the greenlets poll, %s" % tic())gevent.sleep(1) gevent.joinall([gevent.spawn(gr1),gevent.spawn(gr2),gevent.spawn(gr3), ])運(yùn)行結(jié)果: Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 1.0 seconds Ended Polling: at 2.0 seconds同步vs異步
import gevent import random def task(pid):gevent.sleep(random.randint(0,2)*0.001)print('Task %s done' % pid) def synchronous():for i in xrange(5):task(i) def asynchronous():threads = [gevent.spawn(task, i) for i in xrange(5)]gevent.joinall(threads)print('Synchronous:')synchronous()print('Asynchronous:')asynchronous()運(yùn)行結(jié)果:Synchronous: Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Asynchronous: Task 2 done Task 0 done Task 1 done Task 3 done Task 4 done確定性
greenlet具有確定性。在相同配置相同輸入的情況下,它們總是會產(chǎn)生相同的輸出
import time def echo(i):time.sleep(0.001)return i # Non Deterministic Process Pool from multiprocessing.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4) # Deterministic Gevent Pool from gevent.pool import Pool p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))] print(run1 == run2 == run3 == run4)運(yùn)行結(jié)果:False True即使gevent通常帶有確定性,當(dāng)開始與如socket或文件等外部服務(wù)交互時, 不確定性也可能溜進(jìn)你的程序中。因此盡管gevent線程是一種“確定的并發(fā)”形式, 使用它仍然可能會遇到像使用POSIX線程或進(jìn)程時遇到的那些問題。
涉及并發(fā)長期存在的問題就是競爭條件(race condition)(當(dāng)兩個并發(fā)線程/進(jìn)程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現(xiàn)競爭條件),這會導(dǎo)致資源修改的結(jié)果狀態(tài)依賴于時間和執(zhí)行順序。 這個問題,會導(dǎo)致整個程序行為變得不確定。
解決辦法: 始終避免所有全局的狀態(tài).
創(chuàng)建Greenlets
gevent對Greenlet初始化提供了一些封裝.
import gevent from gevent import Greenlet def foo(message, n):gevent.sleep(n)print(message)thread1 = Greenlet.spawn(foo, "Hello", 1)thread2 = gevent.spawn(foo, "I live!", 2)thread3 = gevent.spawn(lambda x: (x+1), 2)threads = [thread1, thread2, thread3]gevent.joinall(threads)執(zhí)行結(jié)果:Hello I live!除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的_run方法.
import gevent from gevent import Greenlet class MyGreenlet(Greenlet):def __init__(self, message, n):Greenlet.__init__(self)self.message = messageself.n = ndef _run(self):print(self.message)gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3) g.start() g.join()執(zhí)行結(jié)果: Hi there!Greenlet狀態(tài)
greenlet的狀態(tài)通常是一個依賴于時間的參數(shù):
- started – Boolean, 指示此Greenlet是否已經(jīng)啟動
- ready() – Boolean, 指示此Greenlet是否已經(jīng)停止
- successful() – Boolean, 指示此Greenlet是否已經(jīng)停止而且沒拋異常
- value – 任意值, 此Greenlet代碼返回的值
- exception – 異常, 此Greenlet內(nèi)拋出的未捕獲異常
程序停止
當(dāng)主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執(zhí)行。這導(dǎo)致了所謂的僵尸進(jìn)程, 它需要在Python解釋器之外被kill掉
通用的處理模式就是在主程序中監(jiān)聽SIGQUIT信號,調(diào)用gevent.shutdown退出程序
import gevent import signal def run_forever():gevent.sleep(1000)if __name__ == '__main__':gevent.signal(signal.SIGQUIT, gevent.shutdown)thread = gevent.spawn(run_forever)thread.join()超時
通過超時可以對代碼塊兒或一個Greenlet的運(yùn)行時間進(jìn)行約束
import gevent from gevent import Timeout seconds = 10 timeout = Timeout(seconds) timeout.start() def wait():gevent.sleep(10)try:gevent.spawn(wait).join()except Timeout:print('Could not complete')超時類
import gevent from gevent import Timeout time_to_wait = 5 # secondsclass TooLong(Exception):passwith Timeout(time_to_wait, TooLong):gevent.sleep(10)另外,對各種Greenlet和數(shù)據(jù)結(jié)構(gòu)相關(guān)的調(diào)用,gevent也提供了超時參數(shù)
import gevent from gevent import Timeout def wait():gevent.sleep(2) timer = Timeout(1).start() thread1 = gevent.spawn(wait) try:thread1.join(timeout=timer) except Timeout:print('Thread 1 timed out') # -- timer = Timeout.start_new(1) thread2 = gevent.spawn(wait) try:thread2.get(timeout=timer) except Timeout:print('Thread 2 timed out') # -- try:gevent.with_timeout(1, wait) except Timeout:print('Thread 3 timed out')運(yùn)行結(jié)果:Thread 1 timed out Thread 2 timed out Thread 3 timed out猴子補(bǔ)丁(Monkey patching)
gevent的死角.
import socket print(socket.socket) print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket) import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select)運(yùn)行結(jié)果:class 'socket.socket' After monkey patch class 'gevent.socket.socket' built-in function select After monkey patch function select at 0x1924de8Python的運(yùn)行環(huán)境允許我們在運(yùn)行時修改大部分的對象,包括模塊,類甚至函數(shù)。 這是個一般說來令人驚奇的壞主意,因?yàn)樗鼊?chuàng)造了“隱式的副作用”,如果出現(xiàn)問題 它很多時候是極難調(diào)試的。雖然如此,在極端情況下當(dāng)一個庫需要修改Python本身 的基礎(chǔ)行為的時候,猴子補(bǔ)丁就派上用場了。在這種情況下,gevent能夠修改標(biāo)準(zhǔn)庫里面大部分的阻塞式系統(tǒng)調(diào)用,包括socket、ssl、threading和 select等模塊,而變?yōu)閰f(xié)作式運(yùn)行。
例如,Redis的python綁定一般使用常規(guī)的tcp socket來與redis-server實(shí)例通信。 通過簡單地調(diào)用gevent.monkey.patch_all(),可以使得redis的綁定協(xié)作式的調(diào)度 請求,與gevent棧的其它部分一起工作。
這讓我們可以將一般不能與gevent共同工作的庫結(jié)合起來,而不用寫哪怕一行代碼。 雖然猴子補(bǔ)丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”
數(shù)據(jù)結(jié)構(gòu)
- 事件(event)
- 隊(duì)列(Queue)
- 組和池(group/pool)
- 鎖和信號量
- 線程局部變量
- 子進(jìn)程
- Actors
事件(event)
事件(event)是一個在Greenlet之間異步通信的形式
import gevent from gevent.event import Eventevt = Event()def setter():print('A: Hey wait for me, I have to do something')gevent.sleep(3)print("Ok, I'm done")evt.set() def waiter():print("I'll wait for you")evt.wait() # blockingprint("It's about time") def main():gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter),gevent.spawn(waiter),gevent.spawn(waiter)]) if __name__ == '__main__': main()運(yùn)行結(jié)果:A: Hey wait for me, I have to do something I'll wait for you I'll wait for you I'll wait for you Ok, I'm done It's about time It's about time It's about time事件對象的一個擴(kuò)展是AsyncResult,它允許你在喚醒調(diào)用上附加一個值。 它有時也被稱作是future或defered,因?yàn)樗钟幸粋€指向?qū)砣我鈺r間可設(shè)置為任何值的引用
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter():gevent.sleep(3)a.set('Hello!') def waiter():print(a.get()) gevent.joinall([gevent.spawn(setter),gevent.spawn(waiter), ])隊(duì)列(Queue)
隊(duì)列是一個排序的數(shù)據(jù)集合,它有常見的put / get操作, 但是它是以在Greenlet之間可以安全操作的方式來實(shí)現(xiàn)的
import gevent from gevent.queue import Queue tasks = Queue() def worker(n):while not tasks.empty():task = tasks.get()print('Worker %s got task %s' % (n, task))gevent.sleep(0)print('Quitting time!') def boss():for i in xrange(1,10):tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([gevent.spawn(worker, 'steve'),gevent.spawn(worker, 'john'),gevent.spawn(worker, 'nancy'), ])執(zhí)行結(jié)果:Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker john got task 5 Worker nancy got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Quitting time! Quitting time! Quitting time!put和get操作都是阻塞的,put_nowait和get_nowait不會阻塞, 然而在操作不能完成時拋出gevent.queue.Empty或gevent.queue.Full異常
組和池(group/pool)
組(group)是一個運(yùn)行中g(shù)reenlet集合,集合中的greenlet像一個組一樣會被共同管理和調(diào)度。 它也兼飾了像Python的multiprocessing庫那樣的平行調(diào)度器的角色,主要用在在管理異步任務(wù)的時候進(jìn)行分組.
import gevent from gevent.pool import Group def talk(msg):for i in xrange(2):print(msg) g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz') group = Group() group.add(g1) group.add(g2) group.join() group.add(g3) group.join()運(yùn)行結(jié)果:bar bar foo foo fizz fizz池(pool)是一個為處理數(shù)量變化并且需要限制并發(fā)的greenlet而設(shè)計(jì)的結(jié)構(gòu) 不使用Pool
from gevent import monkeymonkey.patch_all() import gevent import requestsurls = ["https://www.python.org/", "https://www.yahoo.com/", "https://github.com/"]def get(url):print(requests.get(url).url)ts = [gevent.spawn(get, url) for url in urls]gevent.joinall(ts)使用Pool
from gevent import monkeymonkey.patch_all() import requests from gevent.pool import Poolurls = ["https://www.python.org/", "https://www.yahoo.com/", "https://github.com/"]def get(url):print(requests.get(url).url)p = Pool(3) p.map(get, urls)構(gòu)造一個socket池的類,在各個socket上輪詢
from gevent.pool import Pool class SocketPool(object):def __init__(self):self.pool = Pool(10)self.pool.start()def listen(self, socket):while True:socket.recv()def add_handler(self, socket):if self.pool.full():raise Exception("At maximum pool size")else:self.pool.spawn(self.listen, socket)def shutdown(self):self.pool.kill()鎖和信號量
信號量是一個允許greenlet相互合作,限制并發(fā)訪問或運(yùn)行的低層次的同步原語。 信號量有兩個方法,acquire和release。在信號量是否已經(jīng)被 acquire或release,和擁有資源的數(shù)量之間不同,被稱為此信號量的范圍 (the bound of the semaphore)。如果一個信號量的范圍已經(jīng)降低到0,它會 阻塞acquire操作直到另一個已經(jīng)獲得信號量的greenlet作出釋放
from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) def worker1(n):sem.acquire()print('Worker %i acquired semaphore' % n)sleep(0)sem.release()print('Worker %i released semaphore' % n) def worker2(n):with sem:print('Worker %i acquired semaphore' % n)sleep(0)print('Worker %i released semaphore' % n) pool = Pool() pool.map(worker1, xrange(0,2))運(yùn)行結(jié)果:Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore鎖(lock)是范圍為1的信號量。它向單個greenlet提供了互斥訪問。 信號量和鎖常被用來保證資源只在程序上下文被單次使用
線程局部變量
Gevent允許程序員指定局部于greenlet上下文的數(shù)據(jù)。 在內(nèi)部,它被實(shí)現(xiàn)為以greenlet的getcurrent()為鍵, 在一個私有命名空間尋址的全局查找
import gevent from gevent.local import local stash = local() def f1():stash.x = 1print(stash.x) def f2():stash.y = 2print(stash.y)try:stash.xexcept AttributeError:print("x is not local to f2") g1 = gevent.spawn(f1) g2 = gevent.spawn(f2) gevent.joinall([g1, g2])運(yùn)行結(jié)果:1 2 x is not local to f2很多集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內(nèi)。 例如使用Werkzeug實(shí)用庫和它的proxy對象,我們可以創(chuàng)建Flask風(fēng)格的請求對象
from gevent.local import local from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager from gevent.wsgi import WSGIServer _requests = local() request = LocalProxy(lambda: _requests.request) @contextmanager def sessionmanager(environ):_requests.request = Request(environ)yield_requests.request = None def logic():return "Hello " + request.remote_addr def application(environ, start_response):status = '200 OK'with sessionmanager(environ):body = logic()headers = [('Content-Type', 'text/html')]start_response(status, headers)return [body]WSGIServer(('', 8000), application).serve_forever()子進(jìn)程
從gevent 1.0起,支持gevent.subprocess,支持協(xié)作式的等待子進(jìn)程
import gevent from gevent.subprocess import Popen, PIPE def cron():while True:print("cron")gevent.sleep(0.2) g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip())運(yùn)行結(jié)果:cron cron cron cron cron Linux很多人也想將gevent和multiprocessing一起使用。最明顯的挑戰(zhàn)之一 就是multiprocessing提供
import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write # To Process a, b = Pipe() # From Process c, d = Pipe() def relay():for i in xrange(5):msg = b.recv()c.send(msg + " in " + str(i)) def put_msg():for i in xrange(5):wait_write(a.fileno())a.send('hi') def get_msg():for i in xrange(5):wait_read(d.fileno())print(d.recv()) if __name__ == '__main__':proc = Process(target=relay)proc.start()g1 = gevent.spawn(get_msg)g2 = gevent.spawn(put_msg)gevent.joinall([g1, g2], timeout=1)執(zhí)行結(jié)果:hi in 0 hi in 1 hi in 2 hi in 3 hi in 4然而要注意,組合multiprocessing和gevent必定帶來 依賴于操作系統(tǒng)(os-dependent)的缺陷,其中有:
在兼容POSIX的系統(tǒng)創(chuàng)建子進(jìn)程(forking)之后, 在子進(jìn)程的gevent的狀態(tài)是不適定的(ill-posed)。一個副作用就是, multiprocessing.Process創(chuàng)建之前的greenlet創(chuàng)建動作,會在父進(jìn)程和子進(jìn)程兩方都運(yùn)行。
上例的put_msg()中的a.send()可能依然非協(xié)作式地阻塞調(diào)用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成之前底下的buffer可能是滿的。
上面表示的基于wait_write()/wait_read()的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)),因?yàn)閃indows不能監(jiān)視 pipe事件。
Python包gipc以大體上透明的方式在 兼容POSIX系統(tǒng)和Windows上克服了這些挑戰(zhàn)。它提供了gevent感知的基于 multiprocessing.Process的子進(jìn)程和gevent基于pipe的協(xié)作式進(jìn)程間通信
Actors
actor模型是一個由于Erlang變得普及的更高層的并發(fā)模型。 簡單的說它的主要思想就是許多個獨(dú)立的Actor,每個Actor有一個可以從 其它Actor接收消息的收件箱。Actor內(nèi)部的主循環(huán)遍歷它收到的消息,并根據(jù)它期望的行為來采取行動。
Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內(nèi)使用隊(duì)列, 我們可以定義一個非常簡單的
import gevent from gevent.queue import Queue class Actor(gevent.Greenlet):def __init__(self):self.inbox = Queue()Greenlet.__init__(self)def receive(self, message):"""Define in your subclass."""raise NotImplemented()def _run(self):self.running = Truewhile self.running:message = self.inbox.get()self.receive(message)下面是一個使用的例子:
import gevent from gevent.queue import Queue from gevent import Greenlet class Pinger(Actor):def receive(self, message):print(message)pong.inbox.put('ping')gevent.sleep(0) class Ponger(Actor):def receive(self, message):print(message)ping.inbox.put('pong')gevent.sleep(0) ping = Pinger() pong = Ponger() ping.start() pong.start() ping.inbox.put('start') gevent.joinall([ping, pong])實(shí)際應(yīng)用
簡單server
# On Unix: Access with ``$ nc 127.0.0.1 5000`` # On Window: Access with ``$ telnet 127.0.0.1 5000`` from gevent.server import StreamServer def handle(socket, address):socket.send("Hello from a telnet!\n")for i in range(5):socket.send(str(i) + '\n')socket.close() server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()WSGI Servers And Websockets
Gevent為HTTP內(nèi)容服務(wù)提供了兩種WSGI server。從今以后就稱為 wsgi和pywsgi
- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer
glb中使用
import click from flask import Flask from gevent.pywsgi import WSGIServer from geventwebsocket.handler import WebSocketHandler import v1 from .settings import Config from .sockethandler import handle_websocket def create_app(config=None):app = Flask(__name__, static_folder='static')if config:app.config.update(config)else:app.config.from_object(Config)app.register_blueprint(v1.bp,url_prefix='/v1')return app def wsgi_app(environ, start_response):path = environ['PATH_INFO']if path == '/websocket':handle_websocket(environ['wsgi.websocket'])else:return create_app()(environ, start_response) @click.command() @click.option('-h', '--host_port', type=(unicode, int),default=('0.0.0.0', 5000), help='Host and port of server.') @click.option('-r', '--redis', type=(unicode, int, int),default=('127.0.0.1', 6379, 0),help='Redis url of server.') @click.option('-p', '--port_range', type=(int, int),default=(50000, 61000),help='Port range to be assigned.') def manage(host_port, redis=None, port_range=None):Config.REDIS_URL = 'redis://%s:%s/%s' % redisConfig.PORT_RANGE = port_rangehttp_server = WSGIServer(host_port,wsgi_app, handler_class=WebSocketHandler)print '----GLB Server run at %s:%s-----' % host_portprint '----Redis Server run at %s:%s:%s-----' % redishttp_server.serve_forever()3、缺陷
和其他異步I/O框架一樣,gevent也有一些缺陷:
- 阻塞(真正的阻塞,在內(nèi)核級別)在程序中的某個地方停止了所有的東西.這很像C代碼中monkey patch沒有生效
- 保持CPU處于繁忙狀態(tài).greenlet不是搶占式的,這可能導(dǎo)致其他greenlet不會被調(diào)度.
- 在greenlet之間存在死鎖的可能.
一個gevent回避的缺陷是,你幾乎不會碰到一個和異步無關(guān)的Python庫–它將阻塞你的應(yīng)用程序,因?yàn)榧働ython庫使用的是monkey patch的stdlib
轉(zhuǎn)載于:https://my.oschina.net/u/2474096/blog/1586762
總結(jié)
以上是生活随笔為你收集整理的Gevent简明教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第一章 关于python
- 下一篇: 从放弃迅雷和IDM到自己开发下载工具