日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

python 管道队列_关于python:Multiprocessing-管道与队列

發(fā)布時(shí)間:2023/12/20 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 管道队列_关于python:Multiprocessing-管道与队列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Python的多處理程序包中的隊(duì)列和管道之間的根本區(qū)別是什么?

在什么情況下應(yīng)該選擇一種? 什么時(shí)候使用Pipe()有優(yōu)勢(shì)? 什么時(shí)候使用Queue()有優(yōu)勢(shì)?

Pipe()只能有兩個(gè)端點(diǎn)。

Queue()可以有多個(gè)生產(chǎn)者和消費(fèi)者。

何時(shí)使用它們

如果需要兩個(gè)以上的點(diǎn)進(jìn)行通信,請(qǐng)使用Queue()。

如果您需要絕對(duì)性能,則Pipe()會(huì)更快,因?yàn)镼ueue()是建立在Pipe()之上的。

績效基準(zhǔn)

假設(shè)您要生成兩個(gè)進(jìn)程并在它們之間盡快發(fā)送消息。這些是使用Pipe()和Queue()進(jìn)行的類似測(cè)試之間的拖動(dòng)競(jìng)賽的計(jì)時(shí)結(jié)果。這是在運(yùn)行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上進(jìn)行的。

僅供參考,我將JoinableQueue()的結(jié)果作為獎(jiǎng)勵(lì); JoinableQueue()在調(diào)用queue.task_done()時(shí)負(fù)責(zé)任務(wù)(它甚至不知道特定任務(wù),它只計(jì)算隊(duì)列中未完成的任務(wù)),因此queue.join()知道工作已完成。

此答案底部的每個(gè)代碼...

mpenning@mpenning-T61:~$ python multi_pipe.py

Sending 10000 numbers to Pipe() took 0.0369849205017 seconds

Sending 100000 numbers to Pipe() took 0.328398942947 seconds

Sending 1000000 numbers to Pipe() took 3.17266988754 seconds

mpenning@mpenning-T61:~$ python multi_queue.py

Sending 10000 numbers to Queue() took 0.105256080627 seconds

Sending 100000 numbers to Queue() took 0.980564117432 seconds

Sending 1000000 numbers to Queue() took 10.1611330509 seconds

mpnening@mpenning-T61:~$ python multi_joinablequeue.py

Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds

Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds

Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds

mpenning@mpenning-T61:~$

總結(jié)Pipe()大約是Queue()的三倍。除非您確實(shí)必須擁有這些好處,否則甚至不要考慮JoinableQueue()。

獎(jiǎng)勵(lì)材料2

除非您知道一些捷徑,否則多處理會(huì)在信息流中引入微妙的變化,使調(diào)試變得困難。例如,在許多情況下,當(dāng)您通過字典建立索引時(shí),您的腳本可能運(yùn)行良好,但是某些輸入很少會(huì)失敗。

通常,當(dāng)整個(gè)python進(jìn)程崩潰時(shí),我們會(huì)獲得有關(guān)失敗的線索;但是,如果多處理功能崩潰,則不會(huì)在控制臺(tái)上打印未經(jīng)請(qǐng)求的崩潰回溯。很難找到未知的多處理崩潰,而又不知道導(dǎo)致進(jìn)程崩潰的線索。

我發(fā)現(xiàn)跟蹤多處理崩潰信息的最簡(jiǎn)單方法是將整個(gè)多處理功能包裝在try / except中并使用traceback.print_exc():

import traceback

def reader(args):

try:

# Insert stuff to be multiprocessed here

return args[0]['that']

except:

print"FATAL: reader({0}) exited while multiprocessing".format(args)

traceback.print_exc()

現(xiàn)在,當(dāng)您發(fā)現(xiàn)崩潰時(shí),您會(huì)看到類似以下內(nèi)容的信息:

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing

Traceback (most recent call last):

File"foo.py", line 19, in __init__

self.run(task_q, result_q)

File"foo.py", line 46, in run

raise ValueError

ValueError

源代碼:

"""

multi_pipe.py

"""

from multiprocessing import Process, Pipe

import time

def reader_proc(pipe):

## Read from the pipe; this will be spawned as a separate Process

p_output, p_input = pipe

p_input.close() ? ?# We are only reading

while True:

msg = p_output.recv() ? ?# Read from the output pipe and do nothing

if msg=='DONE':

break

def writer(count, p_input):

for ii in xrange(0, count):

p_input.send(ii) ? ? ? ? ? ? # Write 'count' numbers into the input pipe

p_input.send('DONE')

if __name__=='__main__':

for count in [10**4, 10**5, 10**6]:

# Pipes are unidirectional with two endpoints: ?p_input ------> p_output

p_output, p_input = Pipe() ?# writer() writes to p_input from _this_ process

reader_p = Process(target=reader_proc, args=((p_output, p_input),))

reader_p.daemon = True

reader_p.start() ? ? # Launch the reader process

p_output.close() ? ? ? # We no longer need this part of the Pipe()

_start = time.time()

writer(count, p_input) # Send a lot of stuff to reader_proc()

p_input.close()

reader_p.join()

print("Sending {0} numbers to Pipe() took {1} seconds".format(count,

(time.time() - _start)))

"""

multi_queue.py

"""

from multiprocessing import Process, Queue

import time

import sys

def reader_proc(queue):

## Read from the queue; this will be spawned as a separate Process

while True:

msg = queue.get() ? ? ? ? # Read from the queue and do nothing

if (msg == 'DONE'):

break

def writer(count, queue):

## Write to the queue

for ii in range(0, count):

queue.put(ii) ? ? ? ? ? ? # Write 'count' numbers into the queue

queue.put('DONE')

if __name__=='__main__':

pqueue = Queue() # writer() writes to pqueue from _this_ process

for count in [10**4, 10**5, 10**6]:

### reader_proc() reads from pqueue as a separate process

reader_p = Process(target=reader_proc, args=((pqueue),))

reader_p.daemon = True

reader_p.start() ? ? ? ?# Launch reader_proc() as a separate python process

_start = time.time()

writer(count, pqueue) ? ?# Send a lot of stuff to reader()

reader_p.join() ? ? ? ? # Wait for the reader to finish

print("Sending {0} numbers to Queue() took {1} seconds".format(count,

(time.time() - _start)))

"""

multi_joinablequeue.py

"""

from multiprocessing import Process, JoinableQueue

import time

def reader_proc(queue):

## Read from the queue; this will be spawned as a separate Process

while True:

msg = queue.get() ? ? ? ? # Read from the queue and do nothing

queue.task_done()

def writer(count, queue):

for ii in xrange(0, count):

queue.put(ii) ? ? ? ? ? ? # Write 'count' numbers into the queue

if __name__=='__main__':

for count in [10**4, 10**5, 10**6]:

jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process

# reader_proc() reads from jqueue as a different process...

reader_p = Process(target=reader_proc, args=((jqueue),))

reader_p.daemon = True

reader_p.start() ? ? # Launch the reader process

_start = time.time()

writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)

jqueue.join() ? ? ? ? # Wait for the reader to finish

print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,

(time.time() - _start)))

@Jonathan"總而言之,Pipe()比Queue()快三倍"

但是Pipe()不能安全地與多個(gè)生產(chǎn)者/消費(fèi)者一起使用。

優(yōu)秀的!好的答案,很高興您提供了基準(zhǔn)!我只有兩個(gè)小問題:(1)"快幾個(gè)數(shù)量級(jí)"有點(diǎn)夸大其詞。差異為x3,約為一個(gè)數(shù)量級(jí)的三分之一。只是說。 ;-); (2)比較公平的比較是正在運(yùn)行的N個(gè)工作程序,每個(gè)工作人員都通過點(diǎn)對(duì)點(diǎn)管道與主線程進(jìn)行通信,而運(yùn)行中的N個(gè)工作程序的性能都是從單個(gè)點(diǎn)對(duì)多點(diǎn)隊(duì)列中提取的。

對(duì)您的"獎(jiǎng)金材料" ...是的。如果您是Process的子類,請(qǐng)將大部分run方法放在try塊中。這也是記錄異常的有用方法。復(fù)制普通異常輸出:sys.stderr.write(.join(traceback.format_exception(*(sys.exc_info()))))

通過管道將錯(cuò)誤消息發(fā)送到另一個(gè)進(jìn)程并在另一個(gè)進(jìn)程中處理錯(cuò)誤會(huì)更好嗎?

@ alexpinho98-但是您將需要一些帶外數(shù)據(jù)以及相關(guān)的信令模式,以指示您發(fā)送的不是常規(guī)數(shù)據(jù)而是錯(cuò)誤數(shù)據(jù)。鑒于發(fā)起過程已經(jīng)處于不可預(yù)測(cè)的狀態(tài),這可能要問的太多了。

@邁克,只是想說你很棒。這個(gè)答案對(duì)我很有幫助。

@JJC要對(duì)自己的測(cè)驗(yàn)進(jìn)行測(cè)驗(yàn),3x大約是一個(gè)數(shù)量級(jí),而不是三分之一-sqrt(10)=?3。

在multi-pipe.py中,如何知道在調(diào)用inp_p.close之前將所有項(xiàng)放入管道。

@ideoutrea,同意顯式比隱式好

總結(jié)

以上是生活随笔為你收集整理的python 管道队列_关于python:Multiprocessing-管道与队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。