在這個示例中,我們創建了一個隊列 q
,并通過 multiprocessing.Manager().Queue()
來確保隊列可以在多個進程之間共享。我們定義了 consumer
和 producer
函數,分別用于從隊列中獲取數據和向隊列中放入數據。
在主進程中,我們創建了多個消費者和生產者進程,并將它們啟動。生產者進程將數據放入隊列,消費者進程從隊列中取出數據并處理。生產者進程完成后,我們向隊列發送 None
作為結束信號,告知消費者沒有更多數據。每個消費者在接收到 None
后會停止工作。
注意,我們在 consumer
函數中使用了 queue.task_done()
來標記任務完成。這是可選的,但在使用 join()
方法等待隊列中的所有任務完成時很有用。
這個模式允許多個生產者并發地向隊列中放入數據,同時多個消費者并發地從隊列中取出并處理數據,直到所有生產者完成生產,消費者接收到結束信號。
當使用 multiprocessing.Queue
進行多個生產者和多個消費者的場景時,隊列可以很好地協調這些進程。以下是一個示例,展示了如何創建多個生產者和多個消費者,它們共享同一個隊列:
# encoding:utf-8
import multiprocessing
import time
import randomdef consumer(queue):"""作者:闕輝"""while True:item = queue.get() # 從隊列中獲取數據if item is None:print(f"Consumer {multiprocessing.current_process().name} received end signal.")queue.task_done() # 標記任務完成breakprint(f"Consumer {multiprocessing.current_process().name} received {item}")time.sleep(random.uniform(0.5, 1.5)) # 模擬處理時間queue.task_done() # 標記任務完成def producer(queue, items):"""作者:闕輝"""for item in items:print(f"Producer {multiprocessing.current_process().name} sent {item}")queue.put(item)time.sleep(random.uniform(0.5, 1.5)) # 模擬生產時間if __name__ == '__main__':manager = multiprocessing.Manager()q = manager.Queue() # 使用 Manager.Queue 來支持多個生產者和消費者模式# 創建多個消費者進程consumers = [multiprocessing.Process(target=consumer, args=(q,)) for _ in range(4)]# 創建多個生產者進程producers = [multiprocessing.Process(target=producer, args=(q, range(20))) for _ in range(4)]# 啟動所有消費者進程for c in consumers:c.start()# 啟動所有生產者進程for p in producers:p.start()# 等待所有生產者完成for p in producers:p.join()# 發送結束信號,告知所有消費者沒有更多數據for _ in consumers:q.put(None)# 等待所有消費者完成for c in consumers:c.join()print("All tasks completed.")