Linux生產者消費者模型
- Linux生產者消費者模型詳解
- 生產者消費者模型
- 生產者消費者模型的概念
- 生產者消費者模型的特點
- 生產者消費者模型優點
- 基于BlockingQueue的生產者消費者模型
- 基于阻塞隊列的生產者消費者模型
- 模擬實現基于阻塞隊列的生產消費模型
- 基礎實現
- 生產者消費者步調調整
- 條件喚醒優化
- 基于計算任務的擴展
- 總結
Linux生產者消費者模型詳解
生產者消費者模型
生產者消費者模型的概念
生產者消費者模型通過一個容器解決生產者與消費者的強耦合問題。
- 通信方式:生產者不直接與消費者交互,而是將數據放入容器;消費者從容器取數據。
- 容器作用:緩沖區,解耦生產者與消費者,平衡雙方處理能力。
生產者消費者模型的特點
生產者消費者模型是多線程同步與互斥的經典場景,具有以下特點:
- 三種關系:
- 生產者與生產者:互斥(競爭容器訪問)。
- 消費者與消費者:互斥(競爭容器訪問)。
- 生產者與消費者:互斥(共享容器)+同步(生產消費順序)。
- 兩種角色:生產者與消費者(線程或進程)。
- 一個交易場所:內存緩沖區(如隊列)。
互斥原因:容器是臨界資源,需用互斥鎖保護,多線程競爭訪問。
同步原因:
- 容器滿時,生產者需等待,避免生產失敗。
- 容器空時,消費者需等待,避免消費失敗。
- 同步確保有序訪問,防止饑餓,提高效率。
注意:互斥保證數據正確性,同步實現線程協作。
生產者消費者模型優點
- 解耦:生產者與消費者獨立運行,通過容器間接交互。
- 支持并發:生產者生產時,消費者可同時消費。
- 支持忙閑不均:容器緩沖數據,平衡處理速度差異。
對比函數調用(緊耦合),生產者消費者模型是松耦合設計,生產者無需等待消費者處理。
基于BlockingQueue的生產者消費者模型
基于阻塞隊列的生產者消費者模型
在多線程編程中,**阻塞隊列(Blocking Queue)**是實現生產者消費者模型的常用數據結構。
- 與普通隊列的區別:
- 隊列空時,取元素操作阻塞,直到有數據。
- 隊列滿時,放元素操作阻塞,直到有空間。
- 應用場景:類似管道通信。
模擬實現基于阻塞隊列的生產消費模型
基礎實現
以單生產者、單消費者為例,使用C++ queue
實現阻塞隊列:
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>#define NUM 5template<class T>
class BlockQueue {
private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = NUM) : _cap(cap) {pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue() {pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex); // 隊列滿,等待}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); // 喚醒消費者}void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex); // 隊列空,等待}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); // 喚醒生產者}
private:std::queue<T> _q; // 阻塞隊列int _cap; // 容量上限pthread_mutex_t _mutex; // 互斥鎖pthread_cond_t _full; // 滿條件變量pthread_cond_t _empty; // 空條件變量
};
main.cpp:
#include "BlockQueue.hpp"
#include <unistd.h>void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
說明:
- 單生產者單消費者:無需維護生產者間或消費者間的互斥。
- 互斥:
_mutex
保護隊列。 - 同步:
_full
和_empty
條件變量控制生產消費順序。 - 條件判斷:用
while
防止偽喚醒。 - 運行結果:生產者與消費者步調一致,每秒交替生產消費。
生產者消費者步調調整
-
生產快,消費慢:
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
- 生產者快速填滿隊列后等待,消費者消費一個后喚醒生產者,后續步調一致。
-
生產慢,消費快:
void* Producer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {sleep(1);int data = rand() % 100 + 1;bq->Push(data);std::cout << "Producer: " << data << std::endl;} } void* Consumer(void* arg) {BlockQueue<int>* bq = (BlockQueue<int>*)arg;while (true) {int data;bq->Pop(data);std::cout << "Consumer: " << data << std::endl;} }
- 消費者初始等待生產者生產,消費后繼續等待,步調隨生產者。
條件喚醒優化
調整喚醒條件,例如隊列數據量超一半時喚醒消費者,小于一半時喚醒生產者:
void Push(const T& data) {pthread_mutex_lock(&_mutex);while (IsFull()) {pthread_cond_wait(&_full, &_mutex);}_q.push(data);if (_q.size() >= _cap / 2) {pthread_cond_signal(&_empty); // 超一半喚醒消費者}pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {pthread_mutex_lock(&_mutex);while (IsEmpty()) {pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();if (_q.size() <= _cap / 2) {pthread_cond_signal(&_full); // 少于一半喚醒生產者}pthread_mutex_unlock(&_mutex);
}
- 效果:生產者快速填滿隊列后等待,消費者消費至一半以下才喚醒生產者。
基于計算任務的擴展
將隊列存儲類型改為任務類,擴展功能:
Task.hpp:
#pragma once
#include <iostream>class Task {
public:Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}void Run() {int result = 0;switch (_op) {case '+': result = _x + _y; break;case '-': result = _x - _y; break;case '*': result = _x * _y; break;case '/': if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }else { result = _x / _y; } break;case '%': if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }else { result = _x % _y; } break;default: std::cout << "error operation!" << std::endl; break;}std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;}
private:int _x, _y;char _op;
};
main.cpp:
#include "BlockQueue.hpp"
#include "Task.hpp"void* Producer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;const char* ops = "+-*/%";while (true) {int x = rand() % 100;int y = rand() % 100;char op = ops[rand() % 5];Task t(x, y, op);bq->Push(t);std::cout << "Producer task done" << std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;while (true) {sleep(1);Task t;bq->Pop(t);t.Run();}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<Task>* bq = new BlockQueue<Task>;pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}
- 功能:生產者生成計算任務,消費者執行計算并輸出結果。
- 擴展性:通過定義不同
Task
類實現多樣化任務處理。
總結
- 模型核心:通過容器解耦生產者與消費者,支持并發與忙閑不均。
- 實現關鍵:阻塞隊列結合互斥鎖與條件變量,確保互斥與同步。
- 靈活性:可調整步調、喚醒條件,或擴展為復雜任務處理。