日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

铁乐学python_Day40_进程池

發(fā)布時間:2025/7/14 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 铁乐学python_Day40_进程池 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

進(jìn)程之間的數(shù)據(jù)共享

基于消息傳遞的并發(fā)編程是大勢所趨,
即便是使用線程,推薦做法也是將程序設(shè)計為大量獨(dú)立的線程集合,通過消息隊列交換數(shù)據(jù)。
這樣極大地減少了對使用鎖和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。
但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題。
以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問題。

進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的。
雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此。

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

Manager支持的類型也很多。

例: from multiprocessing import Manager,Process,Lock def work(d,lock): # with lock 就相當(dāng)于一組 lock.acquire() lock.release() # 上下文管理 :必須有一個開始動作 和 一個結(jié)束動作的時候with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會出現(xiàn)數(shù)據(jù)錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(98):p=Process(target=work, args=(dic, lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)

進(jìn)程池和multiprocess.Pool模塊

進(jìn)程池

為什么要有進(jìn)程池?進(jìn)程池的概念。
在程序?qū)嶋H處理問題過程中,忙時會有成千上萬的任務(wù)需要被執(zhí)行,閑時可能只有零星任務(wù)。
那么在成千上萬個任務(wù)需要被執(zhí)行的時候,我們就需要去創(chuàng)建成千上萬個進(jìn)程么?
首先,創(chuàng)建進(jìn)程需要消耗時間,銷毀進(jìn)程也需要消耗時間。
第二即便開啟了成千上萬的進(jìn)程,操作系統(tǒng)也不能讓他們同時執(zhí)行,這樣反而會影響程序的效率。
因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程。
那么我們要怎么做呢?
在這里,要給大家介紹一個進(jìn)程池的概念,定義一個池子,在里面放上固定數(shù)量的進(jìn)程,
有需求來了,就拿一個池中的進(jìn)程來處理任務(wù),等到處理完畢,進(jìn)程并不關(guān)閉,
而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。
如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠,
任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。
也就是說,池中進(jìn)程的數(shù)量是固定的,那么同一時間最多有固定數(shù)量的進(jìn)程在運(yùn)行。
這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進(jìn)程的時間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。

multiprocess.Pool模塊

概念介紹
Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池

參數(shù)介紹:
1 numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值
2 initializer:是每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None
3 initargs:是要傳給initializer的參數(shù)組

主要方法: p.apply(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。 如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。 '''此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。'''p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用。方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。 實(shí)例具有以下方法 obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。 timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)異常。 如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。 obj.ready():如果調(diào)用完成,返回True。 obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常。 obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?obj.terminate():立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。 如果p被垃圾回收,將自動調(diào)用此函數(shù)。例:進(jìn)程池的同步調(diào)用 import os,time from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(2)return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res_l.append(p.apply(work,args=(i,)))# 同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到return,# 等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務(wù)是否存在阻塞,同步調(diào)用都會在原地等著print(res_l)972 run 8036 run 892 run 972 run 8036 run 892 run 972 run 8036 run 892 run 972 run [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]例:進(jìn)程池的異步調(diào)用 import os import time import random from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(random.random())return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res = p.apply_async(work, args=(i,))# 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個子進(jìn)程在異步執(zhí)行# 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù)# 需要注意的是,進(jìn)程池中的三個進(jìn)程不會同時開啟或者同時結(jié)束# 而是執(zhí)行完一個就釋放一個進(jìn)程,這個進(jìn)程就去接收新的任務(wù)。res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,# 等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果# 否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了p.close()p.join()for res in res_l:print(res.get())# 使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,# 因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get。例:server:進(jìn)程池版socket并發(fā)聊天#!/usr/bin/env python # _*_ coding: utf-8 _*_# Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count()) # 開啟6個客戶端,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài) # 在每個進(jìn)程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個,即多個客戶端公用4個進(jìn)程 from socket import * from multiprocessing import Pool import osserver = socket(AF_INET, SOCK_STREAM) server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) server.bind(('127.0.0.1', 9527)) server.listen(5)def talk(conn):print('進(jìn)程pid: %s' % os.getpid())while True:try:msg = conn.recv(1024)if not msg: breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p = Pool(4)while True:conn, *_ = server.accept()p.apply_async(talk, args=(conn,))client端 #!/usr/bin/env python # _*_ coding: utf-8 _*_from socket import *client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 9527))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))效果: 進(jìn)程pid: 7980 進(jìn)程pid: 6252 進(jìn)程pid: 7156 進(jìn)程pid: 7564 進(jìn)程pid: 7980同時只能開啟4個進(jìn)程,開第五個進(jìn)程的時候會被阻塞,直到手動關(guān)掉前四個中的其中一個,第五個才能進(jìn)到池子里運(yùn)行。 并發(fā)開啟多個客戶端,服務(wù)端同一時間只有4個不同的pid,只能結(jié)束一個客戶端,另外一個客戶端才會進(jìn)來.回調(diào)函數(shù) callback= 需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。常用于爬蟲場景。例:使用多進(jìn)程請求多個url來減少網(wǎng)絡(luò)等待浪費(fèi)的時間 #!/usr/bin/env python # _*_ coding: utf-8 _*_from multiprocessing import Pool import requests import osdef get_page(url):print('<進(jìn)程%s> get %s' % (os.getpid(), url))respone = requests.get(url)if respone.status_code == 200:return {'url': url, 'text': respone.text}def pasrse_page(res):print('<進(jìn)程%s> parse %s' % (os.getpid(), res['url']))parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))with open('db.txt', 'a') as f:f.write(parse_res)if __name__ == '__main__':urls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p = Pool(3)res_l = []for url in urls:res = p.apply_async(get_page, args=(url,), callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) # 拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了爬蟲實(shí)例: #!/usr/bin/env python # _*_ coding: utf-8 _*_ import re from urllib.request import urlopen from multiprocessing import Pooldef get_page(url, pattern):response = urlopen(url).read().decode('utf-8')return pattern, responsedef parse_page(info):pattern, page_content = infores = re.findall(pattern, page_content)for item in res:dic = {'index': item[0].strip(),'title': item[1].strip(),'actor': item[2].strip(),'time': item[3].strip(),}print(dic)if __name__ == '__main__':regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'pattern1 = re.compile(regex, re.S)url_dic = {'http://maoyan.com/board/7': pattern1,}p = Pool()res_l = []for url, pattern in url_dic.items():res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)res_l.append(res)for i in res_l:i.get()效果: {'index': '1', 'actor': '主演:泰爾·謝里丹,奧利維亞·庫克,本·門德爾森', 'time': '上映時間:2018-03-30', 'title': '頭號玩家'} {'index': '2', 'actor': '主演:道恩·強(qiáng)森,娜奧米·哈里斯,杰弗里·迪恩·摩根', 'time': '上映時間:2018-04-13', 'title': '狂暴巨獸'} {'index': '3', 'actor': '主演:帕拉巴斯,拉納·達(dá)格巴帝,安努舒卡·謝蒂', 'time': '上映時間:2018-05-04', 'title': '巴霍巴利王2:終結(jié)'} {'index': '4', 'actor': '主演:小羅伯特·唐尼,克里斯·海姆斯沃斯,馬克·魯法洛', 'time': '上映時間:2018-05-11', 'title': '復(fù)仇者聯(lián)盟3:無限戰(zhàn)爭'} {'index': '5', 'actor': '主演:奧古斯特·迪赫,史特凡·柯納斯克,薇姬·克里普斯', 'time': '上映時間:2018-05-05', 'title': '青年馬克思'} {'index': '6', 'actor': '主演:閆妮,鄒元清,吳若甫', 'time': '上映時間:2018-05-11', 'title': '我是你媽'} {'index': '7', 'actor': '主演:凱特·瑪拉,湯姆·費(fèi)爾頓,布萊德利·惠特福德', 'time': '上映時間:2018-05-11', 'title': '戰(zhàn)犬瑞克斯'} {'index': '8', 'actor': '主演:郭京飛,迪麗熱巴,大鵬', 'time': '上映時間:2018-04-20', 'title': '21克拉'} {'index': '9', 'actor': '主演:杰森·格里菲,勞里·海梅斯,迪·布拉雷·貝克爾', 'time': '上映時間:2018-04-05', 'title': '冰雪女王3:火與冰'} {'index': '10', 'actor': '主演:井柏然,周冬雨,田壯壯', 'time': '上映時間:2018-04-28', 'title': '后來的我們'}如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)。例: #!/usr/bin/env python # _*_ coding: utf-8 _*_from multiprocessing import Pool import timedef work(n):time.sleep(1)return n ** 2if __name__ == '__main__':p = Pool()res_l = []for i in range(10):res = p.apply_async(work, args=(i,))res_l.append(res)p.close()p.join() # 等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢nums = []for res in res_l:nums.append(res.get()) # 拿到所有結(jié)果print(nums) # 主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

end

轉(zhuǎn)載于:https://www.cnblogs.com/tielemao/p/9043008.html

總結(jié)

以上是生活随笔為你收集整理的铁乐学python_Day40_进程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。