線程系列:
Linux–線程的認識(一)
Linux–線程的分離、線程庫的地址關系的理解、線程的簡單封裝(二)
線程的互斥:臨界資源只能在同一時間被一個線程使用
生產消費模型
生產消費模型是多線程編程和分布式系統中的一個經典概念,它描述了生產者和消費者之間的交互方式。在這個模型中,生產者負責生成數據或任務,而消費者則負責處理這些數據或任務。這種模型在處理并發和異步操作時非常有用,尤其是在需要平衡生產速率和消費速率的情況下。
基本概念
生產者(Producer):負責生成數據或任務的實體。在多線程環境中,這通常是一個線程或一組線程。
消費者(Consumer):負責處理數據或任務的實體。同樣,這也可以是一個線程或一組線程。
緩沖區(Buffer):生產者和消費者之間的中間存儲區域,用于臨時存放生產者生成的數據,直到消費者準備好處理它們。
工作原理
- 生產者:當生產者生成數據時,它將數據放入緩沖區。如果緩沖區已滿,生產者可能需要等待或停止生產,直到緩沖區有足夠的空間。
- 消費者:消費者從緩沖區取出數據進行處理。如果緩沖區為空,消費者可能需要等待或暫停,直到有新的數據可用。
關鍵技術
- 同步機制:如信號量、互斥鎖、條件變量等,用于控制對共享資源的訪問,確保生產者和消費者不會同時訪問或修改緩沖區。
- 阻塞隊列:一種特殊的隊列,當嘗試添加或移除元素時,如果隊列已滿或為空,操作會被阻塞,直到條件滿足。
應用場景
- 并發編程:在多線程環境中,生產消費模型可以幫助有效地管理資源和任務分配。
- 分布式系統:在網絡服務中,如消息隊列系統,生產者可以是發送消息的服務,消費者則是接收并處理這些消息的服務。
生產消費模型是理解和實現高效并發和分布式系統的關鍵,通過合理設計和優化,可以顯著提高系統的性能和穩定性。
單生產-單消費
BlockQueue.hpp: 阻塞隊列
阻塞隊列是一種支持兩個附加操作的隊列。這兩個附加的操作是:當隊列為空時,獲取元素的線程會等待隊列變為非空;當隊列已滿時,存儲元素的線程會等待隊列可用。
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
using namespace std;template <class T>
class BlockQueue
{
public:BlockQueue(int cap) :_cap(cap),_product_wait_num(0),_consum_wait_num(0){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consum_cond,nullptr);}void Enqueue(T& in)//生產者所用接口{pthread_mutex_lock(&_mutex);//對臨界資源開啟保護while(IsFull())//當隊列存滿后需要讓生產者停止生產,進入阻塞狀態{_product_wait_num++;pthread_cond_wait(&_product_cond,&_mutex);_product_wait_num--;}//開始生產_block_queue.push(move(in));//讓消費者來消費if(_consum_wait_num>0)pthread_cond_signal(&_consum_cond);pthread_mutex_unlock(&_mutex);}void Pop(T* out){pthread_mutex_lock(&_mutex);//對臨界資源開啟保護while(IsEmpty())//當隊列空缺后需要讓消費者停止消費,進入阻塞狀態{_consum_wait_num++;pthread_cond_wait(&_consum_cond,&_mutex);_consum_wait_num--;}//進行消費*out=_block_queue.front();_block_queue.pop();//通知生產者if(_product_wait_num>0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}
private:bool IsFull(){return _block_queue.size() == _cap;}bool IsEmpty(){return _block_queue.empty();}queue<T> _block_queue;//阻塞隊列int _cap; //總上限pthread_mutex_t _mutex; //互斥鎖pthread_cond_t _product_cond; //生產者的條件變量pthread_cond_t _consum_cond; //消費者的條件變量int _product_wait_num;int _consum_wait_num;
};
#endif
代碼解釋:
main.cc: 主函數
#include"BlockQueue.hpp"
#include"Thread.hpp"
#include<string>
#include<vector>
#include<unistd.h>using namespace ThreadMdule;
int a=10;
//生產者
void Productor(BlockQueue<int>& bq)
{int cnt=1;while (true){bq.Enqueue(cnt);std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;cnt++;//sleep(3);}
}
//消費者
void Consumer(BlockQueue<int>& bq)
{while (true){int data;bq.Pop(&data);std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;sleep(5);}
}
//執行創建線程的函數
void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);//將線程放入threads中,記錄信息threads->emplace_back(func, bq, name);threads->back().start();}
}
void StartProductor(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq)
{StartComm(threads,num,bq,Productor);
}
void StartConsumer(vector<Thread<BlockQueue<int>>>* threads,int num,BlockQueue<int>& bq)
{StartComm(threads,num,bq,Consumer);
}void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto &thread : threads){thread.Join();}
}
int main()
{BlockQueue<int>* bq=new BlockQueue<int>(5);vector<Thread<BlockQueue<int>>> threads;//用threads來記錄線程的信息StartProductor(&threads,1,*bq);StartConsumer(&threads,1,*bq);WaitAllThread(threads);
}
細節:
多生產-單消費
這里在上面主函數代碼上更改生產者的數量即可。
直接驗證:
這里用任務類來作為阻塞隊列的任務,讓生產者產出對應任務,消費者來解決任務;生產出來的任務先放入阻塞隊列作為緩沖;
#include<iostream>
#include<string>
#include<functional>
using namespace std;class Task
{
public:Task(){}Task(int a,int b): _a(a),_b(b),_result(0){}void Excute(){_result=_a+_b;}string ResultToString(){return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);}string DebugToString(){return to_string(_a) + "+" + to_string(_b) + "= ?";}
private:int _a;int _b;int _result;
};
//類型
//生產者:
//消費者: