內存池核心介紹
廢話不多說,show you code.
我實現了兩套內存池,一個是固定大小的內存池,一個是多重不同大小的內存池。
Fixed size memory pool
設計思路:
我們一個個看,首先我們定義了一個chunk, chunk 里面包含了n個block, 每個block都存放著一個data數據和一個next的指針,這個指針指向下一個block,形成一個鏈表。我們分配的時候是以一個個chunk來分配的,我們會把空閑的block 存放在freelist 里面。所以每次分配的時候都會先從freelist 中查看是否有空閑的block, 如果沒有,再分配一個chunk。 但是呢,我們內存池考慮到thread-safe, 又要考慮到性能,所以使用了無鎖操作,也就是原子操作(atomic)。 但是實際下來還是性能比較差,所以進一步優化使用threadcache, 也就是每個thread_local。這樣就保證了每個thread都有自己的一份cache, 每次先分配的內存的時候就先從cache 分配,分配完再從freelist 中取。最后我們會把每個chunk數據push到LockFreeStack 里面管理,避免數據泄露。
LockFreeStack
無鎖棧代碼比較簡單:
- 我們首先定義鏈表的節點Node,這個類是個模板類型的template, T就是數據的格式,next是指向下個節點的指針。Node的構造函數聲明了一個可變參數模板,允許接受任意數量和類型的參數,并使用std::forward 完美轉發到data成員的構造函數。
struct Node {T data;std::atomic<Node*> next;template<typename... Args>Node(Args&&... args): data(std::forward<Args>(args)...), next(nullptr) {}
};
std::atomic<Node*> head;
-
push 操作:
- 使用
std::move(item)
將傳入的item值移動到新節點,避免不必要的復制, 并且利用了Node
類中的完美轉發構造函數 - 使用
std::memory_order_relaxed
原子加載當前的頭節點, 目前沒有同步需求。 - 使用頭插法把新節點插入到當前節點的前一個,這里循環條件使用CAS(Compare-And-Swap)操作嘗試更新頭節點
compare_exchange_weak
會比較head
與oldNode
是否相等- 如果相等,則將
head
設為newNode
并返回true
,循環結束 - 如果不相等(可能其他線程修改了
head
),則將oldNode
更新為當前的head
值并返回false
,循環繼續
void push(T item) {auto* newNode = new Node(std::move(item));Node* oldNode = head.load(std::memory_order_relaxed);do{newNode->next.store(oldNode, std::memory_order_relaxed);} while(!head.compare_exchange_weak(oldNode, newNode, std::memory_order_release, std::memory_order_relaxed)); }
- 使用
-
pop操作:
- 讀取當前棧頂節點指針,使用寬松內存序
- 如果頭節點為
nullptr
,表示棧為空,返回false
- 否則:使用CAS操作原子地更新頭節點:
- 比較當前頭節點是否仍為
oldNode
- 若相同,將頭節點更新為
oldNode->next
并返回true
- 若不同(其他線程已修改頭節點),則更新
oldNode
為當前頭節點值并返回false
- 比較當前頭節點是否仍為
- 成功更新頭節點后,將原頭節點的數據移動到結果變量
- 釋放原頭節點內存
- 返回操作成功
bool pop(T& result) {Node* oldNode = head.load(std::memory_order_relaxed);if (oldNode) {if (head.compare_exchange_weak(oldNode, oldNode->next.load(std::memory_order_relaxed), std::memory_order_release, std::memory_order_relaxed)) {result = std::move(oldNode->data);delete oldNode;return true;}}return false; }
這里我們為什么使用
compare_exchange_weak
而不是compare_exchange_strong
呢?compare_exchange_weak
允許在某些平臺上"假失敗"(spurious failure),即使比較的值實際相等,但在實現上可能會返回失敗。這種設計讓它能夠:- 在某些CPU架構上有更高的性能
- 避免昂貴的內存柵欄(memory barrier)操作
- 特別在ARM等架構上實現更高效
- compare_exchange_strong: 保證只在實際值不等于期望值時返回失敗, 如果失敗需要重試,性能開銷比較大。
- compare_exchange_weak: 可能出現假失敗,但在循環中使用時,這種差異不影響結果正確性
LockFreeFixedSizePool
無鎖的固定大小內存池。
-
首先看下我們的block的定義,這個結構體包含三個主要部分:
alignas(T)
指令:確保Block結構體與T類型有相同的對齊要求,防止潛在的內存對齊問題data
數組:- 類型:
std::byte
數組,大小為sizeof(T)
- 作用:存儲T類型對象的原始內存
- 布局:位于Block結構體起始位置
- 類型:
next
指針:- 類型:
std::atomic<Block*>
- 作用:指向鏈表中的下一個Block,用于構建無鎖鏈表
- 初始值:nullptr
- 類型:
as_object()
方法:- 功能:將
data
數組轉換為T類型指針 - 實現:使用
reinterpret_cast
進行類型轉換 - 返回:指向
data
內存區域的T*
指針
- 功能:將
- 當對象被釋放時,Block不會被系統回收,而是返回到池中供后續使用。每個Block作為無鎖鏈表的節點,通過
next
原子指針鏈接。 將data
數組放在結構體起始位置,使得T對象指針可直接轉換為Block指針。 當用戶從內存池獲取內存時,用戶拿到的是指向data
數組的T*
指針。因為data
位于Block結構體的起始位置,并且通過alignas(T)
保證了對齊要求,所以可以安全地在deallocate
時,將用戶的T*
指針轉回Block*
指針。
struct alignas(T) Block {std::byte data[sizeof(T)]; // 每個block的大小為Tstd::atomic<Block*> next{nullptr};T* as_object() { return reinterpret_cast<T*>(data);} };
-
chunk的結構體有兩個成員:
blocks
: 一個指向Block
數組的智能指針,使用std::unique_ptr<Block[]>
管理內存count
: 表示這個Chunk
中包含的Block
數量.Chunk
實現了批量內存分配策略:- 一次性分配多個
Block
對象 - 默認情況下,
N = 4096
,單個Block
大小為sizeof(Block)
- 實際分配的
Block
數量由BLOCK_PER_CHUNK = N / sizeof(Block)
決定
- 一次性分配多個
std::unique_ptr<Block[]>
為Chunk
提供了自動內存管理:- 當
Chunk
被銷毀時,blocks
指向的內存會被自動釋放 - 避免了手動內存管理的復雜性和潛在的內存泄漏
- 當
- 構造函數接收一個
block_count
參數,并完成兩個操作:- 分配指定數量的
Block
對象:std::make_unique<Block[]>(block_count)
- 存儲塊數量:
count(block_count)
- 分配指定數量的
struct Chunk { // 每個chunk包含N個blockstd::unique_ptr<Block[]> blocks;size_t count;Chunk(size_t block_count):blocks(std::make_unique<Block[]>(block_count)), count(block_count) {} };
-
再看下我們的ThreadCache的結構體的實現。ThreadCache是一個線程局部內的緩存系統,用于優化無鎖內存池的性能。它主要為每個線程提供本地內存緩存,減少全局競爭,提高分配效率。
- 這里為什么把緩存的block塊設置為32, 并且每次取的大小設置為16。
- 減少競爭:每次獲取多個塊,減少對全局空閑列表的訪問頻率。
- 適中開銷: 16個塊是內存和性能的平衡點,既不會一次占用過多內存,又能有效減少同步操作
- 批量效率:填充緩存時批量獲取更高效,如在后面的
fill_local_cache()
中使用。
- 這里我們為什么需要一個
pool_instance
的指針。因為我們在內存歸還給全局freelist, 所以這里我們傳遞一個指向對象的指針。
struct ThreadCache {Block* blocks[32];int count = 0;static constexpr size_t BATCH_SIZE = 16; // 批量獲取塊的數量LockFreeFixedSizePool<T, N>* pool_instance = nullptr;~ThreadCache() {if (count > 0 && pool_instance) {return_thread_cache();}}// 將線程本地緩存歸還全局鏈表void return_thread_cache() {if (count == 0 || !pool_instance) return;//將本地緩存鏈接成鏈表for (int i = 0; i < count - 1; ++i) {blocks[i]->next.store(blocks[i+1], std::memory_order_relaxed);}Block* old_head = pool_instance->free_list.load(std::memory_order_relaxed);Block* new_head = blocks[0];Block* new_tail = blocks[count - 1];do {new_tail->next.store(old_head, std::memory_order_relaxed);} while (!pool_instance->free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));count = 0;} };
- 這里為什么把緩存的block塊設置為32, 并且每次取的大小設置為16。
-
繼續看下我們的類成員的定義:
- 我們要把ThreadCache 聲明為thread_local 類型,這樣每個線程都會存在一份緩存
- free_list 為全局緩存,所有空閑的block塊都會鏈接到這個list 里面。
- chunks 分配的chunk由LockFreeStack 來管理,避免內存泄露
- allocate_count 為分配一個block的計數, deallocated_count 為歸還一個block的計數。
static inline thread_local ThreadCache local_cache;static inline constexpr size_t BLOCK_PER_CHUNK = N / sizeof(Block);std::atomic<Block*> free_list{nullptr}; LockFreeStack<std::unique_ptr<Chunk>> chunks; std::atomic<size_t> allocate_count{0}; std::atomic<size_t> deallocated_count{0};
-
分配一個新的chunk.
- 先使用make_unique分配一個chunk的大小,其中包含BLOCK_PER_CHUNK的block。
- 獲取block的指針。
- 這里我們先把block串聯起一個鏈表,然后使用一次原子操作即可。
- 第一次分配的都是空的,所有把所有的block都放進free_list 里面去。
- 最后把chunk放到LockFreeStack里面管理。
void allocate_new_chunk() {auto chunk = std::make_unique<Chunk>(BLOCK_PER_CHUNK);Block* blocks = chunk->blocks.get();// 準備鏈表,只在最后一步進行一次原子操作auto* first_block = &blocks[0];for (size_t i = 0; i < chunk->count - 1; ++i) {blocks[i].next.store(&blocks[i+1], std::memory_order_relaxed);}Block* old_head = free_list.load(std::memory_order_relaxed);do {blocks[chunk->count-1].next.store(old_head, std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, first_block, std::memory_order_release, std::memory_order_relaxed));//將所有塊鏈接到空閑列表chunks.push(std::move(chunk)); }
-
然后我們再看看怎么把free_list 里面的數據批量取出來放到thread_cache里面去。
- 首先獲取這個對象的instance, 如果緩存還有剩余,那么不需要從free_list里面取。
- 否則檢查free_list 有沒有空閑的塊了,如果沒有分配一個新的chunk.
- 這里我們直接從free_list 里面取ThreadCache::BATCH_SIZE 大小的block的鏈表,把獲取的塊放到chche的數組里面。
// 批量獲取塊到本地緩存 void fill_local_cache(ThreadCache& cache) {cache.pool_instance = this;// 如果本地緩存還有剩余,則直接返回if (cache.count > 0) return;// 從全局空閑列表中獲取塊int batch_size = ThreadCache::BATCH_SIZE;Block* head = nullptr;Block* tail = nullptr;Block* new_head = nullptr;Block* old_head = free_list.load(std::memory_order_acquire);do {// 如果全局鏈表為空,分配新的塊if (!old_head) {allocate_new_chunk();old_head = free_list.load(std::memory_order_acquire);if (!old_head) return; // 如果還是仍然為空,返回}head = old_head;Block* current = old_head;int count = 0;while (count < batch_size - 1 && current->next.load(std::memory_order_relaxed)) {current = current->next.load(std::memory_order_relaxed);++count;}tail = current;new_head = tail->next.load(std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));// 將獲取到的塊存儲到本地緩存Block* current = head;int i = 0;while (current != new_head) {cache.blocks[i++] = current;current = current->next.load(std::memory_order_relaxed);}cache.count = i; }
- 分配函數
- allocate 是參數是一個可變參數,允許n個參數原封不動傳給T對象。
- 我們首先獲取數據從cache里面獲取,如果沒有,調用fill_local_cache 緩存cache數據。
- 如果本地緩存失敗了(一般不會出現,除非沒內存了), 從free_list 分配一個block, 如果free_list 也沒有了,則分配一個新的chunk。 最后返回指向數據的指針,也就是Block的首地址。
template<typename... Args> T* allocate(Args&&... args) {if (local_cache.count == 0) {fill_local_cache(local_cache);}Block* block = nullptr;// 嘗試從本地緩存獲取塊if (local_cache.count > 0) {block = local_cache.blocks[--local_cache.count];} else {// 本地緩存填充失敗,直接從全局獲取單個塊Block* old_block = free_list.load(std::memory_order_acquire);// 先嘗試從空閑列表中獲取一個塊while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}// 如果空閑列表為空,則分配一個新的塊if (!block) {allocate_new_chunk();old_block = free_list.load(std::memory_order_acquire);while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}}}allocate_count.fetch_add(1);if (block) {// 構造新對象return new (block->as_object()) T(std::forward<Args>(args)...);}return nullptr; }
- 最后看下deallocate的函數
- 釋放的時候先會去調用對象的析構函數
- 把數據的指針轉為block的指針
- 這里我們有個策略就是如果本地緩存的數量,有可能歸還超過總的大小,所以我們會預先檢測本地緩存的是否已經大于了容量的80%, 如果已經超過的話,就先把一半的緩存還給free_list 里面,這樣確保cache里面有足夠的空間。
- 最后再把block 放到cache里面去。
void deallocate(T* ptr) {if (!ptr) return;ptr->~T();/*Block 結構的第一個成員就是 data 數組as_object() 方法返回的是 data 數組的起始地址通過 alignas(T) 確保 Block 的對齊要求滿足 T 的需求因此,指向 T 對象的指針實際上就是指向 Block 結構體中 data 數組的指針,而 data 數組又在 Block 的起始位置,所以可以安全地將 T 指針轉回 Block 指針。內存布局:Block 對象:+----------------------+| data[sizeof(T)] | <-- 用戶獲得的 T* 指針指向這里+----------------------+| next |+----------------------+*/Block* block = reinterpret_cast<Block*>(ptr);local_cache.pool_instance = this;// 確定本地緩存容量const int cache_capacity = static_cast<int>(sizeof(local_cache.blocks) / sizeof(local_cache.blocks[0]));// 如果本地緩存已近容量的80%,預先歸還一半if (local_cache.count >= cache_capacity * 0.8) {int half_count = local_cache.count / 2;// 構建鏈表for (int i = 0; i < half_count - 1; ++i) {local_cache.blocks[i]->next.store(local_cache.blocks[i + 1], std::memory_order_relaxed);}// 歸還到全局鏈表Block* old_head = free_list.load(std::memory_order_relaxed);Block* cache_head = local_cache.blocks[0];Block* cache_tail = local_cache.blocks[half_count - 1];do {cache_tail->next.store(old_head, std::memory_order_relaxed);} while (!free_list.compare_exchange_weak(old_head, cache_head, std::memory_order_release, std::memory_order_relaxed));// 壓縮剩余緩存for (int i = 0; i < local_cache.count - half_count; ++i) {local_cache.blocks[i] = local_cache.blocks[i + half_count];}local_cache.count -= half_count;}// 現在將新塊添加到本地緩存local_cache.blocks[local_cache.count++] = block;deallocated_count.fetch_add(1, std::memory_order_relaxed); }
- 測試結果(和標準的分配器)
- 可以看到比標準的分配器還是快了一點。PS: 后面有時間可以繼續優化下。
Standard allocator time: 2450 microseconds
Fixed size pool time: 1790 microseconds
源代碼: MemoryPool