緒論
并發編程紛繁復雜,其中用于線程同步的主要工具——條件變量,雖然精悍,但是要想正確靈活的運用卻并不容易。
對于條件變量的理解有三個難點:
- 為什么
wait
函數需要將解鎖和阻塞、喚醒和上鎖這兩對操作編程原子的? - 為什么
wait
函數需要配合while
進行使用? - 通知線程是應該先
notify
再unlock
還是先unlock
后notify
?
希望大家看完下面的介紹能夠得到想要的答案。想要了解更多關于C++并發編程信息可以移步的我倉庫:C++并發編程
條件變量
C++提供了兩種條件變量的實現:std::condition_variable
和std::condition_variable_any
。前者只能和std::mutex
配合使用,后者只需要符合互斥的標準即可。因為std::condition_variable_any
更通用,所以可能產生額外的開銷,如果沒什么特殊需要,盡可能使用std::condition_variable
條件變量是非常重要的線程同步的手段(目前我認為是最重要的),因此對其的深入理解至關重要。
-
條件變量總是和互斥一起配合使用,互斥用于保護共享數據,條件變量用于
- 通知(通知線程)
- 判斷該共享數據是否滿足條件(等待線程)
-
通知線程往往先通過互斥保護共享數據,對數據進行一定的修改后再發送通知(
notify_one()、notify_all()
)。需要注意的是我們應盡可能在臨界區內發送通知,從而避免可能出現的優先級翻轉和條件變量失效問題。雖然臨界區外通知可以讓等待線程一旦被喚醒就能立即解鎖互斥查看是否滿足情況,但是在Pthread進行wait morphint后基本上兩者沒有性能上的差距。詳細的分析可以參考博客:條件變量用例–解鎖與signal的順序問題。notify_one()
理論上只會喚醒一個等待線程,適用于共享變量數量發生變化的情況,例如通知消息隊列中的消息個數增加。notify_all()
會喚醒所有等待該條件變量的線程,適用于共享變量狀態發生變化的情況,例如通知所有工作線程開始計算。
-
等待線程先獲得互斥,然后將鎖和判定條件傳遞給
wait
函數等待返回。-
wait
函數首先會根據判斷條件判斷是否滿足條件(返回true
)-
如果滿足條件,則直接返回(互斥依舊上鎖)
-
如果不滿足條件,則阻塞等待,并解鎖互斥(讓其他線程得以修改共享數據的狀態)。直到被
notify
函數喚醒,再次上鎖,判斷條件是否滿足。這里的阻塞和解鎖、喚醒和上鎖都是原子的,就是為了避免兩個動作分別執行出現的條件競態。- 解鎖和阻塞是原子的:lock → !pred() → unlock → sleep;如果變量的改變以及喚醒事件發生在unlock和sleep中間,那么你不會檢測到,也就是錯過了這次喚醒。假如下次喚醒依賴于此次喚醒的成功(也就是說不會主動喚醒第二次),那么將發生死鎖。
- 喚醒和上鎖是原子的:wakeup → lock → !pred :如果條件在wakeup和lock之間從滿足變成了不滿足(不是因為其他等待線程修改,而是因為負責喚醒的線程自己再次修改了條件),那么此次喚醒將失敗。假如后面條件的再次滿足依賴于此次條件滿足成功(也就是說條件不會再主動滿足),那么將發生死鎖。
需要理解的是上面的死鎖的出現是有限定條件的(例如喚醒之間的依賴、條件滿足的依賴),雖然大多數情況下沒有這么嚴格的條件,但是工具本身需要避免這種危險的情況。
原子操作保證了重要的喚醒和條件滿足都能夠至少被一個等待線程看到。
-
可以看到
wait
函數內部需要解鎖互斥,所以就不能使用不提供unlock
函數的lock_guard
,而應該使用和互斥有相同接口的unique_lock
。
-
-
其實C++的線程庫是對pthread庫的封裝,因此也可以像pthread庫一樣只傳入互斥,解鎖并等待通知,一旦接收到通知后再上鎖,然后在一個
while
循環中進行判斷。while (!pred()) {cond_.wait(lk); //調用pthread_cond_wait }
對于傳入判定條件的版本,其實內部也是這樣的一個封裝罷了。
-
-
之所以說
notify_one()
理論上只會喚醒一個等待線程是因為存在調用一次notify_one()
卻喚醒了多個線程的可能性,甚至有時候沒有調用notify
等待線程都被喚醒,稱這種意外喚醒等待線程的情況為偽喚醒。按照C++標準的規定,這種偽喚醒出現的數量和頻率都不確定,因此要求等待線程的判定函數不能有副作用(可重用),并且需要在喚醒后再次判斷條件是否滿足,如果不滿足則需要重新等待。這也是為什么上面的代碼使用while
進行條件判斷而不是if
的原因。
消息隊列
//
// Created by edward on 22-11-16.
// use condtion_variable to genenrate a thread safe message queue
//#include "utils.h"
#include <mutex>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <string>template<typename T>
class MessageQueue {
public:void push(T t) {std::lock_guard lk(mtx_); //互斥保護數據queue_.push(std::move(t));cond_.notify_one(); //臨界區內發送通知,避免優先級反轉和條件變量失效}T pop() {T frnt;std::unique_lock lk(mtx_);cond_.wait(lk, [&](){return !queue_.empty();});frnt = std::move(queue_.front());queue_.pop();return frnt;}
private:mutable std::mutex mtx_;mutable std::condition_variable cond_;std::queue<T> queue_;
};using namespace std;template<typename T>
void data_prepare(MessageQueue<T> &messageQueue) {T t;while (cin >> t) {messageQueue.push(std::move(t));}
}template<typename T>
void data_process(MessageQueue<T> &messageQueue) {T t;int idx = 0;while (true) {t = messageQueue.pop(); //數據的處理在臨界區外edward::print("[", idx++, "]:", t);}
}int main() {MessageQueue<string> messageQueue;edward::print("test begin:");thread preparer(data_prepare<string>, ref(messageQueue));thread processer(data_process<string>, ref(messageQueue));preparer.join();//不用等待processer,如果preparer結束,則直接推出進程return 0;
}
運行結果
其中用到了我自己寫的庫函數頭文件utils
,如果想要了解更多信息可以移步C++ 工具函數庫