Linux :線程 【生產者消費者模型與信號量】
- (一)生產消費模型
- 1、生產消費模式概念
- 2、生產者消費者之間的關系
- 3、生產者消費者模型優點
- (二)基于BlockingQueue的生產者消費者模型
- 1、基于阻塞隊列模型
- 2、模擬實現基于阻塞隊列的生產消費模型
(一)生產消費模型
1、生產消費模式概念
- 生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。
- 生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
- 這個阻塞隊列就是用來給生產者和消費者解耦的。
2、生產者消費者之間的關系
生產者消費者模型是多線程同步與互斥的一個經典場景,最根本特點是 321原則:
- 三種關系: 生產者和生產者(互斥關系)、消費者和消費者(互斥關系)、生產者和消費者(互斥關系、同步關系)。
- 兩種角色: 生產者和消費者。(通常由進程或線程承擔)
- 一個交易場所: 通常指的是內存中的一段緩沖區(阻塞隊列、環形隊列)。
為什么三種關系中都具有互斥關系??
因為容器(交易場所)會有多個執行流進行訪問,而我們要保持臨界資源的安全,需要加上互斥關系。
生產者和消費者之間為什么會存在同步關系?
如果讓生產者一直生產,那么當生產者生產的數據將容器塞滿后,生產者再生產數據就會生產失敗。
反之,讓消費者一直消費,那么當容器當中的數據被消費完后,消費者再進行消費就會消費失敗。
互斥關系保證的是數據的正確性,而同步關系是為了讓多線程之間協同起來。
3、生產者消費者模型優點
- 解耦
生產者、消費者、交易場所 各司其職,可以根據具體需求自由設計,很好地做到了 解耦,便于維護和擴展 - 支持并發。
- 支持忙閑不均。
生產者在生產時,無需關注消費者的狀態,只需關注交易場所中是否有空閑位置
消費者在消費時,無需關注生產者的狀態,只需關注交易場所中是否有就緒數據
(二)基于BlockingQueue的生產者消費者模型
1、基于阻塞隊列模型
在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現生產者和消費者模型的數據結構。
其與普通的隊列區別在于:
- 當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素
- 當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出
(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
2、模擬實現基于阻塞隊列的生產消費模型
阻塞隊列實現代碼:
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#define NUM 5using namespace std;template <class T>
class BlockQueue
{
private:queue<T> _q; // 阻塞隊列pthread_mutex_t _mutex; // 互斥鎖pthread_cond_t _consumers; // 消費者條件變量pthread_cond_t _producer; // 生產者條件變量int _capacity; // 阻塞隊列容量
public://初始化鎖和條件變量BlockQueue(int capacity = NUM): _capacity(capacity){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumers, nullptr);pthread_cond_init(&_producer, nullptr);}//銷毀鎖和條件變量~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumers);pthread_cond_destroy(&_producer);}//阻塞隊列里插入數據void push(const T &in){pthread_mutex_lock(&_mutex);//當阻塞隊列為空時不能再插入變量,需要做到同步,讓線程進入到生產者的條件變量中去while (_q.size() == _capacity) //這里一定要是while ,因為喚醒可能造成偽喚醒(如果同時喚醒條件變量中的所有線程,那么可能造成偽喚醒){ pthread_cond_wait(&_producer, &_mutex);}_q.push(in);pthread_cond_signal(&_consumers); //push數據后,阻塞隊列一定有數據存在,所以喚醒消費者的條件變量。pthread_mutex_unlock(&_mutex);}//從阻塞隊列里拿出數據T pop(){pthread_mutex_lock(&_mutex);//當阻塞隊列為空時,執行該代碼的線程放入到消費者條件變量中while (_q.size() == 0){pthread_cond_wait(&_consumers, &_mutex);}T data = _q.front();_q.pop();pthread_cond_signal(&_producer); //pop數據后,阻塞隊列一定是滿的,喚醒生產者的條件變量pthread_mutex_unlock(&_mutex);return data;}
};
使用 互斥鎖 + 條件變量 實現互斥與同步
生產者和消費者模型中傳遞的數據一般都是要經過處理的,例如:
- 生產者:
生成數據 + 傳遞數據 - 消費者:
獲取數據 + 處理數據
為了方便理解我們簡單實現一個任務類,Task.hpp代碼如下:
#pragma once
#include <iostream>
#include <string>std::string opers = "+-*/%";enum // 規定錯誤碼
{DivZero = 1, // 當 除數為0時ModZero, // 當 %的時候 ,xx % 0時Unknown // 出現錯誤的符號時
};class Task
{
private:int data1_; //int data2_;char oper_;int exitcode_; // 錯誤碼,判斷結果是否合理,默認為0int result_; // 結果public:Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}~Task(){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if (data2_ == 0)exitcode_ = DivZero;elseresult_ = data1_ / data2_;}break;case '%':{if (data2_ == 0)exitcode_ = ModZero;elseresult_ = data1_ % data2_;}break;default:exitcode_ = Unknown;break;}}//兩數運算 所指向的任務std::string GetTask(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "= ";return r;}//兩數運算的結果std::string GetResult(){std::string r = std::to_string(result_);r += "[code: ";r += std::to_string(exitcode_);r += "]";return r;}
};
該類主要是實現兩數之間的運算,使用該類時需要我們提供具體的 兩個數字 和 運算符。
接下來準備工作已經完畢,我們看看如何運用阻塞隊列,為了方便觀察數據我這里是單生產者單消費者的模型,main.cc代碼如下:
#pragma Once
#include "BlockQueue.hpp"
#include <vector>
#include <string>
#include <ctime>
#include "Task.hpp"// std::string opers = "+-*/%";
void *Consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 接收任務Task data = bq->pop();usleep(10000);// 處理任務data.run();cout << endl;cout << "處理了任務 " << data.GetTask() << data.GetResult();}return nullptr;
}void *Producer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 產生任務char op = opers[rand() % opers.size()];int data1 = rand() % 20 + 1;int data2 = rand() % 10;usleep(10);Task t(data1, data2, op);// 發送任務bq->push(t);cout << endl;cout << "產生了一個任務=> " << t.GetTask() << "??";sleep(1);}return nullptr;
}int main()
{srand(time(nullptr));BlockQueue<Task> *bq = new BlockQueue<Task>(5); //阻塞隊列//創造單生產者 單消費者pthread_t c, p;pthread_create(&c, nullptr, Producer, bq);pthread_create(&p, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;return 0;
}
運行效果如下:
生產消費模型在代碼層的實際 就是線程的同步與互斥 。