文章目錄
- 如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦?
- 1 假設場景設計
- 2 Codes
- 3 流程圖
- 4 優劣勢
- 5 風險可能
如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦?
1 假設場景設計
- Producer(生產者):生成任務并推送到隊列中。
- TaskQueue(主題/被觀察者):任務隊列,同時也是一個“可被觀察”的對象,它在收到新任務后,會主動通知觀察者(消費者)。
- Consumer(觀察者):注冊到隊列中,當有新任務時被通知,并從隊列中拉取任務。
避免了消費者主動等待(如傳統條件變量 wait),改用回調通知。
2 Codes
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主題(被觀察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified(); // 回調通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消費者(觀察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all(); // 所有線程都喚醒}void onNotified() override {cv_.notify_one(); // 喚醒 run 中等待的線程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生產者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 啟動兩個消費者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 啟動生產者線程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}
輸出
[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9
關鍵代碼解讀:
Consumer
類中的 onNotified()
和 run()
方法是如何配合實現消費者監聽通知的 ==》背后即“觀察者 + 條件變量”的事件驅動機制
TaskQueue::notifyObservers() 調用Consumer::onNotified,喚醒等待的 Consumer::run() 線程。
二者配合流程詳解
-
run()
是消費者線程主循環(由start()
啟動)- 每個
Consumer
啟動后會在一個獨立線程中運行run()
方法; - 它使用
cv_.wait(lock)
進入 阻塞等待狀態,直到被通知(由notify_one()
喚醒); - 喚醒后嘗試從
TaskQueue
中popTask()
,直到隊列為空; - 然后再次進入等待。
- 每個
-
onNotified()
是“被觀察者”的回調通知函數TaskQueue::notifyObservers()
被調用(例如pushTask()
中調用)時,會遍歷注冊的觀察者;- 每個觀察者(即
Consumer
)都會被調用onNotified()
; onNotified()
會調用cv_.notify_one()
,喚醒run()
中正在等待的線程。
3 流程圖
[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被喚醒↓→ popTask()↓→ 處理任務
4 優劣勢
編碼可能遇到的問題 | 原因/應對 |
---|---|
cv.wait() 可能虛假喚醒 | 可用 cv.wait(lock, condition) 代替裸 wait() ,避免無任務時誤喚醒。 |
多個消費者搶任務 | 多個消費者被喚醒時要競爭 queue_ 鎖,可通過加任務標簽或調度器來分配。 |
重復喚醒開銷大 | 若任務頻繁到達,建議合并通知、或按“任務計數”通知。 |
優點 | 描述 |
---|---|
解耦 | 消費者不需要主動輪詢,事件驅動機制帶來良好模塊化。 |
可擴展 | 支持多個消費者動態注冊,符合微服務或事件分發模型。 |
降低等待 | 利用通知機制喚醒消費者,避免空輪詢帶來的 CPU 消耗。 |
靈活性 | 可輕松拓展為異步觀察者隊列、支持任務優先級、過濾等機制。 |
5 風險可能
- 若消費者數量多,且頻繁 wakeup,可能存在“驚群效應”。
- 可以通過線程綁定或負載均衡策略來優化通知粒度。
- 可擴展為事件過濾、類型區分(如不同類型的消費者響應不同事件)。