文章目錄
- 背景與動機
- 30.1 條件變量的定義與基本操作 (Definition and Routines)
- 30.2 生產者/消費者問題 (Bounded Buffer Problem)
- 30.3 覆蓋條件 (Covering Conditions) 與 `pthread_cond_broadcast`
- 30.4 總結
背景與動機
到目前為止,我們已經學習了鎖 (Locks) 作為并發原語,它允許線程互斥地訪問臨界區,防止數據競爭。然而,僅有鎖是不夠的。在許多并發場景中,一個線程可能需要等待某個條件 (condition) 變為真之后才能繼續執行。
例子:等待子線程完成 (Thread Join)
一個父線程創建了一個子線程。父線程可能希望等待子線程執行完畢后,再繼續執行自己的后續任務。這種等待子線程完成的操作通常稱為 join()
。
簡單但不高效的嘗試:自旋等待 (Spin-waiting using a shared variable - Figure 30.2)
// Figure 30.2: Parent Waiting For Child: Spin-based Approach
volatile int done = 0; // 共享變量,標記子線程是否完成void *child(void *arg) {printf("child\n");done = 1; // 子線程完成,設置標志return NULL;
}int main(int argc, char *argv[]) {printf("parent: begin\n");pthread_t c;pthread_create(&c, NULL, child, NULL); // 創建子線程while (done == 0) // 父線程在此自旋等待; // spinprintf("parent: end\n");return 0;
}
- 問題: 父線程會持續地檢查
done
變量,這是一種忙等待 (busy-waiting) 或自旋 (spinning)。這種方式極度浪費 CPU 時間,因為父線程在等待期間本可以被調度出去,讓其他有用的工作執行。
核心問題 (THE CRUX): 如何優雅地等待一個條件?
在多線程程序中,線程經常需要等待某個條件成立才能繼續。簡單地自旋等待不僅效率低下,浪費CPU,有時甚至可能不正確。那么,線程應該如何有效地等待一個條件呢?
30.1 條件變量的定義與基本操作 (Definition and Routines)
條件變量 (Condition Variable - CV) 是一種顯式的隊列,線程可以在某個條件不滿足時,將自己放入這個隊列中并進入休眠狀態(通過等待該條件)。當其他線程改變了可能影響該條件的狀態時,它可以通知 (signal) 一個或多個正在等待該條件的休眠線程,喚醒它們以重新檢查條件并繼續執行。
- 歷史淵源: 條件變量的思想可以追溯到 Dijkstra 的“私有信號量 (private semaphores)”和 Hoare 在其監視器 (Monitors) 工作中提出的“條件變量”。
- 聲明 (POSIX Pthreads):
pthread_cond_t c;
(還需要正確初始化) - 核心操作 (POSIX Pthreads):
pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex)
:- 原子地 (atomically):
- 釋放傳入的
mutex
。 - 將調用線程置于休眠狀態,并將其加入與
cv
關聯的等待隊列。
- 釋放傳入的
- 當線程被喚醒時(通常由其他線程調用
pthread_cond_signal
或pthread_cond_broadcast
),pthread_cond_wait
會在返回之前重新獲取 (re-acquire) 傳入的mutex
。 - 前提條件: 調用
pthread_cond_wait
時,當前線程必須已經持有mutex
。
- 原子地 (atomically):
pthread_cond_signal(pthread_cond_t *cv)
:- 喚醒至少一個(通常是一個)正在
cv
上等待的線程(如果存在的話)。 - 如果沒有任何線程在
cv
上等待,signal
操作通常無效(即信號丟失)。
- 喚醒至少一個(通常是一個)正在
pthread_cond_broadcast(pthread_cond_t *cv)
:- 喚醒所有正在
cv
上等待的線程。
- 喚醒所有正在
為什么 wait()
需要一個互斥鎖參數?
這是條件變量設計的核心,為了防止競爭條件和“丟失的喚醒 (lost wakeup)”問題。
- 保護條件檢查: 線程在決定是否需要等待之前,必須檢查某個共享狀態(即條件)。這個檢查本身以及后續可能的
wait
操作必須是原子的。互斥鎖確保了在檢查條件和進入等待狀態之間,條件本身不會被其他線程修改。 - 原子釋放鎖和休眠:
pthread_cond_wait
的關鍵在于它能原子地釋放鎖并將線程置于休眠。如果這兩步不是原子的(例如,先釋放鎖,再嘗試休眠),那么在釋放鎖之后、線程實際休眠之前,另一個線程可能已經修改了條件并發送了信號。這個信號就會因為沒有線程在等待而被丟失,導致調用wait
的線程永久休眠。
使用條件變量解決父線程等待子線程 (Figure 30.3):
// Figure 30.3: Parent Waiting For Child: Use A Condition Variable
int done = 0; // 共享狀態變量,表示子線程是否完成
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; // 互斥鎖,保護 done
pthread_cond_t c = PTHREAD_COND_INITIALIZER; // 條件變量// 子線程退出時調用
void thr_exit() {pthread_mutex_lock(&m); // 獲取鎖,保護 donedone = 1; // 修改共享狀態pthread_cond_signal(&c); // 通知等待在 c 上的線程pthread_mutex_unlock(&m); // 釋放鎖
}void *child(void *arg) {printf("child\n");thr_exit(); // 調用封裝好的退出函數return NULL;
}// 父線程等待子線程完成時調用
void thr_join() {pthread_mutex_lock(&m); // 獲取鎖,保護 done 的檢查while (done == 0) { // 關鍵:使用 while 循環檢查條件// 在調用 wait 時,鎖 m 被持有// wait 會原子地釋放 m 并使父線程休眠pthread_cond_wait(&c, &m);// 當被喚醒時,wait 會重新獲取 m,然后返回}// 此處,父線程重新持有了鎖 m,并且知道 done == 1pthread_mutex_unlock(&m); // 釋放鎖
}int main(int argc, char *argv[]) {printf("parent: begin\n");pthread_t p_child; // 將變量名從 c 改為 p_child 以示區分pthread_create(&p_child, NULL, child, NULL);thr_join(); // 父線程等待子線程printf("parent: end\n");return 0;
}
執行流程分析 (兩種情況):
- 父線程先運行到
thr_join()
:- 父線程獲取鎖
m
。 - 檢查
done
(為0),進入while
循環。 - 調用
pthread_cond_wait(&c, &m)
。父線程原子地釋放鎖m
并進入休眠狀態,等待條件變量c
。 - 子線程運行,打印 “child”。
- 子線程調用
thr_exit()
:- 獲取鎖
m
。 - 設置
done = 1
。 - 調用
pthread_cond_signal(&c)
,喚醒正在c
上等待的父線程。 - 釋放鎖
m
。
- 獲取鎖
- 父線程從
pthread_cond_wait
返回,此時它已重新獲取了鎖m
。 - 父線程再次檢查
while (done == 0)
,此時done
為 1,循環終止。 - 父線程釋放鎖
m
,打印 “parent: end”。
- 父線程獲取鎖
- 子線程先運行并完成:
- 子線程運行,打印 “child”。
- 子線程調用
thr_exit()
:- 獲取鎖
m
。 - 設置
done = 1
。 - 調用
pthread_cond_signal(&c)
。此時可能沒有線程在c
上等待(如果父線程還沒調用thr_join
),信號“丟失”(這是正常的,信號不是計數器)。 - 釋放鎖
m
。子線程結束。
- 獲取鎖
- 父線程運行,調用
thr_join()
:- 獲取鎖
m
。 - 檢查
done
(為1),while (done == 0)
條件不滿足,循環不執行。
- 獲取鎖
- 父線程釋放鎖
m
,打印 “parent: end”。
關鍵點:while
循環 vs. if
語句
- 必須使用
while
循環來重新檢查條件:while (condition_is_false) { pthread_cond_wait(...); }
- 原因 (Mesa Semantics & Spurious Wakeups):
- Mesa 語義 (Mesa Semantics): 當一個線程被
pthread_cond_signal
喚醒時,它僅僅是一個提示 (hint),表明條件可能已經為真。在被喚醒的線程重新獲取鎖并實際運行之前,其他線程可能已經運行并再次改變了條件,使得條件又變回假。因此,被喚醒的線程必須重新檢查條件。 - 虛假喚醒 (Spurious Wakeups): 在某些操作系統或線程庫的實現中,線程有時可能會在沒有被顯式
signal
或broadcast
的情況下從wait
中“虛假地”喚醒。雖然不常見,但為了代碼的健壯性,必須重新檢查條件。 if
語句只檢查一次條件,如果條件在被喚醒后到實際執行前又變假,或者發生了虛假喚醒,程序邏輯就會出錯。
- Mesa 語義 (Mesa Semantics): 當一個線程被
錯誤嘗試分析 (加深理解):
-
沒有狀態變量
done
(Figure 30.4):// thr_exit() // Pthread_mutex_lock(&m); // Pthread_cond_signal(&c); // 只有信號,沒有狀態 // Pthread_mutex_unlock(&m);// thr_join() // Pthread_mutex_lock(&m); // Pthread_cond_wait(&c, &m); // 直接等待,不檢查狀態 // Pthread_mutex_unlock(&m); ```* **問題:** 如果子線程先運行并調用 `thr_exit()` 發送信號,此時父線程可能還沒調用 `thr_join()` 進入等待。信號會丟失。當父線程稍后調用 `thr_join()` 并進入 `wait` 時,它將永遠等待,因為已經沒有信號會再來喚醒它了。 * **教訓:** 條件變量通常需要與一個**顯式的狀態變量**一起使用,該狀態變量記錄了線程感興趣的實際條件。鎖、等待和信號都是圍繞這個狀態變量進行的。
-
沒有鎖的
wait
和signal
(Figure 30.5 - 概念性錯誤,實際 Pthreads API 不允許):// thr_exit() // done = 1; // Pthread_cond_signal(&c); // 沒有鎖// thr_join() // if (done == 0) // Pthread_cond_wait(&c); // 沒有鎖
- 問題 (細微的競爭條件 - Lost Wakeup):
- 父線程調用
thr_join()
。 - 檢查
done
(為0)。 - 在父線程調用
Pthread_cond_wait
使自己休眠之前,父線程被中斷。 - 子線程運行,設置
done = 1
,并調用Pthread_cond_signal
。由于父線程還沒進入等待,信號丟失。 - 父線程恢復運行,執行
Pthread_cond_wait
,進入永久休眠。
- 父線程調用
- 教訓:
pthread_cond_wait
必須與互斥鎖一起使用,并且互斥鎖在調用wait
時必須被持有,以原子地完成條件檢查、釋放鎖和進入休眠的過程。
- 問題 (細微的競爭條件 - Lost Wakeup):
TIP: ALWAYS HOLD THE LOCK WHILE SIGNALING (通常情況下)
- 雖然在某些特定情況下,不持有鎖進行
signal
可能是安全的(例如,如果確信條件狀態的改變和signal
操作本身是原子的,并且之后不會再訪問受鎖保護的數據),但最簡單和最安全的做法是在調用pthread_cond_signal
或pthread_cond_broadcast
時持有相關的互斥鎖。這可以確保在修改共享狀態和發出信號之間沒有競爭。 - 對于
pthread_cond_wait
,持有鎖是強制的語義要求。
30.2 生產者/消費者問題 (Bounded Buffer Problem)
這是一個經典的并發同步問題,用于演示條件變量的強大功能。
- 場景:
- 生產者 (Producer) 線程: 生成數據項并將其放入一個共享的、有限大小的緩沖區 (bounded buffer) 中。
- 消費者 (Consumer) 線程: 從該緩沖區中取出數據項并進行處理。
- 同步要求:
- 生產者不能在緩沖區已滿時放入數據(需要等待緩沖區有空位)。
- 消費者不能在緩沖區為空時取出數據(需要等待緩沖區有數據)。
- 對共享緩沖區的訪問(放入和取出操作)必須是互斥的。
簡單版本:單元素緩沖區 (Single Buffer - Figure 30.6, 30.7)
buffer
: 一個共享的整數變量,用于存放數據。count
: 一個共享的整數變量,0表示緩沖區為空,1表示緩沖區已滿。put(value)
: 假設緩沖區為空 (count==0
),放入數據,設置count=1
。get()
: 假設緩沖區為滿 (count==1
),取出數據,設置count=0
。
首次嘗試:單個條件變量和 if
語句 (Figure 30.8 - Broken)
// 共享變量
// int loops;
// cond_t cond; // 單個條件變量
// mutex_t mutex; // 單個互斥鎖
// int count = 0; // 緩沖區狀態 (0:空, 1:滿)
// int buffer; // 單元素緩沖區void *producer(void *arg) {for (int i = 0; i < loops; i++) {pthread_mutex_lock(&mutex); // p1if (count == 1) // p2: 檢查緩沖區是否已滿pthread_cond_wait(&cond, &mutex); // p3: 如果滿了,等待put(i); // p4: 放入數據 (內部會設置 count=1)pthread_cond_signal(&cond); // p5: 通知消費者有數據了pthread_mutex_unlock(&mutex); // p6}return NULL;
}void *consumer(void *arg) {// for (int i = 0; i < loops; i++) { // 書中是 while(1)while(1) {pthread_mutex_lock(&mutex); // c1if (count == 0) // c2: 檢查緩沖區是否為空pthread_cond_wait(&cond, &mutex); // c3: 如果空了,等待int tmp = get(); // c4: 取出數據 (內部會設置 count=0)pthread_cond_signal(&cond); // c5: 通知生產者有空位了pthread_mutex_unlock(&mutex); // c6printf("%d\n", tmp);}return NULL;
}
-
問題1 (使用
if
而不是while
- Mesa Semantics):- 如 Figure 30.9 的線程軌跡所示:
- 消費者 Tc1 運行,發現
count == 0
,調用wait
并休眠。 - 生產者 Tp 運行,
put()
數據,count
變為 1,signal()
喚醒 Tc1。Tc1 進入就緒隊列,但尚未運行。 - 關鍵: 另一個消費者 Tc2 “潛入”,獲取鎖,發現
count == 1
(因為 Tp 剛生產完),于是 Tc2 執行get()
,count
變為 0。Tc2 消耗了數據。 - 現在 Tc1 終于運行,它從
wait
返回 (因為之前被 Tp 喚醒了)。由于用的是if
,它不會重新檢查count
。 - Tc1 直接調用
get()
,但此時count
已經是 0 了!get()
內部的assert(count == 1)
會失敗。
- 消費者 Tc1 運行,發現
- 修復: 將
if (condition)
改為while (condition)
,如 Figure 30.10。這樣,Tc1 被喚醒后,會重新檢查while (count == 0)
,發現條件仍然為真 (因為 Tc2 已經消費了),于是 Tc1 會再次進入wait
休眠。
- 如 Figure 30.9 的線程軌跡所示:
-
問題2 (單個條件變量的混淆 - “Everyone asleep…” bug):
- 即使使用了
while
循環,單個條件變量仍然存在問題,如 Figure 30.11 的線程軌跡所示:- 兩個消費者 Tc1 和 Tc2 先后運行,都發現緩沖區為空,都在
cond
上調用wait
休眠。 - 生產者 Tp 運行,
put()
數據,count
變為 1。Tp 調用signal()
喚醒一個等待者(假設是 Tc1)。 - Tp 嘗試再次生產,發現
count == 1
(緩沖區已滿),Tp 也在cond
上調用wait
休眠。- 現在狀態:Tc1 就緒 (剛被喚醒),Tc2 休眠 (等待數據),Tp 休眠 (等待空位)。
- Tc1 運行,從
wait
返回,重新檢查while (count == 0)
,條件為假 (因為count == 1
)。 - Tc1 調用
get()
,count
變為 0。 - 關鍵: Tc1 調用
pthread_cond_signal(&cond)
。此時,等待在cond
上的有兩個線程:Tc2 (消費者,等待數據) 和 Tp (生產者,等待空位)。 - 假設
signal
喚醒了 Tc2 (另一個消費者)。 - Tc2 運行,從
wait
返回,重新檢查while (count == 0)
。由于 Tc1 剛消費完,count
是 0,所以條件為真,Tc2 再次進入wait
休眠。 - 災難發生: Tp (唯一能生產數據的線程) 仍然在休眠,等待空位。Tc1 已經消費完退出了循環(或準備下一次消費)。Tc2 也在休眠,等待數據。所有相關的線程都可能陷入休眠,沒有人能打破僵局。
- 兩個消費者 Tc1 和 Tc2 先后運行,都發現緩沖區為空,都在
- 原因: 消費者 Tc1 消耗完數據后,它應該喚醒一個生產者(因為現在有空位了)。但是由于只有一個條件變量,它無法區分應該喚醒誰,它錯誤地喚醒了另一個消費者 Tc2。
- 即使使用了
正確解決方案:使用兩個條件變量 (Figure 30.12)
cond_t empty;
// 當緩沖區從滿變為空時,生產者在此等待,消費者發出信號cond_t fill;
// 當緩沖區從空變為有時,消費者在此等待,生產者發出信號
// 共享變量
// int loops;
// cond_t empty, fill; // 兩個條件變量
// mutex_t mutex;
// int count = 0;
// int buffer;void *producer(void *arg) {for (int i = 0; i < loops; i++) {pthread_mutex_lock(&mutex);while (count == 1) // 緩沖區滿了?pthread_cond_wait(&empty, &mutex); // 等待 empty 條件 (由消費者發出)put(i);pthread_cond_signal(&fill); // 通知消費者有數據了 (發信號給 fill 條件)pthread_mutex_unlock(&mutex);}return NULL;
}void *consumer(void *arg) {while (1) {pthread_mutex_lock(&mutex);while (count == 0) // 緩沖區空了?pthread_cond_wait(&fill, &mutex); // 等待 fill 條件 (由生產者發出)int tmp = get();pthread_cond_signal(&empty); // 通知生產者有空位了 (發信號給 empty 條件)pthread_mutex_unlock(&mutex);printf("%d\n", tmp);}return NULL;
}
- 邏輯:
- 生產者在緩沖區滿時,等待在
empty
條件變量上(期望消費者消費后發出empty
信號)。生產數據后,它向fill
條件變量發信號,通知消費者有數據可取。 - 消費者在緩沖區空時,等待在
fill
條件變量上(期望生產者生產后發出fill
信號)。消費數據后,它向empty
條件變量發信號,通知生產者有空位可用。
- 生產者在緩沖區滿時,等待在
- 優點: 信號被導向了正確的類型的等待線程,避免了之前消費者喚醒消費者或生產者喚醒生產者的混淆問題。
最終的生產者/消費者解決方案 (多個緩沖區槽位 - Figure 30.13, 30.14)
- 將單元素緩沖區擴展為一個真正的循環隊列(數組實現),包含
fill_ptr
(下一個填充位置),use_ptr
(下一個使用位置), 和count
(當前緩沖區中的元素數量)。 put()
和get()
函數相應修改以操作這個循環隊列。- 條件檢查的改變:
- 生產者:
while (count == MAX)
(緩沖區是否已滿,MAX 是緩沖區總容量) - 消費者:
while (count == 0)
(緩沖區是否為空)
- 生產者:
- 條件變量和鎖的邏輯與雙條件變量的單槽版本相同:
- 生產者等待
empty
,通知fill
。 - 消費者等待
fill
,通知empty
。
- 生產者等待
- 好處:
- 更高的并發性和效率: 允許多個數據項在被消費前先生產出來,減少了生產者和消費者的直接同步等待,尤其是在生產和消費速率不均時,可以起到緩沖作用。
- 減少上下文切換: 如果緩沖區足夠大,生產者可以連續生產多個條目而無需等待消費者,反之亦然。
30.3 覆蓋條件 (Covering Conditions) 與 pthread_cond_broadcast
有時,一個線程發出信號時,它可能不清楚究竟應該喚醒哪一個或哪些等待的線程,或者多個等待線程的條件都可能因為這次狀態改變而變為真。
例子:內存分配器 (Figure 30.15)
allocate(size)
: 線程請求分配size
大小的內存。如果當前剩余內存bytesLeft < size
,則線程需要等待。free(ptr, size)
: 線程釋放size
大小的內存,bytesLeft += size
。此時,應該喚醒等待分配內存的線程。
// 共享變量
// int bytesLeft = MAX_HEAP_SIZE;
// cond_t c;
// mutex_t m;void *allocate(int size) {pthread_mutex_lock(&m);while (bytesLeft < size) { // 內存不足,等待pthread_cond_wait(&c, &m);}// 從堆中獲取內存 ...void *ptr = ...; // 假設從某處獲取了內存bytesLeft -= size;pthread_mutex_unlock(&m);return ptr;
}void free(void *ptr, int size) {pthread_mutex_lock(&m);bytesLeft += size;// 問題:應該喚醒誰?pthread_cond_signal(&c); // 只喚醒一個,可能是錯誤的那個pthread_mutex_unlock(&m);
}
- 問題:
- 線程 Ta 調用
allocate(100)
,bytesLeft
為 0,Ta 休眠。 - 線程 Tb 調用
allocate(10)
,bytesLeft
仍為 0,Tb 也休眠。 - 線程 Tc 調用
free(ptr, 50)
,bytesLeft
變為 50。Tc 調用pthread_cond_signal(&c)
。 - 如果
signal
喚醒了 Ta (需要100字節),Ta 重新檢查條件bytesLeft < 100
,發現仍然為真 (50 < 100),Ta 再次休眠。 - 而 Tb (只需要10字節) 本應該被喚醒,因為現在有足夠的內存 (50 > 10),但它沒有被喚醒,仍在休眠。
- 線程 Ta 調用
- 原因:
free
操作無法知道哪個等待的分配請求現在可以被滿足。簡單地signal
一個線程可能喚醒了錯誤的線程。
解決方案:pthread_cond_broadcast()
- 當一個條件可能滿足多個等待者,或者發出信號的線程不確定哪個等待者最適合被喚醒時,可以使用
pthread_cond_broadcast(&c)
。 broadcast
會喚醒所有正在該條件變量c
上等待的線程。- 每個被喚醒的線程都會重新獲取鎖,并重新檢查其等待的特定條件 (在
while
循環中)。- 那些條件仍然不滿足的線程會再次調用
wait
休眠。 - 那些條件已經滿足的線程會跳出
while
循環并繼續執行。
- 那些條件仍然不滿足的線程會再次調用
- 在內存分配器的例子中: 當
free
調用pthread_cond_broadcast
時,Ta 和 Tb 都會被喚醒。- Ta 檢查
bytesLeft < 100
(50 < 100),仍然休眠。 - Tb 檢查
bytesLeft < 10
(50 < 10 不成立),跳出循環,成功分配內存。
- Ta 檢查
- 這種條件被稱為“覆蓋條件 (Covering Condition)”: 一個信號(或狀態改變)覆蓋了所有可能需要被喚醒的情況。代價是可能會喚醒過多不必要的線程,這些線程醒來后檢查條件發現不滿足又會立即睡去,帶來一些性能開銷。但在難以精確判斷喚醒對象時,這是一種保守且正確的做法。
TIP: USE WHILE (NOT IF) FOR CONDITIONS
- 再次強調: 即使不考慮
broadcast
,也始終使用while
循環來檢查條件變量相關的條件。這是因為:- Mesa Semantics: 信號只是一個提示。
- Spurious Wakeups: 線程可能在沒有信號的情況下被喚醒。
broadcast
的情況: 多個線程被喚醒,只有一個或少數幾個的條件真正滿足。
while
循環確保了線程在繼續執行前,其等待的條件確實為真。
30.4 總結
- 條件變量是鎖之外的另一種重要的同步原語。
- 它們允許線程在某個程序狀態(條件)不滿足期望時進入休眠,并在狀態改變時被其他線程喚醒。
- 關鍵組件:
- 一個互斥鎖,用于保護共享的狀態變量和條件檢查。
- 一個或多個條件變量,線程在上面等待。
- 一個共享的狀態變量,代表線程實際關心的條件。
- 核心操作:
wait()
(原子釋放鎖并休眠,喚醒后重新獲取鎖) 和signal()
/broadcast()
(喚醒等待者)。 - 重要實踐:
- 始終在
while
循環中調用wait()
并重新檢查條件。 - 通常在持有相關互斥鎖的情況下調用
signal()
或broadcast()
。 - 仔細設計條件和信號邏輯,考慮是否需要多個條件變量或使用
broadcast
。
- 始終在
- 條件變量優雅地解決了許多重要的同步問題,包括生產者/消費者問題和覆蓋條件問題。