python 协程池gevent.pool_进程池\线程池,协程,gevent
目錄
1. 進程池與線程池
2. 協(xié)程
3. gevent
4. 單線程下實現(xiàn)并發(fā)的套接字通信
首先寫一個基于多線程的套接字
服務(wù)端:
from socket import *
from threading import Thread
def comunicate(conn):
while True: # 通信循環(huán)
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 鏈接循環(huán)
conn, client_addr = server.accept()
print(client_addr)
# 通信
t=Thread(target=comunicate,args=(conn,))
t.start()
if __name__ == '__main__':
s=Thread(target=server,args=('127.0.0.1',8081))
s.start()
每連接上一個客戶端便會創(chuàng)造一個線程 , 那么如果有一萬個客戶端的話服務(wù)端會產(chǎn)生一萬個線程 , 然后服務(wù)端就炸了 , 所以要想個辦法限制連接個數(shù) , 即限制
1. 進程池\線程池
開啟一個進程池 , 會開啟一定個數(shù)的進程 , 然后將任務(wù)提交給進程就可以了
1 介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池,提供異步調(diào)用
ProcessPoolExecutor: 進程池,提供異步調(diào)用
Both implement the same interface, which is defined by the abstract Executor class.
2 基本方法
submit(fn, *args, **kwargs)
異步提交任務(wù)
map(func, *iterables, timeout=None, chunksize=1)
取代for循環(huán)submit的操作
shutdown(wait=True)
相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False,立即返回,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值,整個程序都會等到所有任務(wù)執(zhí)行完畢
submit和map必須在shutdown之前
result(timeout=None)
取得結(jié)果
add_done_callback(fn)
回調(diào)函數(shù)
導(dǎo)入模塊
from concurrent.futures import ProcessPoolExecutor
創(chuàng)建一個進程池
p=ProcessPoolExecutor(4)#進程數(shù)為4
提交任務(wù), 有兩種方式
a.同步調(diào)用:同步調(diào)用:提交完一個任務(wù)之后,就在原地等待,等待任務(wù)完完整整地運行完畢拿到結(jié)果后,再執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是串行執(zhí)行的
res=p.submit(function,參數(shù)一...).result()
b. 異步調(diào)用:提交完一個任務(wù)之后,不在原地等待,而是直接執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是并發(fā)執(zhí)行的,,結(jié)果futrue對象會在任務(wù)運行完畢后自動傳給回調(diào)函數(shù)
res=p.submit(function,參數(shù)一...)
回調(diào)函數(shù)(用于異步調(diào)用)
每提交一個任務(wù) , 會產(chǎn)生一個對象 , 給這個任務(wù)綁定了一個函數(shù) , 這個函數(shù)會在你提交的任務(wù)完成后自動觸發(fā) , 且會將這個對象當作參數(shù)傳給這個函數(shù)
這個函數(shù)用于處理子進程運行完之后產(chǎn)生的結(jié)果
多進程下的回調(diào)函數(shù)
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests
def get(url):
print('%s GET %s' %(os.getpid(),url))
time.sleep(3)#處理的太快看不出效果 , 模擬多處理3秒
response=requests.get(url)#爬取網(wǎng)站內(nèi)容
if response.status_code == 200:
res=response.text
else:
res='下載失敗'
return res#返回爬取數(shù)據(jù)
# 到這里任務(wù)運行完了之后自動調(diào)用parse函數(shù) ,
# 回調(diào)函數(shù) , 處理任務(wù)的結(jié)果用
def parse(future):
time.sleep(1)
res=future.result()#future對象下的result為任務(wù)的返回值
print('%s 解析結(jié)果為%s' %(os.getpid(),len(res)))
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
#開啟進程數(shù)為9的進程池
p=ProcessPoolExecutor(9)
start=time.time()
for url in urls:
# 異步調(diào)用:提交完一個任務(wù)之后,不在原地等待,而是直接執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是并發(fā)執(zhí)行的,,結(jié)果futrue對象會在任務(wù)運行完畢后自動傳給回調(diào)函數(shù)
future=p.submit(get,url)
#將parse設(shè)為回調(diào)函數(shù)
future.add_done_callback(parse) #parse會在任務(wù)運行完畢后自動觸發(fā),然后接收一個參數(shù)future對象
p.shutdown(wait=True)
print('主',time.time()-start)
print('主',os.getpid())
多線程與多進程相同,只需將p=ProcessPoolExecutor(9)改為p=ThreadPoolExecutor(9)就可以了
2. 單線程下實現(xiàn)并發(fā)-------協(xié)程
目標:
在單線程下實現(xiàn)并發(fā),又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。
并發(fā)(多個任務(wù)看起來是同時執(zhí)行就是并發(fā)):切換+保存狀態(tài)
協(xié)程:
協(xié)程是單線程實現(xiàn)并發(fā)
注意:協(xié)程是程序員意淫出來的東西,操作系統(tǒng)里只有進程和線程的概念(操作系統(tǒng)調(diào)度的是線程)
在單線程下實現(xiàn)多個任務(wù)間遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率
強調(diào)
python的線程屬于內(nèi)核級別的,即由操作系統(tǒng)控制調(diào)度(如單線程遇到io或執(zhí)行時間過長就會被迫交出cpu執(zhí)行權(quán)限,切換其他線程運行)
單線程內(nèi)開啟協(xié)程,一旦遇到io,就會從應(yīng)用程序級別(而非操作系統(tǒng))控制切換,以此來提升效率(!!!非io操作的切換與效率無關(guān))
對比操作系統(tǒng)控制線程的切換,用戶在單線程內(nèi)控制協(xié)程的切換
優(yōu)點如下
協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級
單線程內(nèi)就可以實現(xiàn)并發(fā)的效果,最大限度地利用cpu
缺點如下
協(xié)程的本質(zhì)是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內(nèi)開啟多個線程,每個線程內(nèi)開啟協(xié)程
協(xié)程指的是單個線程,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程
總結(jié)
必須在只有一個單線程里實現(xiàn)并發(fā)
修改共享數(shù)據(jù)不需加鎖
用戶程序里自己保存多個控制流的上下文棧
附加:一個協(xié)程遇到IO操作自動切換到其它協(xié)程(如何實現(xiàn)檢測IO,yield、greenlet都無法實現(xiàn),就用到了gevent模塊(select機制))
3. gevent
他是一個第三方的庫 , 可以實現(xiàn)在單線程內(nèi)遇到 IO 任務(wù)自動切換
geven是不能直接識別的 需要在整個文件最前面加上一行代碼
from gevent import monkey;monkey.patch_all()
步驟 :
打補丁
導(dǎo)入from gevent import spawn
定義有 IO 操作的任務(wù)(函數(shù))
將多個任務(wù)分別提交給協(xié)程
g1=spawn(函數(shù)名,函數(shù)的參數(shù))
g2=spawn(函數(shù)名,函數(shù)的參數(shù))
等待兩個協(xié)程運行完 ,
g1.join()
g2.join()
:因為這里是異步調(diào)用 , 主線程代碼運行完了 , 主線程就會死掉 , 協(xié)程里面的任務(wù)也不會運行完 就跟著死了, 所以要加上join方法 ,如果主線程要運行很久 , 或者是一個死循環(huán) , 就不用加join方法 ,即上面的的第五步就可以忽略
from gevent import monkey;monkey.patch_all()
from gevent import spawn,joinall #pip3 install gevent
import time
def play(name):
print('%s play 1' %name)
time.sleep(5)
print('%s play 2' %name)
def eat(name):
print('%s eat 1' %name)
time.sleep(3)
print('%s eat 2' %name)
start=time.time()
g1=spawn(play,'王昭錦')
g2=spawn(eat,'王昭錦')
g1.join()
g2.join()
# joinall([g1,g2]) #上面兩步可以并成這一步
print('主',time.time()-start)
運行結(jié)果如下:
'''
王昭錦 play 1
王昭錦 eat 1
王昭錦 eat 2
王昭錦 play 2
主 5.009259223937988
'''
4. 單線程下實現(xiàn)并發(fā)的套接字通信
服務(wù)端:
from gevent import monkey;monkey.patch_all()
from socket import *
from gevent import spawn
def comunicate(conn):
while True: # 通信循環(huán)
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 鏈接循環(huán)
conn, client_addr = server.accept()
print(client_addr)
# 通信
spawn(comunicate,conn)
if __name__ == '__main__':
g1=spawn(server,'127.0.0.1',8080)
g1.join()
總結(jié)
以上是生活随笔為你收集整理的python 协程池gevent.pool_进程池\线程池,协程,gevent的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pavsched.exe是什么进程 pa
- 下一篇: 1.深度学习练习:Python Basi