消息隊列的實現

8.8 消息隊列

隊列是一種先進先出的結構,消息隊列是進程(線程)常用的一種方法,實現消息隊列常用的方法:

(1)阻塞隊列 (2)無鎖隊列 (3)環形隊列

值得注意的是:在pop 和push 一定要使用while循環,避免虛假喚醒

8.8.1 阻塞隊列

(1)pthread

實現阻塞隊列使用生產者-消費者模式,使用步驟:

#include <pthread.h>
#include <queue>
//第一步,添加成員變量
std::queue<T> queue_;
int max_size_;
pthread_mutex_t mutex_;
pthread_cond_t condition_var_;
//第二步,實現push函數
void Push(const T& item) {pthread_mutex_lock(&mutex_);while (queue_.size() >= max_size_) {pthread_cond_wait(&condition_var_, &mutex_);}queue_.push(item);pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);
}
//第三步,實現pop函數T Pop() {pthread_mutex_lock(&mutex_);while (queue_.empty()) {pthread_cond_wait(&condition_var_, &mutex_);}T item = queue_.front();queue_.pop();pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);return item;}

示例代碼:

#include <pthread.h>
#include <queue>template <typename T>
class BlockingQueue {
public:BlockingQueue() : max_size_(100) {pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&condition_var_, nullptr);}~BlockingQueue() {pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&condition_var_);}void Push(const T& item) {pthread_mutex_lock(&mutex_);while (queue_.size() >= max_size_) {pthread_cond_wait(&condition_var_, &mutex_);}queue_.push(item);pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);}T Pop() {pthread_mutex_lock(&mutex_);while (queue_.empty()) {pthread_cond_wait(&condition_var_, &mutex_);}T item = queue_.front();queue_.pop();pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);return item;}private:std::queue<T> queue_;int max_size_;pthread_mutex_t mutex_;pthread_cond_t condition_var_;
};

測試代碼:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>BlockingQueue<int> queue;void* ProducerThread(void* arg) {int thread_id = *static_cast<int*>(arg);for (int i = 1; i <= 5; ++i) {int item = thread_id * 10 + i;queue.Push(item);std::cout << "Producer " << thread_id << " produced: " << item << std::endl;sleep(1);}pthread_exit(nullptr);
}void* ConsumerThread(void* arg) {int thread_id = *static_cast<int*>(arg);for (int i = 1; i <= 5; ++i) {int item = queue.Pop();std::cout << "Consumer " << thread_id << " consumed: " << item << std::endl;sleep(2);}pthread_exit(nullptr);
}int main() {const int num_producers = 3;const int num_consumers = 2;std::vector<pthread_t> producer_threads(num_producers);std::vector<pthread_t> consumer_threads(num_consumers);for (int i = 0; i < num_producers; ++i) {int* thread_id = new int(i);pthread_create(&producer_threads[i], nullptr, ProducerThread, static_cast<void*>(thread_id));}for (int i = 0; i < num_consumers; ++i) {int* thread_id = new int(i);pthread_create(&consumer_threads[i], nullptr, ConsumerThread, static_cast<void*>(thread_id));}for (int i = 0; i < num_producers; ++i) {pthread_join(producer_threads[i], nullptr);}for (int i = 0; i < num_consumers; ++i) {pthread_join(consumer_threads[i], nullptr);}return 0;
}

**(2)STD **

示例代碼:

#include <queue>
#include <mutex>
#include <condition_variable>template <typename T>
class BlockingQueue {
public:BlockingQueue() = default;void Push(const T& item) {std::unique_lock<std::mutex> lock(mutex_);while (queue_.size() >= max_size_) {condition_var_.wait(lock);}queue_.push(item);condition_var_.notify_one();}T Pop() {std::unique_lock<std::mutex> lock(mutex_);while (queue_.empty()) {condition_var_.wait(lock);}T item = queue_.front();queue_.pop();condition_var_.notify_one();return item;}private:std::queue<T> queue_;std::mutex mutex_;std::condition_variable condition_var_;const size_t max_size_ = 100; // 最大容量
};
8.8.2 環形隊列

環形隊列的特點:(1)固定容量 (2)前后指針 (3)循環存儲

(1)pthread

示例代碼:

#include <pthread.h>
#include <iostream>
#include <vector>template <typename T>
class CircularQueue {
public:CircularQueue(int capacity) : capacity_(capacity), front_(0), rear_(0), size_(0) {queue_.resize(capacity_);pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&condition_var_, nullptr);}~CircularQueue() {pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&condition_var_);}void Push(const T& item) {pthread_mutex_lock(&mutex_);while (size_ >= capacity_) {pthread_cond_wait(&condition_var_, &mutex_);}queue_[rear_] = item;rear_ = (rear_ + 1) % capacity_;++size_;pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);}T Pop() {pthread_mutex_lock(&mutex_);while (size_ <= 0) {pthread_cond_wait(&condition_var_, &mutex_);}T item = queue_[front_];front_ = (front_ + 1) % capacity_;--size_;pthread_cond_signal(&condition_var_);pthread_mutex_unlock(&mutex_);return item;}private:std::vector<T> queue_;int capacity_;int front_;int rear_;int size_;pthread_mutex_t mutex_;pthread_cond_t condition_var_;
};void* ProducerThread(void* arg) {CircularQueue<int>* queue = static_cast<CircularQueue<int>*>(arg);for (int i = 1; i <= 10; ++i) {queue->Push(i);std::cout << "Produced: " << i << std::endl;sleep(1);}return nullptr;
}void* ConsumerThread(void* arg) {CircularQueue<int>* queue = static_cast<CircularQueue<int>*>(arg);for (int i = 1; i <= 10; ++i) {int item = queue->Pop();std::cout << "Consumed: " << item << std::endl;sleep(2);}return nullptr;
}int main() {CircularQueue<int> queue(5);pthread_t producer_thread;pthread_create(&producer_thread, nullptr, ProducerThread, static_cast<void*>(&queue));pthread_t consumer_thread;pthread_create(&consumer_thread, nullptr, ConsumerThread, static_cast<void*>(&queue));pthread_join(producer_thread, nullptr);pthread_join(consumer_thread, nullptr);return 0;
}

(2)STD

示例代碼:

#include <vector>
#include <mutex>
#include <condition_variable>template <typename T>
class CircularQueue {
public:CircularQueue(int capacity) : capacity_(capacity), front_(0), rear_(0), size_(0), queue_(capacity_) {}void Push(const T& item) {std::unique_lock<std::mutex> lock(mutex_);while (IsFull()) {condition_var_.wait(lock);}queue_[rear_] = item;rear_ = (rear_ + 1) % capacity_;++size_;condition_var_.notify_one();}T Pop() {std::unique_lock<std::mutex> lock(mutex_);while (IsEmpty()) {condition_var_.wait(lock);}T item = queue_[front_];front_ = (front_ + 1) % capacity_;--size_;condition_var_.notify_one();return item;}bool IsEmpty() const {return size_ == 0;}bool IsFull() const {return size_ == capacity_;}private:int capacity_;int front_;int rear_;int size_;std::vector<T> queue_;std::mutex mutex_;std::condition_variable condition_var_;
};

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/717969.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/717969.shtml
英文地址,請注明出處:http://en.pswp.cn/news/717969.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

藍橋ACM培訓-實戰1

前言&#xff1a; 今天老師沒講課&#xff0c;只讓我們做了一下幾道題目。 正文&#xff1a; Problem:A 小藍與操作序列&#xff1a; #include<bits/stdc.h> using namespace std; stack<int> a; int main(){int n,flag1,ans;string cz;cin>>n;for(int i1;…

訪問修飾符、Object(方法,使用、equals)、查看equals底層、final--學習JavaEE的day15

day15 一、訪問修飾符 含義&#xff1a; 修飾類、方法、屬性&#xff0c;定義使用的范圍 理解&#xff1a;給類、方法、屬性定義訪問權限的關鍵字 注意&#xff1a; ? 1.修飾類只能使用public和默認的訪問權限 ? 2.修飾方法和屬性可以使用所有的訪問權限 訪問修飾符本類本包…

JetCache源碼解析——API實現(持續更新中……)

在JetCache中不僅可以通過在類和接口的函數上使用注解Cached、CacheUpdate和CacheInvalidate等實現緩存加載、更新和刪除操作&#xff0c;也支持通過調用API接口的形式來實現緩存的加載、更新和刪除操作。 緩存接口 緩存接口的定義如下&#xff1a; /*** 緩存接口&#xff0…

【計算機網絡】HTTPS 協議原理

https 一、HTTPS 是什么二、加密1. 加密概念2. 加密的原因3. 常見的加密方式&#xff08;1&#xff09;對稱加密&#xff08;2&#xff09;非對稱加密 三、數據摘要(數據指紋)四、HTTPS 的工作原理探究1. 只使用對稱加密2. 只使用非對稱加密3. 雙方都使用非對稱加密4. 非對稱加…

Linux:kubernetes(k8s)部署CNI網絡插件(4)

在上一章進行了node加入master Linux&#xff1a;kubernetes&#xff08;k8s&#xff09;node節點加入master主節點&#xff08;3&#xff09;-CSDN博客https://blog.csdn.net/w14768855/article/details/136420447?spm1001.2014.3001.5501 但是他們顯示還是沒準備好 看一下…

面試筆記系列五之MySql+Mybaits基礎知識點整理及常見面試題

目錄 Myibatis myibatis執行過程 mybatis的優缺點有哪些&#xff1f; mybatis和hibernate有什么區別&#xff1f; mybatis中#{}和${}的區別是什么&#xff1f; 簡述一下mybatis插件運行原理及開發流程&#xff1f;&#xff08;插件四大天王&#xff09; mybatis的mapper沒…

2.模擬問題——5.星期幾與字符串對應

輸入輸出示例 輸入&#xff1a; 9 October 2001 14 October 2001 輸出&#xff1a; Tuesday Sunday 【原題鏈接】 字符串處理 C風格的字符串 字符數組&#xff0c;以’\0‘結尾建議在輸入輸出語句中使用 C風格的字符串 #include <string> using namespace std;初始化…

「優選算法刷題」:最長回文子串

一、題目 給你一個字符串 s&#xff0c;找到 s 中最長的回文子串。 如果字符串的反序與原始字符串相同&#xff0c;則該字符串稱為回文字符串。 示例 1&#xff1a; 輸入&#xff1a;s "babad" 輸出&#xff1a;"bab" 解釋&#xff1a;"aba"…

【字符串】馬拉車(Manacher)算法

本篇文章參考&#xff1a;比較易懂的 Manacher&#xff08;馬拉車&#xff09;算法配圖詳解 馬拉車算法可以求出一個字符串中的最長回文子串&#xff0c;時間復雜度 O ( n ) O(n) O(n) 因為字符串長度的奇偶性&#xff0c;回文子串的中心可能是一個字符&#xff0c;也可能是…

uniapp聊天記錄本地存儲(詳細易懂)

目錄 目錄 1、通過websocket拿取數據 2、獲取聊天數據 3、聊天信息存儲 、更新 4、讀取聊天記錄 5、發送信息&#xff0c;信息獲取 6、最終效果 1.聊天信息的存儲格式 2、樣式效果 寫聊天項目&#xff0c;使用到了本地存儲。需要把聊天信息保存在本地&#xff0c;實時獲…

GPT對話知識庫——ARM-Cortex架構分為哪幾個系列?每個系列有幾種工作模式?各種工作模式之間的定義和區別?每種架構不同的特點和應用需求?

提問模型&#xff1a;GPT-4-TURBO-PREVIEW 提問時間&#xff1a;2024.03.02 1&#xff0c;問&#xff1a; Cortex-M系列有幾種工作模式 1&#xff0c;答&#xff1a; Cortex-M系列微控制器是ARM公司開發的一類低功耗、高性能的32位微處理器&#xff0c;廣泛應用于嵌入式系統中…

Centos7使用man查找命令時,報錯No manual entry for xxxx

Centos7使用man查找命令時&#xff0c;報錯No manual entry for xxxx 在Linux中使用man指令查找指令信息時&#xff0c;報No manual entry for xxxx。 比如使用man指令查找sleep3號手冊時&#xff0c;出現以下錯誤&#xff1a; 這是由于沒有安裝man-pages這個rpm包導致的&#…

掌握基本排序算法:冒泡、選擇、插入和快速排序

在計算機科學的世界里&#xff0c;排序是一項基本而重要的操作。無論是數據庫管理、搜索引擎&#xff0c;還是日常編程&#xff0c;高效的排序算法都是提高性能的關鍵。本文將介紹四種基本的排序算法&#xff1a;冒泡排序、選擇排序、插入排序和快速排序&#xff0c;并探討它們…

從0開始學習NEON(1)

1、前言 在上個博客中對NEON有了基礎的了解&#xff0c;本文將針對一個圖像下采樣的例子對NEON進行學習。 學習鏈接:CPU優化技術 - NEON 開發進階 上文鏈接:https://blog.csdn.net/weixin_42108183/article/details/136412104 2、第一個例子 現在有一張圖片&#xff0c;需…

獲取 Windows 通知中心彈窗通知內容(含工具漢化)

目錄 前言 技術原理概述 測試代碼和程序下載連接 本文出處鏈接&#xff1a;https://blog.csdn.net/qq_59075481/article/details/136440280。 前言 從 Windows 8.1 開始&#xff0c;Windows 通知現在以 Toast 而非 Balloon 形式顯示&#xff08; Bollon 通知其實現在是應用…

在ubuntu上安裝hadoop完分布式

準備工作 Xshell安裝包 Xftp7安裝包 虛擬機安裝包 Ubuntu鏡像源文件 Hadoop包 Java包 一、安裝虛擬機 創建ubuntu系統 完成之后會彈出一個新的窗口 跑完之后會重啟一下 按住首先用ctrlaltf3進入命令界面&#xff0c;輸入root&#xff0c;密碼登錄管理員賬號 按Esc 然后輸入 …

數據結構常用的字符串函數(中英雙釋)

頭文件&#xff1a;string.h 1.strchr const char * strchr ( const char * str, int character ); Locate first occurrence of character in string str C string. character Character to be located. Return Value A pointer to the first occurrence of character in s…

適用于恢復iOS數據的 10 款免費 iPhone 恢復軟件

現在&#xff0c;您可以獲得的 iPhone 的存儲容量比大多數人的筆記本電腦和臺式電腦的存儲容量還要大。雖然能夠存儲數千張高分辨率照片和視頻文件、安裝數百個應用程序并隨身攜帶大量音樂庫以供離線收聽固然很棒&#xff0c;但在一個地方擁有如此多的數據可能會帶來毀滅性的后…

2.2_5 調度算法

文章目錄 2.2_5 調度算法一、適用于早期的批處理系統&#xff08;一&#xff09;先來先服務&#xff08;FCFS&#xff0c;First Come First Serve&#xff09;&#xff08;二&#xff09;短作業優先&#xff08;SJF&#xff0c;Shortest Job First&#xff09;&#xff08;三&a…

SpringMVC總結

SpringMVC SpringMVC是隸屬于Spring框架的一部分&#xff0c;主要是用來進行Web開發&#xff0c;是對Servlet進行了封裝。 對于SpringMVC我們主要學習如下內容: SpringMVC簡介 請求與響應 REST風格 SSM整合(注解版) 攔截器 SpringMVC是處理Web層/表現層的框架&#xff…