無鎖隊列實現與內存回收機制:Hazard Pointer 深度解析
在并發系統中,為了提升性能和避免鎖競爭,我們常常追求 lock-free 數據結構。但當你實現完一個無鎖隊列后,會發現一個嚴重問題:
內存什么時候釋放?怎樣避免訪問已經被回收的節點?
Hazard Pointer(危險指針)機制,就是為了解決這一痛點。
本篇將帶你深入理解:
- 什么是無鎖隊列(MS Queue)
- 為什么需要 Hazard Pointer
- 如何用 C++ 實現完整的無鎖隊列 + 安全回收機制
- 避免 ABA 問題的技巧
一、無鎖隊列背景:MS 隊列簡介
Michael-Scott 隊列(MS Queue)是經典的無鎖并發隊列,具備以下特性:
- 使用
std::atomic
實現 - 支持多生產者、多消費者
- 基于 CAS(Compare-And-Swap)進行并發控制
隊列結構:
head --> [dummy] --> [node1] --> [node2] --> nullptr^tail
- 使用 dummy 節點初始化隊列,避免空指針特判。
- enqueue 修改 tail,dequeue 修改 head。
二、C++ 無鎖隊列實現(簡化版)
template <typename T>
struct Node {std::atomic<Node*> next;T value;Node(const T& val) : next(nullptr), value(val) {}
};template <typename T>
class LockFreeQueue {
private:std::atomic<Node<T>*> head;std::atomic<Node<T>*> tail;public:LockFreeQueue() {Node<T>* dummy = new Node<T>(T{});head.store(dummy);tail.store(dummy);}void enqueue(const T& val) {Node<T>* new_node = new Node<T>(val);while (true) {Node<T>* last = tail.load();Node<T>* next = last->next.load();if (last == tail.load()) {if (!next) {if (last->next.compare_exchange_weak(next, new_node)) {tail.compare_exchange_weak(last, new_node);return;}} else {tail.compare_exchange_weak(last, next);}}}}bool dequeue(T& result) {while (true) {Node<T>* first = head.load();Node<T>* last = tail.load();Node<T>* next = first->next.load();if (first == head.load()) {if (first == last) {if (!next) return false; // emptytail.compare_exchange_weak(last, next);} else {result = next->value;if (head.compare_exchange_weak(first, next)) {// ?問題:何時 delete first ?return true;}}}}}
};
問題:節點釋放時機?
在并發環境下 dequeue
之后,其他線程可能仍在訪問那個被刪除的節點,不能立即 delete。
三、為什么需要 Hazard Pointer?
常見方法(失敗):
- 延遲 delete? 不知道什么時候沒人訪問。
- GC? C++ 沒有自動垃圾回收。
- 引用計數? 有性能開銷,難以 lock-free。
- 線程局部鏈表回收? 難以同步他人狀態。
Hazard Pointer 的核心思想:
每個線程維護一個“我正在訪問的指針”集合,回收前先檢查是否有人正在訪問。
四、Hazard Pointer 機制原理
數據結構:
// 線程局部
thread_local Node* my_hazard_ptr;// 全局列表
std::vector<std::atomic<Node*>> hazard_pointers;
工作流程:
-
訪問某節點前,將其地址寫入 hazard pointer。
-
刪除時,把要刪除的節點放入“回收候選集合”。
-
定期掃描所有線程的 hazard pointer:
- 如果節點不在任何 hazard 中,則可以安全 delete。
回收偽碼:
void retire_node(Node* node) {retired_list.push_back(node);if (retired_list.size() >= threshold) {scan_and_reclaim();}
}void scan_and_reclaim() {std::unordered_set<Node*> hp_set;for (auto& hp : hazard_pointers) {Node* p = hp.load();if (p) hp_set.insert(p);}auto it = retired_list.begin();while (it != retired_list.end()) {if (hp_set.count(*it) == 0) {delete *it;it = retired_list.erase(it);} else {++it;}}
}
五、完整工程結構(建議)
lockfree_queue/
├── include/
│ ├── lockfree_queue.h // MS隊列
│ └── hazard_pointer.h // Hazard機制封裝
├── src/
│ ├── hazard_pointer.cpp
│ └── main.cpp
├── test/
│ └── test_queue.cpp // GTest 測試用例
├── CMakeLists.txt
示例:Hazard 指針封裝類接口
class HazardPointerManager {
public:static void* get_slot(); // 獲取當前線程 hazard pointer 槽static void retire(void* ptr);static void reclaim(); // 回收所有未被引用的節點
};
六、ABA 問題與 Hazard Pointer 的結合防護
雖然 Hazard Pointer 可以避免野指針訪問,但不能防止 ABA 問題。
因此,通常會在無鎖隊列中配合使用 Tagged Pointer(帶版本號的指針)。
struct TaggedPtr {Node* ptr;uint64_t tag;
};
std::atomic<TaggedPtr> head;
通過不斷增加 tag
可以有效防止 ABA。
七、總結與建議
問題 | 是否解決 | 說明 |
---|---|---|
并發入隊 | ? | CAS 更新尾指針 |
并發出隊 | ? | CAS 更新頭指針 |
節點回收 | ? | Hazard Pointer 掃描后安全 delete |
ABA 問題 | ?(需 TaggedPtr) | 防止假象“未變化” |
八、參考資料
- Michael & Scott: “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms”
- Herb Sutter: “Lock-Free Programming (Hazard Pointers)”
- C++ Concurrency in Action 第二版
- Facebook Folly Hazard Pointer 實現
下期預告
《設計一個可擴展的 Lock-Free 環:支持動態擴容、線程綁定與優先級任務》