文章目錄
- 一、生產者消費者模型的基本原則
- 💕💕生產者-消費者模型的 321 原則💕💕
- 二、為何要使用生產者消費者模型
- 1. 解耦
- 2. 支持并發 (提高效率)
- 3. 忙閑不均的支持
- 三、基于 BlockingQueue 的生產者消費者模型
- 1. 阻塞隊列的特點
- 2. C++ 模擬實現
- 3. 封裝更精細的版本
- 五、總結
一、生產者消費者模型的基本原則
在多線程編程中,生產者消費者模型是一種常見的并發設計模式。它通過一個**緩沖區(阻塞隊列)**來解耦生產者和消費者,從而解決兩者之間的強耦合問題。生產者只需要負責將數據放入隊列,而消費者只需要負責從隊列中取數據。
這樣設計的好處在于,生產者在完成數據生成后無需等待消費者處理,可以立即返回繼續生成;而消費者也無需關心數據來自哪里,只需要從隊列里取出任務并處理即可。這種方式既保證了系統的高效性,也增強了并發性。
可以把 321 原則用層次化結構來表達,比表格更直觀:
💕💕生產者-消費者模型的 321 原則💕💕
3 個關系
- 生產者之間互斥:多個生產者不能同時寫入隊列,否則會破壞數據一致性。
- 消費者之間互斥:多個消費者不能同時讀取同一數據,否則會出現重復消費。
- 生產者與消費者之間互斥與同步:隊列為空時消費者等待,隊列滿時生產者等待,二者既互斥又必須協同。
2 個角色
- 生產者:負責不斷產生數據并放入隊列。
- 消費者:負責從隊列中取出數據并進行處理。
1 個交易場所
阻塞隊列 / 環形隊列:作為共享緩沖區,承載生產與消費的銜接。
二、為何要使用生產者消費者模型
1. 解耦
生產者和消費者之間沒有直接依賴關系,它們通過阻塞隊列完成數據交互,降低了耦合度。
2. 支持并發 (提高效率)
生產者和消費者可以并發執行,充分利用 CPU 資源。這里提高效率并不是說生產者消費模型可以更快的派發任務,而是通過一個串行的交易場所,可以將任務派發給不同的線程,也可以由不同的線程同時生產,在生產者和消費者自己之間可以做到并發執行,因此提高了效率,畢竟將來派發任務只占有一點點的份額,執行任務才是大頭。
3. 忙閑不均的支持
如果生產者生成速度快于消費者處理速度,多余的數據會存放在阻塞隊列中;反之,消費者可以在數據不足時自動等待。這樣有效平衡了生產和消費之間的速度差異。
三、基于 BlockingQueue 的生產者消費者模型
1. 阻塞隊列的特點
在普通隊列中,如果取數據時隊列為空會直接返回錯誤,而阻塞隊列則會讓取數據的線程阻塞等待直到有數據為止;同樣地,如果存放數據時隊列已滿,線程也會被阻塞,直到有空余位置。這種機制天生適合生產者消費者模型。
2. C++ 模擬實現
BlockQueue.hpp
// 阻塞隊列的實現
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>const int defaultcap = 5; // for testtemplate <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap) : _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_full_cond, NULL);pthread_cond_init(&_empty_cond, NULL);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){_psleep_num++;std::cout << "生產者,進入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 走到這里一定有空間了_q.push(in);if (_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "喚醒消費者..." << std::endl;}pthread_mutex_unlock(&_mutex);}T Pop(){// 消費者調用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if (_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "喚醒消費者" << std::endl;}// pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cap;pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消費者休眠的個數int _psleep_num; // 生產者休眠的個數
};
3. 封裝更精細的版本
Cond.hpp
// 阻塞隊列的實現
#pragma once#include <iostream>
#include <string>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"const int defaultcap = 10; // for testusing namespace MutexModule;
using namespace CondModule;template <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){}void Equeue(const T &in){{LockGuard lockguard(_mutex);// 生產者調用while (IsFull()){// 應該讓生產者線程進行等待// 重點1:pthread_cond_wait調用成功,掛起當前線程之前,要先自動釋放鎖!!// 重點2:當線程被喚醒的時候,默認就在臨界區內喚醒!要從pthread_cond_wait// 成功返回,需要當前線程,重新申請_mutex鎖!!!// 重點3:如果我被喚醒,但是申請鎖失敗了??我就會在鎖上阻塞等待!!!_psleep_num++;std::cout << "生產者,進入休眠了: _psleep_num" << _psleep_num << std::endl;// 問題1: pthread_cond_wait是函數嗎?有沒有可能失敗?pthread_cond_wait立即返回了// 問題2:pthread_cond_wait可能會因為,條件其實不滿足,pthread_cond_wait 偽喚醒_full_cond.Wait(_mutex);_psleep_num--;}// 100%確定:隊列有空間_q.push(in);// 臨時方案// v2if (_csleep_num > 0){_empty_cond.Signal();std::cout << "喚醒消費者..." << std::endl;}}}T Pop(){T data;{// 消費者調用LockGuard lockguard(_mutex);while (IsEmpty()){_csleep_num++;_empty_cond.Wait(_mutex);_csleep_num--;}data = _q.front();_q.pop();if (_psleep_num > 0){_full_cond.Signal();std::cout << "喚醒生產者" << std::endl;}}return data;}~BlockQueue(){}private:std::queue<T> _q; // 臨界資源!!!int _cap; // 容量大小Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _csleep_num; // 消費者休眠的個數int _psleep_num; // 生產者休眠的個數
};
五、總結
生產者消費者模型是一種經典的多線程設計模式。它利用阻塞隊列實現生產與消費的解耦,不僅提高了系統的并發能力,還能平衡兩者之間的處理速度差異。在 C++ 中,我們可以通過 queue + pthread + 條件變量
實現一個簡易的阻塞隊列,再結合任務封裝,實現靈活的生產者消費者模型。
未來在實際工程中,如果使用 C++11 及以上版本,可以考慮用 std::mutex
和 std::condition_variable
替代 pthread
,寫法會更簡潔。