Linux生產者消費者模型

Linux生產者消費者模型

  • Linux生產者消費者模型詳解
    • 生產者消費者模型
      • 生產者消費者模型的概念
      • 生產者消費者模型的特點
      • 生產者消費者模型優點
    • 基于BlockingQueue的生產者消費者模型
      • 基于阻塞隊列的生產者消費者模型
      • 模擬實現基于阻塞隊列的生產消費模型
        • 基礎實現
        • 生產者消費者步調調整
        • 條件喚醒優化
        • 基于計算任務的擴展
    • 總結


Linux生產者消費者模型詳解


生產者消費者模型

生產者消費者模型的概念

生產者消費者模型通過一個容器解決生產者與消費者的強耦合問題。

  • 通信方式:生產者不直接與消費者交互,而是將數據放入容器;消費者從容器取數據。
  • 容器作用:緩沖區,解耦生產者與消費者,平衡雙方處理能力。

生產者消費者模型的特點

生產者消費者模型是多線程同步與互斥的經典場景,具有以下特點:

  1. 三種關系
    • 生產者與生產者:互斥(競爭容器訪問)。
    • 消費者與消費者:互斥(競爭容器訪問)。
    • 生產者與消費者:互斥(共享容器)+同步(生產消費順序)。
  2. 兩種角色:生產者與消費者(線程或進程)。
  3. 一個交易場所:內存緩沖區(如隊列)。

互斥原因:容器是臨界資源,需用互斥鎖保護,多線程競爭訪問。
同步原因

  • 容器滿時,生產者需等待,避免生產失敗。
  • 容器空時,消費者需等待,避免消費失敗。
  • 同步確保有序訪問,防止饑餓,提高效率。

注意:互斥保證數據正確性,同步實現線程協作。

生產者消費者模型優點

  1. 解耦:生產者與消費者獨立運行,通過容器間接交互。
  2. 支持并發:生產者生產時,消費者可同時消費。
  3. 支持忙閑不均:容器緩沖數據,平衡處理速度差異。

對比函數調用(緊耦合),生產者消費者模型是松耦合設計,生產者無需等待消費者處理。


基于BlockingQueue的生產者消費者模型

基于阻塞隊列的生產者消費者模型

在多線程編程中,**阻塞隊列(Blocking Queue)**是實現生產者消費者模型的常用數據結構。

  • 與普通隊列的區別
    • 隊列空時,取元素操作阻塞,直到有數據。
    • 隊列滿時,放元素操作阻塞,直到有空間。
  • 應用場景:類似管道通信。

模擬實現基于阻塞隊列的生產消費模型

基礎實現

以單生產者、單消費者為例,使用C++ queue 實現阻塞隊列:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>#define NUM 5template<class T>
class BlockQueue {
private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = NUM) : _cap(cap) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex); // 隊列滿,等待}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); // 喚醒消費者}void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex); // 隊列空,等待}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); // 喚醒生產者}
private:std::queue<T> _q; // 阻塞隊列int _cap; // 容量上限pthread_mutex_t _mutex; // 互斥鎖pthread_cond_t _full; // 滿條件變量pthread_cond_t _empty; // 空條件變量
};

main.cpp

#include "BlockQueue.hpp"
#include <unistd.h>void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}

說明

  • 單生產者單消費者:無需維護生產者間或消費者間的互斥。
  • 互斥_mutex 保護隊列。
  • 同步_full_empty 條件變量控制生產消費順序。
  • 條件判斷:用 while 防止偽喚醒。
  • 運行結果:生產者與消費者步調一致,每秒交替生產消費。
生產者消費者步調調整
  1. 生產快,消費慢

    void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}
    }
    void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}
    }
    
    • 生產者快速填滿隊列后等待,消費者消費一個后喚醒生產者,后續步調一致。
  2. 生產慢,消費快

    void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}
    }
    void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}
    }
    
    • 消費者初始等待生產者生產,消費后繼續等待,步調隨生產者。
條件喚醒優化

調整喚醒條件,例如隊列數據量超一半時喚醒消費者,小于一半時喚醒生產者:

void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2) {pthread_cond_signal(&_empty); // 超一半喚醒消費者}pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2) {pthread_cond_signal(&_full); // 少于一半喚醒生產者}pthread_mutex_unlock(&_mutex);
}
  • 效果:生產者快速填滿隊列后等待,消費者消費至一半以下才喚醒生產者。
基于計算任務的擴展

將隊列存儲類型改為任務類,擴展功能:

Task.hpp

#pragma once
#include <iostream>class Task {
public:Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}void Run() {int result = 0;switch (_op) {case '+': result = _x + _y; break;case '-': result = _x - _y; break;case '*': result = _x * _y; break;case '/': if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }else { result = _x / _y; } break;case '%': if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }else { result = _x % _y; } break;default: std::cout << "error operation!" << std::endl; break;}std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;}
private:int _x, _y;char _op;
};

main.cpp

#include "BlockQueue.hpp"
#include "Task.hpp"void* Producer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* ops = "+-*/%";while (true) {int x = rand() % 100;int y = rand() % 100;char op = ops[rand() % 5];Task t(x, y, op);bq->Push(t);std::cout << "Producer task done" << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;while (true) {sleep(1);Task t;bq->Pop(t);t.Run();}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<Task>* bq = new BlockQueue<Task>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
  • 功能:生產者生成計算任務,消費者執行計算并輸出結果。
  • 擴展性:通過定義不同 Task 類實現多樣化任務處理。

總結

  • 模型核心:通過容器解耦生產者與消費者,支持并發與忙閑不均。
  • 實現關鍵:阻塞隊列結合互斥鎖與條件變量,確保互斥與同步。
  • 靈活性:可調整步調、喚醒條件,或擴展為復雜任務處理。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/898808.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/898808.shtml
英文地址,請注明出處:http://en.pswp.cn/news/898808.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【中文翻譯】第9章-The Algorithmic Foundations of Differential Privacy

由于GitHub項目僅翻譯到前5章&#xff0c;我們從第6章開始通過大語言模型翻譯&#xff0c;并導出markdown格式。 大模型難免存在錯漏&#xff0c;請讀者指正。 教材原文地址&#xff1a;https://www.cis.upenn.edu/~aaroth/Papers/privacybook.pdf 9 差分隱私與計算復雜度 到目…

【AI大模型】搭建本地大模型GPT-NeoX:詳細步驟及常見問題處理

搭建本地大模型GPT-NeoX:詳細步驟及常見問題處理 GPT-NeoX是一個開源的大型語言模型框架,由EleutherAI開發,可用于訓練和部署類似GPT-3的大型語言模型。本指南將詳細介紹如何在本地環境中搭建GPT-NeoX,并解決過程中可能遇到的常見問題。 1. 系統要求 1.1 硬件要求 1.2 軟…

Unity跨平臺構建快速回顧

知識點來源&#xff1a;人間自有韜哥在&#xff0c;豆包 目錄 一、發布應用程序1. 修改發布必備設置1.1 打開設置面板1.2 修改公司名、游戲項目名、版本號和默認圖標1.3 修改 Package Name 和 Minimum API Level 2. 發布應用程序2.1 配置 Build Settings2.2 選擇發布選項2.3 構…

低配電腦暢玩《怪物獵人:荒野》,ToDesk云電腦優化從30幀到144幀?

《怪物獵人&#xff1a;荒野&#xff08;Monster Hunter Wilds&#xff09;》自2025年正式發售以來已取得相當亮眼的成績&#xff0c;僅用三天時間便輕松突破800萬銷量&#xff0c;目前順利蟬聯周榜冠軍&#xff1b;憑借著開放世界的宏大場景和豐富的狩獵玩法&#xff0c;該游戲…

Flink基礎簡介和安裝部署

文章目錄 一、Flink基礎簡介1、什么是Flink2、Flink流處理特性3、Flink四大基石4、Flink中的角色 二、Flink集群搭建1、Local模式①上傳Flink安裝包②啟動交互窗口③提交任務測試④訪問WebUI頁面查看④退出停止集群 一、Flink基礎簡介 1、什么是Flink Flink是?個分布式&#…

【2025】基于ssm+jsp的二手商城系統設計與實現(源碼、萬字文檔、圖文修改、調試答疑)

基于SSMJSP的二手商城系統設計與實現系統功能結構圖&#xff1a; 課題背景 隨著經濟的發展和人們生活水平的提高&#xff0c;二手交易市場日益活躍。人們對于閑置物品的處理方式逐漸從傳統的廢品回收轉變為通過二手交易平臺進行再利用。這種交易模式不僅能夠幫助用戶節省開支&a…

幻影星空亮相CAAPA北京展 引領文旅產業升級轉型

3月19日&#xff0c;中國游藝機游樂園協會&#xff08;CAAPA&#xff09;主辦的2025中國&#xff08;北京&#xff09;國際游樂設施設備博覽會及2025北京國際旅游休閑娛樂產業博覽會在北京盛大啟幕。在這場行業盛會上&#xff0c;廣州卓遠旗下的“幻影星空”品牌以創新性的虛擬…

銀河麒麟桌面版包管理器(二)

以下內容摘自《銀河麒麟操作系統進階應用》一書 APT包管理器 APT是Debian及其派生系統的包管理器&#xff0c;構建在dpkg之上&#xff0c;以其強大的依賴性處理能力和豐富的軟件倉庫而聞名。APT具有自動解決依賴關系、提供易于使用的命令行工具&#xff08;如apt-get、apt-ca…

【STM32實物】基于STM32的掃地機器人/小車控制系統設計

基于STM32的掃地機器人/小車控制系統設計 演示視頻: 基于STM32的掃地機器人小車控制系統設計 簡介:掃地機器人系統采用分層結構設計,主要包括底層硬件控制層、中間數據處理層和上層用戶交互層。底層硬件控制層負責對各個硬件模塊進行控制和數據采集,中間數據處理層負責對采…

STM32收發數據包中間件——ProtoFlow,更方便的打包解包助手

引言 在嵌入式開發中&#xff0c;數據包封裝是不可或缺的一環。手動編寫協議不僅耗時&#xff0c;還容易出錯。ProtoFlow 的出現&#xff0c;就是為了讓數據包封裝變得簡單、高效、可靠。它不僅占用資源少&#xff0c;還能適配多種場景&#xff0c;是你項目的理想助手。 項目地…

Xcode16.1使用MonkeyDev運行Tiktok報錯分析

問題1&#xff1a; Build input files cannot be found: /usr/lib/libc.dylib, /usr/lib/libstdc.dylib. Did you forget to declare these files as outputs of any script phases or custom build rules which produce them? 解決辦法&#xff1a;在TARGETS的dylib中的Bui…

R語言交互項-formula

R語言交互項-formula 交互項的模型交互項的幾種情形連續變量和連續變量連續變量和分類變量分類變量和分類變量總結交互項的模型 統計中的交互和相關是完全不同的兩個概念,交互項是指兩個或者多個變量對因變量的協同效應,關注變量對因變量的聯合影響,比如變量X對Y的影響是否因…

圖解AUTOSAR_SWS_IPDUMultiplexer

AUTOSAR IPDUMultiplexer模塊詳解 PDU復用器模塊架構與實現分析 目錄 1. IPDU Multiplexer概述2. 模塊配置模型 2.1 配置結構概述2.2 配置類詳解2.3 配置關系說明3. 架構設計 3.1 模塊位置與接口3.2 內部組件結構3.3 接口交互模式4. 操作序列 4.1 PDU傳輸流程4.2 PDU傳輸流程詳…

手機怎么換網絡IP有什么用?操作指南與場景應用?

在數字化時代&#xff0c;手機已經成為我們日常生活中不可或缺的一部分&#xff0c;無論是工作、學習還是娛樂&#xff0c;手機都扮演著至關重要的角色。而在手機的使用過程中&#xff0c;網絡IP地址作為設備在互聯網上的唯一標識符&#xff0c;其重要性和作用不容忽視。本文將…

CH32V208GBU6沁恒協議棧BUG:在主機Write的同一包notify會造成主機一直Write不成功

從事嵌入式單片機的工作算是符合我個人興趣愛好的,當面對一個新的芯片我即想把芯片盡快搞懂完成項目賺錢,也想著能夠把自己遇到的坑和注意事項記錄下來,即方便自己后面查閱也可以分享給大家,這是一種沖動,但是這個或許并不是原廠希望的,盡管這樣有可能會犧牲一些時間也有哪天原…

unsloth微調QwQ32B(4bit)

unsloth微調QwQ32B(4bit) GPU: 3090 24G unsloth安裝部署 pip 安裝 pip install unsloth --index https://pypi.mirrors.usrc.edu.cn/simplesource /etc/network_turbopip install --force-reinstall --no-cache-dir --no-deps githttps://github.com/unslothai/unsloth.git?…

JavaScript案例0322

以下是一些涵蓋不同高級JavaScript概念和應用的案例&#xff0c;每個案例都有詳細解釋&#xff1a; 案例1&#xff1a;實現 Promise/A 規范的手寫 Promise class MyPromise {constructor(executor) {this.state pending;this.value undefined;this.reason undefined;this.o…

Dify 0.15.3 輸入變量無法被重新賦值問題-解決方法

目錄 一、問題描述 二、解決方法 2.1 原因 2.2 修改源碼 2.3 重新打包 dify-api 鏡像 2.4 修改 docker-compose.yaml 文件 2.5 重啟啟動鏡像 一、問題描述 Dify 0.15.3 是一個比較穩定的版本&#xff0c;Dify 1.0 是一個大版本更新&#xff0c;目前還有很多 Bug。但是&a…

SQL Server查詢計劃操作符(7.3)——查詢計劃相關操作符(11)

7.3. 查詢計劃相關操作符 98&#xff09;Table Scan&#xff1a;該操作符從查詢計劃參數列確定的表中獲取所有數據行。如果其參數列中出現WHERE:()謂詞&#xff0c;則只返回滿足該謂詞的數據行。該操作符為邏輯操作符和物理操作符。該操作符具體如圖7.3-98節點1所示。 圖 7.3-…

數據庫練習2

目錄 1.向heros表中新增一列信息&#xff0c;添加一些約束&#xff0c;并嘗試查詢一些信息 2.課堂代碼練習 插入語句 INSERT INTO 刪除語句DELETE和TRUNCATE 更新語句UPDATE和replace 查詢語句SELECT 條件查詢 select語句中的特殊情況 ???查詢排序 order by 分組查詢…