在上一章節中,我們使用互斥量之后,確實解決了數據競爭問題,但出現了新的問題:只有一個線程(thread 1)在處理所有售票任務。這展示了互斥量的一個局限性:它確保了線程安全,但不保證公平性。
1. 條件變量
根據這個問題,我們可以引入條件變量(Condition Variable)?。條件變量允許線程在特定條件不滿足時主動等待,而不是忙等待或不公平地競爭鎖。
為什么會出現只有一個線程工作的情況?
在輸出中,只有thread 1在處理所有售票,這是因為:
鎖競爭的不公平性:當一個線程釋放鎖后,它可能立即又重新獲取鎖,而其他線程沒有機會獲取
調度策略:操作系統的線程調度可能優先調度剛剛釋放鎖的線程
沒有等待機制:線程在無法獲取票時沒有等待,而是繼續競爭鎖
1.1 條件變量的基本概念
條件變量是一種同步機制,允許線程在某個條件不滿足時掛起等待,直到其他線程改變條件并通知它。
條件變量的主要操作:
等待:
pthread_cond_wait(cond, mutex)
原子性地釋放互斥鎖并進入等待狀態
被喚醒后重新獲取互斥鎖
信號:
pthread_cond_signal(cond)
喚醒一個等待該條件變量的線程
廣播:
pthread_cond_broadcast(cond)
喚醒所有等待該條件變量的線程
1.2 條件變量函數
初始化條件變量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
參數說明:
cond
:指向要初始化的條件變量的指針attr
:條件變量屬性,通常為NULL表示使用默認屬性
返回值:成功返回0,失敗返回錯誤碼
使用方式:
// 動態初始化
pthread_cond_t cond;
if (pthread_cond_init(&cond, NULL) != 0) {// 處理錯誤perror("Failed to initialize condition variable");exit(EXIT_FAILURE);
}// 靜態初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
銷毀條件變量
int pthread_cond_destroy(pthread_cond_t *cond);
參數說明:
cond
:要銷毀的條件變量
返回值:成功返回0,失敗返回錯誤碼
注意事項:
只有在沒有線程等待該條件變量時才能安全銷毀
靜態初始化的條件變量不需要銷毀
銷毀后不應再使用該條件變量
等待條件滿足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
參數說明:
cond
:要等待的條件變量mutex
:與條件變量關聯的互斥鎖
返回值:成功返回0,失敗返回錯誤碼
關鍵特性:
原子操作:
pthread_cond_wait
?會原子性地執行以下操作:釋放互斥鎖?
mutex
將線程添加到條件變量的等待隊列中
使線程進入等待狀態
喚醒后的操作:當線程被喚醒時,它會:
重新獲取互斥鎖?
mutex
從?
pthread_cond_wait
?返回
虛假喚醒:線程可能會在沒有收到明確信號的情況下被喚醒,因此必須在循環中檢查條件
喚醒等待的線程
喚醒單個線程
int pthread_cond_signal(pthread_cond_t *cond);
功能:喚醒至少一個等待該條件變量的線程(具體喚醒哪個線程取決于調度策略)
使用場景:當只有一個線程需要被喚醒時使用,效率較高
喚醒所有線程
int pthread_cond_broadcast(pthread_cond_t *cond);
功能:喚醒所有等待該條件變量的線程
使用場景:
當多個線程需要被喚醒時
當不確定哪個線程應該被喚醒時
當條件的變化可能影響多個等待線程時
示例:
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>#define NUM 5int cnt = 100;pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;void* threadrun(void* args)
{std::string name = static_cast<const char*>(args);while(true){pthread_mutex_lock(&glock); // 獲取鎖pthread_cond_wait(&gcond, &glock); // 等待條件變量(會自動釋放鎖!)std::cout << name << "計算: " << cnt++ << std::endl; // 被喚醒后執行任務pthread_mutex_unlock(&glock); // 釋放鎖}
}int main()
{std::vector<pthread_t> threads;for(int i = 0; i < NUM; i++){pthread_t tid;char* name = new char[64];snprintf(name, 64, "thread-%d", i+1);int n = pthread_create(&tid, nullptr, threadrun, name);if(n != 0) continue;threads.push_back(tid);sleep(1);}sleep(3);while(true){std::cout << "喚醒一個線程..." << std::endl;pthread_cond_signal(&gcond);sleep(1);}for(auto& id : threads){int n = pthread_join(id, nullptr);(void)n;// 返回值不做判斷,基本都不會失敗}return 0;
}
1. 初始化階段
靜態初始化了一個互斥鎖?
glock
?和一個條件變量?gcond
2. 線程創建階段
創建5個線程,每個線程間隔1秒啟動
每個線程執行?
threadrun
?函數,并傳遞線程名稱作為參數
3. 線程執行階段
這是最關鍵的部分,每個線程的執行流程:
獲取互斥鎖 (
pthread_mutex_lock
)調用?
pthread_cond_wait
?- 這個函數會:原子性地釋放互斥鎖(讓其他線程可以獲取鎖)
使線程進入等待狀態(休眠,不消耗CPU)
等待被?
pthread_cond_signal
?喚醒
被喚醒后,自動重新獲取互斥鎖,然后執行任務
釋放互斥鎖,然后循環回到步驟1
4. 主線程喚醒階段
主線程每隔1秒調用?
pthread_cond_signal
每次調用會喚醒一個等待在條件變量上的線程
條件變量函數詳解
1.?pthread_cond_wait(&gcond, &glock)
這是條件變量的核心函數,它的工作原理很精妙:
原子性操作:
釋放互斥鎖?
glock
(讓其他線程可以獲取鎖)將當前線程加入到?
gcond
?的等待隊列中使線程進入等待狀態(休眠)
當被喚醒時:
重新獲取互斥鎖?
glock
(可能會阻塞,直到獲取到鎖)從?
pthread_cond_wait
?返回,繼續執行后續代碼
2.?pthread_cond_signal(&gcond)
喚醒一個等待在條件變量?
gcond
?上的線程如果有多個線程在等待,具體喚醒哪個取決于調度策略
不會立即讓被喚醒的線程運行,只是將其從等待狀態變為可運行狀態
3. 為什么需要互斥鎖配合?
條件變量必須與互斥鎖配合使用,因為:
保護共享數據:
cnt++
?操作需要互斥保護避免競態條件:確保檢查條件和進入等待是原子操作
防止虛假喚醒:在重新檢查條件前保持鎖的保護
運行結果:
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/TestCond$ ./test
喚醒一個線程...
thread-1計算: 100
喚醒一個線程...
thread-2計算: 101
喚醒一個線程...
thread-3計算: 102
喚醒一個線程...
thread-4計算: 103
喚醒一個線程...
thread-5計算: 104
喚醒一個線程...
thread-1計算: 105
喚醒一個線程...
thread-2計算: 106
喚醒一個線程...
thread-3計算: 107
喚醒一個線程...
thread-4計算: 108
喚醒一個線程...
thread-5計算: 109
喚醒一個線程...
thread-1計算: 110
喚醒一個線程...
thread-2計算: 111
^C
可以看到按序輸出,這是因為:
線程按創建順序依次進入等待狀態
pthread_cond_signal
?通常按隊列順序喚醒線程(FIFO)每次只喚醒一個線程,所以執行順序是確定性的
2. 生產者消費者模型
2.1?超市購物比喻:理解生產者消費者模型
讓我們用一個超市購物的生動例子來解釋生產者消費者模型:
想象一個超市系統:
生產者?= 商品供應商(如牛奶廠、面包房)
消費者?= 購物顧客
交易場所?= 超市貨架和倉庫
商品?= 數據
工作流程詳解
正常運營流程(緩沖區平衡狀態)
供應商送貨?→ 商品放入貨架 →?顧客購買?→ 商品從貨架取出
生產速度?≈?消費速度?→ 系統平穩運行
兩種阻塞場景詳解
1. 當貨架滿時:生產者阻塞
現實場景:
送貨卡車到達超市倉庫
倉庫管理員:"對不起,倉庫滿了,請在門外稍等"
卡車司機停車等待,不消耗燃油(不占用CPU)
當有顧客買走商品,空出位置后:"卡車先生,現在可以卸貨了!"
卡車開始卸貨
2. 當貨架空時:消費者阻塞
現實場景:
顧客來到超市貨架前
貨架空空如也:"唉,沒貨了,等等吧"
顧客去喝咖啡休息,不浪費時間徘徊(不忙等待)
當供應商補貨后:"顧客們,新貨到了!"
顧客開始選購商品
🎯 三種關鍵關系
1. 生產者與生產者之間的關系:競爭關系
超市例子:多個牛奶供應商都想把產品放到有限的冷藏柜中
技術實現:需要互斥鎖保護共享資源(貨架空間)
關系本質:互斥?- 生產者之間需要競爭有限的緩沖區空間
2. 消費者與消費者之間的關系:競爭關系
超市例子:多個顧客都想購買最后一瓶牛奶
技術實現:需要互斥鎖保護共享資源(商品)
關系本質:互斥?- 消費者之間需要競爭有限的數據/商品
3. 生產者與消費者之間的關系:同步與協作關系
超市例子:顧客買走商品后,需要通知供應商補貨;貨架滿時,供應商需要等待空位
技術實現:使用條件變量進行線程間通信和同步
關系本質:同步?- 生產者和消費者需要協調工作節奏
👥 兩種角色
1. 生產者
職責:產生數據/商品并放入緩沖區
特點:通常有固定的生產節奏
關注點:緩沖區是否有空位
2. 消費者
職責:從緩沖區取出數據/商品并進行處理
特點:消費速度可能波動
關注點:緩沖區是否有數據可消費
🏪 一個交易場所:緩沖區
本質:一塊特定結構的內存空間(通常是隊列)
功能:
解耦生產者和消費者
平衡生產和消費速度差異
提供臨時存儲
2.2 為何使用生產者消費者模型?
1. 解耦(Decoupling)
超市例子:牛奶廠不需要知道誰買了牛奶,顧客也不需要知道牛奶是哪家廠生產的。他們只關心超市這個中間平臺。
技術優勢:
生產者和消費者可以獨立開發和修改
系統更容易維護和擴展
降低系統復雜度
2. 支持并發(Concurrency Support)
超市例子:多個供應商可以同時往不同區域補貨,多個顧客可以同時在不同區域購物。
技術優勢:
生產者線程和消費者線程可以并發執行
提高系統吞吐量和資源利用率
充分利用多核CPU性能
3. 支持忙閑不均(Handling Speed Mismatches)
超市例子:牛奶廠每天生產1000瓶奶,但顧客有時買得多有時買得少。超市倉庫可以緩沖這種不平衡。
技術優勢:
緩沖區可以平衡生產和消費的速度差異
防止快速生產者淹沒慢速消費者
防止消費者等待造成資源浪費
2.3?基于BlockingQueue的生產者消費者模型
BlockingQueue
在多線程編程中,阻塞隊列(Blocking Queue)是一種線程安全的、常用于實現生產者-消費者模型的高級數據結構。與普通隊列相比,阻塞隊列具有以下關鍵特性:
- 阻塞特性:
當隊列為空時:消費者線程嘗試從隊列中獲取元素會被阻塞,直到隊列中有新元素
當隊列已滿時:生產者線程嘗試向隊列中添加元素會被阻塞,直到隊列中有空位
阻塞隊列的實現原理
阻塞隊列通常使用以下組件實現:
一個普通隊列:存儲元素的數據結構(數組或鏈表)
一個互斥鎖:保護對隊列的并發訪問
兩個條件變量:
not_empty
:當隊列為空時,消費者線程等待此條件not_full
:當隊列已滿時,生產者線程等待此條件
阻塞隊列的工作流程
生產者線程的工作流程
獲取互斥鎖
檢查隊列是否已滿
如果已滿,等待
not_full
條件變量否則,將元素加入隊列
釋放互斥鎖
通知消費者線程(通過
not_empty
條件變量)
消費者線程的工作流程
獲取互斥鎖
檢查隊列是否為空
如果為空,等待
not_empty
條件變量否則,從隊列取出元素
釋放互斥鎖
通知生產者線程(通過
not_full
條件變量)
模擬阻塞隊列的生產消費模型
注意:
為便于理解,我們先以單生產者-單消費者模型為例進行講解。初始階段采用原生接口實現,后面再將我們之前封裝好的互斥量等進行復用,
首先實現單生產-單消費模型,之后擴展為多生產-多消費模式(其實代碼邏輯仍然保持不變)。
封裝阻塞隊列
上文已經提到了阻塞隊列的原理,那么我們可以通過數據結構隊列來實現,代碼如下:
#include <iostream>
#include <queue>
#include <pthread.h>const int defaultcap = 5;template <class T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap):_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}
private:std::queue<T> _q;size_t _cap; // 隊列容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;
};
生產者生產資源,消費者消費資源,本質都是對隊列的增刪查改等操作,也就是訪問臨界資源,所以互斥量是需要的,由于阻塞特性,所以條件變量也是必不可少的。
我們知道,隊列為空時,消費者從隊列獲取數據會被阻塞,隊列為滿時,生產者生產數據入隊列時也會被阻塞,那么我們就需要,判斷隊列的狀態是否為空,還是滿。當然這兩個函數我們只需要在內部判斷,不需要暴露給外部使用,可以私有
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }
生產者刪除數據入隊,在上文中我們已經知道了流程,不過我們在實現時引入了兩個成員變量
private:std::queue<T> _q;size_t _cap; // 隊列容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消費者休眠的個數int _psleep_num; // 生產者休眠的個數
};
通過這兩個成員變量判斷是否有生產者或者消費者在wait阻塞休眠(隊列滿了或者空了),有的話我們就喚醒生產者或者消費者?
代碼如下:
// 生產者生產數據入隊列void Enqueue(const T& in){pthread_mutex_lock(&_mutex);// 不能使用if判斷,會虛假喚醒while(IsFull()) {_psleep_num++;std::cout << "隊列已滿, 生產者進入休眠, 生產者休眠個數: " << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 此時隊列必定有空間_q.push(in);// 只有隊列為空時,消費者才會阻塞休眠,此時隊列肯定不為空// 那么就判斷是否有消費者休眠,有就喚醒if(_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "喚醒消費者..." << std::endl;}// 直接喚醒其實也可以,為什么?//pthread_cond_signal(&_empty_cond);pthread_mutex_unlock(&_mutex);// 最后直接喚醒也行,為什么?//pthread_cond_signal(&_empty_cond);}
代碼實現很簡單,但是需要注意幾個問題:
如果不使用這兩個新增的條件變量,直接喚醒也行,或者在解鎖之后直接喚醒也可以。為什么呢?
1. 為什么直接喚醒也可以?(即不管有沒有消費者等待,都發送信號)
直接喚醒(無條件調用pthread_cond_signal
)是可以的,但可能有性能影響
為什么可以?
pthread_cond_signal
是一個輕量級操作如果沒有線程在條件變量上等待,這個調用實際上什么都不做
從功能上講,不會造成任何錯誤
為什么不總是這樣做?
雖然單次調用開銷很小,但在高并發場景下,大量不必要的信號調用會累積成可觀的性能開銷
代碼中的條件判斷(
if(_csleep_num > 0)
)是一種優化,避免了不必要的系統調用
注意:在使用條件變量阻塞等待時,會釋放鎖,喚醒之后會重新申請鎖,但是此時也有可能鎖被別人申請了,那么這個時候在申請鎖時被阻塞等待。
2. 為什么在解鎖后發送信號也可以?
在解鎖后發送信號是完全可行且有時是更好的做法
為什么可以?
POSIX允許在持有或不持有互斥鎖的情況下調用
pthread_cond_signal
條件變量的信號操作本身是線程安全的
為什么有時更好?
減少鎖持有時間:先解鎖再發信號,減少了互斥鎖的持有時間
避免立即競爭:如果先發信號再解鎖,被喚醒的線程會立即嘗試獲取鎖,導致鎖競爭
提高性能:被喚醒的線程可以立即獲取到CPU時間片,而不是等待當前線程釋放鎖
潛在風險:
如果在解鎖后發送信號,需要確保狀態的一致性不會被破壞
在我們的例子中,由于隊列操作已經完成,解鎖后發送信號是安全的
3. 為什么使用?if
?會造成虛假喚醒問題
問題:
pthread_cond_wait
是函數調用,那么函數就有可能調用失敗,萬一失敗,那此時隊列為滿并沒有進行等待阻塞,而是直接push,把數據入隊列,那不就出問題了嗎?或者如果是多生產單消費,消費者消費完一個數據,然后廣播喚醒了所有生產者,那所有生產者都會push數據,不也會出問題嗎?
首先什么是虛假喚醒?
虛假喚醒是指線程在沒有收到明確的信號或廣播的情況下,從?pthread_cond_wait
?中返回的現象。這不是 bug,而是 POSIX 標準允許的行為,原因包括:
性能優化:某些實現可能為了性能而允許虛假喚醒
信號中斷:線程可能被系統信號中斷
硬件因素:多處理器環境下的內存一致性模型
但是如果使用while循環判斷,就不會出現這些問題,而是會重新檢查?IsFull()
,發現隊列又滿了,會再次進入等待。
消費者消費數據出隊列,邏輯和生產者生產數據入隊列一樣,代碼如下:
// 消費者消費數據出隊列T Pop(){pthread_mutex_lock(&_mutex);while(IsEmpty()) {_csleep_num++;std::cout << "隊列為空, 消費者進入休眠, 消費者休眠個數: " << _csleep_num << std::endl;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}// 此時隊列必定有空間T data = _q.front();_q.pop();// 只有隊列為空時,消費者才會阻塞休眠,此時隊列肯定不為空// 那么就判斷是否有消費者休眠,有就喚醒if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "喚醒生產者..." << std::endl;}pthread_mutex_unlock(&_mutex);return data;}
主程序
阻塞隊列已經封裝好了,接下來就需要在主程序中編寫,測試單生產單消費模型
#include "BlockQueue.hpp"
#include <unistd.h>void* consumer(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true){int data = bq->Pop();std::cout << "消費了一個數據: " << data << std::endl;}
}void* producer(void* args)
{int data = 1;BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true){sleep(1);std::cout << "生產了一個數據: " << data << std::endl;bq->Enqueue(data);data++;}
}int main()
{BlockQueue<int>* bq = new BlockQueue<int>; pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, producer, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
運行結果:
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/PC$ ./pc
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 1
喚醒消費者...
消費了一個數據: 1
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 2
喚醒消費者...
消費了一個數據: 2
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 3
喚醒消費者...
消費了一個數據: 3
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 4
喚醒消費者...
消費了一個數據: 4
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 5
喚醒消費者...
消費了一個數據: 5
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 6
喚醒消費者...
消費了一個數據: 6
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 7
喚醒消費者...
消費了一個數據: 7
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
生產了一個數據: 8
喚醒消費者...
消費了一個數據: 8
隊列為空, 消費者進入休眠, 消費者休眠個數: 1
^C
我們也可以來試一下隊列為滿的情況,其他代碼不變,先讓消費者sleep上10秒鐘,讓生產者把隊列push滿
void* consumer(void* args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true){sleep(10);int data = bq->Pop();std::cout << "消費了一個數據: " << data << std::endl;}
}void* producer(void* args)
{int data = 1;BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);while(true){//sleep(1);std::cout << "生產了一個數據: " << data << std::endl;bq->Enqueue(data);data++;}
}
運行結果:
ltx@iv-ye1i2elts0wh2yp1ahah:~/gitLinux/Linux_system/lesson_thread/Cond/PC$ ./pc
生產了一個數據: 1
生產了一個數據: 2
生產了一個數據: 3
生產了一個數據: 4
生產了一個數據: 5
生產了一個數據: 6
隊列已滿, 生產者進入休眠, 生產者休眠個數: 1
喚醒生產者...
消費了一個數據: 1
生產了一個數據: 7
隊列已滿, 生產者進入休眠, 生產者休眠個數: 1
喚醒生產者...
消費了一個數據: 2
生產了一個數據: 8
隊列已滿, 生產者進入休眠, 生產者休眠個數: 1
^C
對于多生產多消費模型,我們的阻塞隊列代碼并不需要改變,其實原理都是一樣的,因為不管是誰訪問隊列,都需要互斥訪問。
注意:這里使用模板是為了說明隊列中不僅可以存放內置類型(如int),對象同樣可以作為任務參與生產消費流程。
3.?為什么 pthread_cond_wait 需要互斥量?
基本原理
條件等待是多線程編程中實現線程同步的重要手段。它的核心邏輯是:當一個線程發現某個條件不滿足時,主動進入等待狀態,直到其他線程修改了共享變量使得條件滿足,并通過信號喚醒等待線程。這種機制必須滿足以下兩個基本要素:
- 共享變量的修改:必須有至少一個線程能夠修改影響條件的共享變量
- 互斥保護:所有對共享變量的訪問和修改都必須通過互斥鎖進行保護
錯誤實現示例分析
考慮以下看似合理的錯誤實現:
// 錯誤的設計
pthread_mutex_lock(&mutex);
while (condition_is_false) {pthread_mutex_unlock(&mutex);//解鎖之后,等待之前,條件可能已經滿?,信號已經發出,但是該信號可能被錯過pthread_cond_wait(&cond, &mutex);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
這個設計存在嚴重的競態條件問題:
- 在解鎖后到調用
pthread_cond_wait
之前存在時間窗口 - 其他線程可能在此期間獲取鎖、修改條件并發送信號
- 這會導致信號丟失,等待線程可能永遠阻塞
假設有兩個線程:消費者線程C和生產者線程P
時間 | 消費者線程C | 生產者線程P | 問題描述 |
---|---|---|---|
t1 | pthread_mutex_lock(&mutex) | 等待鎖 | C獲取鎖 |
t2 | while (condition_is_false) ?→ true | 等待鎖 | 條件不滿足 |
t3 | pthread_mutex_unlock(&mutex) | 等待鎖 | C釋放鎖 |
t4 | 時間窗口開始 | pthread_mutex_lock(&mutex) | P獲取鎖 |
t5 | 準備調用?pthread_cond_wait | 修改條件為true | P改變條件 |
t6 | pthread_cond_signal(&cond) | P發送信號 | |
t7 | pthread_mutex_unlock(&mutex) | P釋放鎖 | |
t8 | 調用?pthread_cond_wait(&cond, &mutex) | 信號已錯過! | |
t9 | 永久阻塞... | 線程死鎖 |
正確的原子性操作
正確的實現要求解鎖和等待必須是原子操作,這正是pthread_cond_wait
的設計目的:
- 函數原型:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- 內部機制:
- 檢查條件量是否為0
- 將互斥量置為1(解鎖)
- 進入等待狀態
- 被喚醒后,將條件量置為1
- 恢復互斥量原狀態
4. 封裝條件變量
和封裝互斥量一樣非常簡單,代碼如下:
#pragma once
#include <cstring>
#include "Mutex.hpp"using namespace MutexModule;namespace CondModule
{class Cond{public:Cond(){int n = pthread_cond_init(&_cond, nullptr);if (n != 0){std::cerr << "cond init failed: " << strerror(n) << std::endl;}}void Wait(Mutex& mutex){int n = pthread_cond_wait(&_cond, mutex.Get());if (n != 0){std::cerr << "cond wait failed: " << strerror(n) << std::endl;}}void Signal(){int n = pthread_cond_signal(&_cond);if (n != 0){std::cerr << "cond signal failed: " << strerror(n) << std::endl;}}void Broadcast(){int n = pthread_cond_broadcast(&_cond);if (n != 0){std::cerr << "cond broadcast failed: " << strerror(n) << std::endl;}}~Cond(){int n = pthread_cond_destroy(&_cond);if (n != 0){std::cerr << "cond destroy failed: " << strerror(n) << std::endl;}}private:pthread_cond_t _cond;};
}
為了提高條件變量的通用性,建議在封裝Cond類時避免直接引用內部的互斥量。這樣可以在后續組合使用時避免因代碼耦合導致的初始化困難,因為Mutex和Cond通常需要同時創建。
我們給互斥量新增一個接口,用于條件變量中需要wait獲得鎖的情況:
pthread_mutex_t* Get(){return &_mutex;}
下面我們也可以將阻塞隊列修改一下,將封裝的互斥量和條件變量復用起來
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"using namespace MutexModule;
using namespace CondModule;const int defaultcap = 5;template <class T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap = defaultcap):_cap(cap), _csleep_num(0), _psleep_num(0){}// 生產者生產數據入隊列void Enqueue(const T& in){LockGuard lockguard(_mutex);// 不能使用if判斷,會虛假喚醒while(IsFull()) {_psleep_num++;std::cout << "隊列已滿, 生產者進入休眠, 生產者休眠個數: " << _psleep_num << std::endl;_full_cond.Wait(_mutex);_psleep_num--;}// 此時隊列必定有空間_q.push(in);// 只有隊列為空時,消費者才會阻塞休眠,此時隊列肯定不為空// 那么就判斷是否有消費者休眠,有就喚醒if(_csleep_num > 0){_empty_cond.Signal();std::cout << "喚醒消費者..." << std::endl;}}// 消費者消費數據出隊列T Pop(){LockGuard lockguard(_mutex);while(IsEmpty()) {_csleep_num++;std::cout << "隊列為空, 消費者進入休眠, 消費者休眠個數: " << _csleep_num << std::endl;_empty_cond.Wait(_mutex);_csleep_num--;}// 此時隊列必定有空間T data = _q.front();_q.pop();// 只有隊列為空時,消費者才會阻塞休眠,此時隊列肯定不為空// 那么就判斷是否有消費者休眠,有就喚醒if(_psleep_num > 0){_full_cond.Signal();std::cout << "喚醒生產者..." << std::endl;}return data;}~BlockQueue() {}
private:std::queue<T> _q;size_t _cap; // 隊列容量大小Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _csleep_num; // 消費者休眠的個數int _psleep_num; // 生產者休眠的個數
};