【C/C++】如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦

文章目錄

  • 如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦?
    • 1 假設場景設計
    • 2 Codes
    • 3 流程圖
    • 4 優劣勢
    • 5 風險可能

如何在一個事件驅動的生產者-消費者模型中使用觀察者進行通知與解耦?


1 假設場景設計

  • Producer(生產者):生成任務并推送到隊列中。
  • TaskQueue(主題/被觀察者):任務隊列,同時也是一個“可被觀察”的對象,它在收到新任務后,會主動通知觀察者(消費者)
  • Consumer(觀察者):注冊到隊列中,當有新任務時被通知,并從隊列中拉取任務。

避免了消費者主動等待(如傳統條件變量 wait),改用回調通知


2 Codes

#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主題(被觀察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified();  // 回調通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消費者(觀察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all();  // 所有線程都喚醒}void onNotified() override {cv_.notify_one();  // 喚醒 run 中等待的線程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生產者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 啟動兩個消費者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 啟動生產者線程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}

輸出

[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9

關鍵代碼解讀:
Consumer 類中的 onNotified()run() 方法是如何配合實現消費者監聽通知的 ==》背后即“觀察者 + 條件變量”的事件驅動機制
TaskQueue::notifyObservers() 調用Consumer::onNotified,喚醒等待的 Consumer::run() 線程。

二者配合流程詳解

  1. run() 是消費者線程主循環(由 start() 啟動)

    • 每個 Consumer 啟動后會在一個獨立線程中運行 run() 方法;
    • 它使用 cv_.wait(lock) 進入 阻塞等待狀態,直到被通知(由 notify_one() 喚醒);
    • 喚醒后嘗試從 TaskQueuepopTask(),直到隊列為空;
    • 然后再次進入等待。
  2. onNotified() 是“被觀察者”的回調通知函數

    • TaskQueue::notifyObservers() 被調用(例如 pushTask() 中調用)時,會遍歷注冊的觀察者;
    • 每個觀察者(即 Consumer)都會被調用 onNotified()
    • onNotified() 會調用 cv_.notify_one(),喚醒 run() 中正在等待的線程。

3 流程圖

[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被喚醒↓→ popTask()↓→ 處理任務

4 優劣勢

編碼可能遇到的問題原因/應對
cv.wait() 可能虛假喚醒可用 cv.wait(lock, condition) 代替裸 wait(),避免無任務時誤喚醒。
多個消費者搶任務多個消費者被喚醒時要競爭 queue_ 鎖,可通過加任務標簽或調度器來分配。
重復喚醒開銷大若任務頻繁到達,建議合并通知、或按“任務計數”通知。
優點描述
解耦消費者不需要主動輪詢,事件驅動機制帶來良好模塊化。
可擴展支持多個消費者動態注冊,符合微服務或事件分發模型。
降低等待利用通知機制喚醒消費者,避免空輪詢帶來的 CPU 消耗。
靈活性可輕松拓展為異步觀察者隊列、支持任務優先級、過濾等機制。

5 風險可能

  • 若消費者數量多,且頻繁 wakeup,可能存在“驚群效應”。
  • 可以通過線程綁定負載均衡策略來優化通知粒度。
  • 可擴展為事件過濾、類型區分(如不同類型的消費者響應不同事件)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/82467.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/82467.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/82467.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MVC和MVVM架構的區別

MVC和MVVM都是前端開發中常用的設計模式&#xff0c;都是為了解決前端開發中的復雜性而設計的&#xff0c;而MVVM模式則是一種基于MVC模式的新模式。 MVC(Model-View-Controller)的三個核心部分&#xff1a;模型、視圖、控制器相較于MVVM(Model-View-ViewModel)的三個核心部分…

蘭亭妙微 | 圖標設計公司 | UI設計案例復盤

在「33」「312」新高考模式下&#xff0c;選科決策成為高中生和家長的「頭等大事」。蘭亭妙微公司受委托優化高考選科決策平臺個人診斷報告界面&#xff0c;核心挑戰是&#xff1a;如何將復雜的測評數據&#xff08;如學習能力傾向、學科報考機會、職業興趣等&#xff09;轉化為…

有銅半孔的設計規范與材料創新

設計關鍵參數 孔徑與間距限制 最小孔徑需≥0.6mm&#xff0c;孔邊距≥0.5mm&#xff0c;避免銅層脫落&#xff1b;拼版時半孔區域需預留2mm間距防止撕裂。 阻焊橋設計 必須保留阻焊橋&#xff08;寬度≥0.1mm&#xff09;&#xff0c;防止焊錫流入孔內造成短路。 獵板的材料…

Engineering a direct k-way Hypergraph Partitioning Algorithm【2017 ALENEX】

文章目錄 一、作者二、摘要三、相關工作四、算法概述五、實驗結果六、主要貢獻 一、作者 Yaroslav Akhremtsev, Tobias Heuer, Peter Sanders, Sebastian Schlag 二、摘要 我們開發了一種快速且高質量的多層算法&#xff0c;能夠直接將超圖劃分為 k 個平衡的塊 —— 無需借助遞…

視頻問答功能播放器(視頻問答)視頻彈題功能實例

視頻問答播放器是一種互動教學工具&#xff0c;在視頻播放過程中彈出題目卡&#xff0c;學員答題后才能繼續觀看&#xff0c;提升學習參與度。視頻問答功能播放器(視頻問答)視頻彈題功能實例&#xff1a; 視頻播放器的視頻問答功能&#xff08;也叫問答播放器、視頻彈題、視頻問…

2025年AI代理演進全景:從技術成熟度曲線到產業重構

2025年AI代理演進全景&#xff1a;從技術成熟度曲線到產業重構 一、技術成熟度曲線定位&#xff1a;AI代理的“期望膨脹期” 根據Gartner技術成熟度曲線&#xff08;Hype Cycle?&#xff09;&#xff0c;AI代理&#xff08;Agentic AI&#xff09;當前正處于期望膨脹期向泡沫…

基于python的機器學習(八)—— 評估算法(一)

目錄 一、機器學習評估的基本概念 1.1 評估的定義與目標 1.2 常見評估指標 1.3 訓練集、驗證集與測試集的劃分 二、分離數據集 2.1 分離訓練數據集和評估數據集 2.2 k折交叉驗證分離 2.3 棄一交叉驗證分離 2.4 重復隨機評估和訓練數據集分離 三、交叉驗證技術 3.…

Win11 系統登入時綁定微軟郵箱導致用戶名欠缺

Win11 系統登入時綁定微軟郵箱導致用戶名欠缺 解決思路 -> 解綁當前微軟郵箱和用戶名 -> 斷網離線建立本地賬戶 -> 設置本地賬戶為Admin權限 -> 注銷當前賬戶&#xff0c;登入新建的用戶 -> 聯網綁定微軟郵箱 -> 刪除舊的用戶命令步驟 管理員權限打開…

Mac系統-最方便的一鍵環境部署軟件ServBay(支持php,java,python,node,go,mysql等)沒有之一,已親自使用!

自從換成Mac電腦以后&#xff0c;做開發有時候要部署各種環境&#xff0c;如php&#xff0c;mysql&#xff0c;nginx&#xff0c;pgsql&#xff0c;java&#xff0c;node&#xff0c;python&#xff0c;go時&#xff0c;嘗試過原生環境部署&#xff0c;各種第三方軟件部署&…

Flink中Kafka連接器的基本應用

文章目錄 前言Kafka連接器基礎案例演示前置說明和環境準備步驟Kafka連接器基本配置關聯數據源映射轉換案例效果演示基于Kafka連接器同步數據到MySQL案例說明前置準備Kafka連接器消費位點調整映射轉換與數據投遞MysqlSlink持久化收集器數據最終效果演示小結參考前言 本文將基于…

Leetcode 刷題記錄 11 —— 二叉樹第二彈

本系列為筆者的 Leetcode 刷題記錄&#xff0c;順序為 Hot 100 題官方順序&#xff0c;根據標簽命名&#xff0c;記錄筆者總結的做題思路&#xff0c;附部分代碼解釋和疑問解答&#xff0c;01~07為C語言&#xff0c;08及以后為Java語言。 01 二叉樹的層序遍歷 /*** Definition…

【R語言科研繪圖】

R語言在繪制SCI期刊圖像時具有顯著優勢&#xff0c;以下從功能、靈活性和學術適配性三個方面分析其適用性&#xff1a; 數據可視化庫豐富 R語言擁有ggplot2、lattice、ggpubr等專業繪圖包&#xff0c;支持生成符合SCI期刊要求的高分辨率圖像&#xff08;如TIFF/PDF格式&#…

【Node.js】Web開發框架

個人主頁&#xff1a;Guiat 歸屬專欄&#xff1a;node.js 文章目錄 1. Node.js Web框架概述1.1 Web框架的作用1.2 Node.js主要Web框架生態1.3 框架選擇考慮因素 2. Express.js2.1 Express.js概述2.2 基本用法2.2.1 安裝Express2.2.2 創建基本服務器 2.3 路由2.4 中間件2.5 請求…

PDF 轉 JPG 圖片小工具:CodeBuddy 助力解決轉換痛點

本文所使用的 CodeBuddy 免費下載鏈接&#xff1a;騰訊云代碼助手 CodeBuddy - AI 時代的智能編程伙伴 前言 在數字化辦公與內容創作的浪潮中&#xff0c;將 PDF 文件轉換為 JPG 圖片格式的需求日益頻繁。無論是學術文獻中的圖表提取&#xff0c;還是宣傳資料的視覺化呈現&am…

Linux 文件系統層次結構

Linux 的文件系統遵循 Filesystem Hierarchy Standard (FHS) 標準&#xff0c;其目錄結構是層次化的&#xff0c;每個目錄都有明確的用途。以下是 Linux 中部分目錄的作用解析&#xff1a; 1. 根目錄 / 作用&#xff1a;根目錄是整個文件系統的頂層目錄&#xff0c;所有其他目…

密碼學標準(Cryptography Standards)介紹

密碼學標準(Cryptography Standards)是為確保信息安全傳輸、存儲和處理而制定的一系列技術規范和協議,廣泛應用于通信、金融、互聯網等領域。以下從分類、主流標準、應用場景和發展趨勢四個方面進行詳細介紹: 一、密碼學標準的分類 密碼學標準可根據技術原理和應用場景分…

ubuntu 22.04安裝和使用docker介紹

docker安裝和使用 準備環境常見的docker操作linux系統常用的配置卸載docker 準備環境 本機環境&#xff1a; Linux yz-MS-7E06 6.8.0-59-generic #61~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue Apr 15 17:03:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux安裝依賴軟件&#xff1a;…

obsidian 中的查找和替換插件,支持正則

最近用著 obsidian 時&#xff0c;發現想要在當前文檔中 查找和替換 內容時&#xff0c;沒有自動查找和替換的功能&#xff0c;去插件市場查找也沒有發現好用的插件&#xff0c;那就自己寫一個吧。 全程用的 AI 來寫的&#xff0c;當然&#xff0c;我對 JS/CSS/TypeScript 等沒…

針對vue項目的webpack優化攻略

一、開發階段優化 1. 熱更新加速&#xff08;HMR&#xff09; // vue.config.js module.exports {devServer: {hot: true, // 開啟熱更新injectClient: true, // 自動注入HMR客戶端watchOptions: {ignored: /node_modules/, // 忽略node_modules變化aggregateTimeout: 300…

BTC官網關注巨鯨12億美元平倉,XBIT去中心化交易平臺表現穩定

在全球加密貨幣市場波動加劇的背景下&#xff0c;2025年5月25日傳出重磅消息。據今日最新國際報道&#xff0c;知名巨鯨James Wynn完全平倉價值12億美元的BTC多頭倉位&#xff0c;整體盈利約845萬美元&#xff0c;此舉引發市場廣泛關注。與此同時&#xff0c;收益型穩定幣市場迎…