📢博客主頁:https://blog.csdn.net/2301_779549673
📢博客倉庫:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
📢本文由 JohnKi 原創,首發于 CSDN🙉
📢未來很長,值得我們全力奔赴更美好的生活?
文章目錄
- 📢前言
- 🏳??🌈一、線程同步概念
- 🏳??🌈二、為什么需要線程同步?
- 2.1 防止數據競爭(Data Race)?
- 2.2 保證操作的原子性
- 2.?3 協調線程間的執行順序
- 2.?4 避免資源爭用(如文件、網絡連接)?
- 🏳??🌈線程同步的常見手段
- 🏳??🌈三、什么是生產消費者模型
- 3.1 三種關系
- 3.2 兩個角色
- 3.3 一個場景
- 🏳??🌈四、以阻塞隊列模擬多生產消費者模型
- 4.1 成員名
- 4.2 構造函數和析構函數
- 4.3 模擬的生產者和消費者
- 4.4 模擬生產、消費者過程
- 🏳??🌈五、整體代碼
- 5.1 BlockQueue.hpp
- 5.2 Main.cc
- 5.3 Makefile
- 👥總結
📢前言
緊接上一回的 從互斥原理到C++ RAII封裝實踐
筆者這回介紹一下線程中幾乎與互斥一樣重要的同步原理,
還有一點,筆者之后的封裝都會使用之前博客中封裝好的容器,需要的可以去倉庫或者前面的博客中自取。
所需所用的都放在了這個倉庫中
🏳??🌈一、線程同步概念
條件變量
:當一個線程互斥地訪問某個變量時,它可能發現在其它線程改變狀態之前,它什么也做不了。例如一個線程訪問隊列時,發現隊列為空,它只能等待,只到其它線程將一個節點添加到隊列中。這種情況就需要用到條件變量。
線程同步
指在多線程編程中,通過特定機制協調多個線程的執行順序,確保它們對共享資源?(如內存、文件、硬件等)的訪問安全有序。核心目標是防止并發訪問導致的數據混亂、邏輯錯誤或資源沖突。
🏳??🌈二、為什么需要線程同步?
2.1 防止數據競爭(Data Race)?
?問題:多個線程同時讀寫共享數據時,執行順序不確定,可能導致數據不一致。
int balance = 100; // 共享變量// 線程A執行:存入200
balance += 200; // 線程B執行:取出150
balance -= 150;
- 未同步時:若線程A和B同時讀取balance=100,最終結果可能是100+200-150=150(正確應為150)或100-150+200=150,但若操作交叉執行(如A讀后B寫),可能得到錯誤值(如-50)。
2.2 保證操作的原子性
?問題:單個操作(如i++)在底層可能對應多條機器指令,線程切換會導致操作未完成就被中斷
; x86的i++實際步驟:
mov eax, [i] ; 讀取i到寄存器
inc eax ; 寄存器加1
mov [i], eax ; 寫回內存
- 若線程A執行到inc eax后被切換,線程B修改了i,線程A恢復后會將舊值寫回,導致結果錯誤。
2.?3 協調線程間的執行順序
?場景:某些任務需要線程按特定順序執行。
?生產者-消費者模型:消費者線程需等待生產者生成數據后再讀取。
?任務依賴:線程B必須在線程A完成初始化后才能執行。
2.?4 避免資源爭用(如文件、網絡連接)?
?問題:多個線程同時寫入同一文件或占用同一網絡端口,會導致數據錯亂或程序崩潰。
🏳??🌈線程同步的常見手段
同步問題的嚴重后果
?數據不一致
:程序輸出錯誤,如銀行賬戶余額異常。
?程序崩潰
:多線程同時釋放內存導致雙重釋放(Double Free)。
?死鎖(Deadlock)
?:線程互相等待對方釋放鎖,導致永久阻塞。
說白了,線程同步就是一種為了統一管理生產消費者模型的一種機制
🏳??🌈三、什么是生產消費者模型
3.1 三種關系
3.2 兩個角色
生產者
,模擬是同數據的那方
消費者
,取走數據的那方
3.3 一個場景
所有生產消費所用的數據都是在中間的 “超市”
中進行
🏳??🌈四、以阻塞隊列模擬多生產消費者模型
下圖是以阻塞隊列模擬多生產消費者模型的基本過程
也就是有兩類線程(生產者和消費者),從同一個場景(blockqueue)中放入和拿出數據的過程
4.1 成員名
我們利用現有的庫函數對環境進行一下封裝,再利用一個隊列模擬臨界資源
private:std::queue<T> _q; // 保存數據的容器,臨界資源int _cap; // bq最大容量pthread_mutex_t _mutex; // 互斥pthread_cond_t _productor_cond; // 生產者條件變量pthread_cond_t _consumer_cond; // 消費者條件變量int _cwait_num; // 當前等待的消費者數量int _pwait_num; // 當前等待的生產者數量};
4.2 構造函數和析構函數
BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){pthread_mutex_init(&_mutex, nullptr); // 創建互斥鎖pthread_cond_init(&_productor_cond, nullptr); // 生產者條件變量pthread_cond_init(&_consumer_cond, nullptr); // 消費者條件變量}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}
4.3 模擬的生產者和消費者
void Equeue(const T &in) // 生產者{pthread_mutex_lock(&_mutex);// 你想放數據,就能放數據嗎??生產數據是有條件的!// 結論1: 在臨界區中等待是必然的(目前)while (IsFull()) // 5. 對條件進行判斷,為了防止偽喚醒,我們通常使用while進行判斷!{std::cout << "生產者進入等待..." << std::endl;// 2. 等是,釋放_mutex_pwait_num++;pthread_cond_wait(&_productor_cond, &_mutex); // wait的時候,必定是持有鎖的!!是有問題的!_pwait_num--;// 3. 返回,線程被喚醒&&重新申請并持有鎖(它會在臨界區內醒來!)std::cout << "生產者被喚醒..." << std::endl;}// 4. if(IsFull())不滿足 || 線程被喚醒_q.push(in); // 生產// 肯定有數據!if(_cwait_num){std::cout << "叫醒消費者" << std::endl;pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消費者{pthread_mutex_lock(&_mutex);while(IsEmpty()){std::cout << "消費者進入等待..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 偽喚醒_cwait_num--;std::cout << "消費者被喚醒..." << std::endl;}// 4. if(IsEmpty())不滿足 || 線程被喚醒*out = _q.front();_q.pop();// 肯定有空間if(_pwait_num){std::cout << "叫醒生產者" << std::endl;pthread_cond_signal(&_productor_cond);}pthread_mutex_unlock(&_mutex);}
4.4 模擬生產、消費者過程
我們假設生產速度小于消費速度,相當于我們沒生產一個對象后需要花費一定的時間,但是消費者一直就緒,就要等生產者生產出來
void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 從bq拿到數據bq->Pop(&data);// 2.做處理printf("Consumer, 消費了一個數據: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 從外部獲取數據// data = 10; // 有數據???// 2. 生產到bq中bq->Equeue(data);printf("producter 生產了一個數據: %d\n", data);data++;}
}
🏳??🌈五、整體代碼
5.1 BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>namespace BlockQueueModule
{static const int gcap = 10;template <typename T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap) : _cap(cap), _cwait_num(0), _pwait_num(0){pthread_mutex_init(&_mutex, nullptr); // 創建互斥鎖pthread_cond_init(&_productor_cond, nullptr); // 生產者條件變量pthread_cond_init(&_consumer_cond, nullptr); // 消費者條件變量}void Equeue(const T &in) // 生產者{pthread_mutex_lock(&_mutex);// 你想放數據,就能放數據嗎??生產數據是有條件的!// 結論1: 在臨界區中等待是必然的(目前)while (IsFull()) // 5. 對條件進行判斷,為了防止偽喚醒,我們通常使用while進行判斷!{std::cout << "生產者進入等待..." << std::endl;// 2. 等是,釋放_mutex_pwait_num++;pthread_cond_wait(&_productor_cond, &_mutex); // wait的時候,必定是持有鎖的!!是有問題的!_pwait_num--;// 3. 返回,線程被喚醒&&重新申請并持有鎖(它會在臨界區內醒來!)std::cout << "生產者被喚醒..." << std::endl;}// 4. if(IsFull())不滿足 || 線程被喚醒_q.push(in); // 生產// 肯定有數據!if(_cwait_num){std::cout << "叫醒消費者" << std::endl;pthread_cond_signal(&_consumer_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T *out) // 消費者{pthread_mutex_lock(&_mutex);while(IsEmpty()){std::cout << "消費者進入等待..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer_cond, &_mutex); // 偽喚醒_cwait_num--;std::cout << "消費者被喚醒..." << std::endl;}// 4. if(IsEmpty())不滿足 || 線程被喚醒*out = _q.front();_q.pop();// 肯定有空間if(_pwait_num){std::cout << "叫醒生產者" << std::endl;pthread_cond_signal(&_productor_cond);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _q; // 保存數據的容器,臨界資源int _cap; // bq最大容量pthread_mutex_t _mutex; // 互斥pthread_cond_t _productor_cond; // 生產者條件變量pthread_cond_t _consumer_cond; // 消費者條件變量int _cwait_num; // 當前等待的消費者數量int _pwait_num; // 當前等待的生產者數量};
}
5.2 Main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>using namespace BlockQueueModule;void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while(true){int data;// 1. 從bq拿到數據bq->Pop(&data);// 2.做處理printf("Consumer, 消費了一個數據: %d\n", data);}
}void *Productor(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){sleep(2);// 1. 從外部獲取數據// data = 10; // 有數據???// 2. 生產到bq中bq->Equeue(data);printf("producter 生產了一個數據: %d\n", data);data++;}
}int main()
{// 交易場所,不僅僅可以用來進行傳遞數據// 傳遞任務!!!v1: 對象 v2BlockQueue<int> *bq = new BlockQueue<int>(5); // 共享資源 -> 臨界資源// 單生產,單消費pthread_t c1, p1, c2, p2, p3;pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_create(&p2, nullptr, Productor, bq);pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);delete bq;return 0;
}
5.3 Makefile
bin=bq
cc=g++
src=$(wildcard *.cc)
obj=$(src:.cc=.o)$(bin):$(obj)$(cc) -o $@ $^ -lpthread
%.o:%.cc$(cc) -c $< -std=c++17.PHONY:clean
clean:rm -f $(bin) $(obj).PHONY:test
test:echo $(src)echo $(obj)
👥總結
本篇博文對 同步原理剖析及模擬多消費者模型 做了一個較為詳細的介紹,不知道對你有沒有幫助呢
覺得博主寫得還不錯的三連支持下吧!會繼續努力的~