日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。

發布時間:2024/7/23 python 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

From:https://www.cnblogs.com/weihengblog/p/9812110.html

concurrent.futures 官方文檔:https://docs.python.org/3/library/concurrent.futures.html

concurrent.futures: 線程池, 讓你更加高效, 并發的處理任務:https://www.h3399.cn/201906/703751.html

python 因為其全局解釋器鎖 GIL 而無法通過線程實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,

IO 密集型 vs?計算密集型:

  • ?IO密集型:讀取文件,讀取網絡套接字頻繁。
  • ?計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這里說的平行計算。
  • 而 concurrent.futures 模塊,可以利用 multiprocessing 實現真正的平行計算。

    核心原理是:concurrent.futures 會以子進程的形式,平行的運行多個 python 解釋器,從而令 python 程序可以利用多核 CPU 來提升執行速度。由于 子進程 與 主解釋器 相分離,所以他們的全局解釋器鎖也是相互獨立的。每個子進程都能夠完整的使用一個CPU 內核。

    解釋 2:concurrent.futures 中的 ProcessPoolExecutor類把工作分配給多個Python進程處理,因此,如果需要做CPU密集型處理,使用這個模塊能繞開GIL,利用所有的CPU核心。
    其原理是一個ProcessPoolExecutor創建了N個獨立的Python解釋器,N是系統上面可用的CPU核數。使用方法和ThreadPoolExecutor方法一樣

    Python:使用 Future、asyncio 處理并發

    :https://blog.csdn.net/sinat_38682860/article/details/105419842

    future 初始 -- 處理并發:https://www.cnblogs.com/zhaof/p/7679529.html

    從Python3.2開始,標準庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。

    相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:

  • 主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。
  • 當一個線程完成的時候,主線程能夠立即知道。
  • 讓多線程和多進程的編碼接口一致。
  • Python 模塊 - Concurrent.futures

    從 Python3.2開始,Python?標準庫提供了 concurrent.futures?模塊,為開發人員提供了啟動異步任務的高級接口。 它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 兩個類,實現了對 threading 和 multiprocessing 的更高級的抽象,對編寫 線程池/進程池 提供了直接的支持。?可以將相應的 tasks 直接放入線程池/進程池,不需要維護Queue來操心死鎖的問題線程池/進程池會自動幫我們調度。

    Future總結

    1. python3自帶,python2需要安裝2. Executer對象它是一個抽象類,它提供了異步執行的方法,他不能直接使用,但可以通過它的子類ThreadPoolExecuter和ProcessPoolExecuter2.1 Executer.submit(fn, *args, **kwargs)fn: 需要異步執行的函數*args,**kwargs fn 接受的參數該方法的作用就是提交一個可執行的回調 task,它返回一個 Future 對象2.2 map(fn, *iterables, timeout=None, chunksize=1)map(task,URLS) # 返回一個 map()迭代器,這個迭代器中的回調執行返回的結果是有序的3. Future對象相關future可以理解為一個在未來完成的操作,這是異步編程的基礎通常情況下我們在遇到IO操作的時候,將會發生阻塞,cpu不能做其他事情而future的引入幫助我們在這段等待時間可以完成其他的操作3.1 done():如果當前線程已取消/已成功,返回True。3.2 cance():如果當前線程正在執行,并且不能取消調用,返回Flase。否則調用取消,返回True3.3 running():如果當前的線程正在執行,則返回True3.4 result():返回調用返回的值,如果調用尚未完成,則此方法等待如果等待超時,會拋出concurrent.futures.TimeoutError如果沒有指定超時時間,則等待無時間限制如果在完成之前,取消了Future,則會引發CancelledError4. as_completed():在多個Future實例上的迭代器將會被返回這些Future實例由fs完成時產生。由fs返回的任何重復的Future,都會被返回一次。里面保存的都是已經執行完成的Future對象5. wait():返回一個元祖,元祖包含兩個元素1. 已完成的future集合2. 未完成的future集合

    初體驗:

    # coding=utf-8 from concurrent import futures from concurrent.futures import Future import timedef return_future(msg):time.sleep(3)return msgpool = futures.ThreadPoolExecutor(max_workers=2)t1 = pool.submit(return_future,'hello') t2 = pool.submit(return_future,'world')time.sleep(3) print(t1.done()) # 如果順利完成,則返回True time.sleep(3) print(t2.done())print(t1.result()) # 獲取future的返回值 time.sleep(3) print(t2.result())print("主線程")

    map(func,* iterables,timeout = None,chunksize = 1 )

    # coding=utf-8import time from concurrent.futures import Future,as_completed from concurrent.futures import ThreadPoolExecutor as Pool import requests import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)pool = Pool() result = pool.map(task,URLS)start_time = time.time()# 按照 URLS 的順序返回 for res in result:print("{} {}".format(res.url,len(res.content)))# 無序的 with Pool(max_workers=3) as executer:future_task = [executer.submit(task,url) for url in URLS]for f in as_completed(future_task):if f.done():f_ret = f.result() # f.result()得到task的返回值,requests對象print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))print("耗時", time.time() - start_time) print("主線程")

    Future對象

    Future可以理解為一個未來完成的操作
    當我們執行io操作的時候,在等待返回結果之前會產生阻塞
    cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作

    from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)# start_time = time.time() # for url in URLS: # ret = task(url) # print("{} {}".format(ret.url,len(ret.content))) # print("耗時",time.time() - start_time) with Pool(max_workers=3) as executor:# 創建future任務future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)""" url不是按照順序返回的,說明并發時,當訪問某一個url時,如果沒有得到返回結果,不會發生阻塞 <Future at 0x1c63990e6d8 state=running> is running <Future at 0x1c639922780 state=running> is running <Future at 0x1c639922d30 state=running> is running <Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101 <Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103 """

    模塊方法

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

    wait() 會返回一個tuple,tuple 會包含兩個集合:已完成的集合?和 未完成的集合。使用 wait() 會獲得更大的自由度,他接受三個參數:FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETE默認為 ALL_COMPLETE

    如果采用默認的 ALL_COMPLETED,程序會阻塞直到線程池里面的所有任務都完成,再執行主線程:

    from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed, wait import requestsURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url, timeout=10):r = requests.get(url=url, timeout=timeout)print(r.status_code)with Pool(max_workers=3) as execute:future_task = [execute.submit(task, url) for url in URLS]for f in future_task:if f.running():print("%s" % (str(f)))"""并且wait還有timeout和return_when兩個參數return_when有三個常量 (默認是 ALL_COMPLETED)FIRST_COMPLETED 任何一個future_task執行完成時/取消時,改函數返回FIRST_EXCEPTION 任何一個future_task發生異常時,該函數返回,如果沒有異常發生,等同于ALL_COMPLETED ALL_COMPLETED 當所有的future_task執行完畢返回。"""results = wait(future_task, return_when="FIRST_COMPLETED") #done = results[0]for d in done:print(d)

    concurrent.futures.as_completed(fs, timeout=None)

    在多個 Future 實例上的迭代器將會被返回,這些 Future 實例由 fs 完成時產生。由 fs 返回的任何重復的 Future,都會被返回一次。里面保存的都是已經執行完成的 Future 對象。

    from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import timeURLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']def task(url,timeout=10):return requests.get(url=url,timeout=timeout)with Pool(max_workers=3) as executor:# 創建future任務future_task = [executor.submit(task,url) for url in URLS]for f in future_task:if f.running():print("%s is running"%str(f))for f in as_completed(future_task):try:ret = f.done()if ret:f_ret = f.result()print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))except Exception as e:f.cance()print(e)

    下面我們將學習?concurrent.futures?模塊中的類。concurrent.futures 基礎模塊是 executor 和 future。

    使用示例代碼:

    # -*- coding:utf-8 -*-import redis from redis import WatchError from concurrent.futures import ProcessPoolExecutorr = redis.Redis(host='127.0.0.1', port=6379)# 減庫存函數, 循環直到減庫存完成 # 庫存充足, 減庫存成功, 返回True # 庫存不足, 減庫存失敗, 返回Falsedef reduce_stock():# python中redis事務是通過pipeline的封裝實現的with r.pipeline() as pipe:while True:try:# watch庫存鍵, multi后如果該key被其他客戶端改變, 事務操作會拋出WatchError異常pipe.watch('stock:count')count = int(pipe.get('stock:count'))if count > 0: # 有庫存# 事務開始pipe.multi()pipe.decr('stock:count')# 把命令推送過去# execute返回命令執行結果列表, 這里只有一個decr返回當前值print(pipe.execute()[0])return Trueelse:return Falseexcept WatchError as ex:# 打印WatchError異常, 觀察被watch鎖住的情況print(ex)pipe.unwatch()def worker():while True:# 沒有庫存就退出if not reduce_stock():breakif __name__ == "__main__":# 設置庫存為100r.set("stock:count", 100)# 多進程模擬多個客戶端提交with ProcessPoolExecutor() as pool:for _ in range(10):pool.submit(worker)

    concurrent.futures 模塊詳解

    1. Executor對象

    class?concurrent.futures.Executor

    Executor 是一個抽象類,它提供了異步執行調用的方法。它不能直接使用,但可以通過它的兩個子類 ThreadPoolExecutor 或者 ProcessPoolExecutor 進行調用。

    1.1 Executor.submit(fn, *args, **kwargs)

    fn:需要異步執行的函數
    *args, **kwargs:fn 的參數

    示例代碼:

    # -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numwith futures.ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(test, 1)print(future.result())

    線程池的基本使用

    # coding: utf-8 from concurrent.futures import ThreadPoolExecutor import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t: # 創建一個最大容納數量為5的線程池task1 = t.submit(spider, 1)task2 = t.submit(spider, 2) # 通過submit提交執行的函數到線程池中task3 = t.submit(spider, 3)print(f"task1: {task1.done()}") # 通過done來判斷線程是否完成print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")time.sleep(2.5)print(f"task1: {task1.done()}")print(f"task2: {task2.done()}")print(f"task3: {task3.done()}")print(task1.result()) # 通過result來獲取返回值
  • 使用 with 語句 ,通過 ThreadPoolExecutor 構造實例,同時傳入 max_workers 參數來設置線程池中最多能同時運行的線程數目。

  • 使用 submit 函數來提交線程需要執行的任務到線程池中,并返回該任務的句柄(類似于文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。

  • 通過使用 done() 方法判斷該任務是否結束。上面的例子可以看出,提交任務后立即判斷任務狀態,顯示四個任務都未完成。在延時2.5后,task1 和 task2 執行完畢,task3 仍在執行中。

  • 使用 result() 方法可以獲取任務的返回值。

  • import time import random from concurrent.futures import ThreadPoolExecutor, waitdef func_test(int_1, int_2):sleep_second = random.randint(int_1, int_2)print(f'睡眠時間 {sleep_second}')time.sleep(t)passdef main():with ThreadPoolExecutor(max_workers=100) as tp_executor:future_task = [tp_executor.submit(func_test, 2, 5) for _ in range(100)]wait(future_task)if __name__ == '__main__':main()pass

    wait(fs, timeout=None, return_when=ALL_COMPLETED)
    wait 接受三個參數:
    ? ? ? ? fs: 表示需要執行的序列
    ? ? ? ? timeout: 等待的最大時間,如果超過這個時間即使線程未執行完成也將返回
    ? ? ? ? return_when:表示wait返回結果的條件,默認為 ALL_COMPLETED 全部執行完成再返回

    示例代碼:

    from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagewith ThreadPoolExecutor(max_workers=5) as t:all_task = [t.submit(spider, page) for page in range(1, 5)]wait(all_task, return_when=FIRST_COMPLETED)print('finished')print(wait(all_task, timeout=2.5))
  • 代碼中返回的條件是:當完成第一個任務的時候,就停止等待,繼續主線程任務
  • 由于設置了延時, 可以看到最后只有 task4 還在運行中
  • as_completed

    上面雖然提供了判斷任務是否結束的方法,但是不能在主線程中一直判斷啊。最好的方法是當某個任務結束了,就給主線程返回結果,而不是一直判斷每個任務是否結束。

    ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子線程中的任務執行完后,直接用 result() 獲取返回結果

    # coding: utf-8 from concurrent.futures import ThreadPoolExecutor, as_completed import timedef spider(page):time.sleep(page)print(f"crawl task{page} finished")return pagedef main():with ThreadPoolExecutor(max_workers=5) as t:obj_list = []for page in range(1, 5):obj = t.submit(spider, page)obj_list.append(obj)for future in as_completed(obj_list):data = future.result()print(f"main: {data}")

    as_completed() 方法是一個生成器,在沒有任務完成的時候,會一直阻塞,除非設置了 timeout。

    當有某個任務完成的時候,會 yield 這個任務,就能執行 for 循環下面的語句,然后繼續阻塞住,循環到所有的任務結束。同時,先完成的任務會先返回給主線程

    map(fn, *iterables, timeout=None)
    ? ? fn: 第一個參數 fn 是需要線程執行的函數;
    ? ? iterables:第二個參數接受一個可迭代對象;
    ? ? timeout: 第三個參數 timeout 跟 wait() 的 timeout 一樣,
    ?? ? ? ? ? ? ?但由于 map 是返回線程執行的結果,
    ?? ??? ??? ? ?如果 timeout小于線程執行時間會拋異常 TimeoutError。

    用法如下:

    import time from concurrent.futures import ThreadPoolExecutordef spider(page):time.sleep(page)return pagestart = time.time() executor = ThreadPoolExecutor(max_workers=4)i = 1 for result in executor.map(spider, [2, 3, 1, 4]):print("task{}:{}".format(i, result))i += 1

    使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數 map 的含義相同,都是將序列中的每個元素都執行同一個函數。

    上面的代碼對列表中的每個元素都執行 spider() 函數,并分配各線程池。

    可以看到執行結果與上面的 as_completed() 方法的結果不同,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先打印前面提交的任務返回的結果。

    1.2 Executor.map(func, *iterables, timeout=None)

    相當于map(func, *iterables),但是func是異步執行。timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數,則不設置超時間。

    func:需要異步執行的函數
    *iterables:可迭代對象,如列表等。每一次func執行,都會從iterables中取參數。
    timeout:設置每次異步操作的超時時間

    示例代碼:

    # -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numdata = [1, 2, 3] with futures.ThreadPoolExecutor(max_workers=1) as executor:for future in executor.map(test, data):print(future)

    1.3 Executor.shutdown(wait=True)

    釋放系統資源,在Executor.submit()或 Executor.map()等異步操作后調用。使用with語句可以避免顯式調用此方法

    2. ThreadPoolExecutor對象

    ThreadPoolExecutor類是Executor子類,使用線程池執行異步調用.

    class concurrent.futures.ThreadPoolExecutor(max_workers),使用 max_workers 數目的線程池執行異步調用

    python3標準庫concurrent.futures比原Thread封裝更高,多線程concurrent.futures.ThreadPoolExecutor,多進程concurrent.futures.ProcessPoolExecutor
    利用concurrent.futures.Future來進行各種便捷的數據交互,包括處理異常,都在result()中再次拋出。

    示例代碼:

    import time from concurrent import futures from concurrent.futures import ThreadPoolExecutordef display(args):print(time.strftime('[%H:%M:%S]', time.localtime()), end=' ')print(args)def task(n):"""只是休眠"""display('begin sleep {}s.'.format(n))time.sleep(n)display('ended sleep {}s.'.format(n))def do_many_task_inorder():"""多線程按任務發布順序依次等待完成"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞運行')for future in future_list:display(future)display('統一結束(有序)')for future in future_list:display(future.result())def do_many_task_disorder():"""多線程執行先完成先顯示"""tasks = [5, 4, 3, 2, 1]with ThreadPoolExecutor(max_workers=3) as executor:future_list = [executor.submit(task, arg) for arg in tasks]display('非阻塞運行')for future in future_list:display(future)display('統一結束(無序)')done_iter = futures.as_completed(future_list) # generatorfor done in done_iter:display(done)if __name__ == '__main__':do_many_task_inorder()do_many_task_disorder()

    3. ProcessPoolExecutor對象

    ThreadPoolExecutor類是Executor子類,使用進程池執行異步調用.

    class concurrent.futures.ProcessPoolExecutor(max_workers=None),使用 max_workers數目的進程池執行異步調用,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個進程進行異步并發)。

    示例代碼:

    # -*- coding:utf-8 -*- from concurrent import futuresdef test(num):import timereturn time.ctime(), numdef muti_exec(m, n):# m 并發次數# n 運行次數with futures.ProcessPoolExecutor(max_workers=m) as executor: # 多進程# with futures.ThreadPoolExecutor(max_workers=m) as executor: #多線程executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))for future in futures.as_completed(executor_dict):times = executor_dict[future]if future.exception() is not None:print('%r generated an exception: %s' % (times, future.exception()))else:print('RunTimes:%d,Res:%s' % (times, future.result()))if __name__ == '__main__':muti_exec(5, 1)

    調度單個任務

    執行者類Executor調度單個任務,使用submit() 函數,然后用返回的 Future 實例等待任務結果。

    Executor 是一個 Python concurrent.futures?模塊的抽象類。 它不能直接使用,我們需要使用以下具體子類之一 -

    • ThreadPoolExecutor:線程池
    • ProcessPoolExecutor:進程池

    示例代碼:

    from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) future = executor.submit(task, 5) print('future: {}'.format(future)) result = future.result() print('result: {}'.format(result))

    線程池?和 進程池

  • ThreadPoolExecutor 是?Executor類的具體子類之一。 子類使用多線程,我們得到一個提交任務的線程池。 該池將任務分配給可用線程并安排它們運行。
  • ProcessPoolExecutor 是Executor類的具體子類之一。 它使用多重處理,并且我們獲得提交任務的進程池。 此池將任務分配給可用的進程并安排它們運行。
  • 如何創建一個 ThreadPoolExecutor 或者 ProcessPoolExecutor?

    ? ? ? ? 在concurrent.futures模塊及其具體子類Executor的幫助下,可以很容易地創建一個線程池或者進程池。 需要使用我們想要的池中的線程數構造一個ThreadPoolExecutor 或者?ProcessPoolExecutor。 默認情況下,數字是5。然后可以提交一個任務到線程池或者進程池。 當submit()任務時,會返回Future對象。 Future對象有一個名為done()的方法,它告訴Future是否已經解決。 有了這個,為這個特定的Future對象設定了一個值。 當任務完成時,線程池執行器將該值設置為Future的對象。

    線程池 示例代碼:

    from concurrent.futures import ThreadPoolExecutor from time import sleepdef task(message):sleep(2)return messagedef main():executor = ThreadPoolExecutor(5)future = executor.submit(task, "Completed")print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()

    結果截圖:

    在上面的例子中,一個ThreadPoolExecutor已經由5個線程構造而成。 然后,在提供消息之前等待2秒的任務被提交給線程池執行器。 從輸出中可以看出,任務直到2秒才完成,所以第一次調用done()將返回False。 2秒后,任務完成,我們通過調用result()方法得到future的結果。

    進程池 示例代碼:

    from concurrent.futures import ProcessPoolExecutor from time import sleepdef task(message):sleep(2)return messagedef main():executor = ProcessPoolExecutor(5)future = executor.submit(task, ("Completed"))print(future.done())sleep(2)print(future.done())print(future.result())if __name__ == '__main__':main()

    實例化ThreadPoolExecutor 或者?ProcessPoolExecutor? 之?上下文管理器
    另一種實例化ThreadPoolExecutor的方法是在上下文管理器的幫助下完成的。 它的工作方式與上例中使用的方法類似。 使用上下文管理器的主要優點是它在語法上看起來不錯。 實例化可以在下面的代碼的幫助下完成

    with ThreadPoolExecutor(max_workers = 5) as executor 或者 with ProcessPoolExecutor(max_workers = 5) as executor

    示例

    以下示例是從 Python 文檔借用的。 在這個例子中,首先必須導入?concurrent.futures?模塊。 然后創建一個名為?load_url()的函數,它將加載請求的url。 然后該函數用池中的5個線程創建?ThreadPoolExecutor。 ThreadPoolExecutor?已被用作上下文管理器。 我們可以通過調用?result()方法來獲得?future的結果。

    import concurrent.futures import urllib.requestURLS = ['http://www.foxnews.com/','https://www.yiibai.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/' ]def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

    以下將是上面的Python腳本的輸出 -

    'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed> 'http://www.foxnews.com/' page is 229313 bytes 'http://www.yiibai.com/' page is 168933 bytes 'http://www.bbc.co.uk/' page is 283893 bytes 'http://europe.wsj.com/' page is 938109 bytes

    進程池:

    import concurrent.futures from concurrent.futures import ProcessPoolExecutor import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()def main():with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))if __name__ == '__main__':main()

    使用 map() 調度多任務,有序返回

    使用map(),多個worker并發地從輸入迭代器里取數據,處理,然后按順序返回結果。

    示例代碼:

    from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(task, range(1, 10)) print('unprocessed results: {}'.format(results)) real_results = list(results) print('real results: {}'.format(real_results))

    使用 Executor.map() 函數

    Python map()函數廣泛用于許多任務。 一個這樣的任務是對可迭代內的每個元素應用某個函數。 同樣,可以將迭代器的所有元素映射到一個函數,并將這些作為獨立作業提交到ThreadPoolExecutor之外。 考慮下面的Python腳本示例來理解函數的工作原理。

    示例
    在下面的示例中,map函數用于將square()函數應用于values數組中的每個值。

    from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()

    以下將是上面的Python腳本的輸出 :

    進程池:

    from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completedvalues = [2, 3, 4, 5]def square(n):return n * ndef main():with ProcessPoolExecutor(max_workers=3) as executor:results = executor.map(square, values)for result in results:print(result)if __name__ == '__main__':main()

    多任務調度,無序返回

    不斷將任務submit到executor,返回future列表,使用as_completed無序產生每個任務的結果。

    示例代碼:

    from concurrent import futures import time import randomdef task(n):time.sleep(random.randint(1, 10))return nexecutor = futures.ThreadPoolExecutor(max_workers=3) future_list = [executor.submit(task, i) for i in range(1, 10)] for f in futures.as_completed(future_list):print(f.result())

    何時使用ProcessPoolExecutor 和 ThreadPoolExecutor ?

    現在我們已經學習了兩個Executor類 - ThreadPoolExecutor和ProcessPoolExecutor,我們需要知道何時使用哪個執行器。需要在受CPU限制的工作負載情況下選擇ProcessPoolExecutor,而在受I/O限制的工作負載情況下則需要選擇ThreadPoolExecutor。

    如果使用ProcessPoolExecutor,那么不需要擔心GIL,因為它使用多處理。 而且,與ThreadPoolExecution相比,執行時間會更少。

    總結

    以上是生活随笔為你收集整理的Python3.2+ 的 concurrent.futures 模块,利用 multiprocessing 实现高并发。的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。