python 协程池gevent.pool_进程池\线程池,协程,gevent
目錄
1. 進程池與線程池
2. 協程
3. gevent
4. 單線程下實現并發的套接字通信
首先寫一個基于多線程的套接字
服務端:
from socket import *
from threading import Thread
def comunicate(conn):
while True: # 通信循環
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 鏈接循環
conn, client_addr = server.accept()
print(client_addr)
# 通信
t=Thread(target=comunicate,args=(conn,))
t.start()
if __name__ == '__main__':
s=Thread(target=server,args=('127.0.0.1',8081))
s.start()
每連接上一個客戶端便會創造一個線程 , 那么如果有一萬個客戶端的話服務端會產生一萬個線程 , 然后服務端就炸了 , 所以要想個辦法限制連接個數 , 即限制
1. 進程池\線程池
開啟一個進程池 , 會開啟一定個數的進程 , 然后將任務提交給進程就可以了
1 介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.
2 基本方法
submit(fn, *args, **kwargs)
異步提交任務
map(func, *iterables, timeout=None, chunksize=1)
取代for循環submit的操作
shutdown(wait=True)
相當于進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,并不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前
result(timeout=None)
取得結果
add_done_callback(fn)
回調函數
導入模塊
from concurrent.futures import ProcessPoolExecutor
創建一個進程池
p=ProcessPoolExecutor(4)#進程數為4
提交任務, 有兩種方式
a.同步調用:同步調用:提交完一個任務之后,就在原地等待,等待任務完完整整地運行完畢拿到結果后,再執行下一行代碼,會導致任務是串行執行的
res=p.submit(function,參數一...).result()
b. 異步調用:提交完一個任務之后,不在原地等待,而是直接執行下一行代碼,會導致任務是并發執行的,,結果futrue對象會在任務運行完畢后自動傳給回調函數
res=p.submit(function,參數一...)
回調函數(用于異步調用)
每提交一個任務 , 會產生一個對象 , 給這個任務綁定了一個函數 , 這個函數會在你提交的任務完成后自動觸發 , 且會將這個對象當作參數傳給這個函數
這個函數用于處理子進程運行完之后產生的結果
多進程下的回調函數
from concurrent.futures import ProcessPoolExecutor
import time,os
import requests
def get(url):
print('%s GET %s' %(os.getpid(),url))
time.sleep(3)#處理的太快看不出效果 , 模擬多處理3秒
response=requests.get(url)#爬取網站內容
if response.status_code == 200:
res=response.text
else:
res='下載失敗'
return res#返回爬取數據
# 到這里任務運行完了之后自動調用parse函數 ,
# 回調函數 , 處理任務的結果用
def parse(future):
time.sleep(1)
res=future.result()#future對象下的result為任務的返回值
print('%s 解析結果為%s' %(os.getpid(),len(res)))
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.sina.com.cn',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
#開啟進程數為9的進程池
p=ProcessPoolExecutor(9)
start=time.time()
for url in urls:
# 異步調用:提交完一個任務之后,不在原地等待,而是直接執行下一行代碼,會導致任務是并發執行的,,結果futrue對象會在任務運行完畢后自動傳給回調函數
future=p.submit(get,url)
#將parse設為回調函數
future.add_done_callback(parse) #parse會在任務運行完畢后自動觸發,然后接收一個參數future對象
p.shutdown(wait=True)
print('主',time.time()-start)
print('主',os.getpid())
多線程與多進程相同,只需將p=ProcessPoolExecutor(9)改為p=ThreadPoolExecutor(9)就可以了
2. 單線程下實現并發-------協程
目標:
在單線程下實現并發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
并發(多個任務看起來是同時執行就是并發):切換+保存狀態
協程:
協程是單線程實現并發
注意:協程是程序員意淫出來的東西,操作系統里只有進程和線程的概念(操作系統調度的是線程)
在單線程下實現多個任務間遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率
強調
python的線程屬于內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換
優點如下
協程的切換開銷更小,屬于程序級別的切換,操作系統完全感知不到,因而更加輕量級
單線程內就可以實現并發的效果,最大限度地利用cpu
缺點如下
協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
總結
必須在只有一個單線程里實現并發
修改共享數據不需加鎖
用戶程序里自己保存多個控制流的上下文棧
附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
3. gevent
他是一個第三方的庫 , 可以實現在單線程內遇到 IO 任務自動切換
geven是不能直接識別的 需要在整個文件最前面加上一行代碼
from gevent import monkey;monkey.patch_all()
步驟 :
打補丁
導入from gevent import spawn
定義有 IO 操作的任務(函數)
將多個任務分別提交給協程
g1=spawn(函數名,函數的參數)
g2=spawn(函數名,函數的參數)
等待兩個協程運行完 ,
g1.join()
g2.join()
:因為這里是異步調用 , 主線程代碼運行完了 , 主線程就會死掉 , 協程里面的任務也不會運行完 就跟著死了, 所以要加上join方法 ,如果主線程要運行很久 , 或者是一個死循環 , 就不用加join方法 ,即上面的的第五步就可以忽略
from gevent import monkey;monkey.patch_all()
from gevent import spawn,joinall #pip3 install gevent
import time
def play(name):
print('%s play 1' %name)
time.sleep(5)
print('%s play 2' %name)
def eat(name):
print('%s eat 1' %name)
time.sleep(3)
print('%s eat 2' %name)
start=time.time()
g1=spawn(play,'王昭錦')
g2=spawn(eat,'王昭錦')
g1.join()
g2.join()
# joinall([g1,g2]) #上面兩步可以并成這一步
print('主',time.time()-start)
運行結果如下:
'''
王昭錦 play 1
王昭錦 eat 1
王昭錦 eat 2
王昭錦 play 2
主 5.009259223937988
'''
4. 單線程下實現并發的套接字通信
服務端:
from gevent import monkey;monkey.patch_all()
from socket import *
from gevent import spawn
def comunicate(conn):
while True: # 通信循環
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port, backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
while True: # 鏈接循環
conn, client_addr = server.accept()
print(client_addr)
# 通信
spawn(comunicate,conn)
if __name__ == '__main__':
g1=spawn(server,'127.0.0.1',8080)
g1.join()
總結
以上是生活随笔為你收集整理的python 协程池gevent.pool_进程池\线程池,协程,gevent的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pavsched.exe是什么进程 pa
- 下一篇: websocket python爬虫_p