asyncio协程与并发
并發(fā)編程
Python的并發(fā)實(shí)現(xiàn)有三種方法。
基本概念
串行:同時(shí)只能執(zhí)行單個(gè)任務(wù)
并行:同時(shí)執(zhí)行多個(gè)任務(wù)
在Python中,雖然嚴(yán)格說(shuō)來(lái)多線程與協(xié)程都是串行的,但其效率高,在遇到阻塞時(shí)會(huì)將阻塞任務(wù)交給系統(tǒng)執(zhí)行,通過(guò)合理調(diào)度任務(wù),使得程序高效。
最高效的當(dāng)然是多進(jìn)程了,但由于多進(jìn)程依賴硬件配置,并且當(dāng)任務(wù)量超過(guò)CPU核心數(shù)時(shí),多進(jìn)程會(huì)有進(jìn)程上下文切換開(kāi)銷,而這個(gè)開(kāi)銷很大,所以不是最佳解決方案。
常見(jiàn)耗時(shí)場(chǎng)景
CPU計(jì)算密集型
多線程對(duì)比單線程,由于GIL的存在,切換線程需要不斷加鎖、釋放鎖,效率反而更低;多進(jìn)程相當(dāng)于多個(gè)CPU同時(shí)工作,因此效率很高。
IO密集型
IO密集型可以是磁盤(pán)IO、網(wǎng)絡(luò)IO、數(shù)據(jù)庫(kù)IO等,都屬于計(jì)算量小,IO等待浪費(fèi)高。越是IO等待時(shí)間長(zhǎng),則多線程的優(yōu)勢(shì)相比單線程越明顯,多進(jìn)程效率高但依賴配置資源。
結(jié)論
單線程總是最慢的,多線程適合在IO密集型場(chǎng)景使用,多進(jìn)程適合CPU計(jì)算要求高的場(chǎng)景下使用,多進(jìn)程雖然總是最快的,但需要CPU資源支持。
多線程
Python創(chuàng)建多線程有兩種方法。
用函數(shù)創(chuàng)建多線程
from threading import Threaddef func():for i in range(2):print('Hello world!')sleep(1)th1 = Thread(target=func) th1.start() th2 = Thread(target=func) th2.start()用類創(chuàng)建多線程
這個(gè)類必須繼承Thread,必須重載run()方法
from threading import Threadclass MyThread(Thread):def __init__(self):super().__init__()self.name = 'Bob'def run(self):for i in range(2):print('Hello world!')sleep(1)th1 = MyThread() th2 = MyThread()th1.start() th2.start()常用方法
- threading.Thread(target=func, args=())
- start() # 啟動(dòng)子線程
- join() # 阻塞子線程
- is_alive()/isAlive() # 判斷線程執(zhí)行狀態(tài),正在執(zhí)行返回True,否則False
- daemon # 設(shè)置線程是否隨主線程退出而退出,默認(rèn)False
- name # 設(shè)置線程名
線程鎖
import threadinglock = threading.Lock() # 生成鎖,全局唯一lock.acquire() # 加鎖lock.release() # 釋放鎖加鎖與解鎖必須成對(duì)出現(xiàn),或者使用上下文管理器with來(lái)管理鎖。
可重入鎖
在Redis分布式鎖中提到過(guò),用于讓非阻塞線程重復(fù)獲得鎖來(lái)發(fā)送或讀取數(shù)據(jù),這里的可重入鎖僅指讓同一線程可以多次獲取鎖。
import threadingrlock = threading.RLock() # 生成可重入鎖死鎖
死鎖通常有兩種。
GIL全局鎖
多進(jìn)程是真正的并行,而多線程是偽并行,實(shí)際是多個(gè)線程交替執(zhí)行。
遇到GIL影響性能的情況,要么考慮用多進(jìn)程替代多線程,要么更換Python解釋器。
線程通信
常用線程通信方法。
Event事件
import threadingevent = threading.Event()event.clear() # 重置event,使所有該event事件都處于待命狀態(tài)event.wait() # 等待接收event指令,決定是否阻塞程序執(zhí)行evnet.set() # 發(fā)送event指令,所有該event事件的線程開(kāi)始執(zhí)行 import timeimport threadingclass MyThread(threading.Thread):def __init__(self, name, event):super().__init__()self.name = nameself.event = eventdef run(self):self.event.wait() # 等待event.set()才能執(zhí)行下去time.sleep(1)print('{} Done'.format(self.name))threads = [] event = threading.Event()for i in range(5):threads.append(MyThread(event))event.clear() # 重置event,使event.wait()生效for t in threads:t.start()print('Waiting 3s') time.sleep(3)print('Awake all threads') event.set() # 發(fā)送event指令,所有綁定了event的線程開(kāi)始執(zhí)行所有線程在調(diào)用start()方法后并不會(huì)執(zhí)行完,而是在event.wait()處停住了,需要發(fā)送event.set()指令才能繼續(xù)執(zhí)行。
Condition狀態(tài)
import threadingcond = threading.Condition()cond.acquire()cond.release()cond.wait() # 等待指令觸發(fā),同時(shí)臨時(shí)釋放鎖,直到調(diào)用notify才重新占有鎖cond.notify() # 發(fā)送指令Condition與Event很類似,不過(guò)由于wait()與notify()可以反復(fù)調(diào)用,因此一般作為編程人員可控調(diào)用鎖來(lái)使用,放在run()方法下。
Queue隊(duì)列
隊(duì)列是線程安全的,通過(guò)put()和get()方法來(lái)操作隊(duì)列。
from queue import Queueq = Queue(maxsize=0) # 設(shè)置0表示無(wú)限長(zhǎng)隊(duì)列q.get(timeout=0.5) # 阻塞程序,等待隊(duì)列消息,可以設(shè)置超時(shí)時(shí)間q.put() # 發(fā)送消息q.join() # 等待所有消息被消費(fèi)完# 不常用但要了解的方法 q.qsize() # 返回消息個(gè)數(shù) q.empty() # 返回bool值,隊(duì)列是否空 q.full() # 返回bool值,隊(duì)列是否滿Queue是FIFO隊(duì)列,還有queue.LifoQueue,queue.PriorityQueue。
線程隔離
兩個(gè)線程的變量不能被相互訪問(wèn)。
通常使用threading.local類來(lái)實(shí)現(xiàn),該類的實(shí)例是一個(gè)字典型對(duì)象,直接通過(guò)key-value形式存入變量,如threading.local().name = 'bob'。
如果想要實(shí)現(xiàn)一個(gè)線程內(nèi)的全局變量或?qū)崿F(xiàn)線程間的信息隔離,就使用local類。
線程池
多線程并不是越多越好,因?yàn)樵谇袚Q線程時(shí)會(huì)切換上下文環(huán)境(當(dāng)然相比多進(jìn)程的開(kāi)銷要小的多),在量大時(shí)依然會(huì)造成CPU的開(kāi)銷。
因此出現(xiàn)了線程池的概念,即預(yù)先創(chuàng)建好合適數(shù)量的線程,使任務(wù)能立刻使用。
通過(guò)concurrent.futures庫(kù)的ThreadPoolExecutor類來(lái)實(shí)現(xiàn)。
import time import threading from concurrent.futures import ThreadPoolExecutordef target():for i in range(5):print('{}-{}\n'.format(threading.get_ident(), i)time.sleep(1)pool = ThreadPoolExecutor(5) # 線程池?cái)?shù)量限制為5for i in range(100):pool.submit(target) # 往線程中提交并運(yùn)行協(xié)程
學(xué)習(xí)協(xié)程,要先理解生成器,因?yàn)镻ython的協(xié)程是從生成器中誕生并演變到現(xiàn)在這個(gè)樣子的。
可迭代、迭代器、生成器
可迭代對(duì)象,其類或元類都實(shí)現(xiàn)了__iter__()方法,而該方法返回一個(gè)對(duì)象支持迭代,既可以是string/list/tuple/dict等內(nèi)置類型的對(duì)象,也可以是自己寫(xiě)的對(duì)象(這個(gè)對(duì)象的類實(shí)現(xiàn)了遍歷元素的__iter__方法)。
迭代器對(duì)象,可迭代對(duì)象是迭代器的基礎(chǔ),迭代器只是比可迭代對(duì)象多了一個(gè)__next__()方法,這個(gè)方法讓我們可以不再用for循環(huán)來(lái)獲取元素。
生成器對(duì)象,在迭代器的基礎(chǔ)上,實(shí)現(xiàn)了yield,相當(dāng)于函數(shù)中的return,在每次for循環(huán)遍歷或調(diào)用next()時(shí),都會(huì)返回一個(gè)值并阻塞等待下一次調(diào)用。
可迭代對(duì)象、迭代器都是將所有元素放在內(nèi)存里,而生成器則是需要時(shí)臨時(shí)生成元素,所以生成器節(jié)省時(shí)間、空間。
如何運(yùn)行/激活生成器
兩個(gè)方法。
這兩個(gè)方法是等價(jià)的,但由于send方法可以傳值進(jìn)去,所以在協(xié)程中大有用處。
生成器的執(zhí)行狀態(tài)
通過(guò)inspect庫(kù)的getgeneratorstate方法獲取狀態(tài)信息。
生成器的異常
StopIteration
從生成器過(guò)渡到協(xié)程:yield
生成器引入了函數(shù)暫停執(zhí)行(yield)功能,后來(lái)又引入了向暫停的生成器發(fā)送信息的功能(send),并以此催生了協(xié)程。
協(xié)程是為非搶占式多任務(wù)產(chǎn)生子程序的計(jì)算機(jī)程序組件,協(xié)程允許不同入口點(diǎn)在不同位置暫停或開(kāi)始執(zhí)行程序。
協(xié)程和線程有相似點(diǎn),多個(gè)協(xié)程之間與線程一樣,只會(huì)交叉串行執(zhí)行;也有不同點(diǎn),線程之間要頻繁切換,加鎖、解鎖,協(xié)程不需要。
協(xié)程通過(guò)yield暫停生成器,將程序的執(zhí)行流程交給其它子程序,從而實(shí)現(xiàn)不同子程序之間的交替執(zhí)行。
通過(guò)例子演示如何向生成器發(fā)送信息。
def func(n):index = 0while index < n:num = yield index # 這里分成兩部分,yield index將index return給外部程序, num = yield接受外部send的信息并賦值給numif num is None:num = 1index += numf = func(5) print(next(f)) # 0 print(f.send(2)) # 2 print(next(f)) # 3 print(f.send(-1)) # 2yield from語(yǔ)法
從Python3.3才出現(xiàn)的語(yǔ)法。
yield from后面需要添加可迭代對(duì)象(迭代器、生成器當(dāng)然滿足要求)。
# 拼接一個(gè)可迭代對(duì)象 # 使用yield astr = 'ABC'alist = [1, 2, 3]adict = dict(name='kct', age=18)agen = (i for i in range(5))def gen(*args):for item in args:for i in item:yield inew_list = gen(astr, alist, adict, agen)print("use yield:", list(new_list))# 使用yield fromdef gen(*args):for item in args:yield from itemnew_flist = fgen(astr, alist, adict, agen)print("use yield from:", list(new_flist))可以看出,使用yield from可以直接從可迭代對(duì)象中yield所有元素,減少了一個(gè)for循環(huán),代碼更簡(jiǎn)潔,當(dāng)然yield from不止做了這件事。
yield from后可以接生成器,以此形成生成器嵌套,yield from就幫我們處理了各種異常,讓我們只需專心于業(yè)務(wù)代碼即可。
具體講解yield from前先了解幾個(gè)概念:
舉個(gè)例子,實(shí)時(shí)計(jì)算平均值
# 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averagecount += 1total += numaverage = total/count# 委托生成器 def proxy_gen():while True:yield from average_gen()# 調(diào)用函數(shù) def main():get_average = proxy_gen()next(get_average) # 第一次調(diào)用不傳值,讓子生成器開(kāi)始運(yùn)行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20委托生成器的作用是在調(diào)用函數(shù)與子生成器之間建立一個(gè)雙向通信通道,調(diào)用函數(shù)可以send消息給子生成器,子生成器yield值也是直接返回給調(diào)用函數(shù)。
有時(shí)會(huì)在yield from前作賦值操作,這是用于做結(jié)束操作,改造上面的例子。
# 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averageif num is None:breakcount += 1total += numaverage = total/countreturn total, count, average # 當(dāng)協(xié)程結(jié)束時(shí),調(diào)用return# 委托生成器 def proxy_gen():while True:total, count, average = yield from average_gen() # 只有子生成器的協(xié)程結(jié)束了才會(huì)進(jìn)行賦值,后面的語(yǔ)句才會(huì)執(zhí)行print('Count for {} times, Total is {}, Average is {}'.format(count, total, average))# 調(diào)用函數(shù) def main():get_average = proxy_gen()next(get_average) # 第一次調(diào)用不傳值,讓子生成器開(kāi)始運(yùn)行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20get_average.send(None) # 結(jié)束協(xié)程,如果后面再調(diào)用send,將會(huì)另起一協(xié)程為什么不直接調(diào)用子生成器?
yield from做了全面的異常處理。直接調(diào)用子生成器,首先就要處理StopIteration異常,其次若子生成器不是協(xié)程生成器而是迭代器,則會(huì)有其它異常拋出,因此需要知道,委托生成器在這之中扮演著重要角色,不可忽略。
asyncio
asyncio是Python3.4引入的標(biāo)準(zhǔn)庫(kù),直接內(nèi)置對(duì)異步IO的支持。
雖然學(xué)了yield和yield from,但還是不知如何入手去做并發(fā),asyncio則是為了提供這么個(gè)框架來(lái)精簡(jiǎn)復(fù)雜的代碼操作。
如何定義創(chuàng)建協(xié)程
通過(guò)前面學(xué)習(xí),我們知道調(diào)用函數(shù)/委托生成器/子生成器這三劍客中,子生成器就是協(xié)程,那么asyncio如何來(lái)定義創(chuàng)建協(xié)程呢?
asyncio通過(guò)在函數(shù)定義前增加async關(guān)鍵字來(lái)定義協(xié)程對(duì)象,通過(guò)isinstance(obj, Coroutine)即可判斷是否是協(xié)程,這個(gè)協(xié)程類從collections.abc包導(dǎo)入。
我們也知道,生成器是協(xié)程的基礎(chǔ),那么有什么辦法將生成器變成協(xié)程來(lái)使用?
通過(guò)@asyncio.coroutine裝飾器可以標(biāo)記生成器函數(shù)為協(xié)程對(duì)象,但是通過(guò)isinstance(obj, Generator)、isinstance(obj, Coroutine)仍然可以看到,這個(gè)生成器函數(shù)只是被標(biāo)記為協(xié)程了,但其本質(zhì)依然是生成器。
重要概念
協(xié)程工作流程
await和yield
這兩者都能實(shí)現(xiàn)暫停的效果,但功能是不兼容的,在生成器中不能用await,在async定義的協(xié)程中不能用yield。
并且,yield from后可接可迭代對(duì)象、迭代器、生成器、future對(duì)象、協(xié)程對(duì)象,await后只能接future對(duì)象、協(xié)程對(duì)象。
創(chuàng)建future對(duì)象
前面我們知道通過(guò)async可以定義一個(gè)協(xié)程對(duì)象,那么如何創(chuàng)建一個(gè)future對(duì)象呢?
答案是通過(guò)task,只需要?jiǎng)?chuàng)建一個(gè)task對(duì)象即可。
# 在前一個(gè)例子中,我們先創(chuàng)建了事件循環(huán),然后通過(guò)事件循環(huán)創(chuàng)建了task,我們來(lái)測(cè)試下 import asyncio from asyncio.futures import Futureasync def hello(name):print('Hello, ', name)coroutine = hello('World')# 創(chuàng)建事件循環(huán) loop = asyncio.get_event_loop()# 將協(xié)程轉(zhuǎn)換為任務(wù) task = loop.create_task(coroutine)print(isinstance(task, Future)) # 結(jié)果是True# 不建立事件循環(huán)的方法 task = asyncio.ensure_future(coroutine)print(isinstance(task, Future)) # 結(jié)果也是True知道了創(chuàng)建future對(duì)象(也即是創(chuàng)建task對(duì)象)的方法,那么我們驗(yàn)證下await和yield后接coroutine和future對(duì)象。
import sys import asyncioasync def f1():await asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f2():yield from asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)async def f3():await asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f4():yield from asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)tasks = [asyncio.ensure_future(f1()),asyncio.ensure_future(f2()),asyncio.ensure_future(f3()),asyncio.ensure_future(f4()) ]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())loop.close()綁定回調(diào)函數(shù)
異步IO都是在IO高的地方掛起,等IO操作結(jié)束后再繼續(xù)執(zhí)行,大多數(shù)時(shí)候,我們后續(xù)的代碼執(zhí)行都是需要依賴IO的返回值的,此時(shí)就要用到回調(diào)了。
回調(diào)的實(shí)現(xiàn)有兩種方式。
第一種,利用同步編程實(shí)現(xiàn)的回調(diào)
這種方法要求我們能夠取得協(xié)程的await的返回值。通過(guò)task對(duì)象的result()方法可以獲得返回結(jié)果。
import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)# 直接通過(guò)task獲取任務(wù)結(jié)果 print('Result: {}'.format(task.result()))第二種,通過(guò)asyncio自帶的添加回調(diào)函數(shù)功能實(shí)現(xiàn)
import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)def callback(future):print('Result: {}'.format(future.result()))coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)# 添加回調(diào)函數(shù) task.add_done_callback(callback)loop.run_until_complete(task)協(xié)程中的并發(fā)
asyncio實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來(lái)完成任務(wù),前面做await和yield的驗(yàn)證時(shí)就用了并發(fā)。
每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作。
第一步,創(chuàng)建多個(gè)協(xié)程的列表
# 協(xié)程函數(shù) async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 協(xié)程對(duì)象 c1 = worker(1) c2 = worker(2) c3 = worker(4)# 協(xié)程轉(zhuǎn)換為task tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]loop = asyncio.get_event_loop()第二步,將列表注冊(cè)到事件循環(huán)中
有兩種方法,這兩種方法的區(qū)別后面說(shuō)。
return的結(jié)果可以通過(guò)task.result()查看。
# asyncio.wait() loop.run_until_complete(asyncio.wait(tasks))# asyncio.gather() loop.run_until_complete(asyncio.gather(*tasks)) # *不能省略# 查看結(jié)果 for task in tasks:print('Result: {}'.format(task.result()))協(xié)程中的嵌套
使用async可以定義協(xié)程,協(xié)程用于耗時(shí)的IO操作,我們也可以封裝更多的IO操作過(guò)程,實(shí)現(xiàn)一個(gè)協(xié)程中await另一個(gè)協(xié)程,實(shí)現(xiàn)協(xié)程的嵌套。
# 內(nèi)部協(xié)程函數(shù) async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 外部協(xié)程函數(shù) async def main():c1 = worker(1)c2 = worker(2)c3 = worker(4)tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]dones, pendings = await asyncio.wait(tasks)for task in tasks:print('Result: {}'.format(task.result()))loop = asyncio.get_event_loop() loop.run_until_complete(main())如果外部協(xié)程使用的asyncio.gather(),那么作如下替換。
results = await asyncio.gather(*tasks)for result in results:print('Result: {}'.format(result))協(xié)程中的狀態(tài)
講生成器時(shí)提到了四種狀態(tài),對(duì)協(xié)程我們也了解一下其狀態(tài)(準(zhǔn)確地說(shuō)是future/task對(duì)象的狀態(tài))。
gather和wait
接收的參數(shù)不同
wait接收的tasks,必須是一個(gè)list對(duì)象,該list對(duì)象中存放多個(gè)task,既可以通過(guò)asyncio.ensure_future轉(zhuǎn)為task對(duì)象也可以不轉(zhuǎn)。
gather也可以接收l(shuí)ist對(duì)象,但*不能省,也可以直接將多個(gè)task作為可變長(zhǎng)參數(shù)傳入,參數(shù)可以是協(xié)程對(duì)象或future對(duì)象。
返回結(jié)果不同
wait返回dones和pendings,前者表示已完成的任務(wù),后者表示未完成的任務(wù),需要通過(guò)task.result()手工獲取結(jié)果。
gather直接將值返回。
協(xié)程控制功能
# FIRST_COMPLETED:完成第一個(gè)任務(wù)就返回 # FIRST_EXCEPTION:產(chǎn)生第一個(gè)異常就返回 # ALL_COMPLETED:所有任務(wù)完成再返回(默認(rèn)選項(xiàng)) dones, pendings = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))# 控制運(yùn)行時(shí)間:1秒后返回 dones, pendings = loop.run_until_complete(asyncio.wait(tasks, timeout=1))動(dòng)態(tài)添加協(xié)程
在asyncio中如何動(dòng)態(tài)添加協(xié)程到事件循環(huán)中?
兩種方法,一種是同步的,一種是異步的。
import time import asyncio from queue import Queue from threading import Thread# 在后臺(tái)永遠(yuǎn)運(yùn)行的事件循環(huán) def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def do_sleep(x, queue, msg=""):time.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動(dòng)態(tài)添加兩個(gè)協(xié)程 # 這種方法在主線程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First') new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second')while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime()) import time import asyncio from queue import Queue from threading import Thread# 在后臺(tái)永遠(yuǎn)運(yùn)行的事件循環(huán) def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_sleep(x, queue, msg=""):await asyncio.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動(dòng)態(tài)添加兩個(gè)協(xié)程 # 這種方法在主線程是異步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop)while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime())轉(zhuǎn)載于:https://www.cnblogs.com/ikct2017/p/9534557.html
總結(jié)
以上是生活随笔為你收集整理的asyncio协程与并发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 0821-0823
- 下一篇: 关于js渲染网页时爬取数据的思路和全过程