文章目錄
- 生產者消費者模型
- 為何要使用生產者消費者模型
- 生產者消費者模型優點
- 基于BlockingQueue的生產者消費者模型
- BlockingQueue
- C++ queue模擬阻塞隊列的生產消費模型
- 單線程生產消費模型
- 多線程生產消費模型
生產者消費者模型
consumer
/productor
321原則(便于記憶)
為何要使用生產者消費者模型
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,
所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,
消費者不找生產者要數據,而是直接從阻塞隊列里取,
阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的。
生產者消費者模型優點
- 解耦
- 支持并發
- 支持忙閑不均
生產消費模型的高效問題
基于BlockingQueue的生產者消費者模型
BlockingQueue
在多線程編程中阻塞隊列(Blocking Queue
)是一種常用于實現生產者和消費者模型的數據結構。
其與普通的隊列區別在于,
當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;
當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出
(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進程操作時會被阻塞)
C++ queue模擬阻塞隊列的生產消費模型
代碼:
單線程生產消費模型
blockqueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);low_water=maxcap_/3;high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);if(q_.size()==0){pthread_cond_wait(&c_cond_,&mutex_);//1.調用的時候,自動釋放鎖}T t=q_.front(); // 你想消費,就直接能消費嗎?不一定。你得先確保消費條件滿足q_.pop();if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);if(q_.size()==maxcap_){pthread_cond_wait(&p_cond_,&mutex_);//1.調用的時候,自動釋放鎖}q_.push(in); // 你想生產,就直接能生產嗎?不一定。你得先確保生產條件滿足if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;int low_water;int high_water;
};
main.cc
#include<iostream>
#include"blockqueue.hpp"
#include<unistd.h>using namespace std;void*Consumer(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);while(1){int data=bq->pop();cout<<"消費了一個數據: "<<data<<endl;// sleep(1);}
}void*Productor(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);int data=0;while(1){data++;bq->push(data);cout<<"生產了一個數據: "<<data<<endl;sleep(1);}
}int main()
{blockqueue<int> *bq=new blockqueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
}
消費者sleep
兩者都不sleep
多線程生產消費模型
補充:
- 為什么判斷條件要放到加鎖之后?
因為判斷臨界資源條件是否滿足,也是在訪問臨界資源!
判斷臨界資源是否就緒,是通過在臨界區內部判斷的!
- 如果臨界資源未就緒,那么線程就要進行等待。
等待的時候,線程是持有鎖的!所以調用wait
時,自動釋放鎖。
如果不釋放鎖,直接等待,那么等待的線程就沒有線程可以喚醒了,
因為其他線程都在鎖外,進不去臨界區。
該線程因為喚醒而返回的時候,重新持有鎖了。
- 如果線程在wait時,被誤喚醒了呢?
偽喚醒的概念
解決方法:判斷條件時用while
,不用if
blockqueue.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);// low_water=maxcap_/3;// high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);while(q_.size()==0)//做到防止代碼被偽喚醒!{pthread_cond_wait(&c_cond_,&mutex_);//1.調用的時候,自動釋放鎖}T t=q_.front(); // 你想消費,就直接能消費嗎?不一定。你得先確保消費條件滿足q_.pop();// if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_cond_signal(&p_cond_);//pthread_cond_broadcastpthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);while(q_.size()==maxcap_)//做到防止代碼被偽喚醒!{pthread_cond_wait(&p_cond_,&mutex_);//1.調用的時候,自動釋放鎖}q_.push(in); // 你想生產,就直接能生產嗎?不一定。你得先確保生產條件滿足// if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;// int low_water;// int high_water;
};
Task.hpp
#pragma once
#include <iostream>
#include<string>
using namespace std;enum
{Div_zero = 1,Mod_zero,Unknown
};class Task
{
public:Task(int x, int y, char op): a(x), b(y), op_(op), ret(0), exitcode(0){}void Run(){switch (op_){case '+':ret = a + b;break;case '-':ret = a - b;break;case '*':ret = a * b;break;case '/':{if (b == 0)exitcode = Div_zero;elseret = a / b;}break;case '%':{if (b == 0)exitcode = Mod_zero;elseret = a % b;}break;default:exitcode=Unknown;break;}}string GetTask(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=???";return r;}string Getret(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=";r+=to_string(ret);r+=" [ exitcode: ";r+=to_string(exitcode);r+=" ]";return r;}void operator()(){Run();}~Task() {}private:int a;int b;char op_;int ret;int exitcode;
};
main.cc
#include <iostream>
#include "blockqueue.hpp"
#include <unistd.h>
#include "Task.hpp"
#include <ctime>using namespace std;string oper = "+-*/%";void *Consumer(void *args)
{blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);while (1){Task data = bq->pop();// data.Run();// 計算// data.Run();data();cout << "處理了一個任務 , 運算結果為: " << data.Getret() << " ,thread id: " << pthread_self() << endl;// sleep(1);}
}void *Productor(void *args)
{int len = oper.size();blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);// int x=0,y=0;// Task data(x,y);while (1){// 模擬生產者生產數據int x = rand() % 10 + 1; //[1,10]int y = rand() % 10; //[0,9];char op = oper[rand() % len];Task data(x, y, op);usleep(10);// 計算bq->push(data);cout << "生產了一個任務: " << data.GetTask() << " ,thread id: " << pthread_self() << endl;sleep(1);}
}int main()
{srand(time(nullptr));blockqueue<Task> *bq = new blockqueue<Task>();pthread_t c[3], p[5];for (int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, bq);}for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Productor, bq);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}delete bq;return 0;
}