日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python 多进程 调用模块内函数_Python进程池multiprocessing.Pool的用法

發(fā)布時間:2024/9/19 python 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 多进程 调用模块内函数_Python进程池multiprocessing.Pool的用法 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、multiprocessing模塊

multiprocessing模塊提供了一個Process類來代表一個進程對象,multiprocessing模塊像線程一樣管理進程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多

看一下Process類的構(gòu)造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數(shù)說明:

group:進程所屬組(基本不用)

target:表示調(diào)用對象

args:表示調(diào)用對象的位置參數(shù)元組

name:別名

kwargs:表示調(diào)用對象的字典

示例:

importmultiprocessingdef do(n): #參數(shù)n由args=(1,)傳入

name = multiprocessing.current_process().name #獲取當(dāng)前進程的名字

print(name, 'starting')print("worker", n)return

if __name__ == '__main__':

numList=[]for i in range(5):

p= multiprocessing.Process(target=do, args=(i,)) #(i,)中加入","表示元祖

numList.append(p)print(numList)

p.start()#用start()方法啟動進程,執(zhí)行do()方法

p.join() #等待子進程結(jié)束以后再繼續(xù)往下運行,通常用于進程間的同步

print("Process end.")

運行結(jié)果:

[]

Process-1starting

worker 0

Process end.

[, ]

Process-2starting

worker1Process end.

[, , ]

Process-3starting

worker2Process end.

[, , , ]

Process-4starting

worker3Process end.

[, , , , ]

Process-5starting

worker4Process end.

通過打印numList可以看出當(dāng)前進程結(jié)束后,再開始下一個進程

注意:

在Windows上要想使用進程模塊,就必須把有關(guān)進程的代碼寫在當(dāng)前.py文件的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要

二、Pool類

Pool類可以提供指定數(shù)量的進程供用戶調(diào)用,當(dāng)有新的請求提交到Pool中時,如果池還沒有滿,就會創(chuàng)建一個新的進程來執(zhí)行請求。如果池滿,請求就會告知先等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行這些請求

下面介紹一下multiprocessing 模塊下的Pool類下的幾個方法:

1.apply()

函數(shù)原型:apply(func[, args=()[, kwds={}]])

該函數(shù)用于傳遞不定參數(shù),同python中的apply函數(shù)一致,主進程會被阻塞直到函數(shù)執(zhí)行結(jié)束(不建議使用,并且3.x以后不再出現(xiàn))

2.apply_async

函數(shù)原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一致,但它是非阻塞的且支持結(jié)果返回后進行回調(diào)

3.map()

函數(shù)原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會使進程阻塞直到結(jié)果返回

注意:雖然第二個參數(shù)是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程

4.map_async()

函數(shù)原型:map_async(func, iterable[, chunksize[, callback]])

與map用法一致,但是它是非阻塞的

5.close()

關(guān)閉進程池(pool),使其不再接受新的任務(wù)

6.terminal()

結(jié)束工作進程,不再處理未處理的任務(wù)

7.join()

主進程阻塞等待子進程的退出, join方法要在close或terminate之后使用

示例1--使用map()函數(shù)

importtimefrom multiprocessing importPooldefrun(fn):#fn: 函數(shù)參數(shù)是數(shù)據(jù)列表的一個元素

time.sleep(1)print(fn *fn)if __name__ == "__main__":

testFL= [1, 2, 3, 4, 5, 6]print('shunxu:') #順序執(zhí)行(也就是串行執(zhí)行,單進程)

s =time.time()for fn intestFL:

run(fn)

t1=time.time()print("順序執(zhí)行時間:", int(t1 -s))print('concurrent:') #創(chuàng)建多個進程,并行執(zhí)行

pool = Pool(3) #創(chuàng)建擁有3個進程數(shù)量的進程池

#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)

pool.map(run, testFL)

pool.close()#關(guān)閉進程池,不再接受新的進程

pool.join() #主進程阻塞等待子進程的退出

t2 =time.time()print("并行執(zhí)行時間:", int(t2 - t1))

運行結(jié)果:

1、map函數(shù)中testFL為可迭代對象--列表

2、當(dāng)創(chuàng)建3個進程時,會一次打印出3個結(jié)果“1,4,9”,當(dāng)當(dāng)創(chuàng)建2個進程時,會一次打印出2個結(jié)果“1,4”,以此類推,當(dāng)創(chuàng)建多余6個進程時,會一次打印出所有結(jié)果

3、如果使用Pool(),不傳入?yún)?shù),可以創(chuàng)建一個動態(tài)控制大小的進程池

從結(jié)果可以看出,并發(fā)執(zhí)行的時間明顯比順序執(zhí)行要快很多,但是進程是要耗資源的,所以平時工作中,進程數(shù)也不能開太大。 對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),讓其不再接受新的Process了

示例2--使用map()_async函數(shù)

print('concurrent:') #創(chuàng)建多個進程,并行執(zhí)行

pool = Pool(3) #創(chuàng)建擁有3個進程數(shù)量的進程池

#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)

pool.map_async(run, testFL)

pool.close() #關(guān)閉進程池,不再接受新的進程

pool.join() #主進程阻塞等待子進程的退出

t2 =time.time()print("并行執(zhí)行時間:", int(t2 - t1))

運行結(jié)果:

從結(jié)果可以看出,map_async()和map()用時相同。目前還沒有看出兩者的區(qū)別,后面知道后再完善

示例3--使用apply()函數(shù)

print('concurrent:') #創(chuàng)建多個進程,并行執(zhí)行

pool = Pool(3) #創(chuàng)建擁有3個進程數(shù)量的進程池

#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)

for fn intestFL:

pool.apply(run, (fn,))

pool.close()#關(guān)閉進程池,不再接受新的進程

pool.join() #主進程阻塞等待子進程的退出

t2 =time.time()print("并行執(zhí)行時間:", int(t2 - t1))

運行結(jié)果:

可見,使用apply()方法,并行執(zhí)行和順序執(zhí)行用時相同,經(jīng)過試驗,進程數(shù)目增大也不會減少并行執(zhí)行的時間

原因:以阻塞的形式產(chǎn)生進程任務(wù),生成1個任務(wù)進程并等它執(zhí)行完出池,第2個進程才會進池,主進程一直阻塞等待,每次只執(zhí)行1個進程任務(wù)

示例4--使用apply_async()函數(shù)

print('concurrent:') #創(chuàng)建多個進程,并行執(zhí)行

pool = Pool(3) #創(chuàng)建擁有3個進程數(shù)量的進程池

#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)

for fn intestFL:pool.apply_async(run, (fn,))

pool.close()#關(guān)閉進程池,不再接受新的進程

pool.join() #主進程阻塞等待子進程的退出

t2 =time.time()print("并行執(zhí)行時間:", int(t2 - t1))

運行結(jié)果:

可見,使用apply_async()方法,并行執(zhí)行時間與使用map()、map_async()方法相同

注意:

map_async()和map()方法,第2個參數(shù)可以是列表也可以是元祖,如下圖:

而使用apply()和apply_async()方法時,第2個參數(shù)只能傳入元祖,傳入列表進程不會被執(zhí)行,如下圖:

三、apply_async()方法callback參數(shù)的用法

示例:

from multiprocessing importPoolimporttimedeffun_01(i):

time.sleep(2)print('start_time:', time.ctime())return i + 100

deffun_02(arg):print('end_time:', arg, time.ctime())if __name__ == '__main__':

pool= Pool(3)for i in range(4):

pool.apply_async(func=fun_01, args=(i,), callback=fun_02) #fun_02的入?yún)閒un_01的返回值

#pool.apply_async(func=fun_01, args=(i,))

pool.close()

pool.join()print('done')

運行結(jié)果:

start_time: Thu Nov 14 16:31:41 2019end_time:100 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:41 2019end_time:101 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:41 2019end_time:102 Thu Nov 14 16:31:41 2019start_time: Thu Nov14 16:31:43 2019end_time:103 Thu Nov 14 16:31:43 2019done

map_async()方法callback參數(shù)的用法與apply_async()相同

四、使用進程池并關(guān)注結(jié)果

importmultiprocessingimporttimedeffunc(msg):print('hello :', msg, time.ctime())

time.sleep(2)print('end', time.ctime())return 'done' +msgif __name__ == '__main__':

pool= multiprocessing.Pool(2)

result=[]for i in range(3):

msg= 'hello %s' %i

result.append(pool.apply_async(func=func, args=(msg,)))

pool.close()

pool.join()for res inresult:print('***:', res.get()) #get()函數(shù)得出每個返回結(jié)果的值

print('All end--')

運行結(jié)果:

五、多進程執(zhí)行多個函數(shù)

使用apply_async()或者apply()方法,可以實現(xiàn)多進程執(zhí)行多個方法

示例:

importmultiprocessingimporttimeimportosdefLee():print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())

start=time.time()

time.sleep(5)

end=time.time()print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defMarlon():print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())

start=time.time()

time.sleep(10)

end=time.time()print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defAllen():print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())

start=time.time()

time.sleep(15)

end=time.time()print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())defFrank():print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())

start=time.time()

time.sleep(20)

end=time.time()print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())if __name__ == '__main__':

func_list=[Lee, Marlon, Allen, Frank]print('parent process id %s' %os.getpid())

pool= multiprocessing.Pool(4)for func infunc_list:

pool.apply_async(func)print('Waiting for all subprocesses done...')

pool.close()

pool.join()print('All subprocesses done.')

運行結(jié)果:

parent process id 84172Waitingforall subprocesses done...

Run task Lee--84868******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Marlon-84252******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Allen-85344******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Run task Frank-85116******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019Task Lee,runs5.00 seconds. ~~~~ Thu Nov 14 17:44:19 2019Task Marlon runs10.00 seconds. ~~~~ Thu Nov 14 17:44:24 2019Task Allen runs15.00 seconds. ~~~~ Thu Nov 14 17:44:29 2019Task Frank runs20.00 seconds. ~~~~ Thu Nov 14 17:44:34 2019All subprocesses done.

六、其他

1、獲取當(dāng)前計算機的CPU數(shù)量

總結(jié)

以上是生活随笔為你收集整理的python 多进程 调用模块内函数_Python进程池multiprocessing.Pool的用法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。