在Python中,queue
模塊提供了同步的、線程安全的隊列類,這使得在多線程環境下共享數據變得簡單。下面是一個使用 queue.Queue
的并發編程示例,其中使用了 threading
模塊來創建多個線程,這些線程將向隊列中添加元素并從隊列中取出元素。
import queue
import threading
import time
import random# 定義一個工作函數,用于模擬生產者(向隊列中添加數據)和消費者(從隊列中取出數據)
def worker(q, job_type):while True:item = random.randint(1, 100) # 模擬生成數據if job_type == 'producer':q.put(item) # 生產者向隊列中添加數據print(f'{threading.current_thread().name} 生產了 {item}')time.sleep(random.random()) # 模擬耗時操作elif job_type == 'consumer':item = q.get() # 消費者從隊列中取出數據q.task_done() # 表示隊列中之前入隊的一個任務已經完成print(f'{threading.current_thread().name} 消費了 {item}')time.sleep(random.random()) # 模擬耗時操作# 創建一個隊列
q = queue.Queue()# 創建并啟動生產者線程
for i in range(2): # 假設有兩個生產者t = threading.Thread(target=worker, args=(q, 'producer'), name=f'生產者-{i+1}')t.daemon = True # 設置為守護線程,主線程結束時守護線程也會結束t.start()# 創建并啟動消費者線程
for i in range(3): # 假設有三個消費者t = threading.Thread(target=worker, args=(q, 'consumer'), name=f'消費者-{i+1}')t.daemon = True # 設置為守護線程t.start()# 主線程等待所有任務完成(這里假設所有任務都會很快完成,實際情況中可能需要更復雜的同步機制)
q.join() # 等待隊列中的所有項目都被處理print("所有任務完成。")
注意:
-
在這個示例中,我使用了
random.randint(1, 100)
來模擬生產的數據,以及random.random()
來模擬生產者和消費者的耗時操作。 -
q.put(item)
用于生產者向隊列中添加元素,而q.get()
用于消費者從隊列中取出元素。q.task_done()
表示隊列中之前入隊的一個任務已經完成,每當消費者線程完成一個元素的消費后,就應當調用這個方法。 -
使用了
threading.Thread
來創建線程,并設置了daemon
屬性為True
,這意味著這些線程是守護線程,當主線程結束時,它們也會自動結束。 -
使用了
q.join()
方法來等待隊列中的所有元素都被處理。但是,需要注意的是,在這個示例中,由于生產者線程是無限循環的(沒有明確的退出條件),所以實際上q.join()
可能永遠不會返回,除非在外部有某種機制來停止生產者線程(例如,通過共享變量或使用threading.Event
)。在實際應用中,你可能需要設計更復雜的邏輯來確保生產者線程在適當的時候停止。 -
為了簡化示例,這里沒有包含優雅地關閉線程或隊列的代碼。在實際應用中,你可能需要實現某種形式的信號機制來通知線程何時停止工作。