Python的多處理程序包中的隊列和管道之間的根本區別是什么?
在什么情況下應該選擇一種? 什么時候使用Pipe()有優勢? 什么時候使用Queue()有優勢?
Pipe()只能有兩個端點。
Queue()可以有多個生產者和消費者。
何時使用它們
如果需要兩個以上的點進行通信,請使用Queue()。
如果您需要絕對性能,則Pipe()會更快,因為Queue()是建立在Pipe()之上的。
績效基準
假設您要生成兩個進程并在它們之間盡快發送消息。這些是使用Pipe()和Queue()進行的類似測試之間的拖動競賽的計時結果。這是在運行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上進行的。
僅供參考,我將JoinableQueue()的結果作為獎勵; JoinableQueue()在調用queue.task_done()時負責任務(它甚至不知道特定任務,它只計算隊列中未完成的任務),因此queue.join()知道工作已完成。
此答案底部的每個代碼...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
總結Pipe()大約是Queue()的三倍。除非您確實必須擁有這些好處,否則甚至不要考慮JoinableQueue()。
獎勵材料2
除非您知道一些捷徑,否則多處理會在信息流中引入微妙的變化,使調試變得困難。例如,在許多情況下,當您通過字典建立索引時,您的腳本可能運行良好,但是某些輸入很少會失敗。
通常,當整個python進程崩潰時,我們會獲得有關失敗的線索;但是,如果多處理功能崩潰,則不會在控制臺上打印未經請求的崩潰回溯。很難找到未知的多處理崩潰,而又不知道導致進程崩潰的線索。
我發現跟蹤多處理崩潰信息的最簡單方法是將整個多處理功能包裝在try / except中并使用traceback.print_exc():
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print"FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
現在,當您發現崩潰時,您會看到類似以下內容的信息:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File"foo.py", line 19, in __init__
self.run(task_q, result_q)
File"foo.py", line 46, in run
raise ValueError
ValueError
源代碼:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() ? ?# We are only reading
while True:
msg = p_output.recv() ? ?# Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) ? ? ? ? ? ? # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: ?p_input ------> p_output
p_output, p_input = Pipe() ?# writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() ? ? # Launch the reader process
p_output.close() ? ? ? # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() ? ? ? ? # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) ? ? ? ? ? ? # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() ? ? ? ?# Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) ? ?# Send a lot of stuff to reader()
reader_p.join() ? ? ? ? # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() ? ? ? ? # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) ? ? ? ? ? ? # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() ? ? # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() ? ? ? ? # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
@Jonathan"總而言之,Pipe()比Queue()快三倍"
但是Pipe()不能安全地與多個生產者/消費者一起使用。
優秀的!好的答案,很高興您提供了基準!我只有兩個小問題:(1)"快幾個數量級"有點夸大其詞。差異為x3,約為一個數量級的三分之一。只是說。 ;-); (2)比較公平的比較是正在運行的N個工作程序,每個工作人員都通過點對點管道與主線程進行通信,而運行中的N個工作程序的性能都是從單個點對多點隊列中提取的。
對您的"獎金材料" ...是的。如果您是Process的子類,請將大部分run方法放在try塊中。這也是記錄異常的有用方法。復制普通異常輸出:sys.stderr.write(.join(traceback.format_exception(*(sys.exc_info()))))
通過管道將錯誤消息發送到另一個進程并在另一個進程中處理錯誤會更好嗎?
@ alexpinho98-但是您將需要一些帶外數據以及相關的信令模式,以指示您發送的不是常規數據而是錯誤數據。鑒于發起過程已經處于不可預測的狀態,這可能要問的太多了。
@邁克,只是想說你很棒。這個答案對我很有幫助。
@JJC要對自己的測驗進行測驗,3x大約是一個數量級,而不是三分之一-sqrt(10)=?3。
在multi-pipe.py中,如何知道在調用inp_p.close之前將所有項放入管道。
@ideoutrea,同意顯式比隱式好