目錄
- 1. 生產者消費者模型的相關概念
- 1.1 什么是生產者消費者模型
- 1.2 生產者消費者模型的優勢作用
- 2. 多線程簡單實現生產者消費者模型
- 2.1 設計方案
- 2.2 代碼實現
- 2.2.1 線程類
- 2.2.2 BlockQueue類
- 2.2.3 任務類
- 2.2.4 主干代碼
1. 生產者消費者模型的相關概念
1.1 什么是生產者消費者模型
??生產者消費者模型是一種經典的并發編程的設計模式。其由三部分組成分別為生產者、消費者與共享資源緩沖區。
- 生產者:
生產任務、數據
的線程或進程 - 消費者:
處理任務、數據
的線程或進程 - 共享資源緩沖區: 任務與數據的暫存區,生產者向其中存儲任務與數據,消費者從中獲取任務與數據。
簡單來說,共享資源緩沖區,是一段臨時保存數據的內存空間,一般使用某種數據結構對象充當(阻塞隊列)
??生產者消費者模型中,其充當生產、消費角色的線程/進程,它們之間需要滿足特定的關系,具體如下:
角色 | 關系 |
---|---|
生產者 vs 生產者 | 互斥 || 同步(互斥,可能同步) |
消費者 vs 消費者 | 互斥 || 同步(互斥,可能同步) |
生產者 vs 消費者 | 互斥 && 同步 |
1.2 生產者消費者模型的優勢作用
??生產者消費者模型是為了協調生產者與消費者之間協作。具體設計為,生產者生產的數據不再直接交給消費者,而是直接存入共享資源緩沖區。而消費者也不再從生產者手中獲取數據,則是轉為從共享資源緩沖區中獲取存入的歷史數據。通過這樣的設計方式,讓生產與消費的操作解耦合,提高更好的并發度,并且支持生產者、消費者之間的忙先不均。
??生產者消費者模型高效與并發度好的原因為,支持生產任務與處理任務或數據的并發。當生產者競爭鎖或是生產任務、數據時,消費者可執行自己的任務,或是直接從緩沖區中獲取存儲的歷史任務、數據。消費者執行任務不影響生產者獲取、生產任務。
2. 多線程簡單實現生產者消費者模型
2.1 設計方案
1. 生產者消費者模型的實體選擇:
- 生產者:創建一批線程向共享資源緩沖區中生產任務
- 消費者:創建一批線程從共享資源緩沖區獲取任務并處理
- 共享資源緩沖區:此處使用自定義的阻塞隊列(BlockQueue)實現,保證生產者、消費者訪問其時互斥且同步
2.阻塞隊列(BlockQueue)的實現:
成員變量 | 作用 |
---|---|
queue<T> | 用于存儲任務、數據的隊列 |
int _cap | 隊列的容量大小 |
pthread_mutex_t _mutex | 訪問阻塞隊列時控制互斥的鎖 |
pthread_cond_t _productor_cond | 控制生產者同步的條件變量 |
pthread_cond_t _consumer_cond | 控制消費者同步的條件變量 |
int _productor_wait_num | 在條件變量處阻塞等待的生產者線程數量 |
int _consumer_wait_num | 在條件變量處阻塞等待的消費者線程數量 |
成員函數 | 作用 |
---|---|
void Equeue(T& data) | 將生產者生產的數據入隊列 |
void Pop(T* data) | 從隊列中獲取歷史的數據,采用輸出型參數的方式 |
bool IsFull() | 檢測隊列是否滿了 |
bool IsEmpty() | 檢測隊列是否為空 |
- 互斥: 阻塞隊列的入隊與出隊操作都必須是互斥的,即保證無論何時,無論是生產者還是消費者線程都只能有一個線程在訪問阻塞隊列。
- 同步: 除此之外,還要保證生產者與消費者之間的同步,即隊列中數據存儲已慢,則阻塞生產者,隊列中沒有數據,則阻塞消費者,確保生產與消費的整個過程可以正常進行。
- 條件變量的優化: 當沒有生產者或消費者在條件變量下阻塞等待時,就可以選擇不需要再去將對應的條件變量喚醒。
3. 程序的主干邏輯與函數
2.2 代碼實現
2.2.1 線程類
#ifndef THREAD_MODULE
#define THREAD_MODULE
#include <pthread.h>
#include <iostream>
using namespace std;
#include <functional>namespace ThreadModule
{template<typename T>using func_t = function<void(T&)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}#endif
2.2.2 BlockQueue類
#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
public:BlockQueue(int cap){_cap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumer_cond, nullptr);pthread_cond_init(&_productor_cond, nullptr);_consumer_wait_num = 0;_productor_wait_num = 0;}bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}void Enqueue(T& data){pthread_mutex_lock(&_mutex);while(IsFull()){_productor_wait_num++;pthread_cond_wait(&_productor_cond, &_mutex);_productor_wait_num--;}_q.push(data);if(_consumer_wait_num > 0)//當有正在等待的消費者時pthread_cond_signal(&_consumer_cond);//生產了繼續消費pthread_mutex_unlock(&_mutex);}void Pop(T* data){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consumer_cond, &_mutex);_consumer_wait_num--;}*data = _q.front();_q.pop();if(_productor_wait_num > 0)//當有正在阻塞等待的生產者時pthread_cond_signal(&_productor_cond);//消費了繼續生產pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer_cond);pthread_cond_destroy(&_productor_cond);}private:std::queue<T> _q; //隊列存儲數據int _cap; //阻塞隊列的容量pthread_mutex_t _mutex;pthread_cond_t _consumer_cond;pthread_cond_t _productor_cond;int _consumer_wait_num;int _productor_wait_num;
};#endif
??條件變量的阻塞判斷條件應設為為while
,不能設置為if
,這是因為pthread_cond_wait
可能會出錯返回導致繼續執行后續代碼。可此時喚醒條件并未滿足,條件變量并沒有被真的喚醒,此種情況被稱為偽喚醒。if
條件判斷語句并不能預防此種偽喚醒的錯誤情況,所以,一般條件變量的阻塞判斷條件會被設置為while
檢測,保證代碼的健壯性。
2.2.3 任務類
#ifndef TASK_HPP
#define TASK_HPP
#include <functional>
#include <iostream>
using namespace std;//使用仿函數
class Task
{
public://無參構造,用于消費者創建接收數據Task(){}Task(int x, int y):_x(x), _y(y){}~Task(){}void toDebugQuestion(){cout << _x << " + " << _y << " =?" << endl;}void toDebugAnswer(){cout << _x << " + " << _y << " = " << _x + _y << endl;}private:int _x;int _y;int _sum;
};#endif
2.2.4 主干代碼
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
using namespace ThreadModule;
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"using blockqueue_t = BlockQueue<Task>;void ProductorRun(blockqueue_t& bq)
{while(true){sleep(1);//生產慢Task t(rand() % 10, rand() % 10);bq.Enqueue(t);t.toDebugQuestion();}
}void ConsumerRun(blockqueue_t& bq)
{Task t;while(true){//sleep(1);//消費慢bq.Pop(&t);t.toDebugAnswer();}
}void StartComm(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, func_t<blockqueue_t> func, int num, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, bq, name);threads.back().start();cout << name << " create success..." << endl;}
}void StartProductor(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ProductorRun, num, "Productor");
}void StartConsumer(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ConsumerRun, num, "Consumer");
}void WaitAllThreads(vector<Thread<blockqueue_t> >& threads)
{for(auto& thread : threads){thread.join();}
}int main()
{srand((size_t)time(nullptr));vector<Thread<blockqueue_t> > threads;blockqueue_t bq(5);StartProductor(threads, bq, 3);StartConsumer(threads, bq, 5);WaitAllThreads(threads);return 0;
}