from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random'''
sockect網絡通信是IO操作,所以用多線程
計算密集型:用多進程
'''def task(name):print('name:%s pid:%s run' %(name,os.getpid()))time.sleep(random.randint(1,3))if__name__ == '__main__':# pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數pool = ThreadPoolExecutor(5)for i in range(10):pool.submit(task,'yang%s' %i) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行pool.shutdown(wait=True) # 類似join 代表往池子里面丟任務的入口關掉 計數器-1print('主')
'''
打印結果:
name:yang0 pid:11120 run
name:yang1 pid:11120 run
name:yang2 pid:11120 run
name:yang3 pid:11120 run
name:yang4 pid:11120 runname:yang5 pid:11120 run
name:yang6 pid:11120 run
name:yang7 pid:11120 runname:yang8 pid:11120 run
name:yang9 pid:11120 run
主
'''from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
import os,time,randomdef task():print('name:%s pid:%s run' %(currentThread().getName(),os.getpid()))time.sleep(random.randint(1,3))if__name__ == '__main__':# pool = ProcessPoolExecutor(4) # 進程池最多裝4個進程,不指定的話默認是cpu的核數pool = ThreadPoolExecutor(5)for i in range(10):pool.submit(task) # 異步調用池子收了10個任務,但同一時間只有4個任務在進行pool.shutdown(wait=True) # 類似join 代表往池子里面丟任務的入口關掉 計數器-1print('主')
'''
打印結果:
name:ThreadPoolExecutor-0_0 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_0 pid:14052 run
主
'''進程池|線程池
#1.同步調用:提交完任務后,就在原地等待任務執(zhí)行完畢,拿到結果,再執(zhí)行下一行代碼,導致程序是串行執(zhí)行from concurrent.futures import ThreadPoolExecutor
import time
import randomdef la(name):print('%s is laing' %name)time.sleep(random.randint(3,5))res = random.randint(7,13)*'#'return {'name':name,'res':res}def weigh(shit):name = shit['name']size = len(shit['res'])print('%s 拉了 <%s>kg' %(name,size))if__name__ == '__main__':pool = ThreadPoolExecutor(10)shit1 = pool.submit(la,'alex').result()weigh(shit1)shit2 = pool.submit(la,'yang').result()weigh(shit2)shit3 = pool.submit(la,'hang').result()weigh(shit3)
'''
打印結果:
alex is laing
alex 拉了 <8>kg
yang is laing
yang 拉了 <8>kg
hang is laing
hang 拉了 <7>kg
'''同步調用#2.異步調用:提交完任務后,不在原地等待任務執(zhí)行完from concurrent.futures import ThreadPoolExecutor
import time
import randomdef la(name):print('%s is laing' %name)time.sleep(random.randint(3,5))res = random.randint(7,13)*'#'return {'name':name,'res':res}# weigh({'name':name,'res':res}) # 這樣寫,所有功能 不能體現出解耦合def weigh(shit):shit = shit.result() # 拿到是一個對象,需要進行result()name = shit['name']size = len(shit['res'])print('%s 拉了 <%s>kg' %(name,size))if__name__ == '__main__':pool = ThreadPoolExecutor(10)shit1 = pool.submit(la,'alex').add_done_callback(weigh)shit2 = pool.submit(la,'yang').add_done_callback(weigh)shit3 = pool.submit(la,'hang').add_done_callback(weigh)
'''
打印結果:
alex is laing
yang is laing
hang is laing
hang 拉了 <10>kg
alex 拉了 <7>kg
yang 拉了 <12>kg
'''異步調用
異步調用的應用
from concurrent.futures import ThreadPoolExecutor
import requests
import timedef get(url):print('GET %s'%url)response = requests.get(url)time.sleep(3)return {'url':url,'content':response.text}def parse(res):res = res.result()print('%s parse res is %s' %(res['url'],len(res['content'])))if__name__ == '__main__':urls = ['http://www.cnblogs.com/linhaifeng','https://www.python.org','https://www.openstack.org',]pool = ThreadPoolExecutor(2)for url in urls:pool.submit(get,url).add_done_callback(parse)
'''
打印結果:
GET http://www.cnblogs.com/linhaifeng
GET https://www.python.org
http://www.cnblogs.com/linhaifeng parse res is 16320
GET https://www.openstack.org
https://www.python.org parse res is 49273
https://www.openstack.org parse res is 64040
'''應用