Linux學習:生產者消費者模型

目錄

  • 1. 生產者消費者模型的相關概念
    • 1.1 什么是生產者消費者模型
    • 1.2 生產者消費者模型的優勢作用
  • 2. 多線程簡單實現生產者消費者模型
    • 2.1 設計方案
    • 2.2 代碼實現
      • 2.2.1 線程類
      • 2.2.2 BlockQueue類
      • 2.2.3 任務類
      • 2.2.4 主干代碼

1. 生產者消費者模型的相關概念

1.1 什么是生產者消費者模型

在這里插入圖片描述
??生產者消費者模型是一種經典的并發編程的設計模式。其由三部分組成分別為生產者消費者共享資源緩沖區

  • 生產者: 生產任務、數據的線程或進程
  • 消費者: 處理任務、數據的線程或進程
  • 共享資源緩沖區: 任務與數據的暫存區,生產者向其中存儲任務與數據,消費者從中獲取任務與數據。簡單來說,共享資源緩沖區,是一段臨時保存數據的內存空間,一般使用某種數據結構對象充當(阻塞隊列)

??生產者消費者模型中,其充當生產、消費角色的線程/進程,它們之間需要滿足特定的關系,具體如下:

角色關系
生產者 vs 生產者互斥 || 同步(互斥,可能同步)
消費者 vs 消費者互斥 || 同步(互斥,可能同步)
生產者 vs 消費者互斥 && 同步

1.2 生產者消費者模型的優勢作用

??生產者消費者模型是為了協調生產者與消費者之間協作。具體設計為,生產者生產的數據不再直接交給消費者,而是直接存入共享資源緩沖區。而消費者也不再從生產者手中獲取數據,則是轉為從共享資源緩沖區中獲取存入的歷史數據。通過這樣的設計方式,讓生產與消費的操作解耦合,提高更好的并發度,并且支持生產者、消費者之間的忙先不均
??生產者消費者模型高效與并發度好的原因為,支持生產任務與處理任務或數據的并發。當生產者競爭鎖或是生產任務、數據時,消費者可執行自己的任務,或是直接從緩沖區中獲取存儲的歷史任務、數據。消費者執行任務不影響生產者獲取、生產任務。

2. 多線程簡單實現生產者消費者模型

2.1 設計方案

1. 生產者消費者模型的實體選擇:
在這里插入圖片描述

  • 生產者:創建一批線程向共享資源緩沖區中生產任務
  • 消費者:創建一批線程從共享資源緩沖區獲取任務并處理
  • 共享資源緩沖區:此處使用自定義的阻塞隊列(BlockQueue)實現,保證生產者、消費者訪問其時互斥且同步

2.阻塞隊列(BlockQueue)的實現:

成員變量作用
queue<T>用于存儲任務、數據的隊列
int _cap隊列的容量大小
pthread_mutex_t _mutex訪問阻塞隊列時控制互斥的鎖
pthread_cond_t _productor_cond控制生產者同步的條件變量
pthread_cond_t _consumer_cond控制消費者同步的條件變量
int _productor_wait_num在條件變量處阻塞等待的生產者線程數量
int _consumer_wait_num在條件變量處阻塞等待的消費者線程數量
成員函數作用
void Equeue(T& data)將生產者生產的數據入隊列
void Pop(T* data)從隊列中獲取歷史的數據,采用輸出型參數的方式
bool IsFull()檢測隊列是否滿了
bool IsEmpty()檢測隊列是否為空
  • 互斥: 阻塞隊列的入隊與出隊操作都必須是互斥的,即保證無論何時,無論是生產者還是消費者線程都只能有一個線程在訪問阻塞隊列。
  • 同步: 除此之外,還要保證生產者與消費者之間的同步,即隊列中數據存儲已慢,則阻塞生產者,隊列中沒有數據,則阻塞消費者,確保生產與消費的整個過程可以正常進行。
  • 條件變量的優化: 當沒有生產者或消費者在條件變量下阻塞等待時,就可以選擇不需要再去將對應的條件變量喚醒。

3. 程序的主干邏輯與函數

在這里插入圖片描述

2.2 代碼實現

2.2.1 線程類

#ifndef THREAD_MODULE
#define THREAD_MODULE
#include <pthread.h>
#include <iostream>
using namespace std;
#include <functional>namespace ThreadModule
{template<typename T>using func_t = function<void(T&)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}#endif

2.2.2 BlockQueue類

#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
public:BlockQueue(int cap){_cap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumer_cond, nullptr);pthread_cond_init(&_productor_cond, nullptr);_consumer_wait_num = 0;_productor_wait_num = 0;}bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}void Enqueue(T& data){pthread_mutex_lock(&_mutex);while(IsFull()){_productor_wait_num++;pthread_cond_wait(&_productor_cond, &_mutex);_productor_wait_num--;}_q.push(data);if(_consumer_wait_num > 0)//當有正在等待的消費者時pthread_cond_signal(&_consumer_cond);//生產了繼續消費pthread_mutex_unlock(&_mutex);}void Pop(T* data){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consumer_cond, &_mutex);_consumer_wait_num--;}*data = _q.front();_q.pop();if(_productor_wait_num > 0)//當有正在阻塞等待的生產者時pthread_cond_signal(&_productor_cond);//消費了繼續生產pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer_cond);pthread_cond_destroy(&_productor_cond);}private:std::queue<T> _q;    //隊列存儲數據int _cap;            //阻塞隊列的容量pthread_mutex_t _mutex;pthread_cond_t _consumer_cond;pthread_cond_t _productor_cond;int _consumer_wait_num;int _productor_wait_num;
};#endif

??條件變量的阻塞判斷條件應設為為while,不能設置為if,這是因為pthread_cond_wait可能會出錯返回導致繼續執行后續代碼。可此時喚醒條件并未滿足,條件變量并沒有被真的喚醒,此種情況被稱為偽喚醒if條件判斷語句并不能預防此種偽喚醒的錯誤情況,所以,一般條件變量的阻塞判斷條件會被設置為while檢測,保證代碼的健壯性

2.2.3 任務類

#ifndef TASK_HPP
#define TASK_HPP
#include <functional>
#include <iostream>
using namespace std;//使用仿函數
class Task
{
public://無參構造,用于消費者創建接收數據Task(){}Task(int x, int y):_x(x), _y(y){}~Task(){}void toDebugQuestion(){cout << _x << " + " << _y << " =?" << endl;}void toDebugAnswer(){cout << _x << " + " << _y << " = " << _x + _y << endl;}private:int _x;int _y;int _sum;
};#endif

2.2.4 主干代碼

#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
using namespace ThreadModule;
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"using blockqueue_t = BlockQueue<Task>;void ProductorRun(blockqueue_t& bq)
{while(true){sleep(1);//生產慢Task t(rand() % 10, rand() % 10);bq.Enqueue(t);t.toDebugQuestion();}
}void ConsumerRun(blockqueue_t& bq)
{Task t;while(true){//sleep(1);//消費慢bq.Pop(&t);t.toDebugAnswer();}
}void StartComm(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, func_t<blockqueue_t> func, int num, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, bq, name);threads.back().start();cout << name << " create success..." << endl;}
}void StartProductor(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ProductorRun, num, "Productor");
}void StartConsumer(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ConsumerRun, num, "Consumer");
}void WaitAllThreads(vector<Thread<blockqueue_t> >& threads)
{for(auto& thread : threads){thread.join();}
}int main()
{srand((size_t)time(nullptr));vector<Thread<blockqueue_t> > threads;blockqueue_t bq(5);StartProductor(threads, bq, 3);StartConsumer(threads, bq, 5);WaitAllThreads(threads);return 0;
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97860.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97860.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97860.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《深度學習》卷積神經網絡:數據增強與保存最優模型解析及實現

目錄 一、數據增強 1. 核心概念 2. 核心目的 3. 常用方法 4. 實現示例&#xff08;基于 PyTorch&#xff09; 5. 自定義數據集加載 二、保存最優模型 1. 核心概念 2. 實現步驟 &#xff08;1&#xff09;定義 CNN 模型 &#xff08;2&#xff09;定義訓練與測試函數…

tcpdump用法

tcpdump用法tcpdump一、什么是tcpdump二、命令格式與參數三、參數列表四、過濾規則組合邏輯運算符過濾器關鍵字理解 Flag 標識符五、常用例子tcpdump 一、什么是tcpdump 二、命令格式與參數 option 可選參數&#xff1a;將在后邊一一解釋。 proto 類過濾器&#xff1a;根據協…

平衡車 - 電機調速

&#x1f308;個人主頁&#xff1a;羽晨同學 &#x1f4ab;個人格言:“成為自己未來的主人~” 在我們的這篇文章當中&#xff0c;我們主要想要實現的功能的是電機調速功能。在我們的這篇文章中&#xff0c;主要實現的是開環的功能&#xff0c;而非閉環&#xff0c;也就是不加…

從利潤率看價值:哪些公司值得長期持有?

&#x1f4a1; 為什么盯緊利潤率&#xff1f; 投資者常常盯著營收增長&#xff0c;卻忽略了一個更關鍵的指標——利潤率。 收入可以靠規模“堆”出來&#xff0c;但利潤率卻是企業護城河的真實體現。心理學研究表明&#xff1a;當一個產品或服務被消費者認定為“不可替代”&a…

小迪web自用筆記25

傳統文件上傳&#xff1a;上傳至服務器本身硬盤。云存儲&#xff1a;借助云存儲oss對象存儲&#xff08;只能被訪問&#xff0c;不可解析&#xff09;Oss云存儲Access key與Access ID&#xff1a;有了這兩個東西之后就可以操作云存儲&#xff0c;可以向里面發數據了。這玩意兒泄…

分發餅干——很好的解釋模板

好的&#xff0c;孩子&#xff0c;我們來玩一個“喂餅干”的游戲。 0. 問題的本質是什么&#xff1f; 想象一下&#xff0c;你就是個超棒的家長&#xff0c;手里有幾塊大小不一的餅干&#xff0c;而面前有幾個餓著肚子的小朋友。每個小朋友都有一個最小的“胃口”值&#xff0c…

場景題:如果一個大型項目,某一個時間所有的CPU的已經被占用了,導致服務不可用,我們開發人員應該如何使服務器盡快恢復正常

問&#xff1a;如果一個大型項目,某一個時間所有的CPU的 已經被占用了&#xff0c;導致服務不可用&#xff0c;我們開發人員 應該如何使服務器盡快恢復正常答&#xff1a;應對CPU 100%導致服務不可用的緊急恢復流程面試官&#xff0c;如果遇到這種情況&#xff0c;我會立即按照…

Docker 安裝 RAGFlow保姆教程

前提條件 Ubuntu 服務器(20.04 或 22.04 LTS 推薦) 已安裝 Docker 和 Docker Compose 如果尚未安裝,請先運行以下命令:# 安裝 Docker curl -fsSL https://get.docker.com -o get-docker.sh sudo sh get-docker.sh # 將當前用戶加入 docker 組,避免每次都要 sudo sudo user…

為什么實際工程里 C++ 部署深度學習模型更常見?為什么大家更愛用 TensorRT?

很多人剛接觸深度學習模型部署的時候&#xff0c;都會習慣用 Python&#xff0c;因為訓練的時候就是 PyTorch、TensorFlow 啊&#xff0c;寫起來方便。但一到 實際工程&#xff0c;特別是工業設備、醫療影像、上位機系統這種場景&#xff0c;你會發現大多數人都轉向了 C 部署。…

深入理解 Java 集合框架:底層原理與實戰應用

在日常開發中&#xff0c;集合是 Java 中使用頻率最高的工具之一。從最常見的 ArrayList、HashMap 到更復雜的并發集合&#xff0c;幾乎每一個 Java 程序員都離不開集合框架。集合框架不僅提供了豐富的數據結構實現&#xff0c;還封裝了底層復雜的邏輯&#xff0c;讓開發者能夠…

爬取m3u8視頻完整教程

爬取步驟&#xff1a;1.先找到網頁源代碼2.從網頁源代碼中拿到m3u83.下載m3u84.讀取m3u8文件&#xff0c;下載視頻5.合并視頻首先我們來爬取一個星辰影院的電影&#xff1a;下面我以這個為例&#xff1a;我們需要在源代碼中找到m3u8這個url&#xff1a;緊接著我們利用下面的方法…

Python爬蟲實戰: 基于Scrapy的Amazon跨境電商選品數據爬蟲方案

概述與設計思路 利用Python的Scrapy框架進行大規模頁面抓取和結構化數據提取,配合aiohttp實現高并發請求,從而高效獲取Amazon平臺上的商品列表、詳情、評論等公開信息。通過對這些數據進行清洗與分析,可以識別出有潛力的商品,評估市場競爭程度,并跟蹤競爭對手的動態,為跨…

穩定版IM即時通訊 仿默往APP即時通訊im源碼聊天社交源碼支持二開原生開發獨立部署 含搭建教程

內容目錄一、詳細介紹二、效果展示1.部分代碼2.效果圖展示三、學習資料下載一、詳細介紹 技術開發語言&#xff1a; 后臺管理端&#xff1a;Java GO Mysql數據庫 安卓端&#xff1a;Java iOS端&#xff1a;ob PC端&#xff1a;c 功能簡單介紹&#xff1a; 單聊&#xff…

封裝一個redis獲取并解析數據的工具類

redis獲取并解析數據工具類實現代碼使用示例實現代碼 import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import lom…

23種設計模式——策略模式 (Strategy Pattern)?詳解

?作者簡介&#xff1a;大家好&#xff0c;我是 Meteors., 向往著更加簡潔高效的代碼寫法與編程方式&#xff0c;持續分享Java技術內容。 &#x1f34e;個人主頁&#xff1a;Meteors.的博客 &#x1f49e;當前專欄&#xff1a;設計模式 ?特色專欄&#xff1a;知識分享 &#x…

CI(持續集成)、CD(持續交付/部署)、CT(持續測試)、CICD、CICT

目錄 **CI、CD、CT 詳解與關系** **1. CI(Continuous Integration,持續集成)** **2. CD(Continuous Delivery/Deployment,持續交付/部署)** **持續交付(Continuous Delivery)** **持續部署(Continuous Deployment)** **3. CT(Continuous Testing,持續測試)** **4.…

【音視頻】WebRTC ICE 模塊深度剖析

原文鏈接&#xff1a; https://mp.weixin.qq.com/s?__bizMzIzMjY3MjYyOA&mid2247498075&idx2&sn6021a2f60b1e7c71ce4d7af6df0b9b89&chksme893e540dfe46c56323322e780d41aec1f851925cfce8b76b3f4d5cfddaa9c7cbb03a7ae4c25&scene178&cur_album_id314699…

linux0.12 head.s代碼解析

重新設置IDT和GDT&#xff0c;為256個中斷門設置默認的中斷處理函數檢查A20地址線是否啟用設置數學協處理器將main函數相關的參數壓棧設置分頁機制&#xff0c;將頁表映射到0~16MB的物理內存上返回main函數執行 源碼詳細注釋如下: /** linux/boot/head.s** (C) 1991 Linus T…

Maven動態控制版本號秘籍:高效發包部署,版本管理不再頭疼!

作者&#xff1a;唐叔在學習 專欄&#xff1a;唐叔的Java實踐 關鍵詞&#xff1a;Maven版本控制、versions插件、動態版本號、持續集成、自動化部署、Java項目管理 摘要&#xff1a;本文介紹如何使用Maven Versions插件動態控制項目版本號和依賴組件版本號&#xff0c;實現無需…

簡述:普瑞時空數據建庫軟件(國土變更建庫)之一(變更預檢查部分規則)

簡述&#xff1a;普瑞時空數據建庫軟件&#xff08;國土變更建庫&#xff09;之一(變更預檢查部分規則) 主要包括三種類型&#xff1a;常規檢查、行政區范圍檢查、20X異常滅失檢查 本blog地址&#xff1a;https://blog.csdn.net/hsg77