python分布式进程(windows下)
分布式進(jìn)程:
在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺機(jī)器上,而Thread最多只能分布到同一臺機(jī)器的多個(gè)CPU上。
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。由于managers模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫分布式多進(jìn)程程序。
舉個(gè)例子:如果我們已經(jīng)有一個(gè)通過Queue通信的多進(jìn)程程序在同一臺機(jī)器上運(yùn)行,現(xiàn)在,由于處理任務(wù)的進(jìn)程任務(wù)繁重,希望把發(fā)送任務(wù)的進(jìn)程和處理任務(wù)的進(jìn)程分布到兩臺機(jī)器上。怎么用分布式進(jìn)程實(shí)現(xiàn)?
原有的Queue可以繼續(xù)使用,但是,通過managers模塊把Queue通過網(wǎng)絡(luò)暴露出去,就可以讓其他機(jī)器的進(jìn)程訪問Queue了。
我們先看服務(wù)進(jìn)程,服務(wù)進(jìn)程負(fù)責(zé)啟動Queue,把Queue注冊到網(wǎng)絡(luò)上,然后往Queue里面寫入任務(wù):
下面的代碼是在windows下運(yùn)行的,所以出現(xiàn)了各種問題:
# coding=utf-8 import random, time, Queue from multiprocessing.managers import BaseManager# 發(fā)送任務(wù)的隊(duì)列: task_queue =Queue.Queue() # 接收結(jié)果的隊(duì)列: result_queue = Queue.Queue()# 從BaseManager繼承的QueueManager: class QueueManager(BaseManager):pass# 把兩個(gè)Queue都注冊到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設(shè)置驗(yàn)證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網(wǎng)絡(luò)訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個(gè)任務(wù)進(jìn)去: for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n) # 從result隊(duì)列讀取結(jié)果: print('Try get results...') for i in range(10):r = result.get(timeout=10)print('Result: %s' % r) # 關(guān)閉: manager.shutdown() print('master exit.')在windows命令行終端的運(yùn)行結(jié)果:
由錯誤信息改代碼:
# coding=utf-8import random,time, Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_supporttask_queue = Queue.Queue() # 發(fā)送任務(wù)的隊(duì)列: result_queue = Queue.Queue() # 接收結(jié)果的隊(duì)列: class QueueManager(BaseManager): # 從BaseManager繼承的QueueManager:pass # windows下運(yùn)行 def return_task_queue():global task_queuereturn task_queue # 返回發(fā)送任務(wù)隊(duì)列 def return_result_queue ():global result_queuereturn result_queue # 返回接收結(jié)果隊(duì)列def test():# 把兩個(gè)Queue都注冊到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對象,它們用來進(jìn)行進(jìn)程間通信,交換對象#QueueManager.register('get_task_queue', callable=lambda: task_queue)#QueueManager.register('get_result_queue', callable=lambda: result_queue)QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue)# 綁定端口5000, 設(shè)置驗(yàn)證碼'abc':#manager = QueueManager(address=('', 5000), authkey=b'abc')# windows需要寫ip地址manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')manager.start() # 啟動Queue: # 獲得通過網(wǎng)絡(luò)訪問的Queue對象:task = manager.get_task_queue() result = manager.get_result_queue()for i in range(10): # 放幾個(gè)任務(wù)進(jìn)去:n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)# 從result隊(duì)列讀取結(jié)果:print('Try get results...') for i in range(10):# 這里加了異常捕獲try:r = result.get(timeout=5)print('Result: %s' % r)except Queue.Empty:print('result queue is empty.')# 關(guān)閉: manager.shutdown() print('master exit.') if __name__=='__main__':freeze_support()print('start!')test()運(yùn)行結(jié)果:
對比上段代碼改變的地方有:
# 把兩個(gè)Queue都注冊到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對象 QueueManager.register('get_task_queue',callable=return_task_queue) QueueManager.register('get_result_queue',callable=return_result_queue)其中task_queue和result_queue是兩個(gè)隊(duì)列,分別存放任務(wù)和結(jié)果。它們用來進(jìn)行進(jìn)程間通信,交換對象。
官網(wǎng)上有如下例子。
# coding=utf-8 from multiprocessing import Process, Queue def f(queue):queue.put([42, None, 'hello'])if __name__ == '__main__': q = Queue() # 創(chuàng)建隊(duì)列qp = Process(target=f, args=(q,)) # 創(chuàng)建一個(gè)進(jìn)程p.start()print(q.get()) # 打印列表[42, None, 'hello']p.join()其中列表[42, None, ‘hello’]從新建p進(jìn)程傳到了主進(jìn)程中。
因?yàn)槭欠植际降沫h(huán)境,放入queue中的數(shù)據(jù)需要等待Workers機(jī)器運(yùn)算處理后再進(jìn)行讀取,這樣就需要對queue用QueueManager進(jìn)行封裝放到網(wǎng)絡(luò)中。這是通過下面這句
QueueManager.register('get_task_queue',callable=return_task_queue)實(shí)現(xiàn)的,我們給return_task_queue的網(wǎng)絡(luò)調(diào)用接口取了一個(gè)名get_task_queue,而return_result_queue的名字是get_result_queue,方便區(qū)分對哪個(gè)queue進(jìn)行操作。
task.put(n)即是對task_queue進(jìn)行寫入數(shù)據(jù),相當(dāng)于分配任務(wù)。而result.get()即是等待workers處理后返回的結(jié)果
# windows需要寫ip地址manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')這點(diǎn)不同于linux操作系統(tǒng),必須寫ip地址
if __name__=='__main__':freeze_support()print('start!')test()windows必須有 if name==’main‘: 這點(diǎn)從報(bào)錯的信息可以看出
中間加入了捕獲異常,使代碼運(yùn)行完整,運(yùn)行結(jié)果更容易看懂,在運(yùn)行的時(shí)候最好用cmd終端。
下面是Worker的代碼:
# coding=utf-8 import time, sys,Queue from multiprocessing.managers import BaseManager# 創(chuàng)建類似的QueueManager: class QueueManager(BaseManager):pass# 由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue,所以注冊時(shí)只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')# 連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗(yàn)證碼注意保持與task_master.py設(shè)置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 從網(wǎng)絡(luò)連接: try:m.connect() except:print('請先啟動task_master.py!')#sys.exit("sorry, goodbye!"); # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊(duì)列取任務(wù),并把結(jié)果寫入result隊(duì)列: for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print('task queue is empty.') # 處理結(jié)束: print('worker exit.')這個(gè)簡單的Master/Worker模型有什么用?其實(shí)這就是一個(gè)簡單但真正的分布式計(jì)算,把代碼稍加改造,啟動多個(gè)worker,就可以把任務(wù)分布到幾臺甚至幾十臺機(jī)器上,比如把計(jì)算n*n的代碼換成發(fā)送郵件,就實(shí)現(xiàn)了郵件隊(duì)列的異步發(fā)送。
Queue對象存儲在哪?注意到task_worker.py中根本沒有創(chuàng)建Queue的代碼,所以,Queue對象存儲在task_master.py進(jìn)程中:
task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。對比上面的例子,可以看出Queue對象從另一個(gè)進(jìn)程通過網(wǎng)絡(luò)傳遞了過來。只不過這里的傳遞和網(wǎng)絡(luò)通信由QueueManager完成。
運(yùn)行結(jié)果:
運(yùn)行task_master.py
運(yùn)行task_worker.py
此運(yùn)行是在同一臺電腦上
參考:
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
總結(jié)
以上是生活随笔為你收集整理的python分布式进程(windows下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的ThreadLocal变
- 下一篇: python中list,tuple,st