日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

进程与multiprocessing模块

發(fā)布時(shí)間:2025/3/15 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 进程与multiprocessing模块 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一 進(jìn)程?

  進(jìn)程(Process)是計(jì)算機(jī)中的程序關(guān)于某數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,是操作系統(tǒng)結(jié)構(gòu)的基礎(chǔ)。在早期面向進(jìn)程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中,進(jìn)程是程序的基本執(zhí)行實(shí)體;在當(dāng)代面向線程設(shè)計(jì)的計(jì)算機(jī)結(jié)構(gòu)中,進(jìn)程是線程的容器。程序是指令、數(shù)據(jù)及其組織形式的描述,進(jìn)程是程序的實(shí)體。它是操作系統(tǒng)動(dòng)態(tài)執(zhí)行的基本單元,在傳統(tǒng)的操作系統(tǒng)中,進(jìn)程既是基本的分配單元,也是基本的執(zhí)行單元。 ?————百度百科

  PS ? os模塊的getpid方法就是獲取當(dāng)前進(jìn)程的進(jìn)程號(id)。

  多道技術(shù)產(chǎn)生的背景:針對單核,實(shí)現(xiàn)并發(fā)。

  多路復(fù)用分為時(shí)間上的復(fù)用和空間上的復(fù)用。

  空間上的復(fù)用:將內(nèi)存分為幾個(gè)部分,互不干擾。

  時(shí)間上的復(fù)用:

         1 遇到I/O阻塞時(shí)切換任務(wù)。

         2 任務(wù)執(zhí)行固定時(shí)間后主動(dòng)切換。

?

壹:Process類,創(chuàng)建子進(jìn)程

一 創(chuàng)建子進(jìn)程

  創(chuàng)建子進(jìn)程的方法一

import multiprocessing import time import os def foo():time.sleep(1)print('子進(jìn)程 %s 父進(jìn)程 %s' %(os.getpid(),os.getppid())) if __name__ == '__main__': #在windows下必須加上這一句代碼 p1=multiprocessing.Process(target=foo) p2=multiprocessing.Process(target=foo) p1.start() p2.start() p1.join() #主進(jìn)程等待子進(jìn)程完成,在執(zhí)行主進(jìn)程 p2.join() print('主進(jìn)程 %s 主進(jìn)程的父進(jìn)程是 %s,這是pycharm的進(jìn)程'%(os.getpid(),os.getppid()))

  輸出:

子進(jìn)程 1260 父進(jìn)程 12808 子進(jìn)程 2256 父進(jìn)程 12808 主進(jìn)程 12808 主進(jìn)程的父進(jìn)程是 8804,這是pycharm的進(jìn)程

  創(chuàng)建子進(jìn)程的方法二

import multiprocessing import os class Pro(multiprocessing.Process):def __init__(self,name):super().__init__()self.name=name def run(self): print('進(jìn)行姓名',self.name) print('子進(jìn)程 %s 父進(jìn)程 %s'%(os.getpid(),os.getppid())) if __name__ == '__main__': p=Pro('egon') p.start() print('主進(jìn)程 %s' %os.getpid())

  輸出:

主進(jìn)程 12632 進(jìn)行姓名 egon 子進(jìn)程 10300 父進(jìn)程 12632

  

  創(chuàng)建子進(jìn)程方法二的應(yīng)用

import multiprocessing import socket server = socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1', 8080)) server.listen(5) class Myprocess(multiprocessing.Process):def __init__(self,conn):super().__init__()self.conn = conndef run(self):while True:try:while True:date=self.conn.recv(1024)if date==b'q':breakself.conn.send(date.upper())except Exception:break if __name__ == '__main__':while True:conn,addr=server.accept()print('conn',conn,'addr',addr)s=Myprocess(conn)s.start() server.close()

  在服務(wù)端,用multiprocessing模塊開啟多個(gè)子進(jìn)程時(shí),格式是這樣的:

  重要!

import multiprocessing import socket class Myprocess(multiprocessing.Process):def __init__(self,conn):self.conn=conndef run(self):pass #核心代碼 if __name__ == '__main__':server=socket.socket() # server.bind(('127.0.0.1',8080)) # server.listen() #這三行代碼固定不動(dòng)while True: conn,addr=server.accept() #其實(shí)一個(gè)子進(jìn)程就是一個(gè)connp=Myprocess(conn) #服務(wù)端創(chuàng)建多個(gè)子進(jìn)程本應(yīng)該就是把conn當(dāng)做參數(shù)傳給Myprocessp.start() #生成p對象,p.start()子進(jìn)程開啟,conn有了一個(gè)屬于自己的子進(jìn)程。

?

?

?

二 Process類的其他方法

  1 ?join方法

  官方文檔的意思是:阻塞當(dāng)前進(jìn)程,直到調(diào)用join方法的那個(gè)進(jìn)程執(zhí)行完,再繼續(xù)執(zhí)行當(dāng)前進(jìn)程。

import multiprocessing class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進(jìn)程','----') if __name__ == '__main__':p1=Myprocess(1)p1.start()print('主進(jìn)程','====')

  輸出:

主進(jìn)程 ==== 子進(jìn)程 ----

  在加入p1.join()代碼之后,p1子進(jìn)程會(huì)先執(zhí)行完,在執(zhí)行主進(jìn)程。

import multiprocessing class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進(jìn)程','----') if __name__ == '__main__':p1=Myprocess(1)p1.start()p1.join()print('主進(jìn)程','====')

  輸出:

子進(jìn)程 ---- 主進(jìn)程 ====

  2 daemon() 守護(hù)進(jìn)程

  守護(hù)進(jìn)程(daemon)是一類在后臺運(yùn)行的特殊進(jìn)程,用于執(zhí)行特定的系統(tǒng)任務(wù)。很多守護(hù)進(jìn)程在系統(tǒng)引導(dǎo)的時(shí)候啟動(dòng),并且一直運(yùn)行直到系統(tǒng)關(guān)閉。

  守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行完畢后終止。

  守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,如果這樣做,會(huì)報(bào)錯(cuò)。

  使用方法  

    p1.daemon=True,放在start()方法之前。

import multiprocessing import time class Myprocess(multiprocessing.Process):def __init__(self,x):super().__init__()self.x=xdef run(self):print('子進(jìn)程{}'.format(self.x),'----')time.sleep(2)print('子進(jìn)程{}'.format(self.x),'=====') if __name__ == '__main__':p1=Myprocess(1)p2=Myprocess(2)p1.daemon=True #p1是守護(hù)進(jìn)程,主進(jìn)程代碼執(zhí)行完畢后,立馬結(jié)束。 p1.start()p2.start()time.sleep(1) #一秒鐘,足骨歐p1,p2開啟子進(jìn)程print('主進(jìn)程','====')

  輸出:

子進(jìn)程1 ---- 子進(jìn)程2 ---- 主進(jìn)程 ==== 子進(jìn)程2 ===== #因?yàn)閜1是守護(hù)進(jìn)程,主進(jìn)程代碼執(zhí)行完畢后,就立馬結(jié)束了。所以沒有打印‘子進(jìn)程1 ===’

?

貳: ?Lock類,創(chuàng)建互斥鎖。只能一次acquire,然后release才能使用。

   Rlock類,創(chuàng)建遞歸所。解決死鎖問題。遞歸所有個(gè)引用計(jì)數(shù),可以多次acquire,release。

  同步能夠保證多個(gè)線程安全訪問競爭資源,最簡單的同步機(jī)制是引入互斥鎖。互斥鎖為資源引入一個(gè)狀態(tài):鎖定/非鎖定。某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個(gè)進(jìn)行寫入操作,從而保證了多情況下數(shù)據(jù)的正確性。

  與join方法類似,作用是并發(fā)變?yōu)榇小?/p>

  應(yīng)用:

  搶票的過程,分為查票的余量和買票。查票的余量應(yīng)該是并發(fā),買票應(yīng)該是串行。

import json import time import random import multiprocessing def search():date=json.load(open('db.txt','r'))print('票數(shù):{}'.format(date['count'])) def get(i):date = json.load(open('db.txt', 'r'))if date['count']>0:date['count']-=1time.sleep(random.randint(1,3)) #模擬網(wǎng)絡(luò)延遲json.dump(date,open('db.txt','w'))print('{} 搶票成功!'.format(i))def rob_ticket(i,lock): #其實(shí)可以沒有g(shù)et,search函數(shù)。完全可以合并在一起。但是,分為兩個(gè)小函數(shù),邏輯非常清晰。加鎖也變得更加容易,不出錯(cuò)。search()lock.acquire() #加鎖 #with lock: 和文件操作一樣,也可以簡化。 get(i) # get(i)lock.release() #釋放鎖,解鎖if __name__ == '__main__':lock = multiprocessing.Lock()print('lock',lock) #主進(jìn)程創(chuàng)建了一個(gè)互斥鎖,作為參數(shù)傳給子進(jìn)程。for i in range(1,21): #創(chuàng)建20個(gè)子進(jìn)程p=multiprocessing.Process(target=rob_ticket,args=(i,lock))p.start()

  輸出:

lock <Lock(owner=None)> 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 票數(shù):2 1 搶票成功! 3 搶票成功!

  總結(jié):互斥鎖限定了在同一時(shí)刻只有一個(gè)進(jìn)程能夠?qū)蚕碣Y源進(jìn)行修改。弊端是涉及到文件的修改,文件在硬盤上,效率不可避免的會(huì)很低。

?    而且還需要自己進(jìn)行加鎖解鎖處理。所以,如果可以,盡量尋求更好的方法。IPC機(jī)制便是答案。

?

?

叁:進(jìn)程間通信(IPC,Inter-Process Communication)指至少兩個(gè)進(jìn)程或線程間傳送數(shù)據(jù)或信號的一些技術(shù)或方法。

python中提供了隊(duì)列(Queue)和管道(Pipe)兩種方法。

  1 隊(duì)列和管道都是將數(shù)據(jù)存放在內(nèi)存中,比起硬盤,速度會(huì)快很多。

  2 隊(duì)列是基于 管道+鎖 實(shí)現(xiàn)的,可以幫我們從加鎖的繁瑣代碼中解脫出來。推薦使用

?

肆 Queue類。隊(duì)列。

  Queue([maxsize]),?創(chuàng)建共享的進(jìn)程隊(duì)列。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。

  put方法 ?put_mowait()方法

  get方法 ?get_nowait()方法  

import multiprocessingq=multiprocessing.Queue(3) print(q,) q.put('1') q.put('2') q.put('3') print(q.get()) print(q.get()) print(q.get())

  輸出:

<multiprocessing.queues.Queue object at 0x000002132885A048> 1 2 3

?

伍 ?JoinableQueue類

  q=JoinableQueue()

  提供了Queue類兩個(gè)沒有的方法。

    join():阻塞,直到隊(duì)列q中沒有item。  

    task_done():必須跟在get()方法后面。

    

  from multiprocessing import JoinableQueueq = JoinableQueue()q.task_done() # Signal task completionq.join() # Wait for completion。
  
  

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks

from multiprocessing import Process,JoinableQueue import time,random def consumer(name,q):while True:time.sleep(random.randint(1,2))res=q.get()if res is None:breakprint('%s 吃了 %s'%(name,res))q.task_done() #一個(gè)get()跟著一個(gè)task_done() q.task_done()是放在消費(fèi)者模型這邊的。 def produce(name,q):for i in range(10):time.sleep(random.randint(1,2))res='包子%s'%iq.put(res)print('%s 生產(chǎn)了 %s '%(name,res))q.join() #如果注釋掉,最后顯示的時(shí)間是15秒左右,因?yàn)閜1,p2代碼執(zhí)行完后,不管隊(duì)列q中 if __name__ == '__main__': #有沒有item,p1,p2的完成代表著主進(jìn)程的完成,c又是守護(hù)進(jìn)程。c盡管沒有消費(fèi)完所有數(shù)據(jù),也會(huì)終結(jié)。 q.join()是放在生產(chǎn)者模型這邊的。start_time=time.time() #加上join,便是阻塞狀態(tài),知道q隊(duì)列中的item被c進(jìn)程全部完,這樣主進(jìn)程代碼執(zhí)行完畢。c作為守護(hù)進(jìn)程,也會(huì)隨之終結(jié)。用時(shí)大約30秒q=JoinableQueue()p1=Process(target=produce,args=('egon',q))p2=Process(target=produce,args=('wupeoqi',q))c=Process(target=consumer,args=('alex',q))c.daemon=Truec.start()p1.start()p2.start()p1.join()p2.join()print(time.time()-start_time)

?

陸 Manager() 

  Python中進(jìn)程間共享數(shù)據(jù),處理基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。?

Manager()返回的manager對象控制了一個(gè)server進(jìn)程,此進(jìn)程包含的python對象可以被其他的進(jìn)程通過proxies來訪問。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全。

  Manager支持的類型有l(wèi)ist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。?

  Manager()也是創(chuàng)建的內(nèi)存中的空間。

import time def foo(d,lock):with lock:temp=d['x']time.sleep(0.001)d['x']=temp-1from multiprocessing import Manager,Process,Lock if __name__ == '__main__':m=Manager()lock=Lock()d=m.dict({'x':10})l=[]for i in range(10):p=Process(target=foo,args=(d,lock))l.append(p)p.start()for p in l:p.join()print(d)

?

柒 Pool() ? ? ? ? !!!!!http://www.cnblogs.com/Tour/p/4564710.html ?!!!很好的博客地址

?  在使用Python進(jìn)行系統(tǒng)管理時(shí),特別是同時(shí)操作多個(gè)文件目錄或者遠(yuǎn)程控制多臺主機(jī),并行操作可以節(jié)約大量的時(shí)間。如果操作的對象數(shù)目不大時(shí),還可以直接使用Process類動(dòng)態(tài)的生成多個(gè)進(jìn)程,十幾個(gè)還好,但是如果上百個(gè)甚至更多,那手動(dòng)去限制進(jìn)程數(shù)量就顯得特別的繁瑣,此時(shí)進(jìn)程池就派上用場了。?
Pool類可以提供指定數(shù)量的進(jìn)程供用戶調(diào)用,當(dāng)有新的請求提交到Pool中時(shí),如果池還沒有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程來執(zhí)行請求。如果池滿,請求就會(huì)告知先等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行這些請求。?
下面介紹一下multiprocessing 模塊下的Pool類下的幾個(gè)方法。

  apply():

    該函數(shù)用于傳遞不定參數(shù),主進(jìn)程會(huì)被阻塞直到函數(shù)執(zhí)行結(jié)束,不建議使用。同步調(diào)用

  apply_async():

    apply_async(func[, args=()[, kwds={}[, callback=None]]])

    非阻塞且支持結(jié)果返回進(jìn)行回調(diào)

  

    首先來看apply_async方法,源碼如下:

def apply_async(self, func, args=(), kwds={}, callback=None):assert self._state == RUNresult = ApplyResult(self._cache, callback)self._taskqueue.put(([(result._job, None, func, args, kwds)], None))return result
func表示執(zhí)行此任務(wù)的方法
args、kwds分別表func的位置參數(shù)和關(guān)鍵字參數(shù)
callback表示一個(gè)單參數(shù)的方法,當(dāng)有結(jié)果返回時(shí),callback方法會(huì)被調(diào)用,參數(shù)即為任務(wù)執(zhí)行后的結(jié)果

    每調(diào)用一次apply_result方法,實(shí)際上就向_taskqueue中添加了一條任務(wù),注意這里采用了非阻塞(異步)的調(diào)用方式,即apply_async方法中新建的任務(wù)只是被添加到任務(wù)隊(duì)列中,還并未執(zhí)行,不需要等待,直接返回創(chuàng)建的ApplyResult對象,注意在創(chuàng)建ApplyResult對象時(shí),將它放入進(jìn)程池的緩存_cache中。

    任務(wù)隊(duì)列中有了新創(chuàng)建的任務(wù),那么根據(jù)上節(jié)分析的處理流程,進(jìn)程池的_task_handler線程,將任務(wù)從taskqueue中獲取出來,放入_inqueue中,觸發(fā)worker進(jìn)程根據(jù)args和kwds調(diào)用func,運(yùn)行結(jié)束后,將結(jié)果放入_outqueue,再由進(jìn)程池中的_handle_results線程,將運(yùn)行結(jié)果從_outqueue中取出,并找到_cache緩存中的ApplyResult對象,_set其運(yùn)行結(jié)果,等待調(diào)用端獲取。

  close():

    關(guān)閉進(jìn)程池,使其不再接收新的任務(wù)。

  join():

    主進(jìn)程阻塞等待子進(jìn)程的退出,join方法必須用在close()方法之后,兩者搭配使用。

  PS?回調(diào)函數(shù):

    回調(diào)函數(shù)就是一個(gè)通過函數(shù)指針調(diào)用的函數(shù)。如果你把函數(shù)的指針(地址)作為參數(shù)傳遞給另一個(gè)函數(shù),當(dāng)這個(gè)指針被用來調(diào)用其所指向的函數(shù)時(shí),我們就說這是回調(diào)函數(shù)。回調(diào)函數(shù)不是由該函數(shù)的實(shí)現(xiàn)方直接調(diào)用,而是在特定的事件或條件發(fā)生時(shí)由另外的一方調(diào)用的,用于對該事件或條件進(jìn)行響應(yīng)。 ? ? ? ? ? ? ? ?——百度百科

你到一個(gè)商店買東西,剛好你要的東西沒有貨,于是你在店員那里留下了你的電話,過了幾天店里有貨了,店員就打了你的電話,然后你接到電話后就到店里去取了貨。在這個(gè)例子里,你的電話號碼就叫回調(diào)函數(shù),你把電話留給店員就叫登記回調(diào)函數(shù),店里后來有貨了叫做觸發(fā)了回調(diào)關(guān)聯(lián)的事件,店員給你打電話叫做調(diào)用回調(diào)函數(shù),你到店里去取貨叫做響應(yīng)回調(diào)事件。 ?——知乎回答     

    callback函數(shù)是一個(gè)以參數(shù)形式傳遞給另一個(gè)函數(shù)的函數(shù),并且該函數(shù)(指callback函數(shù))必須等另一個(gè)函數(shù)執(zhí)行完才會(huì)被調(diào)用!(當(dāng)被調(diào)用時(shí),另一個(gè)函數(shù)就是callback函數(shù)的父函數(shù))。

    理解起來可能有點(diǎn)繞,通俗點(diǎn)的例子:

    函數(shù)a有一個(gè)參數(shù),這個(gè)參數(shù)是個(gè)函數(shù)b,當(dāng)函數(shù)a執(zhí)行完以后執(zhí)行函數(shù)b。那么這個(gè)過程就叫回調(diào)。

    這里必須強(qiáng)調(diào)的一點(diǎn):函數(shù)b是你以參數(shù)形式傳給函數(shù)a的,那么函數(shù)b被調(diào)用時(shí)就叫回調(diào)函數(shù)。


  PS 同步與異步   

    同步和異步關(guān)注的是消息通信機(jī)制 (synchronous communication/ asynchronous communication)
  所謂同步,就是在發(fā)出一個(gè)*調(diào)用*時(shí),在沒有得到結(jié)果之前,該*調(diào)用*就不返回。但是一旦調(diào)用返回,就得到返回值了。
  換句話說,就是由*調(diào)用者*主動(dòng)等待這個(gè)*調(diào)用*的結(jié)果。

    而異步則是相反,*調(diào)用*在發(fā)出之后,這個(gè)調(diào)用就直接返回了,所以沒有返回結(jié)果。換句話說,當(dāng)一個(gè)異步過程調(diào)用發(fā)出后,調(diào)用者不會(huì)立刻得到結(jié)果。而是在*調(diào)用*發(fā)出后,*被調(diào)用者*通過狀態(tài)、通知來通知調(diào)用者,或通過回調(diào)函數(shù)處理這個(gè)調(diào)用。

    同步I/O操作:導(dǎo)致請求進(jìn)程阻塞,直到I/O操作完成;

    異步I/O操作:不導(dǎo)致請求進(jìn)程阻塞。

from multiprocessing import Process,Pool import time,os def foo(n):print(n)time.sleep(5)print('%s is working '%os.getpid())return n**2if __name__ == '__main__':p=Pool(5)objs=[]for i in range(10):obj=p.apply_async(foo,args=(i,))objs.append(obj)p.close()p.join()print(objs)for obj in objs:print(obj.get())

  輸出:?

0 1 2 3 4 14344 is working 5 15624 is working 6 5312 is working 7 16000 is working 8 11868 is working 9 14344 is working 15624 is working 5312 is working 16000 is working 11868 is working [<multiprocessing.pool.ApplyResult object at 0x000001B9953AF860>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF908>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AF9B0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFA90>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFB70>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFC50>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFD30>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFE10>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFEF0>, <multiprocessing.pool.ApplyResult object at 0x000001B9953AFFD0>] 0 1 4 9 16 25 36 49 64 81

  注意:看obj是什么,用get()方法取其值。

?

  Pool類的異步以及回調(diào)函數(shù)。回調(diào)函數(shù)可以用在爬蟲上。

import requests,os from multiprocessing import Pool,Process def get(url):r=requests.get(url)print('進(jìn)程%s get %s'%(os.getpid(),url))return {'url':url,'text':len(r.text)} def search(dic):with open('db.txt','a')as f: # a 模式 也可以創(chuàng)建不存在的文件名date='url: %s lenth: %s\n'%(dic['url'],dic['text'])f.write(date) if __name__ == '__main__':p=Pool(3)l=[]url_l=['http://cn.bing.com/','http://www.cnblogs.com/wupeiqi/','http://www.cnblogs.com/654321cc/','https://www.cnblogs.com/','http://society.people.com.cn/n1/2017/1012/c1008-29581930.html','http://www.xilu.com/news/shaonianxinzangyou5gedong.html',]for url in url_l:obj=p.apply_async(get,(url,),callback=search) #在這里,apply_async,創(chuàng)建了進(jìn)程。search是回調(diào)函數(shù),有且唯一參數(shù)是get函數(shù)的返回值,l.append(obj) #obj一直是ApplyResult object p.close()p.join()print(l)for obj in l:print(obj.get()) #obj.get()一直是get()函數(shù)的返回值,不管有沒有回調(diào)函數(shù)。

  輸出:

進(jìn)程14044 get http://www.cnblogs.com/wupeiqi/ 進(jìn)程13000 get http://www.cnblogs.com/654321cc/ 進(jìn)程15244 get http://cn.bing.com/ 進(jìn)程15244 get http://www.xilu.com/news/shaonianxinzangyou5gedong.html 進(jìn)程14044 get https://www.cnblogs.com/ 進(jìn)程13000 get http://society.people.com.cn/n1/2017/1012/c1008-29581930.html [<multiprocessing.pool.ApplyResult object at 0x0000027D4C893BE0>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893C88>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893D30>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893DD8>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893E80>, <multiprocessing.pool.ApplyResult object at 0x0000027D4C893F60>] {'url': 'http://cn.bing.com/', 'text': 127210} {'url': 'http://www.cnblogs.com/wupeiqi/', 'text': 21292} {'url': 'http://www.cnblogs.com/654321cc/', 'text': 13268} {'url': 'https://www.cnblogs.com/', 'text': 40331} {'url': 'http://society.people.com.cn/n1/2017/1012/c1008-29581930.html', 'text': 23641} {'url': 'http://www.xilu.com/news/shaonianxinzangyou5gedong.html', 'text': 51247}

?

轉(zhuǎn)載于:https://www.cnblogs.com/654321cc/p/7650190.html

總結(jié)

以上是生活随笔為你收集整理的进程与multiprocessing模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。