日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

python 协程池gevent.pool_进程池\线程池,协程,gevent

發(fā)布時間:2023/12/10 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 协程池gevent.pool_进程池\线程池,协程,gevent 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

1. 進程池與線程池

2. 協(xié)程

3. gevent

4. 單線程下實現(xiàn)并發(fā)的套接字通信

首先寫一個基于多線程的套接字

服務(wù)端:

from socket import *

from threading import Thread

def comunicate(conn):

while True: # 通信循環(huán)

try:

data = conn.recv(1024)

if len(data) == 0: break

conn.send(data.upper())

except ConnectionResetError:

break

conn.close()

def server(ip, port, backlog=5):

server = socket(AF_INET, SOCK_STREAM)

server.bind((ip, port))

server.listen(backlog)

while True: # 鏈接循環(huán)

conn, client_addr = server.accept()

print(client_addr)

# 通信

t=Thread(target=comunicate,args=(conn,))

t.start()

if __name__ == '__main__':

s=Thread(target=server,args=('127.0.0.1',8081))

s.start()

每連接上一個客戶端便會創(chuàng)造一個線程 , 那么如果有一萬個客戶端的話服務(wù)端會產(chǎn)生一萬個線程 , 然后服務(wù)端就炸了 , 所以要想個辦法限制連接個數(shù) , 即限制

1. 進程池\線程池

開啟一個進程池 , 會開啟一定個數(shù)的進程 , 然后將任務(wù)提交給進程就可以了

1 介紹

concurrent.futures模塊提供了高度封裝的異步調(diào)用接口

ThreadPoolExecutor:線程池,提供異步調(diào)用

ProcessPoolExecutor: 進程池,提供異步調(diào)用

Both implement the same interface, which is defined by the abstract Executor class.

2 基本方法

submit(fn, *args, **kwargs)

異步提交任務(wù)

map(func, *iterables, timeout=None, chunksize=1)

取代for循環(huán)submit的操作

shutdown(wait=True)

相當于進程池的pool.close()+pool.join()操作

wait=True,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)

wait=False,立即返回,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢

但不管wait參數(shù)為何值,整個程序都會等到所有任務(wù)執(zhí)行完畢

submit和map必須在shutdown之前

result(timeout=None)

取得結(jié)果

add_done_callback(fn)

回調(diào)函數(shù)

導(dǎo)入模塊

from concurrent.futures import ProcessPoolExecutor

創(chuàng)建一個進程池

p=ProcessPoolExecutor(4)#進程數(shù)為4

提交任務(wù), 有兩種方式

a.同步調(diào)用:同步調(diào)用:提交完一個任務(wù)之后,就在原地等待,等待任務(wù)完完整整地運行完畢拿到結(jié)果后,再執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是串行執(zhí)行的

res=p.submit(function,參數(shù)一...).result()

b. 異步調(diào)用:提交完一個任務(wù)之后,不在原地等待,而是直接執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是并發(fā)執(zhí)行的,,結(jié)果futrue對象會在任務(wù)運行完畢后自動傳給回調(diào)函數(shù)

res=p.submit(function,參數(shù)一...)

回調(diào)函數(shù)(用于異步調(diào)用)

每提交一個任務(wù) , 會產(chǎn)生一個對象 , 給這個任務(wù)綁定了一個函數(shù) , 這個函數(shù)會在你提交的任務(wù)完成后自動觸發(fā) , 且會將這個對象當作參數(shù)傳給這個函數(shù)

這個函數(shù)用于處理子進程運行完之后產(chǎn)生的結(jié)果

多進程下的回調(diào)函數(shù)

from concurrent.futures import ProcessPoolExecutor

import time,os

import requests

def get(url):

print('%s GET %s' %(os.getpid(),url))

time.sleep(3)#處理的太快看不出效果 , 模擬多處理3秒

response=requests.get(url)#爬取網(wǎng)站內(nèi)容

if response.status_code == 200:

res=response.text

else:

res='下載失敗'

return res#返回爬取數(shù)據(jù)

# 到這里任務(wù)運行完了之后自動調(diào)用parse函數(shù) ,

# 回調(diào)函數(shù) , 處理任務(wù)的結(jié)果用

def parse(future):

time.sleep(1)

res=future.result()#future對象下的result為任務(wù)的返回值

print('%s 解析結(jié)果為%s' %(os.getpid(),len(res)))

if __name__ == '__main__':

urls=[

'https://www.baidu.com',

'https://www.sina.com.cn',

'https://www.tmall.com',

'https://www.jd.com',

'https://www.python.org',

'https://www.openstack.org',

'https://www.baidu.com',

'https://www.baidu.com',

'https://www.baidu.com',

]

#開啟進程數(shù)為9的進程池

p=ProcessPoolExecutor(9)

start=time.time()

for url in urls:

# 異步調(diào)用:提交完一個任務(wù)之后,不在原地等待,而是直接執(zhí)行下一行代碼,會導(dǎo)致任務(wù)是并發(fā)執(zhí)行的,,結(jié)果futrue對象會在任務(wù)運行完畢后自動傳給回調(diào)函數(shù)

future=p.submit(get,url)

#將parse設(shè)為回調(diào)函數(shù)

future.add_done_callback(parse) #parse會在任務(wù)運行完畢后自動觸發(fā),然后接收一個參數(shù)future對象

p.shutdown(wait=True)

print('主',time.time()-start)

print('主',os.getpid())

多線程與多進程相同,只需將p=ProcessPoolExecutor(9)改為p=ThreadPoolExecutor(9)就可以了

2. 單線程下實現(xiàn)并發(fā)-------協(xié)程

目標:

在單線程下實現(xiàn)并發(fā),又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協(xié)程是一種用戶態(tài)的輕量級線程,即協(xié)程是由用戶程序自己控制調(diào)度的。

并發(fā)(多個任務(wù)看起來是同時執(zhí)行就是并發(fā)):切換+保存狀態(tài)

協(xié)程:

協(xié)程是單線程實現(xiàn)并發(fā)

注意:協(xié)程是程序員意淫出來的東西,操作系統(tǒng)里只有進程和線程的概念(操作系統(tǒng)調(diào)度的是線程)

在單線程下實現(xiàn)多個任務(wù)間遇到IO就切換就可以降低單線程的IO時間,從而最大限度地提升單線程的效率

強調(diào)

python的線程屬于內(nèi)核級別的,即由操作系統(tǒng)控制調(diào)度(如單線程遇到io或執(zhí)行時間過長就會被迫交出cpu執(zhí)行權(quán)限,切換其他線程運行)

單線程內(nèi)開啟協(xié)程,一旦遇到io,就會從應(yīng)用程序級別(而非操作系統(tǒng))控制切換,以此來提升效率(!!!非io操作的切換與效率無關(guān))

對比操作系統(tǒng)控制線程的切換,用戶在單線程內(nèi)控制協(xié)程的切換

優(yōu)點如下

協(xié)程的切換開銷更小,屬于程序級別的切換,操作系統(tǒng)完全感知不到,因而更加輕量級

單線程內(nèi)就可以實現(xiàn)并發(fā)的效果,最大限度地利用cpu

缺點如下

協(xié)程的本質(zhì)是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內(nèi)開啟多個線程,每個線程內(nèi)開啟協(xié)程

協(xié)程指的是單個線程,因而一旦協(xié)程出現(xiàn)阻塞,將會阻塞整個線程

總結(jié)

必須在只有一個單線程里實現(xiàn)并發(fā)

修改共享數(shù)據(jù)不需加鎖

用戶程序里自己保存多個控制流的上下文棧

附加:一個協(xié)程遇到IO操作自動切換到其它協(xié)程(如何實現(xiàn)檢測IO,yield、greenlet都無法實現(xiàn),就用到了gevent模塊(select機制))

3. gevent

他是一個第三方的庫 , 可以實現(xiàn)在單線程內(nèi)遇到 IO 任務(wù)自動切換

geven是不能直接識別的 需要在整個文件最前面加上一行代碼

from gevent import monkey;monkey.patch_all()

步驟 :

打補丁

導(dǎo)入from gevent import spawn

定義有 IO 操作的任務(wù)(函數(shù))

將多個任務(wù)分別提交給協(xié)程

g1=spawn(函數(shù)名,函數(shù)的參數(shù))

g2=spawn(函數(shù)名,函數(shù)的參數(shù))

等待兩個協(xié)程運行完 ,

g1.join()

g2.join()

:因為這里是異步調(diào)用 , 主線程代碼運行完了 , 主線程就會死掉 , 協(xié)程里面的任務(wù)也不會運行完 就跟著死了, 所以要加上join方法 ,如果主線程要運行很久 , 或者是一個死循環(huán) , 就不用加join方法 ,即上面的的第五步就可以忽略

from gevent import monkey;monkey.patch_all()

from gevent import spawn,joinall #pip3 install gevent

import time

def play(name):

print('%s play 1' %name)

time.sleep(5)

print('%s play 2' %name)

def eat(name):

print('%s eat 1' %name)

time.sleep(3)

print('%s eat 2' %name)

start=time.time()

g1=spawn(play,'王昭錦')

g2=spawn(eat,'王昭錦')

g1.join()

g2.join()

# joinall([g1,g2]) #上面兩步可以并成這一步

print('主',time.time()-start)

運行結(jié)果如下:

'''

王昭錦 play 1

王昭錦 eat 1

王昭錦 eat 2

王昭錦 play 2

主 5.009259223937988

'''

4. 單線程下實現(xiàn)并發(fā)的套接字通信

服務(wù)端:

from gevent import monkey;monkey.patch_all()

from socket import *

from gevent import spawn

def comunicate(conn):

while True: # 通信循環(huán)

try:

data = conn.recv(1024)

if len(data) == 0: break

conn.send(data.upper())

except ConnectionResetError:

break

conn.close()

def server(ip, port, backlog=5):

server = socket(AF_INET, SOCK_STREAM)

server.bind((ip, port))

server.listen(backlog)

while True: # 鏈接循環(huán)

conn, client_addr = server.accept()

print(client_addr)

# 通信

spawn(comunicate,conn)

if __name__ == '__main__':

g1=spawn(server,'127.0.0.1',8080)

g1.join()

總結(jié)

以上是生活随笔為你收集整理的python 协程池gevent.pool_进程池\线程池,协程,gevent的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。