文章目錄
- 第3章 共享數據
- 本章主要內容
- 共享數據的問題
- 使用互斥保護數據
- 保護數據的替代方案
- 3.1 共享數據的問題
- 共享數據的核心問題
- 不變量的重要性
- 示例:刪除雙鏈表中的節點
- 多線程環境中的問題
- 條件競爭的后果
- 總結
- 3.1.1 條件競爭
- 3.1.2 避免惡性條件競爭
- 3.2 使用互斥量
- 3.2.1 互斥量
- 例子 (多線程插入容器會導致容器失效)
- **3.2.2 保護共享數據**
- **代碼 3.2 無意中傳遞了保護數據的引用**
- 1. `SomeData` 類
- 2. 全局指針 `unprotected`
- 3. `DataWrapper` 類
- 4. `malicious_function` 函數
- 5. `foo_protected` 和 `foo_unprotected` 函數
- 6. `main` 函數
- 總結
- 輸出解析
- 1. Protected Access (安全訪問)
- 2. Unprotected Access (不安全訪問)
- 代碼執行流程總結
- 不當的指針或引用傳遞而導致競爭條件(代碼例子)
- 示例代碼
- 問題分析
- 解決方案
- 改進版代碼
- 使用改進版代碼
- 總結
- 3.2.3 接口間的條件競爭
- 代碼 3.3 `std::stack` 容器的實現
- 如何解決接口設計中的條件競爭問題?
- **直接解決方案:在 `top()` 中拋出異常**
- **潛在的條件競爭問題**
- 解決條件競爭的幾種選項
- **選項 1:傳入一個引用**
- **選項 2:無異常拋出的拷貝構造函數或移動構造函數**
- **選項 3:返回指向彈出值的指針**
- **選項 4:“選項 1 + 選項 2” 或 “選項 1 + 選項 3”**
- 總結
- 示例:定義線程安全的堆棧
- 代碼 3.4 線程安全的堆棧類定義(概述)
- 代碼 3.5 擴展(線程安全)堆棧
- 鎖粒度的討論
- 全局互斥量的問題
- 細粒度鎖的問題
- 死鎖問題
- 3.2.4 死鎖:問題描述及解決方案
- **問題描述**
- **避免死鎖的建議**
- **解決方案:使用 `std::lock` 和 `std::scoped_lock`**
- **1. 使用 `std::lock`**
- **2. C++17 中的 `std::scoped_lock`**
- **總結**
- 3.2.5 避免死鎖的進階指導
- **問題背景**
- **1. 避免嵌套鎖**
- **2. 避免在持有鎖時調用外部代碼**
- **3. 使用固定順序獲取鎖**
- **4. 使用層次鎖結構**
- **5. 超越鎖的延伸擴展**
- **總結**
- **完整代碼實現**
- **代碼說明**
- **1. `hierarchical_mutex` 類**
- **2. 示例函數**
- **3. 主函數**
- **運行結果**
- 3.2.6 `std::unique_lock` —— 靈活的鎖
- **靈活性的特點**
- **代碼示例:交換操作中 `std::lock()` 和 `std::unique_lock` 的使用**
- **關鍵點解析**
- **總結**
- **完整代碼示例**
- **代碼說明**
- **1. `SharedResource` 類**
- **2. `swapValues()` 函數**
- **3. `modifyResource()` 函數**
- **4. 主函數**
- **運行結果示例**
- **關鍵點解析**
- 3.2.7 不同域中互斥量的傳遞
- **所有權傳遞機制**
- **應用場景:函數返回鎖**
- **網關類模式**
- **提前釋放鎖**
- **總結**
- **完整代碼示例**
- **代碼說明**
- **1. 全局互斥量**
- **2. 函數 `get_lock()`**
- **3. 網關類 `Gateway`**
- **4. 主函數**
- **運行結果示例**
- **關鍵點解析**
- 3.2.8 鎖的粒度
- **鎖粒度的概念**
- **鎖粒度的實際意義**
- **使用 `std::unique_lock` 減少鎖持有時間**
- **粗粒度鎖的問題**
- **細粒度鎖的應用示例**
- **語義問題與潛在風險**
- **總結**
- **完整代碼示例:銀行賬戶系統**
- **代碼說明**
- **1. `BankAccount` 類**
- **2. 線程函數**
- **3. 主函數**
- **運行結果示例**
- **關鍵點解析**
- 3.3 保護共享數據的方式
- **隱式同步的需求**
- 3.3.1 保護共享數據的初始化過程
- **多線程環境下的問題**
- **雙重檢查鎖模式的問題**
- **C++ 標準庫的解決方案:`std::call_once` 和 `std::once_flag`**
- **示例代碼**
- **作為類成員的延遲初始化**
- **靜態局部變量的線程安全初始化**
- **總結**
- **完整代碼示例**
- **代碼說明**
- **1. `SomeResource` 類**
- **2. `ResourceManager` 類**
- **3. 靜態局部變量初始化**
- **4. 測試函數**
- **5. 主函數**
- **運行結果示例**
- **關鍵點解析**
- 3.3.2 保護不常更新的數據結構
- **場景描述**
- **問題分析**
- **C++ 標準庫中的解決方案**
- **代碼示例**
- **代碼說明**
- **關鍵點解析**
- **完整代碼示例**
- **代碼說明**
- **1. `dns_entry` 類**
- **2. `dns_cache` 類**
- **3. 測試函數**
- **4. 主函數**
- **運行結果示例**
- **關鍵點解析**
- 3.3.3 嵌套鎖
- **概述**
- **完整代碼示例**
- **代碼說明**
- **1. `RecursiveMutexExample` 類**
- **2. `ImprovedDesignExample` 類**
- **3. 測試函數**
- **運行結果示例**
- **關鍵點解析**
第3章 共享數據
本章主要內容
- 共享數據的問題
- 使用互斥保護數據
- 保護數據的替代方案
在上一章中,我們已經對線程管理有了一定的了解。現在,讓我們來探討一下“共享數據的那些事兒”。
共享數據的問題
想象一下,你和朋友合租一個公寓,公寓中只有一個廚房和一個衛生間。當你的朋友在使用衛生間時,你就無法使用它了。同樣的問題也會出現在廚房。例如,如果廚房里有一個烤箱,你在烤香腸的同時,也在做蛋糕,那么你可能會得到不想要的食物(比如香腸味的蛋糕)。此外,在公共空間進行某項任務時,如果發現某些需要的東西被別人拿走,或者在你離開的一段時間內有些東西被移動了位置,這都會讓你感到不爽。
類似的問題也困擾著線程。當多個線程訪問共享數據時,必須制定一些規則來限定哪些數據可以被哪些線程訪問。如果一個線程更新了共享數據,它需要通知其他線程。從易用性的角度來看,同一進程中的多個線程共享數據有利有弊,但錯誤的共享數據使用是導致bug的主要原因。
使用互斥保護數據
為了避免上述問題,我們可以使用互斥鎖(mutex)來保護共享數據。互斥鎖確保在同一時間只有一個線程可以訪問共享數據,從而防止數據競爭和不一致的狀態。
保護數據的替代方案
除了互斥鎖,還有其他一些方法可以保護共享數據,例如:
- 讀寫鎖:允許多個線程同時讀取數據,但只允許一個線程寫入數據。
- 原子操作:通過硬件支持的原子操作來確保數據的一致性。
- 無鎖編程:通過復雜的算法和數據結構來實現線程安全,而不使用鎖。
本章將以數據共享為主題,探討如何避免上述及潛在問題的發生,同時最大化共享數據的優勢。
通過以上內容,我們希望能夠幫助你更好地理解共享數據的問題及其解決方案。在接下來的章節中,我們將深入探討這些技術的具體實現和應用。
3.1 共享數據的問題
在多線程編程中,共享數據是一個強大但危險的工具。當多個線程同時訪問和修改共享數據時,如果沒有妥善的管理,就會引發一系列復雜的問題。本節將深入探討共享數據修改帶來的挑戰,以及如何通過理解“不變量”來避免潛在的錯誤。
共享數據的核心問題
共享數據的問題主要源于數據的修改。如果共享數據是只讀的,那么所有線程都能安全地訪問它,因為數據不會被改變。然而,當一個或多個線程試圖修改共享數據時,情況就會變得復雜。修改操作可能會破壞數據的一致性,導致其他線程讀取到錯誤或無效的數據。
這種問題的根源在于條件競爭(Race Condition):當多個線程同時訪問共享數據,且至少有一個線程試圖修改數據時,程序的執行結果可能依賴于線程調度的順序,從而導致不可預測的行為。
不變量的重要性
為了理解共享數據修改帶來的問題,我們需要引入**不變量(Invariants)**的概念。不變量是描述數據結構在特定條件下必須保持的穩定狀態。例如,對于一個雙鏈表,不變量可能是“每個節點的前向指針和后向指針都正確指向相鄰節點”。
在修改共享數據時,尤其是復雜的數據結構,更新操作通常會暫時破壞不變量。例如,在刪除雙鏈表中的一個節點時,需要更新其相鄰節點的指針。在這個過程中,不變量會被暫時破壞,直到所有指針更新完成。
示例:刪除雙鏈表中的節點
![[Pasted image 20250209160942.png]]
讓我們以雙鏈表的節點刪除為例,具體說明共享數據修改可能引發的問題。假設我們有一個雙鏈表,每個節點包含兩個指針:一個指向下一個節點,另一個指向前一個節點。刪除一個節點的步驟如下:
- 找到要刪除的節點N。
- 更新前一個節點的指針,使其指向節點N的下一個節點。
- 更新后一個節點的指針,使其指向節點N的前一個節點。
- 刪除節點N。
在這個過程中,步驟2和步驟3會暫時破壞不變量。例如,在步驟2完成后,前一個節點的指針已經指向了節點N的下一個節點,但后一個節點的指針還未更新。此時,如果有其他線程訪問鏈表,可能會讀取到不一致的數據。
多線程環境中的問題
在多線程環境中,這種臨時的不變量破壞會引發嚴重的問題。例如:
- 如果一個線程在刪除節點的過程中被中斷,其他線程可能會訪問到一個部分更新的鏈表,導致讀取到無效的數據。
- 如果多個線程同時嘗試刪除相鄰的節點,可能會導致鏈表結構的永久性損壞,甚至引發程序崩潰。
這種問題被稱為條件競爭(Race Condition),是多線程編程中最常見的錯誤之一。它的根本原因在于多個線程對共享數據的訪問和修改缺乏協調。
條件競爭的后果
條件競爭的后果可能是災難性的。例如:
- 數據損壞:鏈表、樹等復雜數據結構可能會被破壞,導致程序無法正常運行。
- 不可預測的行為:程序的執行結果可能依賴于線程調度的順序,導致難以復現和調試的bug。
- 程序崩潰:在極端情況下,條件競爭可能導致程序崩潰或數據丟失。
總結
共享數據的修改是多線程編程中的一個核心挑戰。為了確保程序的正確性,我們必須理解不變量在數據結構中的作用,并采取措施避免條件競爭的發生。在接下來的章節中,我們將探討如何使用互斥鎖、原子操作等技術來保護共享數據,確保多線程程序的穩定性和可靠性。
通過以上分析,我們希望你能更清晰地認識到共享數據問題的本質,并為解決這些問題打下堅實的基礎。
3.1.1 條件競爭
想象一下你在一個大型電影院買票,售票窗口很多,大家都在同時購票。當你和其他人都在競爭購買同一場電影的票時,你的座位選擇就取決于之前的座位預定情況。如果剩余的座位不多了,那么就會出現一場“搶票大戰”,看誰能搶到最后一張票。這就是一個典型的條件競爭的例子:你的座位(或者電影票)是否能成功購買,取決于購票的先后順序。
在并發編程中,競爭條件指的是多個線程的執行順序會影響到程序的結果。每個線程都試圖盡快完成自己的任務。多數情況下,即使執行順序發生變化,也是良性競爭,結果仍然可以接受。例如,兩個線程同時向一個處理隊列中添加任務,由于隊列的特性,誰先添加任務并不會影響最終的結果。
只有當不變量(invariant)遭到破壞時,才會出現惡性競爭,比如雙向鏈表的例子。并發訪問共享數據時,如果多個線程的執行順序不當,導致數據狀態與預期的不變量不符,就會產生惡性競爭。C++ 標準中定義了數據競爭這個術語,它是一種特殊的條件競爭:當多個線程并發地修改同一個獨立對象時,就會發生數據競爭。數據競爭會導致未定義行為,這是并發編程中非常嚴重的問題。
惡性條件競爭通常發生在對多個數據塊進行修改的場景,例如修改兩個相連的指針(如圖 3.1 所示)。當操作需要訪問兩個獨立的數據塊時,不同的指令可能會交錯執行,一個線程可能正在修改數據塊,而另一個線程同時訪問了該數據塊。由于這種交錯執行的概率較低,因此這類問題通常難以發現和復現。即使 CPU 指令連續執行完成,并且數據結構可以被其他并發線程訪問,問題再次復現的幾率仍然很低。但是,隨著系統負載的增加,執行次數也隨之增加,問題復現的概率也會增大。因此,條件競爭問題可能會在系統高負載的情況下才會顯現出來。此外,條件競爭通常對時間非常敏感,因此在調試模式下運行程序時,錯誤可能會完全消失,因為調試模式會影響程序的執行時間(即使影響很小)。
對于并發編程人員來說,條件競爭是一個噩夢。在編寫多線程程序時,我們需要使用各種復雜的技術來避免惡性條件競爭。
3.1.2 避免惡性條件競爭
解決惡性條件競爭最直接的方法是對共享數據結構采用某種保護機制,以確保只有修改線程才能看到不變量的中間狀態。從其他訪問線程的角度來看,修改要么已經完成,要么尚未開始。C++ 標準庫提供了許多類似的機制,我們將在后面逐一介紹。
另一種選擇是修改數據結構和不變量的設計,使其能夠完成一系列不可分割的變化,從而保證每個不變量的狀態都是一致的。這種方法稱為無鎖編程(lock-free programming)。然而,無鎖編程非常復雜,很難保證其正確性。在這種層面上,無論是內存模型的細微差別,還是線程訪問數據的能力,都會增加編程的難度。
還有一種處理條件競爭的方法是使用事務(transaction)的方式來更新數據結構(就像更新數據庫一樣)。所需的讀取和寫入數據都存儲在事務日志中,然后將之前的操作合并并提交。當數據結構被另一個線程修改或者處理重啟時,提交操作將無法進行。這種方法稱為軟件事務內存(software transactional memory,STM),是一個熱門的研究領域。本書不會對 STM 進行詳細介紹,因為 C++ 目前沒有直接支持 STM(盡管 C++ 有事務性內存擴展的技術規范 [1])。
最基本的保護共享數據結構的方法是使用 C++ 標準庫提供的互斥量(mutex)。
好的,我來幫你優化這段關于互斥量的敘述:
3.2 使用互斥量
在并發編程中,我們不希望共享數據出現競爭條件,導致數據不變量遭到破壞。一種簡單的想法是將所有訪問共享數據的代碼都標記為互斥的,即同一時刻只允許一個線程訪問共享數據。這樣,任何線程在執行時,其他線程都必須等待,除非該線程正在修改共享數據,否則任何線程都不可能看到不變量的中間狀態。
實現這一想法的關鍵在于鎖機制。線程在訪問共享數據之前,先將數據“鎖住”,訪問結束后再將數據“解鎖”。線程庫需要保證,當一個線程使用互斥量鎖住共享數據時,其他線程必須等到該線程解鎖后才能訪問數據。
互斥量(mutex)是 C++ 中保護數據的最通用機制。然而,正確使用互斥量需要仔細的代碼編排,以確保數據的正確性(見 3.2.2 節),并避免接口間的競爭條件(見 3.2.3 節)。此外,互斥量也可能導致死鎖(見 3.2.4 節),或者對數據的保護過多或過少(見 3.2.8 節)。
3.2.1 互斥量
我們可以通過實例化 std::mutex
來創建互斥量實例。lock()
成員函數用于對互斥量上鎖,unlock()
用于解鎖。但是,不建議直接調用這些成員函數,因為這意味著必須在每個函數出口(包括異常情況)都調用 unlock()
。C++ 標準庫為互斥量提供了 RAII(Resource Acquisition Is Initialization)模板類 std::lock_guard
,它在構造時提供一個已鎖定的互斥量,并在析構時自動解鎖,從而保證互斥量始終能被正確解鎖。
以下代碼展示了如何在多線程應用中使用 std::mutex
和 std::lock_guard
來保護列表的訪問:
#include <list>
#include <mutex>
#include <algorithm>std::list<int> some_list; // 1
std::mutex some_mutex; // 2void add_to_list(int new_value) {std::lock_guard<std::mutex> guard(some_mutex); // 3some_list.push_back(new_value);
}bool list_contains(int value_to_find) {std::lock_guard<std::mutex> guard(some_mutex); // 4return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
在上述代碼中,some_list
是一個全局變量 ①,它被一個全局互斥量 some_mutex
保護 ②。add_to_list()
③ 和 list_contains()
④ 函數使用 std::lock_guard<std::mutex>
來確保對數據的訪問是互斥的:list_contains()
不可能看到正在被 add_to_list()
修改的列表。
C++17 添加了一個新特性:模板類參數推導。對于像 std::lock_guard
這樣簡單的模板類型,我們可以省略模板參數列表。因此,③ 和 ④ 的代碼可以簡化為:
C++
std::lock_guard guard(some_mutex);
具體的模板參數類型推導則交給 C++17 的編譯器完成。在 3.2.4 節中,我們將介紹 C++17 中的一種增強版數據保護機制——std::scoped_lock
。因此,在 C++17 環境下,上面的代碼也可以寫成:
C++
std::scoped_lock guard(some_mutex);
為了保持代碼清晰,并兼容只支持 C++11 標準的編譯器,我們繼續使用 std::lock_guard
,并在代碼中明確寫出模板參數的類型。
在某些情況下,使用全局變量沒有問題。但大多數情況下,互斥量通常會與需要保護的數據放在同一個類中,而不是定義為全局變量。這是面向對象設計的原則:將它們放在一個類中,可以使它們聯系在一起,也可以對類的功能進行封裝和數據保護。在這種情況下,add_to_list
和 list_contains
函數可以作為這個類的成員函數。互斥量和需要保護的數據都在類中定義為私有成員,這使得代碼更清晰,也方便了解何時對互斥量上鎖。所有成員函數都會在調用時對數據上鎖,結束時對數據解鎖,這就保證了訪問時數據不變量的狀態穩定。
例子 (多線程插入容器會導致容器失效)
#include"baseinclude.h"// 共享資源(列表)
std::list<int> shared_list;
std::mutex list_mutex;// 不受保護的函數:多個線程同時訪問和修改共享列表
void unprotected_add_to_list(int new_value) {shared_list.push_back(new_value);
}// 受保護的函數:使用互斥鎖保護共享列表
void protected_add_to_list(int new_value) {std::lock_guard<std::mutex> guard(list_mutex);shared_list.push_back(new_value);
}// 模擬線程執行的函數
void thread_function(int start_value, int count, bool use_protection) {for (int i = 0; i < count; ++i) {int value = start_value + i;if (use_protection) {protected_add_to_list(value);}else {unprotected_add_to_list(value);}}
}int main() {const int num_threads = 20;const int values_per_thread = 100*100;// 受保護的情況shared_list.clear(); // 清空列表std::vector<std::thread> protected_threads;for (int i = 0; i < num_threads; ++i) {protected_threads.push_back(std::thread(thread_function, i * values_per_thread, values_per_thread, true));}for (auto& thread : protected_threads) {thread.join();}std::cout << "Protected list size: " << shared_list.size() << std::endl; // 預期結果為 num_threads * values_per_thread// 不受保護的情況shared_list.clear(); // 第一次清空列表不會崩潰,因為容器是在受保護的情況下插入數據的,容器結構不會被破壞std::vector<std::thread> unprotected_threads;for (int i = 0; i < num_threads; ++i) {unprotected_threads.push_back(std::thread(thread_function, i * values_per_thread, values_per_thread, false));}for (auto& thread : unprotected_threads) {thread.join();}std::cout << "Unprotected list size: " << shared_list.size() << std::endl; // 預期結果可能小于 num_threads * values_per_thread# shared_list.clear(); // 第二次清空列表會導致容器崩潰return 0;
}
unprotected_add_to_list 代碼不僅會導致 shared_list的size() 不一致,更會 對 shared_list 的結構 進行破壞
當然,情況并非總是如此理想:當其中一個成員函數返回的是受保護數據的指針或引用時,也會破壞數據。具有訪問能力的指針或引用可以訪問(并可能修改)受保護的數據,而不會受到互斥鎖的限制。這就需要謹慎設計接口,確保互斥量能夠鎖住數據訪問,并且不留后門。
3.2.2 保護共享數據
使用互斥量保護共享數據并非簡單地在每個成員函數中添加 std::lock_guard
就能萬事大吉。通過指針或引用“泄露”受保護數據,同樣會使保護形同虛設。雖然檢查指針和引用相對容易——只需確保成員函數不通過返回值或輸出參數返回指向受保護數據的指針或引用——但更重要的是要全面考慮:
- 防止成員函數“泄露”: 仔細檢查所有成員函數,確保它們不會將指向受保護數據的指針或引用傳遞給調用者。
- 防范外部訪問: 除了自己編寫的成員函數,還要注意是否有其他代碼(尤其是你無法控制的代碼)可能通過指針或引用的方式訪問你的數據。即使函數本身沒有在互斥量保護區域內存儲指針或引用,也可能存在風險。
- 避免將保護數據作為運行時參數傳遞: 像代碼3.2那樣,將受保護數據作為參數傳遞給用戶提供的函數,會留下可乘之機。
代碼 3.2 無意中傳遞了保護數據的引用
C++
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <mutex>
#include <vector>class SomeData {int a;std::string b;public:SomeData() : a(0), b("") {}void do_something(const std::string& threadName) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬耗時操作a++;b += "1";std::cout << threadName << " - Thread ID: " << std::this_thread::get_id() << ", a: " << a << ", b: " << b << std::endl;}void print_data() const {std::cout << "Current Data - a: " << a << ", b: " << b << std::endl;}
};SomeData* unprotected = nullptr;class DataWrapper {
private:SomeData data;std::mutex m;public:template<typename Function>void process_data(Function func, const std::string& threadName) {std::lock_guard<std::mutex> lock(m); // 確保在作用域內持有鎖func(data, threadName);}void access_unprotected_data(const std::string& threadName) {std::lock_guard<std::mutex> lock(m); // 在持有鎖的情況下訪問if (unprotected) {unprotected->do_something(threadName + " - Unsafe Access");}}
};void malicious_function(SomeData& protected_data, const std::string& threadName) {unprotected = &protected_data; // 將受保護的數據暴露給全局指針std::cout << threadName << " - Exposing protected data to global pointer." << std::endl;
}void foo_protected(DataWrapper& x, const std::string& threadName) {x.process_data(malicious_function, threadName + " - Protected"); // 調用惡意函數,但保持鎖的持有x.access_unprotected_data(threadName + " - Protected"); // 安全地訪問數據
}void foo_unprotected(DataWrapper& x, const std::string& threadName) {x.process_data(malicious_function, threadName + " - Unprotected"); // 調用惡意函數if (unprotected) {unprotected->do_something(threadName + " - Unprotected"); // 不安全地訪問數據}
}int main() {DataWrapper x;std::cout << "--- Protected Access ---" << std::endl;std::vector<std::thread> protected_threads;for (int i = 0; i < 2; ++i) {protected_threads.emplace_back(foo_protected, std::ref(x), std::to_string(i));}for (auto& t : protected_threads) {t.join();}x.process_data([](SomeData& data, const std::string&) { data.print_data(); }, "Final");std::cout << "\n--- Unprotected Access (Data Race) ---" << std::endl;std::vector<std::thread> unprotected_threads;for (int i = 0; i < 2; ++i) {unprotected_threads.emplace_back(foo_unprotected, std::ref(x), std::to_string(i));}for (auto& t : unprotected_threads) {t.join();}x.process_data([](SomeData& data, const std::string&) { data.print_data(); }, "Final");return 0;
}
這段代碼看似使用了 std::lock_guard
進行了保護,但問題在于 process_data
函數將受保護的 data
傳遞給了用戶提供的函數 func
①。這就導致 foo
函數可以繞過保護機制,將惡意函數 malicious_function
傳遞進去 ②,從而在沒有鎖定互斥量的情況下訪問 do_something()
③。
核心問題: 這段代碼的問題在于,它只是“表面上”保護了數據結構,而沒有真正限制對數據的訪問。foo()
函數中調用 unprotected->do_something()
的代碼本質上是在無保護的狀態下訪問共享數據。
解決之道: 為了真正保護共享數據,務必遵守以下原則:永遠不要將受保護數據的指針或引用傳遞到互斥鎖作用域之外!
1. SomeData
類
這個類代表共享的數據資源。它包含兩個成員變量:一個整數 a
和一個字符串 b
。
- 構造函數:初始化
a
為0,b
為空字符串。 do_something
方法:模擬耗時操作(通過線程休眠),然后對數據進行修改,并輸出當前線程ID和修改后的數據狀態。print_data
方法:打印當前的數據狀態。
class SomeData {int a;std::string b;public:SomeData() : a(0), b("") {}void do_something(const std::string& threadName) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬耗時操作a++;b += "1";std::cout << threadName << " - Thread ID: " << std::this_thread::get_id() << ", a: " << a << ", b: " << b << std::endl;}void print_data() const {std::cout << "Current Data - a: " << a << ", b: " << b << std::endl;}
};
2. 全局指針 unprotected
這是一個全局指針,指向 SomeData
類型的對象。用于演示不安全的數據訪問。
SomeData* unprotected = nullptr;
3. DataWrapper
類
這個類封裝了 SomeData
對象,并使用互斥量來保護共享數據,防止并發訪問導致的數據競爭。
process_data
方法:接受一個函數對象作為參數,并在持有鎖的情況下執行該函數,確保對共享數據的操作是線程安全的。access_unprotected_data
方法:在持有鎖的情況下訪問全局指針指向的數據,以展示如何安全地訪問共享數據。
class DataWrapper {
private:SomeData data;std::mutex m;public:template<typename Function>void process_data(Function func, const std::string& threadName) {std::lock_guard<std::mutex> lock(m); // 確保在作用域內持有鎖func(data, threadName);}void access_unprotected_data(const std::string& threadName) {std::lock_guard<std::mutex> lock(m); // 在持有鎖的情況下訪問if (unprotected) {unprotected->do_something(threadName + " - Unsafe Access");}}
};
4. malicious_function
函數
這個函數將受保護的數據暴露給全局指針 unprotected
,模擬惡意行為。
void malicious_function(SomeData& protected_data, const std::string& threadName) {unprotected = &protected_data; // 將受保護的數據暴露給全局指針std::cout << threadName << " - Exposing protected data to global pointer." << std::endl;
}
5. foo_protected
和 foo_unprotected
函數
這兩個函數分別展示了安全和不安全的訪問方式。
foo_protected
:調用malicious_function
并在持有鎖的情況下訪問數據,確保操作是線程安全的。foo_unprotected
:同樣調用malicious_function
,但在不持有鎖的情況下訪問數據,展示數據競爭的風險。
void foo_protected(DataWrapper& x, const std::string& threadName) {x.process_data(malicious_function, threadName + " - Protected"); // 調用惡意函數,但保持鎖的持有x.access_unprotected_data(threadName + " - Protected"); // 安全地訪問數據
}void foo_unprotected(DataWrapper& x, const std::string& threadName) {x.process_data(malicious_function, threadName + " - Unprotected"); // 調用惡意函數if (unprotected) {unprotected->do_something(threadName + " - Unprotected"); // 不安全地訪問數據}
}
6. main
函數
主函數中創建多個線程來測試安全和不安全的訪問方式,并輸出最終的數據狀態。
int main() {DataWrapper x;std::cout << "--- Protected Access ---" << std::endl;std::vector<std::thread> protected_threads;for (int i = 0; i < 2; ++i) {protected_threads.emplace_back(foo_protected, std::ref(x), std::to_string(i));}for (auto& t : protected_threads) {t.join();}x.process_data([](SomeData& data, const std::string&) { data.print_data(); }, "Final");std::cout << "\n--- Unprotected Access (Data Race) ---" << std::endl;std::vector<std::thread> unprotected_threads;for (int i = 0; i < 2; ++i) {unprotected_threads.emplace_back(foo_unprotected, std::ref(x), std::to_string(i));}for (auto& t : unprotected_threads) {t.join();}return 0;
}
總結
這段代碼通過創建多個線程并使用不同的方法訪問共享數據,展示了多線程編程中的同步問題。具體來說,它演示了如何使用互斥量保護共享資源,以及不使用互斥量可能導致的數據競爭問題。通過這種方式,可以幫助理解線程安全的重要性及其實際應用場景。
根據你提供的輸出,我們可以詳細解釋每個部分的執行過程和結果。以下是結合輸出對代碼執行流程的詳細解釋:
輸出解析
1. Protected Access (安全訪問)
--- Protected Access ---
1 - Protected - Exposing protected data to global pointer.
1 - Protected - Unsafe Access - Thread ID: 2364, a: 1, b: 1
0 - Protected - Exposing protected data to global pointer.
0 - Protected - Unsafe Access - Thread ID: 12884, a: 2, b: 11
Current Data - a: 2, b: 11
-
線程 1:
- 調用
foo_protected
函數。 - 在
process_data
中調用了malicious_function
,將SomeData
對象暴露給全局指針unprotected
,并打印出"1 - Protected - Exposing protected data to global pointer."
。 - 然后在持有鎖的情況下通過
access_unprotected_data
訪問數據,并調用do_something
方法,打印出"1 - Protected - Unsafe Access - Thread ID: 2364, a: 1, b: 1"
。
- 調用
-
線程 0:
- 類似地,調用
foo_protected
函數。 - 在
process_data
中調用了malicious_function
,將SomeData
對象暴露給全局指針unprotected
,并打印出"0 - Protected - Exposing protected data to global pointer."
。 - 然后在持有鎖的情況下通過
access_unprotected_data
訪問數據,并調用do_something
方法,打印出"0 - Protected - Unsafe Access - Thread ID: 12884, a: 2, b: 11"
。
- 類似地,調用
-
最終狀態:
- 最后,通過
x.process_data([](SomeData& data, const std::string&) { data.print_data(); }, "Final");
打印出當前的數據狀態:"Current Data - a: 2, b: 11"
。
- 最后,通過
2. Unprotected Access (不安全訪問)
--- Unprotected Access (Data Race) ---
0 - Unprotected - Exposing protected data to global pointer.
1 - Unprotected - Exposing protected data to global pointer.
1 - Unprotected - Thread ID: 3944, a: 0 - Unprotected - Thread ID: 4, b: 111112844, a: 4, b: 1111
-
線程 0 和線程 1:
- 調用
foo_unprotected
函數。 - 在
process_data
中調用了malicious_function
,將SomeData
對象暴露給全局指針unprotected
,并分別打印出"0 - Unprotected - Exposing protected data to global pointer."
和"1 - Unprotected - Exposing protected data to global pointer."
。
- 調用
-
并發問題:
- 這里沒有使用互斥量來保護對
unprotected
的訪問,導致了數據競爭。 - 線程 1 和線程 0 同時嘗試修改
a
和b
,但由于缺乏同步機制,導致輸出混亂。具體表現為:"1 - Unprotected - Thread ID: 3944, a: 0 - Unprotected - Thread ID: 4, b: 111112844, a: 4, b: 1111"
顯示了兩個線程同時修改數據的結果,但輸出格式混亂且不一致,表明發生了數據競爭。
- 這里沒有使用互斥量來保護對
代碼執行流程總結
-
Protected Access(安全訪問):
- 使用互斥量 (
std::mutex
) 來確保對共享數據的操作是線程安全的。 - 每個線程在訪問共享數據之前都會獲取鎖,確保在同一時間只有一個線程可以修改數據。
- 最終的數據狀態是一致的,
a
和b
的值正確反映了所有線程的操作。
- 使用互斥量 (
-
Unprotected Access(不安全訪問):
- 不使用互斥量來保護共享數據,導致多個線程同時訪問和修改同一塊數據。
- 數據競爭發生,導致輸出混亂且數據狀態不可預測。
- 由于沒有同步機制,兩個線程同時修改
a
和b
,導致最終的數據狀態可能是錯誤的或不一致的。
不當的指針或引用傳遞而導致競爭條件(代碼例子)
以下是一個更清晰的例子,展示如何通過不正確的互斥量使用導致數據保護失效的問題。這個例子涉及一個線程安全的計數器類,展示了如何因為不當的指針或引用傳遞而導致競爭條件。
示例代碼
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>class Counter {
private:int count = 0;std::mutex m;public:void increment() {std::lock_guard<std::mutex> lock(m);++count;}int getCount() const {std::lock_guard<std::mutex> lock(m);return count;}// 危險的函數:返回指向受保護數據的指針int* getUnsafePointer() {return &count; // 錯誤:將受保護的數據暴露給外部}
};void incrementCounter(Counter& counter) {for (int i = 0; i < 1000; ++i) {counter.increment();}
}void accessUnsafePointer(int* ptr) {for (int i = 0; i < 1000; ++i) {(*ptr)++; // 直接修改未加鎖的共享數據}
}int main() {Counter counter;std::vector<std::thread> threads;// 正確地通過線程安全接口訪問計數器for (int i = 0; i < 10; ++i) {threads.emplace_back(incrementCounter, std::ref(counter));}// 錯誤地通過指針直接訪問受保護數據int* unsafePtr = counter.getUnsafePointer();threads.emplace_back(accessUnsafePointer, unsafePtr);for (auto& t : threads) {t.join();}std::cout << "Final Count: " << counter.getCount() << std::endl;return 0;
}
問題分析
-
正確部分:
increment()
方法通過std::lock_guard
確保了對count
的線程安全操作。getCount()
方法同樣在讀取時加鎖,確保線程安全。
-
錯誤部分:
getUnsafePointer()
方法直接返回了指向count
的指針,這使得外部可以直接訪問和修改count
,而無需通過互斥鎖保護。- 在
main()
函數中,accessUnsafePointer
函數通過該指針直接修改了count
的值,繞過了互斥鎖機制。
-
結果:
- 由于多個線程同時修改
count
,且部分修改未加鎖,最終輸出的計數值可能小于預期(10000),甚至出現未定義行為。
- 由于多個線程同時修改
解決方案
避免返回指向受保護數據的指針或引用。如果需要提供對數據的訪問,可以通過以下方式改進:
改進版代碼
class SafeCounter {
private:int count = 0;mutable std::mutex m;public:void increment() {std::lock_guard<std::mutex> lock(m);++count;}int getCount() const {std::lock_guard<std::mutex> lock(m);return count;}// 提供安全的方式訪問數據,而不是返回指針或引用void applyFunctionToCount(std::function<void(int&)> func) {std::lock_guard<std::mutex> lock(m);func(count); // 在鎖保護下調用用戶提供的函數}
};
使用改進版代碼
void safeAccess(SafeCounter& counter) {counter.applyFunctionToCount([](int& value) {value += 1; // 安全地修改計數器});
}int main() {SafeCounter counter;std::vector<std::thread> threads;for (int i = 0; i < 10; ++i) {threads.emplace_back(safeAccess, std::ref(counter));}for (auto& t : threads) {t.join();}std::cout << "Final Count: " << counter.getCount() << std::endl;return 0;
}
總結
- 關鍵點:不要將受保護數據的指針或引用傳遞到互斥鎖作用域之外。
- 最佳實踐:通過線程安全的接口操作共享數據,避免直接暴露底層數據結構。
- 擴展思考:即使使用了互斥鎖,仍需注意條件競爭和其他潛在的并發問題。
以下是經過排版后的文本內容,使其更加清晰易讀:
3.2.3 接口間的條件競爭
即使使用了互斥量或其他機制保護了共享數據,也不能完全避免條件競爭。我們仍然需要確保數據是否受到了充分保護。
回想之前雙鏈表的例子:為了實現線程安全地刪除一個節點,我們需要確保防止對三個節點(待刪除的節點及其前后相鄰的節點)的并發訪問。如果僅僅對指向每個節點的指針進行訪問保護,這與沒有使用互斥量一樣,條件競爭仍然可能發生。除了指針之外,整個數據結構和整個刪除操作都需要保護。在這種情況下,最簡單的解決方案是使用互斥量來保護整個鏈表,如代碼 3.1 所示。
盡管鏈表的個別操作可能是安全的,但條件競爭仍可能存在于其他接口中。例如,構建一個類似于 std::stack
的棧(代碼 3.3),除了構造函數和 swap()
以外,需要為 std::stack
提供五個操作:
push()
:將一個新元素壓入棧。pop()
:彈出棧頂元素。top()
:查看棧頂元素。empty()
:判斷棧是否為空。size()
:獲取棧中元素的數量。
即使修改了 top()
方法,返回的是一個拷貝而非引用(即遵循了 3.2.2 節的準則),這個接口仍然可能存在條件競爭問題。這個問題不僅存在于基于互斥量實現的接口中,在無鎖實現的接口中也可能產生條件競爭。這是接口本身的問題,與實現方式無關。
代碼 3.3 std::stack
容器的實現
template<typename T, typename Container = std::deque<T>>
class stack {
public:explicit stack(const Container&);explicit stack(Container&& = Container());template <class Alloc> explicit stack(const Alloc&);template <class Alloc> stack(const Container&, const Alloc&);template <class Alloc> stack(Container&&, const Alloc&);template <class Alloc> stack(stack&&, const Alloc&);bool empty() const;size_t size() const;T& top();const T& top() const;void push(const T&);void push(T&&);void pop();void swap(stack&&);template <class... Args> void emplace(Args&&... args); // C++14的新特性
};
雖然 empty()
和 size()
在返回時可能是正確的,但這些結果并不可靠。在返回之后,其他線程可以自由地訪問棧,并可能通過 push()
向棧中添加多個新元素,或者通過 pop()
刪除一些已有的元素。這樣一來,之前從 empty()
和 size()
得到的數值就可能變得無效。
對于非共享的棧對象,如果棧非空,使用 empty()
檢查后再調用 top()
訪問棧頂元素是安全的。如下代碼所示:
stack<int> s;
if (!s.empty()) { // 1int const value = s.top(); // 2s.pop(); // 3do_something(value);
}
這段代碼不僅在單線程環境中是安全的,而且在空堆棧上調用 top()
是未定義的行為也符合預期。然而,對于共享的棧對象,這樣的調用順序不再安全。因為在調用 empty()
(①)和調用 top()
(②)之間,可能有來自另一個線程的 pop()
調用并刪除了最后一個元素。這是一個經典的條件競爭問題。
即使使用互斥量對棧內部數據進行了保護,這種條件競爭仍然可能發生。這是接口固有的問題,無法僅通過互斥量解決。
以下是經過排版后的文本內容,使其更加清晰易讀:
如何解決接口設計中的條件競爭問題?
問題的根源在于接口設計本身,因此解決方案需要從變更接口設計入手。以下是對問題及其解決方案的詳細分析。
直接解決方案:在 top()
中拋出異常
一種簡單的解決方案是,在調用 top()
時,如果發現棧已經是空的,則拋出異常。這種方法可以避免未定義行為的發生,但存在以下缺點:
- 即使
empty()
返回false
,也需要進行異常捕獲,增加了代碼復雜性。 - 本質上,這會讓
empty()
函數變得多余,因為它無法完全保證后續操作的安全性。
盡管這種方案能夠直接解決問題,但它并不是最優解。
潛在的條件競爭問題
仔細觀察之前的代碼段:
if (!s.empty()) { // ①int const value = s.top(); // ②s.pop(); // ③do_something(value);
}
在調用 top()
(②)和 pop()
(③)之間仍然存在一個潛在的條件競爭。假設兩個線程運行相同的代碼,并且共享同一個棧對象。例如:
- 棧中最初只有兩個元素。
- 每個線程都執行
empty()
和top()
操作。
在這種情況下,即使內部互斥量保護了棧的操作,只有一個線程可以調用棧的成員函數,但由于 do_something()
是可以并發運行的,可能會出現以下執行順序(如表 3.1 所示):
Thread A | Thread B |
---|---|
if (!s.empty()); | if (!s.empty()); |
int const value = s.top(); | |
int const value = s.top(); | |
s.pop(); | |
do_something(value); | s.pop(); |
do_something(value); |
在這種執行順序下:
- 每個線程都調用了兩次
top()
,但沒有修改棧,因此每個線程可能得到相同的值。 - 在
top()
的兩次調用過程中,沒有任何線程調用pop()
,導致某個值被處理了兩次。
這種條件競爭比未定義的 empty()
/ top()
競爭更為嚴重,因為結果表面上看起來沒有錯誤,但實際上隱藏了一個難以定位的 Bug。
以下是經過排版后的文本內容,以及對每種方案可行性的說明:
解決條件競爭的幾種選項
不幸的是,std::stack
的設計將 top()
和 pop()
分割為兩個獨立的操作,反而引入了原本想要避免的條件競爭。幸運的是,我們還有其他選項可供選擇,但每種選項都有相應的代價。
選項 1:傳入一個引用
第一個選項是將變量的引用作為參數,傳入 pop()
函數中獲取“彈出值”:
std::vector<int> result;
some_stack.pop(result);
優點:
- 簡單直觀,能夠直接將棧頂元素賦值給用戶提供的變量。
- 避免了返回值拷貝或移動時可能引發的異常問題。
缺點:
- 需要構造出一個棧中類型的實例,用于接收目標值。對于某些類型,這在時間和資源上可能不劃算。
- 對于需要復雜構造函數參數的類型,這種方式可能不可行。
- 需要可賦值的存儲類型,這是一個重大限制。即使支持移動構造或拷貝構造(從而允許返回一個值),許多用戶自定義類型可能仍不支持賦值操作。
可行性說明:
- 這種方法適用于簡單的數據類型或支持賦值操作的類型。
- 它通過直接傳遞引用的方式,避免了條件競爭,確保了線程安全性。
選項 2:無異常拋出的拷貝構造函數或移動構造函數
對于有返回值的 pop()
函數來說,唯一的問題在于返回值時可能會拋出異常。然而,許多類型的拷貝構造函數不會拋出異常,并且隨著 C++ 新標準對“右值引用”的支持,許多類型還具有移動構造函數,即使它們與拷貝構造函數功能相同,也不會拋出異常。
可以通過以下方式限制線程安全棧的使用:
std::is_nothrow_copy_constructible<T>::value || std::is_nothrow_move_constructible<T>::value
優點:
- 提供了一種“異常安全”的解決方案,確保在返回值時不會因拷貝或移動操作拋出異常。
- 能夠安全地返回所需的值,而無需擔心異常導致的數據丟失。
缺點:
4. 局限性較強:并非所有用戶自定義類型都具有不拋出異常的拷貝構造函數或移動構造函數。
5. 如果某些類型無法滿足這一要求,則無法存儲在線程安全的棧中,這會限制其適用范圍。
可行性說明:
- 這種方法適用于具有不拋出異常的拷貝構造函數或移動構造函數的類型。
- 它通過限制棧中存儲的類型,確保了操作的安全性和可靠性。
選項 3:返回指向彈出值的指針
第三個選擇是返回一個指向彈出元素的指針,而不是直接返回值。指針的優勢在于可以自由拷貝,并且不會產生異常,從而避免了 Cargill 提到的異常問題。
std::shared_ptr<int> result = some_stack.pop();
優點:
- 指針可以自由拷貝,不會引發異常。
- 使用
std::shared_ptr
可以避免內存泄漏問題,因為當最后一個指針銷毀時,對象也會自動銷毀。 - 標準庫完全控制內存分配方案,無需顯式的
new
和delete
操作。
缺點:
6. 返回指針需要對對象的內存分配進行管理,對于簡單數據類型(如 int
),內存管理的開銷遠大于直接返回值。
7. 相較于非線程安全版本,這種方案的開銷較大,因為堆棧中的每個對象都需要用 new
進行獨立的內存分配。
可行性說明:
- 這種方法適用于需要動態內存分配的復雜數據類型。
- 它通過使用智能指針(如
std::shared_ptr
)管理內存,確保了線程安全性和資源管理的可靠性。
選項 4:“選項 1 + 選項 2” 或 “選項 1 + 選項 3”
對于通用代碼來說,靈活性不應忽視。可以選擇結合選項 2 或選項 3 來補充選項 1,提供多種實現方式,讓用戶根據具體需求選擇最合適、最經濟的方案。
優點:
- 提供了多種實現方式,增強了接口的靈活性。
- 用戶可以根據實際需求選擇最適合的方案,從而在性能和安全性之間找到平衡。
缺點:
- 增加了接口的復雜性,可能需要更多的文檔說明和示例代碼來幫助用戶理解如何正確使用。
可行性說明:
- 這種方法適用于需要高度靈活性的場景。
- 它通過組合多種方案,滿足了不同用戶的需求,同時保留了線程安全性和異常安全性。
總結
選項 | 優點 | 缺點 |
---|---|---|
選項 1 | 簡單直觀,避免條件競爭 | 不適用于復雜類型或不可賦值的類型 |
選項 2 | 異常安全,確保返回值可靠 | 局限性強,適用范圍有限 |
選項 3 | 自由拷貝,避免異常 | 內存管理開銷大,不適合簡單類型 |
選項 4 | 靈活性高,滿足多樣需求 | 接口復雜性增加 |
每種方案都有其適用場景和局限性。在實際應用中,應根據具體的類型特性、性能需求和線程安全要求,選擇最適合的方案。
以下是優化后的敘述,使其更加流暢且易于理解:
示例:定義線程安全的堆棧
代碼 3.4 線程安全的堆棧類定義(概述)
以下是一個設計為無條件競爭問題的線程安全堆棧類定義。該類實現了選項 1 和選項 3:通過重載 pop()
方法,使用局部引用存儲彈出值,并返回一個 std::shared_ptr<>
對象。接口非常簡潔,僅包含兩個核心函數:push()
和 pop()
。
#include <exception>
#include <memory> // For std::shared_ptr<>struct empty_stack : std::exception {const char* what() const throw();
};template<typename T>
class threadsafe_stack {
public:threadsafe_stack();threadsafe_stack(const threadsafe_stack&);threadsafe_stack& operator=(const threadsafe_stack&) = delete; // ① 禁用賦值操作void push(T new_value);std::shared_ptr<T> pop();void pop(T& value);bool empty() const;
};
為了提高安全性,我們削減了接口功能:
- 堆棧不支持直接賦值操作,因此賦值操作已被禁用(見①,詳見附錄 A,A.2 節)。
- 沒有提供
swap()
函數。 - 當堆棧為空時,
pop()
函數會拋出empty_stack
異常,確保即使調用了empty()
函數,其他部分仍能正常運行。
通過使用 std::shared_ptr
,我們可以避免內存分配和管理的問題,同時減少頻繁使用 new
和 delete
的需求。原本堆棧中的五個操作(push()
、pop()
、top()
、empty()
和 size()
),現在簡化為三個:push()
、pop()
和 empty()
(其中 empty()
已經顯得多余)。這種簡化不僅增強了數據控制能力,還確保互斥量能夠完全保護所有操作。
代碼 3.5 擴展(線程安全)堆棧
以下是一個簡單的實現,封裝了 std::stack<>
的線程安全堆棧:
#include <exception>
#include <memory>
#include <mutex>
#include <stack>struct empty_stack : std::exception {const char* what() const throw() {return "empty stack!";}
};template<typename T>
class threadsafe_stack {
private:std::stack<T> data;mutable std::mutex m;public:threadsafe_stack() : data(std::stack<T>()) {}threadsafe_stack(const threadsafe_stack& other) {std::lock_guard<std::mutex> lock(other.m);data = other.data; // 在構造函數體中執行拷貝}threadsafe_stack& operator=(const threadsafe_stack&) = delete;void push(T new_value) {std::lock_guard<std::mutex> lock(m);data.push(new_value);}std::shared_ptr<T> pop() {std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack(); // 檢查棧是否為空std::shared_ptr<T> res = std::make_shared<T>(data.top()); // 分配返回值data.pop();return res;}void pop(T& value) {std::lock_guard<std::mutex> lock(m);if (data.empty()) throw empty_stack();value = data.top();data.pop();}bool empty() const {std::lock_guard<std::mutex> lock(m);return data.empty();}
};
說明:
- 堆棧支持拷貝操作:拷貝構造函數會對互斥量上鎖,然后安全地拷貝堆棧內容。
- 構造函數體中的拷貝操作(見注釋①)通過互斥量確保復制結果的正確性,這種方式比成員初始化列表更靈活且安全。
鎖粒度的討論
在之前的 top()
和 pop()
函數討論中,由于鎖的粒度過小,惡性條件競爭問題已經顯現——需要保護的操作未能完全覆蓋。然而,鎖粒度過大同樣會導致性能下降。
全局互斥量的問題
當使用全局互斥量保護所有共享數據時,在系統存在大量共享數據的情況下,線程可能會強制運行,甚至訪問不同位置的數據,從而抵消并發帶來的性能優勢。例如:
- 第一版為多處理器系統設計的 Linux 內核中,使用了一個全局內核鎖。盡管這個鎖能正常工作,但在雙核處理系統上的性能卻遠不如兩個單核系統的總和,四核系統的表現更是令人失望。
- 后續修正的 Linux 內核引入了細粒度鎖方案,顯著減少了內核競爭,此時四核處理系統的性能接近單核處理系統的四倍。
細粒度鎖的問題
使用多個互斥量保護所有數據時,雖然可以減少鎖的競爭,但也可能帶來新的問題。例如:
- 如果增大互斥量覆蓋數據的粒度,則只需要鎖住一個互斥量即可完成操作。但這種方案并不適用于所有場景:
- 若互斥量保護的是一個獨立類的實例,則鎖的狀態可能無法滿足下一階段的需求。
- 或者需要為該類的所有實例分別創建獨立的互斥量,這可能導致額外的復雜性和開銷。
死鎖問題
當某個操作需要同時獲取兩個或多個互斥量時,死鎖問題便會浮現。這種情況與條件競爭完全相反——不同的線程會互相等待對方釋放鎖,最終導致沒有任何線程能夠繼續執行。
以下是經過優化后的敘述,使其更加簡潔、流暢且易于理解:
3.2.4 死鎖:問題描述及解決方案
問題描述
想象一個玩具由兩部分組成(例如鼓和鼓錘),必須同時擁有這兩部分才能玩。如果有兩個孩子都想玩這個玩具,當其中一個孩子拿到了鼓和鼓錘時,他可以盡情玩耍;但如果另一個孩子也想玩,則必須等待前者完成。
現在假設鼓和鼓錘被分別放在不同的玩具箱里,兩個孩子同時想去敲鼓,于是分別到各自的玩具箱尋找。結果,一個孩子拿到了鼓,另一個拿到了鼓錘。此時問題出現了:除非其中一個孩子決定讓步,將自己手中的部分交給對方,否則誰也無法玩鼓。如果雙方都緊握著自己的部分不放,最終誰也無法繼續游戲。
在多線程編程中,類似的場景經常發生。線程對鎖的競爭可能導致死鎖:兩個或多個線程各自持有某個互斥量,并試圖獲取另一個互斥量,但由于彼此都在等待對方釋放鎖,導致所有線程都無法繼續執行。這種情況即為死鎖。
避免死鎖的建議
為了避免死鎖,通常建議以相同的順序鎖定多個互斥量。例如,總是先鎖定互斥量 A,再鎖定互斥量 B,這樣可以有效避免死鎖。然而,在某些復雜場景下,這種方法可能并不適用。例如:
- 當多個互斥量保護同一個類的獨立實例時,情況變得更加復雜。
- 如果一個操作需要對同一類的兩個不同實例進行數據交換,為了確保數據交換的正確性,必須避免并發修改數據,并確保每個實例上的互斥量都能正確保護其區域。
- 如果簡單地選擇一個固定的鎖定順序(如按照實例提供的第一個互斥量作為第一個參數),可能會適得其反:當兩個線程嘗試在相同的兩個實例間進行數據交換時,程序仍可能陷入死鎖。
解決方案:使用 std::lock
和 std::scoped_lock
幸運的是,C++ 標準庫提供了工具來解決死鎖問題。
1. 使用 std::lock
std::lock
可以一次性鎖定多個互斥量,且不會引入死鎖風險。以下是一個簡單的交換操作示例,展示了如何使用 std::lock
:
#include <mutex>class some_big_object;void swap(some_big_object& lhs, some_big_object& rhs);class X {
private:some_big_object some_detail;std::mutex m;public:X(const some_big_object& sd) : some_detail(sd) {}friend void swap(X& lhs, X& rhs) {if (&lhs == &rhs)return;// 鎖定兩個互斥量std::lock(lhs.m, rhs.m);// 使用 std::lock_guard 管理鎖std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);// 執行數據交換swap(lhs.some_detail, rhs.some_detail);}
};
關鍵點:
- 調用
std::lock
同時鎖定兩個互斥量。 - 使用
std::lock_guard
管理鎖,通過傳遞std::adopt_lock
參數表示這些鎖已經被std::lock
獲取,無需重新構建鎖。 - 這種方式可以保證函數退出時,互斥量能夠自動解鎖,即使發生異常也不會導致資源泄漏。
2. C++17 中的 std::scoped_lock
從 C++17 開始,標準庫引入了 std::scoped_lock
,這是一種更簡潔的 RAII 工具,功能類似于 std::lock_guard
,但支持接受不定數量的互斥量作為模板參數和構造參數。上述代碼可以重寫如下:
#include <mutex>void swap(X& lhs, X& rhs) {if (&lhs == &rhs)return;// 使用 std::scoped_lock 替代 std::lock 和 std::lock_guardstd::scoped_lock guard(lhs.m, rhs.m);// 執行數據交換swap(lhs.some_detail, rhs.some_detail);
}
優勢:
std::scoped_lock
在構造時鎖定所有傳入的互斥量,解鎖在析構時完成。- 它通過隱式模板參數推導機制,根據傳遞的對象類型自動構造實例,簡化了代碼。
- 相較于
std::lock
和std::lock_guard
的組合,std::scoped_lock
更加簡潔且不易出錯。
總結
雖然 std::lock
和 std::scoped_lock
可以在鎖定多個互斥量時避免死鎖,但它們無法幫助你單獨獲取其中一個鎖。這需要開發者的經驗和紀律性,確保程序邏輯不會導致死鎖。
死鎖是多線程編程中常見的難題,因為它往往不可預見,且在大多數情況下程序運行正常。然而,遵循一些簡單的規則可以幫助我們編寫“無死鎖”的代碼。例如:
4. 始終以相同的順序鎖定多個互斥量。
5. 使用 std::lock
或 std::scoped_lock
來避免死鎖。
6. 盡量減少鎖的粒度,避免不必要的鎖定。
通過合理設計和工具支持,我們可以有效降低死鎖發生的可能性。
以下是經過優化排版后的敘述,使其更加清晰、條理分明且易于理解:
3.2.5 避免死鎖的進階指導
問題背景
死鎖通常是由于對鎖的使用不當造成的。即使在無鎖的情況下,僅需兩個線程通過互相調用 join()
即可引發死鎖。這種情況下,沒有線程能夠繼續運行,因為它們正在相互等待。這種情況非常常見:一個線程等待另一個線程結束,而其他線程同時也在等待第一個線程結束,因此三個或更多線程的互相等待也可能導致死鎖。
為了避免死鎖,以下提供一些實用建議和進階指導。
1. 避免嵌套鎖
核心思想: 每個線程只持有一個鎖,不要嘗試獲取第二個鎖。
- 如果需要獲取多個鎖,可以使用
std::lock
來一次性鎖定多個互斥量,從而避免死鎖。 - 嵌套鎖是死鎖的主要原因之一,應盡量避免。
2. 避免在持有鎖時調用外部代碼
核心思想: 在持有鎖的情況下,盡量避免調用外部代碼,因為外部代碼可能執行未知操作(包括獲取鎖),這會違反“避免嵌套鎖”的原則,并可能導致死鎖。
- 當編寫通用代碼(如第 3.2.3 節中的棧)時,每個操作的參數類型通常由外部定義。在這種情況下,需要額外注意避免死鎖。
3. 使用固定順序獲取鎖
核心思想: 當必須獲取兩個或多個鎖時,應以固定的順序獲取它們。
- 在某些場景中,這種方式相對簡單。例如,在第 3.2.3 節的棧中,每個棧實例都有一個內置互斥量,棧的操作可以添加約束,確保對數據項的處理僅限于棧本身。這樣可以減少通用棧的復雜性。
- 然而在其他情況下,比如鏈表操作(第 3.1 節中的例子),情況可能更復雜。鏈表中的每個節點都有一個互斥量保護。為了訪問鏈表,線程必須依次獲取感興趣節點上的互斥鎖。
- 刪除一個節點時,線程需要獲取三個節點的鎖:即將刪除的節點及其兩個鄰接節點。
- 遍歷鏈表時,線程必須在獲取當前節點鎖的前提下獲取下一個節點的鎖,以確保指針不會被同時修改。
- 使用“手遞手”模式允許多個線程訪問鏈表的不同部分,但必須以固定順序上鎖,否則可能導致死鎖。
示例:
假設節點 A 和 C 在鏈表中相鄰,當前線程試圖同時獲取 A 和 B 的鎖,而另一個線程已經獲取了 B 的鎖并試圖獲取 A 的鎖。這種經典的死鎖場景如圖 3.2 所示。
4. 使用層次鎖結構
核心思想: 定義鎖的層級結構,確保線程只能按層級順序獲取鎖。
- 層次鎖的意義在于運行時檢查鎖的合法性。將應用分層,并識別每層上的所有互斥量。
- 如果代碼試圖對某個互斥量上鎖,而該線程已持有更高層級的鎖,則不允許繼續鎖定。
- 可以通過為每個互斥量分配一個層級值,并在運行時檢查鎖的合法性來實現。
示例代碼:
hierarchical_mutex high_level_mutex(10000); // ① 高層級鎖
hierarchical_mutex low_level_mutex(5000); // ② 低層級鎖
hierarchical_mutex other_mutex(6000); // ③ 中層級鎖int do_low_level_stuff();
int low_level_func()
{std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // ④ 鎖住低層級鎖return do_low_level_stuff();
}void high_level_stuff(int some_param);
void high_level_func()
{std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // ⑥ 鎖住高層級鎖high_level_stuff(low_level_func()); // ⑤ 調用低層級函數
}void thread_a() // ⑦ 合法線程
{high_level_func();
}void do_other_stuff();
void other_stuff()
{high_level_func(); // ⑩ 違反層級規則do_other_stuff();
}void thread_b() // ⑧ 非法線程
{std::lock_guard<hierarchical_mutex> lk(other_mutex); // ⑨ 鎖住中層級鎖other_stuff();
}
說明:
thread_a()
遵守層級規則,因此可以正常運行。thread_b()
違反規則,因為在調用high_level_func()
時,試圖獲取比當前層級更高的鎖,導致運行時錯誤。
實現細節:
hierarchical_mutex
是一種用戶自定義的互斥量類型,可以通過簡單的實現支持層級檢查(見代碼 3.8)。try_lock()
方法允許線程嘗試獲取鎖,但如果無法獲取,則不會阻塞線程。
代碼 3.8:簡單的層級互斥量實現
class hierarchical_mutex {
private:std::mutex internal_mutex;unsigned long const hierarchy_value;unsigned long previous_hierarchy_value;static thread_local unsigned long this_thread_hierarchy_value; // ① 線程局部變量void check_for_hierarchy_violation(){if (this_thread_hierarchy_value <= hierarchy_value) // ② 檢查層級沖突throw std::logic_error("mutex hierarchy violated");}void update_hierarchy_value(){previous_hierarchy_value = this_thread_hierarchy_value; // ③ 更新之前的層級值this_thread_hierarchy_value = hierarchy_value;}public:explicit hierarchical_mutex(unsigned long value): hierarchy_value(value), previous_hierarchy_value(0) {}void lock(){check_for_hierarchy_violation(); // ② 檢查層級沖突internal_mutex.lock(); // ④ 鎖住內部互斥量update_hierarchy_value(); // ⑤ 更新層級值}void unlock(){if (this_thread_hierarchy_value != hierarchy_value)throw std::logic_error("mutex hierarchy violated"); // ⑨ 檢查層級沖突this_thread_hierarchy_value = previous_hierarchy_value; // ⑥ 恢復之前的層級值internal_mutex.unlock();}bool try_lock(){check_for_hierarchy_violation(); // ② 檢查層級沖突if (!internal_mutex.try_lock()) // ⑦ 嘗試獲取鎖return false;update_hierarchy_value(); // 更新層級值return true;}
};thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // ⑧ 初始化為最大值
關鍵點:
- 使用
thread_local
存儲當前線程的層級值,確保每個線程的狀態獨立。 - 初始值設置為
ULONG_MAX
,以便任何鎖都可以被首次獲取。 check_for_hierarchy_violation()
方法確保線程只能按層級順序獲取鎖。
5. 超越鎖的延伸擴展
死鎖不僅發生在鎖之間,還可能出現在同步構造中(形成等待循環)。因此,以下幾點指導意見尤為重要:
- 避免嵌套鎖。
- 避免等待持有鎖的線程。
- 如果需要等待線程結束,應確保線程只等待比其層級低的線程。
標準庫工具:
std::lock()
和std::lock_guard
可以覆蓋大多數場景。- 對于更復雜的場景,可以使用
std::unique_lock
提供更大的靈活性。
總結
通過遵循上述建議,可以有效避免死鎖的發生:
- 避免嵌套鎖。
- 避免在持有鎖時調用外部代碼。
- 使用固定順序獲取鎖。
- 使用層次鎖結構。
- 超越鎖的延伸擴展。
這些方法不僅可以幫助我們在設計階段規避死鎖,還可以在運行時檢測潛在問題,從而提高程序的健壯性和可靠性。
以下是完整的代碼實現,基于您提供的描述和需求。代碼包括 hierarchical_mutex
的完整實現以及相關的線程函數示例。
完整代碼實現
#include <iostream>
#include <thread>
#include <mutex>
#include <stdexcept>// 定義 hierarchical_mutex 類
class hierarchical_mutex {
private:std::mutex internal_mutex; // 內部互斥量unsigned long const hierarchy_value; // 當前鎖的層級值unsigned long previous_hierarchy_value; // 保存之前的層級值// 線程局部變量:存儲當前線程的層級值static thread_local unsigned long this_thread_hierarchy_value;// 檢查層級沖突void check_for_hierarchy_violation() {if (this_thread_hierarchy_value <= hierarchy_value) {throw std::logic_error("Mutex hierarchy violated");}}// 更新當前線程的層級值void update_hierarchy_value() {previous_hierarchy_value = this_thread_hierarchy_value;this_thread_hierarchy_value = hierarchy_value;}public:// 構造函數:初始化層級值explicit hierarchical_mutex(unsigned long value): hierarchy_value(value), previous_hierarchy_value(0) {}// 加鎖操作void lock() {check_for_hierarchy_violation(); // 檢查層級沖突internal_mutex.lock(); // 鎖住內部互斥量update_hierarchy_value(); // 更新層級值}// 解鎖操作void unlock() {if (this_thread_hierarchy_value != hierarchy_value) {throw std::logic_error("Mutex hierarchy violated");}this_thread_hierarchy_value = previous_hierarchy_value; // 恢復之前的層級值internal_mutex.unlock();}// 嘗試加鎖操作bool try_lock() {check_for_hierarchy_violation(); // 檢查層級沖突if (!internal_mutex.try_lock()) { // 嘗試獲取鎖return false;}update_hierarchy_value(); // 更新層級值return true;}
};// 初始化線程局部變量為最大值
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);// 模擬低層操作
int do_low_level_stuff() {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬耗時操作return 42;
}// 低層函數:鎖定低層級互斥量
int low_level_func(hierarchical_mutex& low_level_mutex) {std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 鎖住低層級鎖return do_low_level_stuff();
}// 高層函數:鎖定高層級互斥量并調用低層函數
void high_level_func(hierarchical_mutex& high_level_mutex, hierarchical_mutex& low_level_mutex) {std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 鎖住高層級鎖int result = low_level_func(low_level_mutex); // 調用低層函數std::cout << "High-level function result: " << result << std::endl;
}// 線程 A:合法線程
void thread_a(hierarchical_mutex& high_level_mutex, hierarchical_mutex& low_level_mutex) {try {high_level_func(high_level_mutex, low_level_mutex); // 調用高層函數} catch (const std::exception& e) {std::cerr << "Thread A error: " << e.what() << std::endl;}
}// 線程 B:非法線程(違反層級規則)
void thread_b(hierarchical_mutex& other_mutex, hierarchical_mutex& high_level_mutex) {try {std::lock_guard<hierarchical_mutex> lk(other_mutex); // 鎖住中層級鎖high_level_func(high_level_mutex, other_mutex); // 違反層級規則} catch (const std::exception& e) {std::cerr << "Thread B error: " << e.what() << std::endl;}
}int main() {// 創建三個 hierarchical_mutex 實例hierarchical_mutex high_level_mutex(10000); // 高層級鎖hierarchical_mutex low_level_mutex(5000); // 低層級鎖hierarchical_mutex other_mutex(6000); // 中層級鎖// 啟動線程 A 和線程 Bstd::thread t1(thread_a, std::ref(high_level_mutex), std::ref(low_level_mutex));std::thread t2(thread_b, std::ref(other_mutex), std::ref(high_level_mutex));// 等待線程結束t1.join();t2.join();std::cout << "All threads completed." << std::endl;return 0;
}
代碼說明
1. hierarchical_mutex
類
-
成員變量:
internal_mutex
:實際的互斥量。hierarchy_value
:當前鎖的層級值。previous_hierarchy_value
:保存之前的層級值。this_thread_hierarchy_value
:線程局部變量,存儲當前線程的層級值。
-
方法:
check_for_hierarchy_violation()
:檢查是否違反層級規則。update_hierarchy_value()
:更新當前線程的層級值。lock()
:加鎖操作。unlock()
:解鎖操作。try_lock()
:嘗試加鎖操作。
2. 示例函數
do_low_level_stuff()
:模擬低層操作。low_level_func()
:鎖定低層級互斥量并執行低層操作。high_level_func()
:鎖定高層級互斥量并調用低層函數。thread_a()
:合法線程,遵守層級規則。thread_b()
:非法線程,違反層級規則。
3. 主函數
- 創建三個
hierarchical_mutex
實例:high_level_mutex
、low_level_mutex
和other_mutex
。 - 啟動兩個線程:
thread_a
和thread_b
。 - 使用
join()
等待線程結束。
運行結果
-
線程 A:
- 遵守層級規則,正常運行。
- 輸出:
High-level function result: 42
-
線程 B:
- 違反層級規則,拋出異常。
- 輸出:
Thread B error: Mutex hierarchy violated
-
最終輸出:
All threads completed.
通過上述實現,您可以驗證 hierarchical_mutex
的正確性和有效性,同時避免死鎖的發生。
以下是經過優化排版后的敘述,使其更加清晰、簡潔且易于理解:
3.2.6 std::unique_lock
—— 靈活的鎖
std::unique_lock
提供了比 std::lock_guard
更加靈活的鎖管理方式。與 std::lock_guard
不同,std::unique_lock
實例不必始終綁定到互斥量的數據類型上,這使得它的使用場景更加廣泛。
靈活性的特點
-
構造函數參數:
- 可以將
std::adopt_lock
作為第二個參數傳入構造函數,用于管理已經鎖定的互斥量。 - 也可以將
std::defer_lock
作為第二個參數傳遞,表明互斥量應保持解鎖狀態。這種方式允許通過調用lock()
方法手動鎖定互斥量,或者將std::unique_lock
對象傳遞給std::lock()
進行統一鎖定。
- 可以將
-
與
std::lock_guard
的對比:- 使用
std::unique_lock
和std::defer_lock
替代std::lock_guard
和std::adopt_lock
,可以輕松實現代碼轉換(如代碼 3.9 所示)。 - 雖然代碼長度相同且功能幾乎等價,但
std::unique_lock
占用更多內存,并且性能略遜于std::lock_guard
。 - 這種靈活性的代價是:
std::unique_lock
實例可以不綁定到互斥量上,而僅存儲和更新相關標志。
- 使用
代碼示例:交換操作中 std::lock()
和 std::unique_lock
的使用
以下代碼展示了如何在交換操作中使用 std::unique_lock
和 std::defer_lock
:
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);class X {
private:some_big_object some_detail;std::mutex m;public:X(const some_big_object& sd) : some_detail(sd) {}friend void swap(X& lhs, X& rhs) {if (&lhs == &rhs)return;// 創建兩個 std::unique_lock 實例,初始狀態為未鎖定std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock); // ①std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock); // ①// 使用 std::lock 同時鎖定兩個互斥量std::lock(lock_a, lock_b); // ②// 執行數據交換swap(lhs.some_detail, rhs.some_detail);}
};
關鍵點解析
-
std::defer_lock
的作用:- 在構造
std::unique_lock
時,std::defer_lock
表明互斥量應保持解鎖狀態。 - 這種方式允許我們延遲鎖定操作,直到需要時再調用
lock()
或將其傳遞給std::lock()
。
- 在構造
-
std::lock
的作用:std::lock
可以同時鎖定多個互斥量,避免死鎖風險。- 在代碼中,
std::lock(lock_a, lock_b)
實現了對兩個互斥量的安全鎖定。
-
標志位的作用:
std::unique_lock
內部維護一個標志位,用于記錄該實例是否擁有特定的互斥量。- 如果實例擁有互斥量,則析構函數會自動調用
unlock()
;否則不會調用。 - 可以通過
owns_lock()
成員函數查詢該標志。
-
性能與適用性:
- 由于
std::unique_lock
存儲了額外的標志位信息,其實例體積通常比std::lock_guard
大。 - 使用
std::unique_lock
時會有輕微的性能開銷,因此在簡單場景下建議優先使用std::lock_guard
。 - 當需要更靈活的鎖管理時(如遞延鎖或鎖所有權轉移),
std::unique_lock
是更好的選擇。
- 由于
總結
std::unique_lock
提供了比 std::lock_guard
更加靈活的鎖管理能力,適用于復雜的多線程場景。盡管它占用更多資源并帶來輕微的性能開銷,但在需要遞延鎖或鎖所有權轉移的情況下,它是不可或缺的工具。對于簡單的鎖需求,仍然推薦使用 std::lock_guard
;而對于更復雜的需求,std::unique_lock
是更合適的選擇。
以下是一個完整的代碼示例,展示了如何使用 std::unique_lock
和 std::defer_lock
來實現多線程環境下的安全數據交換操作。該示例模擬了兩個線程對共享資源的訪問,并通過 std::unique_lock
管理鎖。
完整代碼示例
#include <iostream>
#include <thread>
#include <mutex>// 定義一個簡單的類,包含一個互斥量和一個共享資源
class SharedResource {
private:int value; // 共享資源std::mutex mtx; // 保護共享資源的互斥量public:SharedResource(int initialValue = 0) : value(initialValue) {}// 安全地獲取值int getValue() const {std::lock_guard<std::mutex> lock(mtx);return value;}// 安全地設置值void setValue(int newValue) {std::lock_guard<std::mutex> lock(mtx);value = newValue;}// 交換兩個共享資源的值friend void swapValues(SharedResource& lhs, SharedResource& rhs) {// 使用 std::unique_lock 和 std::defer_lock 管理鎖std::unique_lock<std::mutex> lock_a(lhs.mtx, std::defer_lock); // 延遲鎖定 lhs 的互斥量std::unique_lock<std::mutex> lock_b(rhs.mtx, std::defer_lock); // 延遲鎖定 rhs 的互斥量// 使用 std::lock 同時鎖定兩個互斥量,避免死鎖std::lock(lock_a, lock_b);// 執行交換操作std::swap(lhs.value, rhs.value);}
};// 線程函數:修改共享資源的值
void modifyResource(SharedResource& resource, int newValue, const std::string& threadName) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬耗時操作resource.setValue(newValue);std::cout << threadName << " modified the value to " << resource.getValue() << std::endl;
}int main() {// 創建兩個共享資源實例SharedResource resource1(10);SharedResource resource2(20);// 輸出初始值std::cout << "Initial values: resource1 = " << resource1.getValue()<< ", resource2 = " << resource2.getValue() << std::endl;// 啟動兩個線程,分別修改 resource1 和 resource2 的值std::thread t1(modifyResource, std::ref(resource1), 100, "Thread 1");std::thread t2(modifyResource, std::ref(resource2), 200, "Thread 2");// 等待線程結束t1.join();t2.join();// 輸出修改后的值std::cout << "After modification: resource1 = " << resource1.getValue()<< ", resource2 = " << resource2.getValue() << std::endl;// 交換 resource1 和 resource2 的值swapValues(resource1, resource2);// 輸出交換后的值std::cout << "After swapping: resource1 = " << resource1.getValue()<< ", resource2 = " << resource2.getValue() << std::endl;return 0;
}
代碼說明
1. SharedResource
類
- 包含一個整型變量
value
作為共享資源。 - 使用
std::mutex
保護共享資源的訪問。 - 提供
getValue()
和setValue()
方法,用于安全地讀取和修改共享資源。 - 友元函數
swapValues()
實現兩個共享資源之間的值交換。
2. swapValues()
函數
- 使用
std::unique_lock
和std::defer_lock
延遲鎖定兩個互斥量。 - 使用
std::lock()
同時鎖定兩個互斥量,避免死鎖。 - 調用
std::swap()
完成值的交換。
3. modifyResource()
函數
- 模擬線程對共享資源的修改操作。
- 使用
std::this_thread::sleep_for()
模擬耗時操作。
4. 主函數
- 創建兩個
SharedResource
實例:resource1
和resource2
。 - 啟動兩個線程分別修改
resource1
和resource2
的值。 - 調用
swapValues()
交換兩個資源的值。 - 輸出各個階段的結果。
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
Initial values: resource1 = 10, resource2 = 20
Thread 1 modified the value to 100
Thread 2 modified the value to 200
After modification: resource1 = 100, resource2 = 200
After swapping: resource1 = 200, resource2 = 100
關鍵點解析
-
std::unique_lock
的靈活性:- 使用
std::defer_lock
延遲鎖定互斥量,避免在構造時立即鎖定。 - 使用
std::lock()
同時鎖定多個互斥量,防止死鎖。
- 使用
-
線程安全:
- 通過互斥量保護共享資源的訪問,確保多線程環境下的安全性。
-
性能與適用性:
- 在需要遞延鎖或鎖所有權轉移的情況下,
std::unique_lock
是更好的選擇。 - 對于簡單的鎖需求,可以繼續使用
std::lock_guard
。
- 在需要遞延鎖或鎖所有權轉移的情況下,
通過這個示例,您可以清楚地了解 std::unique_lock
的使用方式及其在多線程編程中的實際應用場景。
以下是經過優化排版后的敘述,使其更加清晰、簡潔且易于理解:
3.2.7 不同域中互斥量的傳遞
std::unique_lock
的靈活性不僅體現在其延遲鎖定和多鎖管理能力上,還在于它可以將互斥量的所有權通過移動操作在不同的實例之間傳遞。這種特性在某些場景下非常有用,例如需要將鎖從一個函數傳遞到另一個函數。
所有權傳遞機制
-
自動發生的情況:
- 在某些情況下,所有權的轉移是自動發生的。例如,當函數返回一個
std::unique_lock
實例時,編譯器會自動調用移動構造函數,將鎖的所有權轉移到調用者。
- 在某些情況下,所有權的轉移是自動發生的。例如,當函數返回一個
-
顯式調用的情況:
- 如果源值是一個左值(實際值或引用),則需要顯式調用
std::move()
來執行移動操作。 - 如果源值是一個右值(臨時類型),則無需顯式調用
std::move()
,因為編譯器會自動處理。
- 如果源值是一個左值(實際值或引用),則需要顯式調用
-
不可賦值性:
std::unique_lock
是可移動但不可賦值的類型。這意味著一旦鎖的所有權被轉移,原始對象將不再擁有該鎖。
應用場景:函數返回鎖
以下代碼片段展示了如何通過函數返回鎖,并將其所有權傳遞給調用者:
std::unique_lock<std::mutex> get_lock() {extern std::mutex some_mutex;std::unique_lock<std::mutex> lk(some_mutex); // 鎖住互斥量prepare_data(); // 準備數據return lk; // 返回鎖(編譯器負責調用移動構造函數)①
}void process_data() {std::unique_lock<std::mutex> lk(get_lock()); // 轉移鎖的所有權②do_something(); // 使用鎖保護的數據
}
關鍵點:
- 在
get_lock()
函數中,lk
被聲明為自動變量,不需要顯式調用std::move()
,直接返回即可。 - 在
process_data()
函數中,lk
接收了從get_lock()
返回的鎖,確保do_something()
可以安全地訪問受保護的數據(數據不會被其他線程修改)。
網關類模式
這種模式通常用于依賴當前程序狀態的互斥量,或者依賴于返回類型為 std::unique_lock
的函數。在這種情況下,可以設計一個“網關類”來管理對保護數據的訪問權限。
工作原理:
4. 網關類的數據成員確認是否已經對保護數據進行了鎖定。
5. 所有對保護數據的訪問都必須通過網關類。
6. 當需要訪問數據時,獲取網關類的實例(例如通過調用類似 get_lock()
的函數)。
7. 通過網關類的成員函數對數據進行訪問。
8. 訪問完成后銷毀網關類對象,釋放鎖,允許其他線程訪問保護數據。
示例:
class Gateway {
private:std::unique_lock<std::mutex> lock;public:explicit Gateway(std::mutex& mtx) : lock(mtx) {} // 構造函數鎖定互斥量~Gateway() = default; // 析構函數自動釋放鎖void accessData() {// 對數據進行訪問do_something();}
};void process_data_with_gateway() {extern std::mutex some_mutex;Gateway gateway(some_mutex); // 創建網關類實例并鎖定互斥量gateway.accessData(); // 通過網關類訪問數據
} // 離開作用域時自動釋放鎖
提前釋放鎖
std::unique_lock
的靈活性還體現在它允許在銷毀之前放棄擁有的鎖。可以通過調用 unlock()
方法來實現這一點。
優點:
- 提前釋放鎖可以減少持有鎖的時間,從而提高應用程序的性能。
- 其他線程無需等待鎖的釋放,避免了不必要的阻塞。
示例:
void selective_unlock() {std::mutex mtx;std::unique_lock<std::mutex> lock(mtx);if (some_condition()) {lock.unlock(); // 提前釋放鎖}// 在某些分支中可能不需要持有鎖do_something();
}
總結
std::unique_lock
的靈活性使得它在多線程編程中具有廣泛的應用場景:
9. 可以通過移動操作在不同實例之間傳遞鎖的所有權。
10. 支持函數返回鎖,方便調用者在鎖保護的范圍內執行額外操作。
11. 提供了提前釋放鎖的能力,有助于優化應用程序的性能。
盡管 std::unique_lock
占用更多資源并帶來輕微的性能開銷,但在需要靈活鎖管理的情況下,它是不可或缺的工具。
以下是一個完整的代碼示例,展示了如何通過 std::unique_lock
實現鎖的所有權傳遞,并結合“網關類”模式來管理對保護數據的訪問。
完整代碼示例
#include <iostream>
#include <thread>
#include <mutex>// 全局互斥量
std::mutex some_mutex;// 模擬準備數據的操作
void prepare_data() {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬耗時操作std::cout << "Data preparation completed." << std::endl;
}// 模擬處理數據的操作
void do_something() {std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 模擬耗時操作std::cout << "Processing data..." << std::endl;
}// 函數返回鎖:鎖住互斥量并準備數據
std::unique_lock<std::mutex> get_lock() {std::unique_lock<std::mutex> lk(some_mutex); // 鎖住互斥量prepare_data(); // 準備數據return lk; // 返回鎖(編譯器負責調用移動構造函數)
}// 網關類:管理對保護數據的訪問
class Gateway {
private:std::unique_lock<std::mutex> lock;public:// 構造函數:鎖定互斥量explicit Gateway(std::mutex& mtx) : lock(mtx) {}// 成員函數:訪問數據void accessData() {do_something(); // 處理數據}
};// 主函數
int main() {// 使用函數返回鎖的方式std::cout << "Using function return lock:" << std::endl;{std::unique_lock<std::mutex> lk(get_lock()); // 轉移鎖的所有權do_something(); // 處理數據} // 鎖自動釋放// 使用網關類的方式std::cout << "\nUsing gateway class:" << std::endl;{Gateway gateway(some_mutex); // 創建網關類實例并鎖定互斥量gateway.accessData(); // 通過網關類訪問數據} // 鎖自動釋放return 0;
}
代碼說明
1. 全局互斥量
- 定義了一個全局互斥量
some_mutex
,用于保護共享資源。
2. 函數 get_lock()
- 鎖住互斥量
some_mutex
。 - 調用
prepare_data()
準備數據。 - 返回
std::unique_lock<std::mutex>
實例,將鎖的所有權轉移到調用者。
3. 網關類 Gateway
- 在構造函數中鎖定互斥量。
- 提供成員函數
accessData()
,用于安全地訪問受保護的數據。
4. 主函數
-
第一部分:使用函數返回鎖
- 調用
get_lock()
獲取鎖的所有權。 - 調用
do_something()
處理數據。 - 離開作用域時,鎖自動釋放。
- 調用
-
第二部分:使用網關類
- 創建
Gateway
實例,自動鎖定互斥量。 - 調用
gateway.accessData()
訪問數據。 - 離開作用域時,鎖自動釋放。
- 創建
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
Using function return lock:
Data preparation completed.
Processing data...Using gateway class:
Data preparation completed.
Processing data...
關鍵點解析
-
鎖的所有權傳遞:
get_lock()
函數返回一個std::unique_lock<std::mutex>
實例,將鎖的所有權轉移到調用者。- 編譯器會自動調用移動構造函數完成所有權轉移。
-
網關類模式:
- 網關類
Gateway
封裝了對互斥量的鎖定和解鎖邏輯。 - 所有對保護數據的訪問都必須通過網關類,確保線程安全。
- 網關類
-
RAII 原則:
- 使用
std::unique_lock
和網關類實現了 RAII(Resource Acquisition Is Initialization)原則。 - 鎖在進入作用域時自動獲取,在離開作用域時自動釋放。
- 使用
-
性能優化:
- 通過提前釋放鎖或減少鎖持有時間,可以提高多線程應用程序的性能。
通過這個示例,您可以清楚地了解 std::unique_lock
的靈活性及其在不同場景下的應用方式。
以下是經過優化排版后的敘述,使其更加清晰、簡潔且易于理解:
3.2.8 鎖的粒度
鎖粒度的概念
在 3.2.3 節中,我們已經了解了鎖的粒度這一概念。鎖的粒度是一個描述通過一個鎖保護的數據量大小的術語:
- 細粒度鎖(Fine-grained Lock):保護較小的數據量。
- 粗粒度鎖(Coarse-grained Lock):保護較大的數據量。
鎖的粒度對性能至關重要。為了保護對應的數據,確保鎖能夠有效保護這些數據同樣重要。
鎖粒度的實際意義
想象一下在超市結賬時的情景:
- 如果正在結賬的顧客突然意識到忘了拿蔓越莓醬,然后離開柜臺去拿,并讓其他人等待他回來,這會導致其他顧客感到無奈。
- 或者當收銀員準備收錢時,顧客才開始翻錢包找錢,這樣的情況也會增加等待時間。
與此類似,在多線程環境中:
- 如果多個線程正在等待同一個資源(例如,等待互斥量解鎖),而某個線程持有鎖的時間過長,就會顯著增加其他線程的等待時間。
- 這種情況尤其發生在對文件進行輸入/輸出操作時。文件 I/O 操作通常比從內存中讀寫相同長度的數據慢成百上千倍。因此,除非鎖明確用于保護對文件的訪問,否則將 I/O 操作包含在鎖內會顯著延遲其他線程的執行,抵消多線程帶來的性能優勢。
使用 std::unique_lock
減少鎖持有時間
std::unique_lock
提供了一種靈活的方式來減少鎖的持有時間。如果代碼中某些部分不需要訪問共享數據,可以手動釋放鎖,并在需要時重新獲取。
以下是一個示例代碼:
void get_and_process_data() {std::unique_lock<std::mutex> my_lock(the_mutex);some_class data_to_process = get_next_data_chunk();my_lock.unlock(); // ① 在調用 process() 前手動釋放鎖result_type result = process(data_to_process);my_lock.lock(); // ② 在寫入數據前重新獲取鎖write_result(data_to_process, result);
}
關鍵點:
- 在調用耗時的
process()
函數之前(①),手動釋放鎖。 - 在需要寫入數據時(②),重新獲取鎖。
這種做法可以顯著減少鎖的持有時間,從而提高并發性能。
粗粒度鎖的問題
當只有一個互斥量保護整個數據結構時:
- 更多的操作需要競爭同一個鎖。
- 持有鎖的時間會更長。
這兩方面都會導致性能下降,因此向細粒度鎖轉移是合理的。
細粒度鎖的應用示例
假設我們需要比較兩個對象是否相等。如果對象中的數據類型很簡單(例如 int
),可以直接復制數據并分別加鎖進行比較。
以下是一個示例代碼:
class Y {
private:int some_detail;mutable std::mutex m;// 獲取受保護的數據int get_detail() const {std::lock_guard<std::mutex> lock_a(m); // ① 加鎖保護數據訪問return some_detail;}public:Y(int sd) : some_detail(sd) {}// 比較操作符friend bool operator==(Y const& lhs, Y const& rhs) {if (&lhs == &rhs)return true;int const lhs_value = lhs.get_detail(); // ② 獲取 lhs 的值int const rhs_value = rhs.get_detail(); // ③ 獲取 rhs 的值return lhs_value == rhs_value; // ④ 比較兩個值}
};
關鍵點:
- 每次調用
get_detail()
時(①),只短暫地持有鎖以讀取數據。 - 比較操作符分別獲取兩個對象的值(② 和 ③),并在之后進行比較(④)。
- 這種方式減少了鎖的持有時間,避免了死鎖的可能性。
語義問題與潛在風險
盡管上述方法減少了鎖的持有時間,但也引入了一些語義問題:
- 如果
lhs.some_detail
和rhs.some_detail
在讀取后被修改,可能會導致比較結果不準確。 - 比較操作符返回
true
只能說明在某一時間點上兩個值相等,但這并不意味著它們在整個操作期間都保持相等。
因此,在設計鎖機制時,必須仔細權衡性能和語義一致性。
總結
鎖的粒度直接影響多線程程序的性能:
- 粗粒度鎖:簡單易實現,但可能導致鎖競爭和持有時間過長。
- 細粒度鎖:減少鎖的競爭和持有時間,但實現復雜性較高。
在實際應用中,應根據具體需求選擇合適的鎖粒度。此外,std::unique_lock
提供了靈活的鎖管理能力,可以幫助減少不必要的鎖持有時間,從而提高程序的并發性能。
以下是一個完整的代碼示例,展示了如何通過細粒度鎖來優化多線程程序的性能。該示例模擬了一個簡單的銀行賬戶系統,其中多個線程對賬戶余額進行操作。
完整代碼示例:銀行賬戶系統
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>// 銀行賬戶類
class BankAccount {
private:double balance; // 賬戶余額mutable std::mutex mtx; // 保護余額的互斥量public:// 構造函數explicit BankAccount(double initialBalance = 0.0) : balance(initialBalance) {}// 獲取余額(線程安全)double getBalance() const {std::lock_guard<std::mutex> lock(mtx); // 加鎖保護訪問return balance;}// 存款(線程安全)void deposit(double amount) {std::lock_guard<std::mutex> lock(mtx); // 加鎖保護修改if (amount > 0) {balance += amount;std::cout << "Deposited: " << amount << ", New Balance: " << balance << std::endl;}}// 取款(線程安全)bool withdraw(double amount) {std::lock_guard<std::mutex> lock(mtx); // 加鎖保護修改if (amount > 0 && balance >= amount) {balance -= amount;std::cout << "Withdrew: " << amount << ", New Balance: " << balance << std::endl;return true;}return false;}// 比較兩個賬戶余額是否相等friend bool operator==(const BankAccount& lhs, const BankAccount& rhs) {// 使用細粒度鎖分別獲取兩個賬戶的余額std::lock(lhs.mtx, rhs.mtx); // 同時鎖定兩個互斥量,避免死鎖std::lock_guard<std::mutex> lock_a(lhs.mtx, std::adopt_lock);std::lock_guard<std::mutex> lock_b(rhs.mtx, std::adopt_lock);return lhs.balance == rhs.balance;}
};// 線程函數:模擬存款操作
void depositMoney(BankAccount& account, double amount, int threadId) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬延遲account.deposit(amount);std::cout << "Thread " << threadId << " deposited " << amount << std::endl;
}// 線程函數:模擬取款操作
void withdrawMoney(BankAccount& account, double amount, int threadId) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模擬延遲if (account.withdraw(amount)) {std::cout << "Thread " << threadId << " withdrew " << amount << std::endl;} else {std::cout << "Thread " << threadId << " failed to withdraw " << amount << std::endl;}
}int main() {// 創建一個銀行賬戶BankAccount account(100.0); // 初始余額為 100.0// 創建多個線程進行存款和取款操作std::vector<std::thread> threads;threads.emplace_back(depositMoney, std::ref(account), 50.0, 1); // 線程 1 存款 50.0threads.emplace_back(withdrawMoney, std::ref(account), 30.0, 2); // 線程 2 取款 30.0threads.emplace_back(depositMoney, std::ref(account), 20.0, 3); // 線程 3 存款 20.0threads.emplace_back(withdrawMoney, std::ref(account), 80.0, 4); // 線程 4 取款 80.0// 等待所有線程完成for (auto& t : threads) {t.join();}// 輸出最終余額std::cout << "Final Balance: " << account.getBalance() << std::endl;// 比較兩個賬戶余額是否相等BankAccount anotherAccount(90.0);if (account == anotherAccount) {std::cout << "The two accounts have the same balance." << std::endl;} else {std::cout << "The two accounts have different balances." << std::endl;}return 0;
}
代碼說明
1. BankAccount
類
- 包含一個
double
類型的成員變量balance
,用于存儲賬戶余額。 - 使用
std::mutex
保護對余額的訪問。 - 提供線程安全的
getBalance()
、deposit()
和withdraw()
方法。 - 實現了
operator==
操作符,用于比較兩個賬戶余額是否相等。通過細粒度鎖分別獲取兩個賬戶的余額,避免死鎖。
2. 線程函數
depositMoney()
:模擬存款操作。withdrawMoney()
:模擬取款操作。
3. 主函數
- 創建一個初始余額為 100.0 的銀行賬戶。
- 啟動多個線程進行存款和取款操作。
- 等待所有線程完成后,輸出最終余額。
- 比較兩個賬戶余額是否相等。
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
Deposited: 50, New Balance: 150
Thread 1 deposited 50
Withdrew: 30, New Balance: 120
Thread 2 withdrew 30
Deposited: 20, New Balance: 140
Thread 3 deposited 20
Failed to withdraw 80, New Balance: 140
Thread 4 failed to withdraw 80
Final Balance: 140
The two accounts have different balances.
關鍵點解析
-
細粒度鎖的應用:
- 在
operator==
中,使用std::lock()
同時鎖定兩個互斥量,避免死鎖。 - 分別獲取兩個賬戶的余額,減少鎖的持有時間。
- 在
-
線程安全的操作:
- 所有對賬戶余額的訪問和修改都通過加鎖保護,確保線程安全。
-
性能優化:
- 通過減少鎖的持有時間,提高并發性能。
-
語義一致性:
- 在比較兩個賬戶余額時,注意語義問題:即使返回
true
,也只是表示在某一時間點上兩個余額相等。
- 在比較兩個賬戶余額時,注意語義問題:即使返回
通過這個示例,您可以清楚地了解如何通過細粒度鎖優化多線程程序的性能,并確保線程安全和語義一致性。
以下是經過優化排版后的敘述,使其更加清晰、簡潔且易于理解:
3.3 保護共享數據的方式
互斥量是一種通用的機制,但并非保護共享數據的唯一方式。在特定情況下,還有許多其他方法可以提供合適的保護。
隱式同步的需求
在某些極端情況下,共享數據可能只需要在初始化時進行保護,而后續的訪問是只讀的,因此不需要同步。例如:
- 數據作為只讀方式創建后,不再需要額外的保護。
- 初始化過程中的保護可能會對性能造成不必要的影響。
為此,C++標準庫提供了一種專門用于保護共享數據初始化過程的機制。
3.3.1 保護共享數據的初始化過程
假設有一個代價昂貴的共享資源(如打開數據庫連接或分配大量內存),延遲初始化(Lazy Initialization)在這種場景中非常常見。在單線程代碼中,延遲初始化的實現如下:
std::shared_ptr<some_resource> resource_ptr;void foo() {if (!resource_ptr) { // 檢查是否已初始化resource_ptr.reset(new some_resource); // 初始化}resource_ptr->do_something(); // 使用資源
}
多線程環境下的問題
將上述代碼轉換為多線程版本時,只有初始化部分需要保護,但以下實現會導致不必要的線程序列化:
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;void foo() {std::unique_lock<std::mutex> lk(resource_mutex); // 所有線程在此序列化if (!resource_ptr) {resource_ptr.reset(new some_resource); // 只有初始化過程需要保護}lk.unlock();resource_ptr->do_something();
}
盡管這種實現可以確保線程安全,但它會讓所有線程在檢查初始化狀態時等待互斥量,從而降低性能。
雙重檢查鎖模式的問題
為了解決上述問題,許多人嘗試使用“雙重檢查鎖模式”:
void undefined_behaviour_with_double_checked_locking() {if (!resource_ptr) { // 第一次檢查:未加鎖std::lock_guard<std::mutex> lk(resource_mutex);if (!resource_ptr) { // 第二次檢查:加鎖后再次檢查resource_ptr.reset(new some_resource); // 初始化}}resource_ptr->do_something(); // 使用資源
}
潛在問題:
- 第一次未加鎖的讀取操作與另一線程中加鎖的寫入操作之間存在條件競爭(Data Race)。
- 即使一個線程看到指針已被寫入,它可能無法看到新創建的對象實例,導致調用
do_something()
后出現未定義行為。
C++ 標準庫的解決方案:std::call_once
和 std::once_flag
為了消除條件競爭,C++ 標準庫提供了 std::once_flag
和 std::call_once
,用于安全地執行一次性初始化操作。
示例代碼
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;void init_resource() {resource_ptr.reset(new some_resource); // 初始化資源
}void foo() {std::call_once(resource_flag, init_resource); // 確保初始化只執行一次resource_ptr->do_something(); // 使用資源
}
特點:
std::call_once
確保初始化函數只被調用一次。- 相比顯式使用互斥量,
std::call_once
的開銷更小,特別是在初始化完成后。
作為類成員的延遲初始化
以下示例展示了如何在類中使用 std::call_once
實現線程安全的延遲初始化:
class X {
private:connection_info connection_details;connection_handle connection;std::once_flag connection_init_flag;void open_connection() {connection = connection_manager.open(connection_details); // 初始化連接}public:X(const connection_info& connection_details_) : connection_details(connection_details_) {}void send_data(const data_packet& data) {std::call_once(connection_init_flag, &X::open_connection, this); // 初始化連接connection.send_data(data); // 發送數據}data_packet receive_data() {std::call_once(connection_init_flag, &X::open_connection, this); // 初始化連接return connection.receive_data(); // 接收數據}
};
關鍵點:
- 第一次調用
send_data()
或receive_data()
的線程會完成初始化。 - 需要將
this
指針傳遞給std::call_once
,以便調用類的成員函數。
靜態局部變量的線程安全初始化
在 C++11 標準中,靜態局部變量的初始化是線程安全的。例如:
class my_class;my_class& get_my_class_instance() {static my_class instance; // 線程安全的初始化過程return instance;
}
優點:
- 不需要顯式使用
std::call_once
或互斥量。 - 初始化和定義完全在一個線程中完成,避免了條件競爭。
總結
對于需要保護的共享數據,C++ 提供了多種機制:
- 互斥量:適用于通用場景,但可能導致性能開銷。
- 雙重檢查鎖模式:存在條件競爭風險,不推薦使用。
std::call_once
和std::once_flag
:專為一次性初始化設計,性能更優。- 靜態局部變量:在 C++11 中提供線程安全的初始化機制,適合全局實例。
選擇合適的保護機制可以顯著提高程序的性能和安全性。
以下是一個完整的代碼示例,展示了如何使用 std::call_once
和 std::once_flag
來實現線程安全的延遲初始化。同時,還演示了靜態局部變量的線程安全初始化。
完整代碼示例
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>// 模擬一個昂貴的資源類
class SomeResource {
public:SomeResource() {std::cout << "SomeResource initialized." << std::endl;}void doSomething() const {std::cout << "SomeResource is doing something." << std::endl;}
};// 使用 std::call_once 實現線程安全的延遲初始化
class ResourceManager {
private:std::shared_ptr<SomeResource> resource;std::once_flag init_flag;void initializeResource() {resource.reset(new SomeResource); // 初始化資源}public:void useResource() {std::call_once(init_flag, &ResourceManager::initializeResource, this); // 確保只初始化一次if (resource) {resource->doSomething(); // 使用資源} else {std::cout << "Resource initialization failed." << std::endl;}}
};// 使用靜態局部變量實現線程安全的延遲初始化
SomeResource& getGlobalResource() {static SomeResource instance; // 線程安全的初始化過程return instance;
}// 測試函數
void testResourceManager(ResourceManager& manager) {manager.useResource();
}void testStaticLocalVariable() {SomeResource& resource = getGlobalResource();resource.doSomething();
}int main() {// 創建多個線程測試 ResourceManagerstd::cout << "Testing ResourceManager with std::call_once:" << std::endl;ResourceManager manager;std::vector<std::thread> threads;for (int i = 0; i < 5; ++i) {threads.emplace_back(testResourceManager, std::ref(manager));}for (auto& t : threads) {t.join();}// 測試靜態局部變量的線程安全初始化std::cout << "\nTesting static local variable initialization:" << std::endl;std::vector<std::thread> staticThreads;for (int i = 0; i < 5; ++i) {staticThreads.emplace_back(testStaticLocalVariable);}for (auto& t : staticThreads) {t.join();}return 0;
}
代碼說明
1. SomeResource
類
- 模擬一個代價昂貴的資源。
- 構造函數輸出初始化信息。
- 提供
doSomething()
方法模擬資源的操作。
2. ResourceManager
類
- 使用
std::call_once
和std::once_flag
實現線程安全的延遲初始化。 initializeResource()
方法負責初始化資源。useResource()
方法確保資源只被初始化一次,并調用其操作方法。
3. 靜態局部變量初始化
getGlobalResource()
函數返回一個全局SomeResource
實例。- C++11 標準保證靜態局部變量的初始化是線程安全的。
4. 測試函數
testResourceManager()
:測試ResourceManager
的線程安全性。testStaticLocalVariable()
:測試靜態局部變量的線程安全性。
5. 主函數
- 創建多個線程測試
ResourceManager
的延遲初始化。 - 創建多個線程測試靜態局部變量的線程安全初始化。
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
Testing ResourceManager with std::call_once:
SomeResource initialized.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.Testing static local variable initialization:
SomeResource initialized.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.
SomeResource is doing something.
關鍵點解析
-
std::call_once
的作用:- 確保初始化函數只執行一次。
- 避免了雙重檢查鎖模式中的條件競爭問題。
-
靜態局部變量的線程安全性:
- 在 C++11 中,靜態局部變量的初始化是線程安全的。
- 適合需要全局實例且初始化代價較高的場景。
-
性能優化:
std::call_once
的開銷比顯式使用互斥量更低。- 靜態局部變量的初始化機制由編譯器自動優化,性能更優。
通過這個示例,您可以清楚地了解如何使用 std::call_once
和靜態局部變量來實現線程安全的延遲初始化。
以下是經過優化排版后的敘述,使其更加清晰、簡潔且易于理解:
3.3.2 保護不常更新的數據結構
場景描述
假設需要將域名解析為對應的 IP 地址,并將其存儲在一個 DNS 緩存表中。通常情況下,DNS 條目在較長時間內保持不變。然而,當用戶訪問不同的網站時,可能會有新的條目被添加到緩存中。盡管這些條目可能在其生命周期內很少發生變化,但定期檢查緩存條目的有效性仍然是必要的。
此外,緩存可能會偶爾進行更新(例如對某些條目進行修改)。雖然更新頻率較低,但在多線程環境中,仍然需要保護更新過程的狀態,以確保每個線程讀取到的數據是有效的。
問題分析
如果使用普通的互斥量(如 std::mutex
)來保護數據結構,可能會導致性能下降,因為在沒有發生修改的情況下,它會削減并發讀取的可能性。因此,我們需要一種更適合這種場景的鎖機制。
在這種情況下,“讀者-寫者鎖”(Reader-Writer Lock)是一種更合適的選擇。它允許以下兩種訪問方式:
- 獨占訪問:一個“寫者”線程可以獨占訪問數據結構。
- 共享訪問:多個“讀者”線程可以同時訪問數據結構。
C++ 標準庫中的解決方案
從 C++17 開始,標準庫提供了兩種適合讀者-寫者鎖場景的互斥量:
std::shared_mutex
:適用于簡單的讀者-寫者鎖場景,性能較高,但功能較少。std::shared_timed_mutex
:支持更多操作(如超時鎖定),但性能略遜于std::shared_mutex
。
在 C++14 中,僅提供了 std::shared_timed_mutex
。而在 C++11 中,標準庫并未提供任何讀者-寫者鎖類型。如果使用的是舊版本編譯器,可以考慮使用 Boost 庫中的互斥量。
需要注意的是,讀者-寫者鎖的性能取決于系統中的處理器數量以及讀者和寫者線程的負載情況。因此,在實際應用中,需要根據目標系統的具體情況進行性能測試,以確保引入復雜性后仍能獲得性能收益。
代碼示例
以下是一個簡單的 DNS 緩存實現,使用 std::map
存儲緩存數據,并通過 std::shared_mutex
進行保護。
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>class dns_entry; // 假設這是一個表示 DNS 條目的類class dns_cache {
private:std::map<std::string, dns_entry> entries; // 存儲 DNS 緩存條目mutable std::shared_mutex entry_mutex; // 保護數據結構的互斥量public:// 查找 DNS 條目(只讀操作)dns_entry find_entry(const std::string& domain) const {std::shared_lock<std::shared_mutex> lk(entry_mutex); // ① 獲取共享鎖,允許多個線程并發讀取auto it = entries.find(domain);return (it == entries.end()) ? dns_entry() : it->second; // 如果未找到條目,返回空條目}// 更新或添加 DNS 條目(寫操作)void update_or_add_entry(const std::string& domain, const dns_entry& dns_details) {std::lock_guard<std::shared_mutex> lk(entry_mutex); // ② 獲取獨占鎖,確保只有一個線程可以修改數據entries[domain] = dns_details; // 更新或添加條目}
};
代碼說明
-
find_entry()
方法- 使用
std::shared_lock<std::shared_mutex>
獲取共享鎖(①),允許多個線程同時讀取緩存數據。 - 如果找到指定的域名條目,則返回該條目;否則返回一個空條目。
- 使用
-
update_or_add_entry()
方法- 使用
std::lock_guard<std::shared_mutex>
獲取獨占鎖(②),確保在更新或添加條目時,其他線程無法訪問數據結構。 - 將指定的域名和 DNS 條目插入或更新到緩存中。
- 使用
關鍵點解析
-
共享鎖與獨占鎖的區別
- 共享鎖(
std::shared_lock
)允許多個線程同時讀取數據,提高并發性能。 - 獨占鎖(
std::lock_guard
或std::unique_lock
)確保只有一個線程可以修改數據,避免數據競爭。
- 共享鎖(
-
性能優化
- 在大多數情況下,DNS 緩存的讀取操作遠多于寫入操作。因此,使用讀者-寫者鎖可以顯著提高并發性能。
- 需要注意的是,當有線程持有共享鎖時,嘗試獲取獨占鎖的線程會被阻塞,直到所有共享鎖釋放。同樣地,當有線程持有獨占鎖時,其他線程無法獲取任何類型的鎖。
-
適用場景
- 讀者-寫者鎖適用于讀多寫少的場景,例如緩存、日志記錄等。
通過上述代碼示例和分析,您可以清楚地了解如何使用 std::shared_mutex
和相關鎖機制來保護不常更新的數據結構,從而在多線程環境中實現高效的并發訪問。
以下是一個完整的代碼示例,展示了如何使用 std::shared_mutex
來保護一個 DNS 緩存數據結構。該示例包括了讀取和更新緩存的操作,并演示了多線程環境下的并發訪問。
完整代碼示例
#include <iostream>
#include <map>
#include <string>
#include <shared_mutex>
#include <thread>
#include <vector>
#include <chrono>// 模擬 DNS 條目類
class dns_entry {
private:std::string ip_address;public:explicit dns_entry(const std::string& ip = "") : ip_address(ip) {}void set_ip(const std::string& ip) {ip_address = ip;}std::string get_ip() const {return ip_address;}friend std::ostream& operator<<(std::ostream& os, const dns_entry& entry) {return os << "IP: " << entry.ip_address;}
};// DNS 緩存類
class dns_cache {
private:std::map<std::string, dns_entry> entries; // 存儲 DNS 緩存條目mutable std::shared_mutex entry_mutex; // 保護數據結構的互斥量public:// 查找 DNS 條目(只讀操作)dns_entry find_entry(const std::string& domain) const {std::shared_lock<std::shared_mutex> lk(entry_mutex); // 獲取共享鎖auto it = entries.find(domain);if (it == entries.end()) {return dns_entry(); // 如果未找到條目,返回空條目}return it->second;}// 更新或添加 DNS 條目(寫操作)void update_or_add_entry(const std::string& domain, const dns_entry& dns_details) {std::lock_guard<std::shared_mutex> lk(entry_mutex); // 獲取獨占鎖entries[domain] = dns_details; // 更新或添加條目std::cout << "Updated cache for domain: " << domain << ", Entry: " << dns_details << std::endl;}// 模擬定期檢查緩存有效性(寫操作)void check_and_clean_cache() {std::lock_guard<std::shared_mutex> lk(entry_mutex); // 獲取獨占鎖std::cout << "Cleaning cache..." << std::endl;entries.clear(); // 清空緩存}
};// 測試函數:模擬讀取操作
void test_find_entry(dns_cache& cache, const std::string& domain) {dns_entry entry = cache.find_entry(domain);if (!entry.get_ip().empty()) {std::cout << "Found entry for domain: " << domain << ", Entry: " << entry << std::endl;} else {std::cout << "No entry found for domain: " << domain << std::endl;}
}// 測試函數:模擬更新操作
void test_update_or_add_entry(dns_cache& cache, const std::string& domain, const std::string& ip) {dns_entry new_entry(ip);cache.update_or_add_entry(domain, new_entry);
}int main() {// 創建 DNS 緩存實例dns_cache cache;// 創建多個線程測試緩存std::vector<std::thread> threads;// 啟動多個讀取線程for (int i = 0; i < 5; ++i) {threads.emplace_back(test_find_entry, std::ref(cache), "example.com");}// 啟動多個更新線程threads.emplace_back(test_update_or_add_entry, std::ref(cache), "example.com", "192.168.1.1");threads.emplace_back(test_update_or_add_entry, std::ref(cache), "google.com", "8.8.8.8");// 啟動緩存清理線程threads.emplace_back([&cache]() {std::this_thread::sleep_for(std::chrono::seconds(2)); // 模擬延遲cache.check_and_clean_cache();});// 等待所有線程完成for (auto& t : threads) {t.join();}return 0;
}
代碼說明
1. dns_entry
類
- 模擬一個 DNS 條目,包含 IP 地址。
- 提供
set_ip()
和get_ip()
方法來設置和獲取 IP 地址。 - 重載
<<
運算符以便于輸出。
2. dns_cache
類
- 使用
std::map
存儲 DNS 緩存條目。 - 使用
std::shared_mutex
保護數據結構。 - 提供以下方法:
find_entry()
:查找指定域名的 DNS 條目,使用共享鎖允許多個線程并發讀取。update_or_add_entry()
:更新或添加 DNS 條目,使用獨占鎖確保只有一個線程可以修改數據。check_and_clean_cache()
:模擬定期清理緩存,使用獨占鎖防止其他線程訪問數據。
3. 測試函數
test_find_entry()
:模擬讀取操作,查找指定域名的 DNS 條目。test_update_or_add_entry()
:模擬更新操作,添加或更新指定域名的 DNS 條目。
4. 主函數
- 創建一個
dns_cache
實例。 - 啟動多個線程進行讀取、更新和清理緩存操作。
- 等待所有線程完成。
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
No entry found for domain: example.com
No entry found for domain: example.com
No entry found for domain: example.com
No entry found for domain: example.com
No entry found for domain: example.com
Updated cache for domain: example.com, Entry: IP: 192.168.1.1
Updated cache for domain: google.com, Entry: IP: 8.8.8.8
Cleaning cache...
關鍵點解析
-
共享鎖與獨占鎖的配合
find_entry()
使用共享鎖,允許多個線程同時讀取緩存數據。update_or_add_entry()
和check_and_clean_cache()
使用獨占鎖,確保只有一個線程可以修改數據。
-
性能優化
- 在大多數情況下,DNS 緩存的讀取操作遠多于寫入操作。通過使用讀者-寫者鎖機制,可以顯著提高并發性能。
-
線程安全
- 使用
std::shared_mutex
確保在多線程環境中對共享數據結構的訪問是安全的。
- 使用
通過這個示例,您可以清楚地了解如何使用 std::shared_mutex
來保護不常更新的數據結構,并實現高效的并發訪問。
3.3.3 嵌套鎖
概述
線程對已經獲取的 std::mutex
再次上鎖是錯誤的,這種行為會導致未定義結果。然而,在某些情況下,一個線程可能會嘗試在釋放互斥量之前多次獲取鎖。為了解決這一問題,C++ 標準庫提供了 std::recursive_mutex
類。std::recursive_mutex
的功能與 std::mutex
類似,但允許同一個線程對其多次加鎖。當前線程必須在其他線程獲取鎖之前,解鎖所有已持有的鎖。例如,如果調用了 lock()
三次,則需要調用 unlock()
三次才能完全釋放鎖。
使用嵌套鎖時,代碼設計需要進行調整。通常,嵌套鎖用于保護可并發訪問的類的成員數據。每個公共成員函數都會對互斥量加鎖,并在操作完成后解鎖。然而,當一個成員函數調用另一個成員函數時,第二個函數也會試圖加鎖,這可能導致未定義行為。一種“變通”的解決方案是將普通互斥量替換為嵌套鎖,但這并不是推薦的做法,因為它可能會破壞類的不變量。
更好的方式是提取出一個私有成員函數,該函數不負責加鎖(調用前必須已持有鎖)。通過這種方式,可以確保在調用新函數時數據的狀態是明確且一致的。
完整代碼示例
以下是一個完整的代碼示例,展示了如何使用 std::recursive_mutex
和改進后的設計方法。
#include <iostream>
#include <mutex>
#include <thread>// 使用 std::recursive_mutex 的類
class RecursiveMutexExample {
private:int value;mutable std::recursive_mutex mtx;public:RecursiveMutexExample(int initialValue = 0) : value(initialValue) {}// 公共成員函數:增加值并打印void incrementAndPrint() {std::lock_guard<std::recursive_mutex> lock(mtx); // 加鎖++value;std::cout << "Incremented value: " << value << std::endl;// 調用另一個成員函數doubleValueAndPrint();}// 公共成員函數:加倍值并打印void doubleValueAndPrint() {std::lock_guard<std::recursive_mutex> lock(mtx); // 加鎖value *= 2;std::cout << "Doubled value: " << value << std::endl;}
};// 改進后的設計:避免嵌套鎖
class ImprovedDesignExample {
private:int value;mutable std::mutex mtx;// 私有成員函數:不負責加鎖void modifyValue(int modifier) const {value += modifier; // 修改值}public:ImprovedDesignExample(int initialValue = 0) : value(initialValue) {}// 公共成員函數:增加值并打印void incrementAndPrint() {std::lock_guard<std::mutex> lock(mtx); // 加鎖modifyValue(1); // 調用私有函數修改值std::cout << "Incremented value: " << value << std::endl;// 調用另一個成員函數doubleValueAndPrint();}// 公共成員函數:加倍值并打印void doubleValueAndPrint() {std::lock_guard<std::mutex> lock(mtx); // 加鎖modifyValue(value); // 調用私有函數修改值std::cout << "Doubled value: " << value << std::endl;}
};// 測試函數
void testRecursiveMutexExample(RecursiveMutexExample& example) {example.incrementAndPrint();
}void testImprovedDesignExample(ImprovedDesignExample& example) {example.incrementAndPrint();
}int main() {// 測試 RecursiveMutexExamplestd::cout << "Testing RecursiveMutexExample:" << std::endl;RecursiveMutexExample recursiveExample;std::thread t1(testRecursiveMutexExample, std::ref(recursiveExample));std::thread t2(testRecursiveMutexExample, std::ref(recursiveExample));t1.join();t2.join();// 測試 ImprovedDesignExamplestd::cout << "\nTesting ImprovedDesignExample:" << std::endl;ImprovedDesignExample improvedExample;std::thread t3(testImprovedDesignExample, std::ref(improvedExample));std::thread t4(testImprovedDesignExample, std::ref(improvedExample));t3.join();t4.join();return 0;
}
代碼說明
1. RecursiveMutexExample
類
- 使用
std::recursive_mutex
來允許嵌套鎖。 - 每個公共成員函數都對互斥量加鎖。
- 當一個成員函數調用另一個成員函數時,嵌套鎖不會導致死鎖。
2. ImprovedDesignExample
類
- 使用
std::mutex
并避免嵌套鎖。 - 提取了一個私有成員函數
modifyValue()
,該函數不負責加鎖。 - 在公共成員函數中,確保在調用私有函數前已持有鎖。
3. 測試函數
testRecursiveMutexExample()
和testImprovedDesignExample()
分別測試兩個類的功能。- 創建多個線程來驗證線程安全性。
運行結果示例
假設程序運行時線程調度正常,可能的輸出如下:
Testing RecursiveMutexExample:
Incremented value: 1
Doubled value: 2
Incremented value: 3
Doubled value: 6Testing ImprovedDesignExample:
Incremented value: 1
Doubled value: 2
Incremented value: 3
Doubled value: 6
關鍵點解析
-
嵌套鎖的使用場景
- 當一個線程需要多次加鎖時,
std::recursive_mutex
是一種解決方案。 - 然而,嵌套鎖可能會掩蓋潛在的設計問題,因此應謹慎使用。
- 當一個線程需要多次加鎖時,
-
改進設計的優點
- 提取私有成員函數避免了嵌套鎖。
- 明確了調用新函數時數據的狀態,確保類的不變量不被破壞。
-
性能考慮
std::recursive_mutex
的性能通常低于std::mutex
,因為需要額外的計數機制來跟蹤鎖的次數。- 如果可以避免嵌套鎖,建議優先使用普通的
std::mutex
。
通過這個示例,您可以清楚地了解如何使用 std::recursive_mutex
以及如何改進設計以避免嵌套鎖的潛在問題。