日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python并发处理list数据_python并发编程之多进程2--------数据共享及进程池和回调函数...

發布時間:2025/3/19 python 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python并发处理list数据_python并发编程之多进程2--------数据共享及进程池和回调函数... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、數據共享

1.進程間的通信應該盡量避免共享數據的方式

2.進程間的數據是獨立的,可以借助隊列或管道實現通信,二者都是基于消息傳遞的。

雖然進程間數據獨立,但可以用過Manager實現數據共享,事實上Manager的功能遠不止于此。

1 命令就是一個程序,按回車就會執行(這個只是在windows情況下)

2 tasklist 查看進程

3 tasklist | findstr pycharm #(findstr是進行過濾的),|就是管道(tasklist執行的內容就放到管道里面了,

4 管道后面的findstr pycharm就接收了)

3.(IPC)進程之間的通信有兩種實現方式:管道和隊列

1 from multiprocessing import Manager,Process,Lock

2 def work(dic,mutex):

3 # mutex.acquire()

4 # dic['count']-=1

5 # mutex.release()

6 # 也可以這樣加鎖

7 with mutex:

8 dic['count'] -= 1

9 if __name__ == '__main__':

10 mutex = Lock()

11 m = Manager() #實現共享,由于字典是共享的字典,所以得加個鎖

12 share_dic = m.dict({'count':100})

13 p_l = []

14 for i in range(100):

15 p = Process(target=work,args=(share_dic,mutex))

16 p_l.append(p) #先添加進去

17 p.start()

18 for i in p_l:

19 i.join()

20 print(share_dic)

21 # 共享就意味著會有競爭,

22

23數據共享 View Code

二、進程池

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。多進程是實現并發的手段之一,需要注意的問題是:很明顯需要并發執行的任務通常要遠大于核數

一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程

進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到并行)

例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

那么什么是進程池呢?進程池就是控制進程數目

1 ps:對于遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

進程池的結構:

創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然后自始至終使用這三個進程去執行所有任務,不會開啟其他進程

1.創建進程池

1 Pool([numprocess [,initializer [, initargs]]]):創建進程池

2.參數介紹

1 numprocess:要創建的進程數,如果省略,將默認為cpu_count()的值,可os.cpu_count()查看

2 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None

3 initargs:是要傳給initializer的參數組

3.方法介紹

1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行

2 func(*args,**kwargs),然后返回結果。

3 需要強調的是:此操作并不會在所有池工作進程中并執行func函數。

4 如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()

5 函數或者使用p.apply_async()

6

7

8 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然后返回結果。此方法的結果是AsyncResult類的實例,

9 callback是可調用對象,接收輸入參數。當func的結果變為可用時,

10 將理解傳遞給callback。callback禁止執行任何阻塞操作,

11 否則將接收其他異步操作中的結果。

12

13 p.close():關閉進程池,防止進一步操作。禁止往進程池內在添加任務(需要注意的是一定要寫在close()的上方)

14

1 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之后調用

應用1:

1 from multiprocessing import Pool

2 import os,time

3 def task(n):

4 print('[%s] is running'%os.getpid())

5 time.sleep(2)

6 print('[%s] is done'%os.getpid())

7 return n**2

8 if __name__ == '__main__':

9 # print(os.cpu_count()) #查看cpu個數

10 p = Pool(4) #最大四個進程

11 for i in range(1,7):#開7個任務

12 res = p.apply(task,args=(i,)) #同步的,等著一個運行完才執行另一個

13 print('本次任務的結束:%s'%res)

14 p.close()#禁止往進程池內在添加任務

15 p.join() #在等進程池

16 print('主')apply同步進程池(阻塞)(串行) View Code

1 # ----------------

2 # 那么我們為什么要用進程池呢?這是因為進程池使用來控制進程數目的,

3 # 我們需要幾個就開幾個進程。如果不用進程池實現并發的話,會開很多的進程

4 # 如果你開的進程特別多,那么你的機器就會很卡,所以我們把進程控制好,用幾個就

5 # 開幾個,也不會太占用內存

6 from multiprocessing import Pool

7 import os,time

8 def walk(n):

9 print('task[%s] running...'%os.getpid())

10 time.sleep(3)

11 return n**2

12 if __name__ == '__main__':

13 p = Pool(4)

14 res_obj_l = []

15 for i in range(10):

16 res = p.apply_async(walk,args=(i,))

17 # print(res) #打印出來的是對象

18 res_obj_l.append(res) #那么現在拿到的是一個列表,怎么得到值呢?我們用個.get方法

19 p.close() #禁止往進程池里添加任務

20 p.join()

21 # print(res_obj_l)

22 print([obj.get() for obj in res_obj_l]) #這樣就得到了

23apply_async異步進程池(非阻塞)(并行) View Code

那么什么是同步,什么是異步呢?

同步就是指一個進程在執行某個請求的時候,若該請求需要一段時間才能返回信息,那么這個進程將會一直等待下去,直到收到返回信息才繼續執行下去

異步是指進程不需要一直等下去,而是繼續執行下面的操作,不管其他進程的狀態。當有消息返回時系統會通知進程進行處理,這樣可以提高執行的效率。

什么是串行,什么是并行呢?

舉例:能并排開幾輛車的就可以說是“并行”,只能一輛一輛開的就屬于“串行”了。很明顯,并行的速度要比串行的快得多。(并行互不影響,串行的等著一個完了才能接著另一個)

應用2:

使用進程池維護固定數目的進程(以前客戶端和服務端的改進

1 from socket import *

2 from multiprocessing import Pool

3 s = socket(AF_INET,SOCK_STREAM)

4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用

5 s.bind(('127.0.0.1',8081))

6 s.listen(5)

7 print('start running...')

8 def talk(coon,addr):

9 while True: # 通信循環

10 try:

11 cmd = coon.recv(1024)

12 print(cmd.decode('utf-8'))

13 if not cmd: break

14 coon.send(cmd.upper())

15 print('發送的是%s'%cmd.upper().decode('utf-8'))

16 except Exception:

17 break

18 coon.close()

19 if __name__ == '__main__':

20 p = Pool(4)

21 while True:#鏈接循環

22 coon,addr = s.accept()

23 print(coon,addr)

24 p.apply_async(talk,args=(coon,addr))

25 s.close()

26 #因為是循環,所以就不用p.join了

27

28服務端 View Code

1 from socket import *

2 c = socket(AF_INET,SOCK_STREAM)

3 c.connect(('127.0.0.1',8081))

4 while True:

5 cmd = input('>>:').strip()

6 if not cmd:continue

7 c.send(cmd.encode('utf-8'))

8 data = c.recv(1024)

9 print('接受的是%s'%data.decode('utf-8'))

10 c.close()

11客戶端 View Code

三、回調函數

1 回調函數什么時候用?(回調函數在爬蟲中最常用)

2 造數據的非常耗時

3 處理數據的時候不耗時

4

5 你下載的地址如果完成了,就自動提醒讓主進程解析

6 誰要是好了就通知解析函數去解析(回調函數的強大之處)

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

1 from multiprocessing import Pool

2 import requests

3 import os

4 import time

5 def get_page(url):

6 print('<%s> is getting [%s]' %(os.getpid(),url))

7 response = requests.get(url) #得到地址

8 time.sleep(2)

9 print('<%s> is done [%s]'%(os.getpid(),url))

10 return {'url':url,'text':response.text}

11 def parse_page(res):

12 '''解析函數'''

13 print('<%s> parse [%s]'%(os.getpid(),res['url']))

14 with open('db.txt','a') as f:

15 parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text']))

16 f.write(parse_res)

17 if __name__ == '__main__':

18 p = Pool(4)

19 urls = [

20 'https://www.baidu.com',

21 'http://www.openstack.org',

22 'https://www.python.org',

23 'https://help.github.com/',

24 'http://www.sina.com.cn/'

25 ]

26 for url in urls:

27 obj = p.apply_async(get_page,args=(url,),callback=parse_page)

28 p.close()

29 p.join()

30 print('主',os.getpid()) #都不用.get()方法了

31

32回調函數(下載網頁的小例子) View Code

如果在主進程中等待進程池中所有任務都執行完畢后,再統一處理結果,則無需回調函數

1 from multiprocessing import Pool

2 import requests

3 import os

4 def get_page(url):

5 print('<%os> get [%s]' %(os.getpid(),url))

6 response = requests.get(url) #得到地址 response響應

7 return {'url':url,'text':response.text}

8 if __name__ == '__main__':

9 p = Pool(4)

10 urls = [

11 'https://www.baidu.com',

12 'http://www.openstack.org',

13 'https://www.python.org',

14 'https://help.github.com/',

15 'http://www.sina.com.cn/'

16 ]

17 obj_l= []

18 for url in urls:

19 obj = p.apply_async(get_page,args=(url,))

20 obj_l.append(obj)

21 p.close()

22 p.join()

23 print([obj.get() for obj in obj_l])

24

25下載網頁小例子(無需回調函數) View Code

歸類:網絡編程socket

總結

以上是生活随笔為你收集整理的python并发处理list数据_python并发编程之多进程2--------数据共享及进程池和回调函数...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。