進(jìn)程之間的數(shù)據(jù)共享 基于消息傳遞的并發(fā)編程是大勢所趨, 即便是使用線程,推薦做法也是將程序設(shè)計為大量獨(dú)立的線程集合,通過消息隊列交換數(shù)據(jù)。 這樣極大地減少了對使用鎖和其他同步手段的需求,還可以擴(kuò)展到分布式系統(tǒng)中。 但進(jìn)程間應(yīng)該盡量避免通信,即便需要通信,也應(yīng)該選擇進(jìn)程安全的工具來避免加鎖帶來的問題。 以后我們會嘗試使用數(shù)據(jù)庫來解決現(xiàn)在進(jìn)程之間的數(shù)據(jù)共享問題。
進(jìn)程間數(shù)據(jù)是獨(dú)立的,可以借助于隊列或管道實(shí)現(xiàn)通信,二者都是基于消息傳遞的。 雖然進(jìn)程間數(shù)據(jù)獨(dú)立,但可以通過Manager實(shí)現(xiàn)數(shù)據(jù)共享,事實(shí)上Manager的功能遠(yuǎn)不止于此。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager支持的類型也很多。
例:
from multiprocessing import Manager,Process,Lock
def work(d,lock):
# with lock 就相當(dāng)于一組 lock.acquire() lock.release()
# 上下文管理 :必須有一個開始動作 和 一個結(jié)束動作的時候with lock: #不加鎖而操作共享的數(shù)據(jù),肯定會出現(xiàn)數(shù)據(jù)錯亂d['count']-=1if __name__ == '__main__':lock=Lock()with Manager() as m:dic=m.dict({'count':100})p_l=[]for i in range(98):p=Process(target=work, args=(dic, lock))p_l.append(p)p.start()for p in p_l:p.join()print(dic)
進(jìn)程池和multiprocess.Pool模塊 進(jìn)程池 為什么要有進(jìn)程池?進(jìn)程池的概念。 在程序?qū)嶋H處理問題過程中,忙時會有成千上萬的任務(wù)需要被執(zhí)行,閑時可能只有零星任務(wù)。 那么在成千上萬個任務(wù)需要被執(zhí)行的時候,我們就需要去創(chuàng)建成千上萬個進(jìn)程么? 首先,創(chuàng)建進(jìn)程需要消耗時間,銷毀進(jìn)程也需要消耗時間。 第二即便開啟了成千上萬的進(jìn)程,操作系統(tǒng)也不能讓他們同時執(zhí)行,這樣反而會影響程序的效率。 因此我們不能無限制的根據(jù)任務(wù)開啟或者結(jié)束進(jìn)程。 那么我們要怎么做呢? 在這里,要給大家介紹一個進(jìn)程池的概念,定義一個池子,在里面放上固定數(shù)量的進(jìn)程, 有需求來了,就拿一個池中的進(jìn)程來處理任務(wù),等到處理完畢,進(jìn)程并不關(guān)閉, 而是將進(jìn)程再放回進(jìn)程池中繼續(xù)等待任務(wù)。 如果有很多任務(wù)需要執(zhí)行,池中的進(jìn)程數(shù)量不夠, 任務(wù)就要等待之前的進(jìn)程執(zhí)行任務(wù)完畢歸來,拿到空閑進(jìn)程才能繼續(xù)執(zhí)行。 也就是說,池中進(jìn)程的數(shù)量是固定的,那么同一時間最多有固定數(shù)量的進(jìn)程在運(yùn)行。 這樣不會增加操作系統(tǒng)的調(diào)度難度,還節(jié)省了開閉進(jìn)程的時間,也一定程度上能夠?qū)崿F(xiàn)并發(fā)效果。
multiprocess.Pool模塊 概念介紹 Pool([numprocess [,initializer [, initargs]]]):創(chuàng)建進(jìn)程池
參數(shù)介紹: 1 numprocess:要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()的值 2 initializer:是每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None 3 initargs:是要傳給initializer的參數(shù)組
主要方法:
p.apply(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。
'''需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。
如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()'''p.apply_async(func [, args [, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(*args,**kwargs),然后返回結(jié)果。
'''此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。'''p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。P.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用。方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。
實(shí)例具有以下方法
obj.get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。
timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)異常。
如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。
obj.ready():如果調(diào)用完成,返回True。
obj.successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常。
obj.wait([timeout]):等待結(jié)果變?yōu)榭捎谩?obj.terminate():立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。
如果p被垃圾回收,將自動調(diào)用此函數(shù)。例:進(jìn)程池的同步調(diào)用
import os,time
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(2)return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res_l.append(p.apply(work,args=(i,)))# 同步調(diào)用,直到本次任務(wù)執(zhí)行完畢拿到return,# 等待任務(wù)work執(zhí)行的過程中可能有阻塞也可能沒有阻塞# 但不管該任務(wù)是否存在阻塞,同步調(diào)用都會在原地等著print(res_l)972 run
8036 run
892 run
972 run
8036 run
892 run
972 run
8036 run
892 run
972 run
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]例:進(jìn)程池的異步調(diào)用
import os
import time
import random
from multiprocessing import Pooldef work(n):print('%s run' %os.getpid())time.sleep(random.random())return n**2if __name__ == '__main__':p=Pool(3) #進(jìn)程池中從無到有創(chuàng)建三個進(jìn)程,以后一直是這三個進(jìn)程在執(zhí)行任務(wù)res_l=[]for i in range(10):res = p.apply_async(work, args=(i,))# 異步運(yùn)行,根據(jù)進(jìn)程池中有的進(jìn)程數(shù),每次最多3個子進(jìn)程在異步執(zhí)行# 返回結(jié)果之后,將結(jié)果放入列表,歸還進(jìn)程,之后再執(zhí)行新的任務(wù)# 需要注意的是,進(jìn)程池中的三個進(jìn)程不會同時開啟或者同時結(jié)束# 而是執(zhí)行完一個就釋放一個進(jìn)程,這個進(jìn)程就去接收新的任務(wù)。res_l.append(res)# 異步apply_async用法:如果使用異步提交的任務(wù),主進(jìn)程需要使用jion,# 等待進(jìn)程池內(nèi)任務(wù)都處理完,然后可以用get收集結(jié)果# 否則,主進(jìn)程結(jié)束,進(jìn)程池可能還沒來得及執(zhí)行,也就跟著一起結(jié)束了p.close()p.join()for res in res_l:print(res.get())# 使用get來獲取apply_aync的結(jié)果,如果是apply,則沒有g(shù)et方法,# 因?yàn)閍pply是同步執(zhí)行,立刻獲取結(jié)果,也根本無需get。例:server:進(jìn)程池版socket并發(fā)聊天#!/usr/bin/env python
# _*_ coding: utf-8 _*_# Pool內(nèi)的進(jìn)程數(shù)默認(rèn)是cpu核數(shù),假設(shè)為4(查看方法os.cpu_count())
# 開啟6個客戶端,會發(fā)現(xiàn)2個客戶端處于等待狀態(tài)
# 在每個進(jìn)程內(nèi)查看pid,會發(fā)現(xiàn)pid使用為4個,即多個客戶端公用4個進(jìn)程
from socket import *
from multiprocessing import Pool
import osserver = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 9527))
server.listen(5)def talk(conn):print('進(jìn)程pid: %s' % os.getpid())while True:try:msg = conn.recv(1024)if not msg: breakconn.send(msg.upper())except Exception:breakif __name__ == '__main__':p = Pool(4)while True:conn, *_ = server.accept()p.apply_async(talk, args=(conn,))client端
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from socket import *client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 9527))while True:msg=input('>>: ').strip()if not msg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))效果:
進(jìn)程pid: 7980
進(jìn)程pid: 6252
進(jìn)程pid: 7156
進(jìn)程pid: 7564
進(jìn)程pid: 7980同時只能開啟4個進(jìn)程,開第五個進(jìn)程的時候會被阻塞,直到手動關(guān)掉前四個中的其中一個,第五個才能進(jìn)到池子里運(yùn)行。
并發(fā)開啟多個客戶端,服務(wù)端同一時間只有4個不同的pid,只能結(jié)束一個客戶端,另外一個客戶端才會進(jìn)來.回調(diào)函數(shù) callback=
需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。常用于爬蟲場景。例:使用多進(jìn)程請求多個url來減少網(wǎng)絡(luò)等待浪費(fèi)的時間
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from multiprocessing import Pool
import requests
import osdef get_page(url):print('<進(jìn)程%s> get %s' % (os.getpid(), url))respone = requests.get(url)if respone.status_code == 200:return {'url': url, 'text': respone.text}def pasrse_page(res):print('<進(jìn)程%s> parse %s' % (os.getpid(), res['url']))parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))with open('db.txt', 'a') as f:f.write(parse_res)if __name__ == '__main__':urls = ['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p = Pool(3)res_l = []for url in urls:res = p.apply_async(get_page, args=(url,), callback=pasrse_page)res_l.append(res)p.close()p.join()print([res.get() for res in res_l]) # 拿到的是get_page的結(jié)果,其實(shí)完全沒必要拿該結(jié)果,該結(jié)果已經(jīng)傳給回調(diào)函數(shù)處理了爬蟲實(shí)例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import re
from urllib.request import urlopen
from multiprocessing import Pooldef get_page(url, pattern):response = urlopen(url).read().decode('utf-8')return pattern, responsedef parse_page(info):pattern, page_content = infores = re.findall(pattern, page_content)for item in res:dic = {'index': item[0].strip(),'title': item[1].strip(),'actor': item[2].strip(),'time': item[3].strip(),}print(dic)if __name__ == '__main__':regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'pattern1 = re.compile(regex, re.S)url_dic = {'http://maoyan.com/board/7': pattern1,}p = Pool()res_l = []for url, pattern in url_dic.items():res = p.apply_async(get_page, args=(url, pattern), callback=parse_page)res_l.append(res)for i in res_l:i.get()效果:
{'index': '1', 'actor': '主演:泰爾·謝里丹,奧利維亞·庫克,本·門德爾森', 'time': '上映時間:2018-03-30', 'title': '頭號玩家'}
{'index': '2', 'actor': '主演:道恩·強(qiáng)森,娜奧米·哈里斯,杰弗里·迪恩·摩根', 'time': '上映時間:2018-04-13', 'title': '狂暴巨獸'}
{'index': '3', 'actor': '主演:帕拉巴斯,拉納·達(dá)格巴帝,安努舒卡·謝蒂', 'time': '上映時間:2018-05-04', 'title': '巴霍巴利王2:終結(jié)'}
{'index': '4', 'actor': '主演:小羅伯特·唐尼,克里斯·海姆斯沃斯,馬克·魯法洛', 'time': '上映時間:2018-05-11', 'title': '復(fù)仇者聯(lián)盟3:無限戰(zhàn)爭'}
{'index': '5', 'actor': '主演:奧古斯特·迪赫,史特凡·柯納斯克,薇姬·克里普斯', 'time': '上映時間:2018-05-05', 'title': '青年馬克思'}
{'index': '6', 'actor': '主演:閆妮,鄒元清,吳若甫', 'time': '上映時間:2018-05-11', 'title': '我是你媽'}
{'index': '7', 'actor': '主演:凱特·瑪拉,湯姆·費(fèi)爾頓,布萊德利·惠特福德', 'time': '上映時間:2018-05-11', 'title': '戰(zhàn)犬瑞克斯'}
{'index': '8', 'actor': '主演:郭京飛,迪麗熱巴,大鵬', 'time': '上映時間:2018-04-20', 'title': '21克拉'}
{'index': '9', 'actor': '主演:杰森·格里菲,勞里·海梅斯,迪·布拉雷·貝克爾', 'time': '上映時間:2018-04-05', 'title': '冰雪女王3:火與冰'}
{'index': '10', 'actor': '主演:井柏然,周冬雨,田壯壯', 'time': '上映時間:2018-04-28', 'title': '后來的我們'}如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)。例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_from multiprocessing import Pool
import timedef work(n):time.sleep(1)return n ** 2if __name__ == '__main__':p = Pool()res_l = []for i in range(10):res = p.apply_async(work, args=(i,))res_l.append(res)p.close()p.join() # 等待進(jìn)程池中所有進(jìn)程執(zhí)行完畢nums = []for res in res_l:nums.append(res.get()) # 拿到所有結(jié)果print(nums) # 主進(jìn)程拿到所有的處理結(jié)果,可以在主進(jìn)程中進(jìn)行統(tǒng)一進(jìn)行處理[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
end
轉(zhuǎn)載于:https://www.cnblogs.com/tielemao/p/9043008.html
總結(jié)
以上是生活随笔 為你收集整理的铁乐学python_Day40_进程池 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。