日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

asyncio协程与并发

發布時間:2024/1/17 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 asyncio协程与并发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

并發編程

Python的并發實現有三種方法。

  • 多線程
  • 多進程
  • 協程(生成器)
  • 基本概念

    串行:同時只能執行單個任務
    并行:同時執行多個任務

    在Python中,雖然嚴格說來多線程與協程都是串行的,但其效率高,在遇到阻塞時會將阻塞任務交給系統執行,通過合理調度任務,使得程序高效。

    最高效的當然是多進程了,但由于多進程依賴硬件配置,并且當任務量超過CPU核心數時,多進程會有進程上下文切換開銷,而這個開銷很大,所以不是最佳解決方案。

    常見耗時場景

  • CPU計算密集型
  • 磁盤IO密集型
  • 網絡IO密集型
  • CPU計算密集型

    多線程對比單線程,由于GIL的存在,切換線程需要不斷加鎖、釋放鎖,效率反而更低;多進程相當于多個CPU同時工作,因此效率很高。

    IO密集型

    IO密集型可以是磁盤IO、網絡IO、數據庫IO等,都屬于計算量小,IO等待浪費高。越是IO等待時間長,則多線程的優勢相比單線程越明顯,多進程效率高但依賴配置資源。

    結論

    單線程總是最慢的,多線程適合在IO密集型場景使用,多進程適合CPU計算要求高的場景下使用,多進程雖然總是最快的,但需要CPU資源支持。

    多線程

    Python創建多線程有兩種方法。

  • 函數
  • 用函數創建多線程

    from threading import Threaddef func():for i in range(2):print('Hello world!')sleep(1)th1 = Thread(target=func) th1.start() th2 = Thread(target=func) th2.start()

    用類創建多線程

    這個類必須繼承Thread,必須重載run()方法

    from threading import Threadclass MyThread(Thread):def __init__(self):super().__init__()self.name = 'Bob'def run(self):for i in range(2):print('Hello world!')sleep(1)th1 = MyThread() th2 = MyThread()th1.start() th2.start()

    常用方法

    • threading.Thread(target=func, args=())
      • start() # 啟動子線程
      • join() # 阻塞子線程
      • is_alive()/isAlive() # 判斷線程執行狀態,正在執行返回True,否則False
      • daemon # 設置線程是否隨主線程退出而退出,默認False
      • name # 設置線程名

    線程鎖

    import threadinglock = threading.Lock() # 生成鎖,全局唯一lock.acquire() # 加鎖lock.release() # 釋放鎖

    加鎖與解鎖必須成對出現,或者使用上下文管理器with來管理鎖。

    可重入鎖

    在Redis分布式鎖中提到過,用于讓非阻塞線程重復獲得鎖來發送或讀取數據,這里的可重入鎖僅指讓同一線程可以多次獲取鎖。

    import threadingrlock = threading.RLock() # 生成可重入鎖

    死鎖

    死鎖通常有兩種。

  • 同一線程內嵌套獲取同一把鎖,造成死鎖(解決方案是用可重入鎖)
  • 多個線程不按順序同時獲得多個鎖,造成死鎖(解決方案一是靠編程人員人工識別,二是對鎖排序)
  • GIL全局鎖

    多進程是真正的并行,而多線程是偽并行,實際是多個線程交替執行。

    遇到GIL影響性能的情況,要么考慮用多進程替代多線程,要么更換Python解釋器。

    線程通信

    常用線程通信方法。

  • threading.Event
  • threading.Condition
  • queue.Queue
  • Event事件

    import threadingevent = threading.Event()event.clear() # 重置event,使所有該event事件都處于待命狀態event.wait() # 等待接收event指令,決定是否阻塞程序執行evnet.set() # 發送event指令,所有該event事件的線程開始執行 import timeimport threadingclass MyThread(threading.Thread):def __init__(self, name, event):super().__init__()self.name = nameself.event = eventdef run(self):self.event.wait() # 等待event.set()才能執行下去time.sleep(1)print('{} Done'.format(self.name))threads = [] event = threading.Event()for i in range(5):threads.append(MyThread(event))event.clear() # 重置event,使event.wait()生效for t in threads:t.start()print('Waiting 3s') time.sleep(3)print('Awake all threads') event.set() # 發送event指令,所有綁定了event的線程開始執行

    所有線程在調用start()方法后并不會執行完,而是在event.wait()處停住了,需要發送event.set()指令才能繼續執行。

    Condition狀態

    import threadingcond = threading.Condition()cond.acquire()cond.release()cond.wait() # 等待指令觸發,同時臨時釋放鎖,直到調用notify才重新占有鎖cond.notify() # 發送指令

    Condition與Event很類似,不過由于wait()與notify()可以反復調用,因此一般作為編程人員可控調用鎖來使用,放在run()方法下。

    Queue隊列

    隊列是線程安全的,通過put()和get()方法來操作隊列。

    from queue import Queueq = Queue(maxsize=0) # 設置0表示無限長隊列q.get(timeout=0.5) # 阻塞程序,等待隊列消息,可以設置超時時間q.put() # 發送消息q.join() # 等待所有消息被消費完# 不常用但要了解的方法 q.qsize() # 返回消息個數 q.empty() # 返回bool值,隊列是否空 q.full() # 返回bool值,隊列是否滿

    Queue是FIFO隊列,還有queue.LifoQueue,queue.PriorityQueue。

    線程隔離

    兩個線程的變量不能被相互訪問。

    通常使用threading.local類來實現,該類的實例是一個字典型對象,直接通過key-value形式存入變量,如threading.local().name = 'bob'。

    如果想要實現一個線程內的全局變量或實現線程間的信息隔離,就使用local類。

    線程池

    多線程并不是越多越好,因為在切換線程時會切換上下文環境(當然相比多進程的開銷要小的多),在量大時依然會造成CPU的開銷。

    因此出現了線程池的概念,即預先創建好合適數量的線程,使任務能立刻使用。

    通過concurrent.futures庫的ThreadPoolExecutor類來實現。

    import time import threading from concurrent.futures import ThreadPoolExecutordef target():for i in range(5):print('{}-{}\n'.format(threading.get_ident(), i)time.sleep(1)pool = ThreadPoolExecutor(5) # 線程池數量限制為5for i in range(100):pool.submit(target) # 往線程中提交并運行

    協程

    學習協程,要先理解生成器,因為Python的協程是從生成器中誕生并演變到現在這個樣子的。

    可迭代、迭代器、生成器

    可迭代對象,其類或元類都實現了__iter__()方法,而該方法返回一個對象支持迭代,既可以是string/list/tuple/dict等內置類型的對象,也可以是自己寫的對象(這個對象的類實現了遍歷元素的__iter__方法)。

    迭代器對象,可迭代對象是迭代器的基礎,迭代器只是比可迭代對象多了一個__next__()方法,這個方法讓我們可以不再用for循環來獲取元素。

    生成器對象,在迭代器的基礎上,實現了yield,相當于函數中的return,在每次for循環遍歷或調用next()時,都會返回一個值并阻塞等待下一次調用。

    可迭代對象、迭代器都是將所有元素放在內存里,而生成器則是需要時臨時生成元素,所以生成器節省時間、空間。

    如何運行/激活生成器

    兩個方法。

  • next()
  • send(None)
  • 這兩個方法是等價的,但由于send方法可以傳值進去,所以在協程中大有用處。

    生成器的執行狀態

    通過inspect庫的getgeneratorstate方法獲取狀態信息。

  • GEN_CREATED 等待開始執行
  • GEN_RUNNING 解釋器正在執行(只有多線程中才能看到)
  • GEN_SUSPENDED 在yield表達式處暫停
  • GEN_CLOSED 執行結束
  • 生成器的異常

    StopIteration

    從生成器過渡到協程:yield

    生成器引入了函數暫停執行(yield)功能,后來又引入了向暫停的生成器發送信息的功能(send),并以此催生了協程。

    協程是為非搶占式多任務產生子程序的計算機程序組件,協程允許不同入口點在不同位置暫停或開始執行程序。

    協程和線程有相似點,多個協程之間與線程一樣,只會交叉串行執行;也有不同點,線程之間要頻繁切換,加鎖、解鎖,協程不需要。

    協程通過yield暫停生成器,將程序的執行流程交給其它子程序,從而實現不同子程序之間的交替執行。

    通過例子演示如何向生成器發送信息。

    def func(n):index = 0while index < n:num = yield index # 這里分成兩部分,yield index將index return給外部程序, num = yield接受外部send的信息并賦值給numif num is None:num = 1index += numf = func(5) print(next(f)) # 0 print(f.send(2)) # 2 print(next(f)) # 3 print(f.send(-1)) # 2

    yield from語法

    從Python3.3才出現的語法。

    yield from后面需要添加可迭代對象(迭代器、生成器當然滿足要求)。

    # 拼接一個可迭代對象 # 使用yield astr = 'ABC'alist = [1, 2, 3]adict = dict(name='kct', age=18)agen = (i for i in range(5))def gen(*args):for item in args:for i in item:yield inew_list = gen(astr, alist, adict, agen)print("use yield:", list(new_list))# 使用yield fromdef gen(*args):for item in args:yield from itemnew_flist = fgen(astr, alist, adict, agen)print("use yield from:", list(new_flist))

    可以看出,使用yield from可以直接從可迭代對象中yield所有元素,減少了一個for循環,代碼更簡潔,當然yield from不止做了這件事。

    yield from后可以接生成器,以此形成生成器嵌套,yield from就幫我們處理了各種異常,讓我們只需專心于業務代碼即可。

    具體講解yield from前先了解幾個概念:

  • 調用函數:調用委托生成器的代碼
  • 委托生成器:包含yield from表達式的生成器函數
  • 子生成器:yield from后接的生成器函數
  • 舉個例子,實時計算平均值

    # 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averagecount += 1total += numaverage = total/count# 委托生成器 def proxy_gen():while True:yield from average_gen()# 調用函數 def main():get_average = proxy_gen()next(get_average) # 第一次調用不傳值,讓子生成器開始運行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20

    委托生成器的作用是在調用函數與子生成器之間建立一個雙向通信通道,調用函數可以send消息給子生成器,子生成器yield值也是直接返回給調用函數。

    有時會在yield from前作賦值操作,這是用于做結束操作,改造上面的例子。

    # 子生成器 def average_gen():total = 0count = 0average = 0while True:num = yield averageif num is None:breakcount += 1total += numaverage = total/countreturn total, count, average # 當協程結束時,調用return# 委托生成器 def proxy_gen():while True:total, count, average = yield from average_gen() # 只有子生成器的協程結束了才會進行賦值,后面的語句才會執行print('Count for {} times, Total is {}, Average is {}'.format(count, total, average))# 調用函數 def main():get_average = proxy_gen()next(get_average) # 第一次調用不傳值,讓子生成器開始運行print(get_average.send(10)) # 10print(get_average.send(20)) # 15print(get_average.send(30)) # 20get_average.send(None) # 結束協程,如果后面再調用send,將會另起一協程

    為什么不直接調用子生成器?

    yield from做了全面的異常處理。直接調用子生成器,首先就要處理StopIteration異常,其次若子生成器不是協程生成器而是迭代器,則會有其它異常拋出,因此需要知道,委托生成器在這之中扮演著重要角色,不可忽略。

    asyncio

    asyncio是Python3.4引入的標準庫,直接內置對異步IO的支持。

    雖然學了yield和yield from,但還是不知如何入手去做并發,asyncio則是為了提供這么個框架來精簡復雜的代碼操作。

    如何定義創建協程

    通過前面學習,我們知道調用函數/委托生成器/子生成器這三劍客中,子生成器就是協程,那么asyncio如何來定義創建協程呢?

    asyncio通過在函數定義前增加async關鍵字來定義協程對象,通過isinstance(obj, Coroutine)即可判斷是否是協程,這個協程類從collections.abc包導入。

    我們也知道,生成器是協程的基礎,那么有什么辦法將生成器變成協程來使用?

    通過@asyncio.coroutine裝飾器可以標記生成器函數為協程對象,但是通過isinstance(obj, Generator)、isinstance(obj, Coroutine)仍然可以看到,這個生成器函數只是被標記為協程了,但其本質依然是生成器。

    重要概念

  • event_loop事件循環,將協程注冊到時間循環中,當滿足事件發生時調用相應的協程函數;
  • coroutine協程,一個使用async關鍵字定義的函數,調用它不會立即執行函數,而是返回一個協程對象,這個協程對象需要注冊到事件循環中,由事件循環調用;
  • future對象,代表將來執行或沒有執行的任務的結果,和task沒有本質區別;
  • task任務,一個協程對象就是一個原生可以掛起的函數,任務則是對協程的進一步包裝,其中包含任務的各種狀態,task對象是future的子類,它將coroutine和future聯系在一起,將coroutine封裝成一個future對象;
  • async/await關鍵字,async定義一個協程,await用于掛起阻塞的異步調用接口(作用類似yield但不完全是)。
  • 協程工作流程

  • 定義/創建協程對象
  • 將協程轉換為task
  • 定義事件循環容器
  • 把task任務扔進事件循環中并觸發
  • import asyncioasync def hello(name):print('Hello, ', name)coroutine = hello('World')# 創建事件循環 loop = asyncio.get_event_loop()# 將協程轉換為任務 task = loop.create_task(coroutine)# 將任務放入事件循環對象中觸發 loop.run_until_complete(task)

    await和yield

    這兩者都能實現暫停的效果,但功能是不兼容的,在生成器中不能用await,在async定義的協程中不能用yield。

    并且,yield from后可接可迭代對象、迭代器、生成器、future對象、協程對象,await后只能接future對象、協程對象。

    創建future對象

    前面我們知道通過async可以定義一個協程對象,那么如何創建一個future對象呢?

    答案是通過task,只需要創建一個task對象即可。

    # 在前一個例子中,我們先創建了事件循環,然后通過事件循環創建了task,我們來測試下 import asyncio from asyncio.futures import Futureasync def hello(name):print('Hello, ', name)coroutine = hello('World')# 創建事件循環 loop = asyncio.get_event_loop()# 將協程轉換為任務 task = loop.create_task(coroutine)print(isinstance(task, Future)) # 結果是True# 不建立事件循環的方法 task = asyncio.ensure_future(coroutine)print(isinstance(task, Future)) # 結果也是True

    知道了創建future對象(也即是創建task對象)的方法,那么我們驗證下await和yield后接coroutine和future對象。

    import sys import asyncioasync def f1():await asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f2():yield from asyncio.sleep(2)return 'Hello, {}'.format(sys._getframe().f_code.co_name)async def f3():await asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)@asyncio.coroutine def f4():yield from asyncio.ensure_future(asyncio.sleep(2))return 'Hello, {}'.format(sys._getframe().f_code.co_name)tasks = [asyncio.ensure_future(f1()),asyncio.ensure_future(f2()),asyncio.ensure_future(f3()),asyncio.ensure_future(f4()) ]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())loop.close()

    綁定回調函數

    異步IO都是在IO高的地方掛起,等IO操作結束后再繼續執行,大多數時候,我們后續的代碼執行都是需要依賴IO的返回值的,此時就要用到回調了。

    回調的實現有兩種方式。

    第一種,利用同步編程實現的回調

    這種方法要求我們能夠取得協程的await的返回值。通過task對象的result()方法可以獲得返回結果。

    import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)# 直接通過task獲取任務結果 print('Result: {}'.format(task.result()))

    第二種,通過asyncio自帶的添加回調函數功能實現

    import time import asyncioasync def _sleep(x):time.sleep(x)return 'Stopped {} seconds!'.format(x)def callback(future):print('Result: {}'.format(future.result()))coroutine = _sleep(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)# 添加回調函數 task.add_done_callback(callback)loop.run_until_complete(task)

    協程中的并發

    asyncio實現并發,就需要多個協程來完成任務,前面做await和yield的驗證時就用了并發。

    每當有任務阻塞的時候就await,然后其他協程繼續工作。

    第一步,創建多個協程的列表

    # 協程函數 async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 協程對象 c1 = worker(1) c2 = worker(2) c3 = worker(4)# 協程轉換為task tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]loop = asyncio.get_event_loop()

    第二步,將列表注冊到事件循環中

    有兩種方法,這兩種方法的區別后面說。

    return的結果可以通過task.result()查看。

    # asyncio.wait() loop.run_until_complete(asyncio.wait(tasks))# asyncio.gather() loop.run_until_complete(asyncio.gather(*tasks)) # *不能省略# 查看結果 for task in tasks:print('Result: {}'.format(task.result()))

    協程中的嵌套

    使用async可以定義協程,協程用于耗時的IO操作,我們也可以封裝更多的IO操作過程,實現一個協程中await另一個協程,實現協程的嵌套。

    # 內部協程函數 async def worker(n):print('Waiting: {}'.format(n))await asyncio.sleep(n)return 'Done {}'.format(n)# 外部協程函數 async def main():c1 = worker(1)c2 = worker(2)c3 = worker(4)tasks = [asyncio.ensure_future(c1),asyncio.ensure_future(c2),asyncio.ensure_future(c3)]dones, pendings = await asyncio.wait(tasks)for task in tasks:print('Result: {}'.format(task.result()))loop = asyncio.get_event_loop() loop.run_until_complete(main())

    如果外部協程使用的asyncio.gather(),那么作如下替換。

    results = await asyncio.gather(*tasks)for result in results:print('Result: {}'.format(result))

    協程中的狀態

    講生成器時提到了四種狀態,對協程我們也了解一下其狀態(準確地說是future/task對象的狀態)。

  • Pending:創建Future,還未執行
  • Running:事件循環正在調用執行任務
  • Done:任務執行完畢
  • Cancelled:Task被取消后的狀態
  • gather和wait

    接收的參數不同

    wait接收的tasks,必須是一個list對象,該list對象中存放多個task,既可以通過asyncio.ensure_future轉為task對象也可以不轉。

    gather也可以接收list對象,但*不能省,也可以直接將多個task作為可變長參數傳入,參數可以是協程對象或future對象。

    返回結果不同

    wait返回dones和pendings,前者表示已完成的任務,后者表示未完成的任務,需要通過task.result()手工獲取結果。

    gather直接將值返回。

    協程控制功能

    # FIRST_COMPLETED:完成第一個任務就返回 # FIRST_EXCEPTION:產生第一個異常就返回 # ALL_COMPLETED:所有任務完成再返回(默認選項) dones, pendings = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))# 控制運行時間:1秒后返回 dones, pendings = loop.run_until_complete(asyncio.wait(tasks, timeout=1))

    動態添加協程

    在asyncio中如何動態添加協程到事件循環中?

    兩種方法,一種是同步的,一種是異步的。

    import time import asyncio from queue import Queue from threading import Thread# 在后臺永遠運行的事件循環 def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def do_sleep(x, queue, msg=""):time.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動態添加兩個協程 # 這種方法在主線程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First') new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second')while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime()) import time import asyncio from queue import Queue from threading import Thread# 在后臺永遠運行的事件循環 def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_sleep(x, queue, msg=""):await asyncio.sleep(x)queue.put(msg)queue = Queue()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,)) t.start()print(time.ctime())# 動態添加兩個協程 # 這種方法在主線程是異步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop)while True:msg = queue.get()print('{} is done'.format(msg))print(time.ctime())

    轉載于:https://www.cnblogs.com/ikct2017/p/9534557.html

    總結

    以上是生活随笔為你收集整理的asyncio协程与并发的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。