優化并發
- 一、并發基礎與優化核心知識點
- 二、關鍵代碼示例與測試
- 三、關鍵優化策略總結
- 四、性能測試方法論
- 多選題
- 設計題
- 答案與詳解
- 多選題答案:
- 設計題答案示例
一、并發基礎與優化核心知識點
- 線程 vs 異步任務
- 核心區別:
std::thread
直接管理線程,std::async
由運行時決定異步策略(可能用線程池)。 - 優化點:頻繁創建線程開銷大,優先用
std::async
。
- 原子操作與內存序
- 原子類型:
std::atomic<T>
確保操作不可分割。 - 內存序:
memory_order_relaxed
(無同步)到memory_order_seq_cst
(全序同步)。
- 鎖的精細控制
- 鎖類型:
std::mutex
、std::recursive_mutex
、std::shared_mutex
。 - 優化技巧:縮短臨界區、避免嵌套鎖、用
std::lock_guard
/std::unique_lock
自動管理。
- 條件變量與生產者-消費者
- 使用模式:
wait()
搭配謂詞防止虛假喚醒,notify_one()
/notify_all()
通知。
- 無鎖數據結構
- 適用場景:高并發計數器、隊列等,減少鎖競爭。
二、關鍵代碼示例與測試
示例1:原子操作 vs 互斥鎖的性能對比
#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>constexpr int kIncrements = 1'000'000;// 使用互斥鎖的計數器
struct MutexCounter {int value = 0;std::mutex mtx;void increment() {std::lock_guard<std::mutex> lock(mtx);++value;}
};// 使用原子操作的計數器
struct AtomicCounter {std::atomic<int> value{0};void increment() {value.fetch_add(1, std::memory_order_relaxed);}
};template<typename Counter>
void test_performance(const std::string& name) {Counter counter;auto start = std::chrono::high_resolution_clock::now();std::vector<std::thread> threads;for (int i = 0; i < 4; ++i) {threads.emplace_back([&counter] {for (int j = 0; j < kIncrements; ++j) {counter.increment();}});}for (auto& t : threads) t.join();auto end = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();std::cout << name << " Result: " << counter.value << ", Time: " << duration << "ms\n";
}int main() {test_performance<MutexCounter>("Mutex Counter");test_performance<AtomicCounter>("Atomic Counter");return 0;
}
編譯命令:
g++ -std=c++11 -O2 -pthread atomic_vs_mutex.cpp -o atomic_vs_mutex
輸出示例:
Mutex Counter Result: 4000000, Time: 182ms
Atomic Counter Result: 4000000, Time: 32ms
結論:原子操作在高并發下性能顯著優于互斥鎖。
示例2:線程池實現(減少線程創建開銷)
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>class ThreadPool {
public:ThreadPool(size_t num_threads) : stop(false) {for (size_t i = 0; i < num_threads; ++i) {workers.emplace_back([this] {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queue_mutex);condition.wait(lock, [this] { return stop || !tasks.empty(); });if (stop && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task();}});}}template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {using return_type = decltype(f(args...));auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);tasks.emplace([task] { (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};int main() {ThreadPool pool(4);std::vector<std::future<int>> results;for (int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "Task " << i << " executed\n";return i * i;}));}for (auto& result : results)std::cout << "Result: " << result.get() << std::endl;return 0;
}
編譯命令:
g++ -std=c++11 -O2 -pthread thread_pool.cpp -o thread_pool
輸出示例:
Task Result: Task 3Task 1Task 20 executedexecuted
Task 5 executed
Task 40 executed
Task 7Task 6 executedexecutedexecutedexecutedResult: 1
Result: 4
Result: 9
Result: 16
Result: 25
Result: 36
Result: 49
結論:線程池復用線程,減少頻繁創建銷毀的開銷。
示例3:使用無鎖隊列提升并發
#include <iostream>
#include <atomic>
#include <thread>template<typename T>
class LockFreeQueue {
public:struct Node {T data;Node* next;Node(const T& data) : data(data), next(nullptr) {}};LockFreeQueue() : head(new Node(T())), tail(head.load()) {}void push(const T& data) {Node* newNode = new Node(data);Node* prevTail = tail.exchange(newNode);prevTail->next = newNode;}bool pop(T& result) {Node* oldHead = head.load();if (oldHead == tail.load()) return false;head.store(oldHead->next);result = oldHead->next->data;delete oldHead;return true;}private:std::atomic<Node*> head;std::atomic<Node*> tail;
};int main() {LockFreeQueue<int> queue;// 生產者線程std::thread producer([&] {for (int i = 0; i < 10; ++i) {queue.push(i);std::this_thread::sleep_for(std::chrono::milliseconds(10));}});// 消費者線程std::thread consumer([&] {int value;while (true) {if (queue.pop(value)) {std::cout << "Consumed: " << value << std::endl;if (value == 9) break;}}});producer.join();consumer.join();return 0;
}
編譯命令:
g++ -std=c++11 -O2 -pthread lockfree_queue.cpp -o lockfree_queue
輸出示例:
Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Consumed: 7
Consumed: 8
Consumed: 9
結論:無鎖隊列減少鎖爭用,適合高并發場景。
三、關鍵優化策略總結
-
減少鎖競爭:
- 縮小臨界區范圍。
- 使用讀寫鎖(
std::shared_mutex
)區分讀寫操作。
-
利用原子操作:
- 簡單計數器優先用
std::atomic
。 - 適當選擇內存序(如
memory_order_relaxed
)。
- 簡單計數器優先用
-
異步與線程池:
- 避免頻繁創建線程,使用
std::async
或自定義線程池。
- 避免頻繁創建線程,使用
-
無鎖數據結構:
- 在CAS(Compare-And-Swap)安全時使用,但需注意ABA問題。
-
緩存友好設計:
- 避免false sharing(用
alignas
對齊或填充字節)。
- 避免false sharing(用
四、性能測試方法論
-
基準測試:
- 使用
std::chrono
精確測量代碼段耗時。 - 對比不同實現的吞吐量(如ops/sec)。
- 使用
-
壓力測試:
- 模擬高并發場景,觀察資源競爭情況。
-
工具輔助:
- Valgrind檢測內存問題。
- perf分析CPU緩存命中率。
多選題
-
下列哪些情況可能導致數據競爭?
A. 多個線程同時讀取同一變量
B. 一個線程寫,另一個線程讀同一變量
C. 使用互斥量保護共享變量
D. 使用原子變量操作 -
關于
std::async
和std::thread
的選擇,正確的說法是?
A.std::async
默認啟動策略是延遲執行
B.std::thread
更適合需要直接控制線程生命周期的場景
C.std::async
會自動管理線程池
D.std::thread
無法返回計算結果 -
以下哪些操作可能引發死鎖?
A. 在持有鎖時調用外部未知代碼
B. 對多個互斥量使用固定順序加鎖
C. 遞歸互斥量(std::recursive_mutex
)的嵌套加鎖
D. 未及時釋放條件變量關聯的鎖 -
關于原子操作的內存順序,正確的說法是?
A.memory_order_relaxed
不保證操作順序
B.memory_order_seq_cst
會降低性能但保證全局順序
C.memory_order_acquire
僅保證讀操作的可見性
D. 原子操作必須與volatile
關鍵字結合使用 -
優化同步的合理手段包括:
A. 將大臨界區拆分為多個小臨界區
B. 使用無鎖數據結構替代互斥量
C. 通過std::future
傳遞計算結果
D. 在單核系統上使用忙等待(busy-wait) -
關于條件變量的正確使用方式:
A. 必須在循環中檢查謂詞
B.notify_one()
比notify_all()
更高效
C. 可以脫離互斥量單獨使用
D. 虛假喚醒(spurious wakeup)是必須處理的 -
以下哪些是"鎖護送"(Lock Convoy)的表現?
A. 多個線程頻繁爭搶同一互斥量
B. 線程因鎖競爭頻繁切換上下文
C. 互斥量的持有時間極短
D. 使用讀寫鎖(std::shared_mutex
) -
減少共享數據競爭的方法包括:
A. 使用線程局部存儲(TLS)
B. 復制數據到各線程獨立處理
C. 通過消息隊列傳遞數據
D. 增加互斥量的數量 -
關于
std::promise
和std::future
的正確說法是:
A.std::promise
只能設置一次值
B.std::future
的get()
會阻塞直到結果就緒
C. 多個線程可以共享同一個std::future
對象
D.std::async
返回的std::future
可能延遲執行 -
關于原子變量與互斥量的對比,正確的說法是:
A. 原子變量適用于簡單數據類型
B. 互斥量能保護復雜操作序列
C. 原子變量的fetch_add
是原子的
D. 互斥量比原子變量性能更好
設計題
- 實現一個線程安全的無鎖(lock-free)隊列
要求:
- 使用原子操作實現
push
和pop
- 處理ABA問題
- 提供測試代碼驗證并發操作正確性
- 設計一個生產者-消費者模型
要求:
- 使用
std::condition_variable
和std::mutex
- 隊列長度限制為固定大小
- 支持多生產者和多消費者
- 提供測試代碼模擬并發場景
- 實現基于
std::async
的并行任務執行器
要求:
- 支持提交任意可調用對象
- 自動回收已完成的任務資源
- 限制最大并發線程數為CPU核心數
- 測試代碼展示并行加速效果
- 優化高競爭場景下的計數器
要求:
- 使用線程局部存儲(TLS)分散寫操作
- 定期合并局部計數到全局變量
- 對比普通原子計數器與優化版本的性能差異
- 提供測試代碼和性能統計輸出
5 實現一個讀寫鎖(Read-Write Lock)
要求:
- 支持多個讀者或單個寫者
- 避免寫者饑餓(writer starvation)
- 基于
std::mutex
和std::condition_variable
實現 - 測試代碼驗證讀寫操作的互斥性
答案與詳解
多選題答案:
-
B
解析:數據競爭的條件是至少一個線程寫且無同步措施。A只有讀不沖突,C/D有同步機制。 -
B, D
解析:std::async
默認策略非延遲(C++11起為std::launch::async|deferred
),B正確,D因std::thread
無直接返回值機制正確。 -
A, C
解析:A可能導致回調中再次加鎖;C遞歸鎖允許同一線程重復加鎖但需對應解鎖次數,誤用仍可能死鎖。 -
A, B
解析:memory_order_relaxed
無順序保證,B正確,C中acquire
保證后續讀的可見性,D錯誤(原子操作無需volatile
)。 -
A, B, C
解析:D在單核忙等待浪費CPU,其余均為有效優化手段。 -
A, B, D
解析:C錯誤,條件變量必須與互斥量配合使用。 -
A, B
解析:鎖護送表現為頻繁爭搶導致線程切換,C短持有時間反而減少競爭,D無關。 -
A, B, C
解析:D增加鎖數量可能加劇競爭,其余均為減少競爭的有效方法。 -
A, B, D
解析:C錯誤,std::future
不可多線程同時調用get()
。 -
A, B, C
解析:D錯誤,互斥量在低競爭時性能可能更差。
設計題答案示例
- 無鎖隊列實現(部分代碼)
#include <atomic>
#include <memory>template<typename T>
class LockFreeQueue {
private:struct Node {std::shared_ptr<T> data;std::atomic<Node*> next;Node() : next(nullptr) {}};std::atomic<Node*> head;std::atomic<Node*> tail;public:LockFreeQueue() : head(new Node), tail(head.load()) {}void push(T value) {Node* new_node = new Node;new_node->data = std::make_shared<T>(std::move(value));Node* old_tail = tail.load();while (!old_tail->next.compare_exchange_weak(nullptr, new_node)) {old_tail = tail.load();}tail.compare_exchange_weak(old_tail, new_node);}std::shared_ptr<T> pop() {Node* old_head = head.load();while (old_head != tail.load()) {if (head.compare_exchange_weak(old_head, old_head->next)) {std::shared_ptr<T> res = old_head->next->data;delete old_head;return res;}}return nullptr;}
};// 測試代碼
int main() {LockFreeQueue<int> queue;queue.push(42);auto val = queue.pop();if (val && *val == 42) {std::cout << "Test passed!\n";}return 0;
}
- 生產者-消費者模型
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>template<typename T>
class SafeQueue {
private:std::queue<T> queue;std::mutex mtx;std::condition_variable cv;size_t max_size;public:SafeQueue(size_t size) : max_size(size) {}void push(T item) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return queue.size() < max_size; });queue.push(std::move(item));cv.notify_all();}T pop() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return !queue.empty(); });T val = std::move(queue.front());queue.pop();cv.notify_all();return val;}
};// 測試代碼
int main() {SafeQueue<int> q(10);std::thread producer([&q] {for (int i = 0; i < 10; ++i) {q.push(i);}});std::thread consumer([&q] {for (int i = 0; i < 10; ++i) {int val = q.pop();std::cout << "Got: " << val << '\n';}});producer.join();consumer.join();return 0;
}
其他設計題目的答案, 稍后補充