????????前言:本文將要介紹的高并發內存池,它的原型是Google的?個開源項?tcmalloc,全稱Thread-Caching Malloc,近一個月我將以學習為目的來模擬實現一個精簡版的高并發內存池,并對核心技術分塊進行精細剖析,分享在專欄《高并發內存池》里,期待小伙伴們的熱情支持與關注!
項目專欄:高并發內存池_敲上癮的博客-CSDN博客
目錄
一、項目介紹
二、Thread Cache實現
1.框架設計
2.自由鏈表封裝
3.對齊數計算和下映射計算
三、源碼
1.Common.h
2.ConcurrentAlloc.h
3.ThreadCache.h
3.ThreadCache.cpp
5.UnitTest.cpp?
一、項目介紹
? ? ? ? 該篇文章將要介紹的?并發的內存池,它的原型是google的?個開源項?tcmalloc,全稱Thread-Caching Malloc,即線程緩存的malloc,實現了?效的多線程內存管理,?于替代系統的內存分配相關的函數(malloc、free)。
????????在多線程環境下進行內存申請本質上屬于對公共資源的訪問,高并發場景下對內存分配鎖的競爭會異常激烈,這將嚴重拖慢程序運行效率。此外,頻繁的系統級內存申請不僅會產生額外開銷,還可能引發內存碎片問題。傳統的malloc內存分配機制由于這些固有缺陷,已難以滿足現代高性能開發中對內存管理效率的需求。
而tcmalloc在高并發場景下是如何高效、安全的分配和釋放內存呢?它的設計核心如下:
- 減少鎖競爭:傳統內存分配器使用全局鎖保護內存管理結構,高并發下鎖競爭嚴重。高并發內存池通過以下方式降低鎖沖突:
- 線程本地緩存(Thread Local Storage, TLS):每個線程維護獨立的內存緩存,大部分分配/釋放操作在本地完成,無需加鎖。
- 分層設計:將內存管理分為線程本地緩存、中央共享緩存和全局堆,逐層解耦。
- 內存預分配:預先分配大塊內存(如分頁或固定大小的塊),減少頻繁的系統調用(如
brk
或mmap
)。 - 內存分類管理:
- 按大小劃分:小對象(如<64KB)、中對象和大對象采用不同分配策略。
- 對齊與碎片優化:固定大小的內存塊減少內存碎片。
concurrent memory pool主要由以下3個部分構成:
?
thread cache:
????????線程緩存是每個線程獨有的,?于?于256KB的內存的分配,線程從這?申請內存不需要加鎖,每個線程獨享?個cache,這也是該并發線程池?效的地?。
central cache:
????????中?緩存是所有線程所共享,thread cache是按需從central cache中獲取的對象。central cache合適的時機回收thread cache中的對象,避免?個線程占?了太多的內存,?其他線程的內存吃緊,達到內存分配在多個線程中均衡調度的?的。central cache是存在競爭的,所以從這?取內存對象是需要加鎖,?先這??的是桶鎖,其次只有thread cache的沒有內存對象時才會找central cache,所以這?競爭不會很激烈。
page cache:
?????????緩存是在central cache緩存上?的?層緩存,存儲的內存是以?為單位存儲及分配的,central cache沒有內存對象時,從page cache分配出?定數量的page,并切割成定???的?塊內存,分配給central cache。當?個span的?個跨度?的對象都回收以后,page cache會回收central cache滿?條件的span對象,并且合并相鄰的?,組成更?的?,緩解內存碎?的問題。
二、Thread Cache實現
Thread Cache實現有些類似定長內存池,大家有興趣可以閱讀一下:
定長內存池原理及實現-CSDN博客
但是定長內存池有一個缺點,即它每次申請的空間只能是固定的,它的核心原理是:
????????先向系統申請一大塊空間,然后當我們再需要申請內存時就從這塊空間上獲取,我們把空間用完后不用急著釋放回系統,而是把它用一個自由鏈表連接起來,進行二次利用。也就是說我們再次申請空間時直接到自由鏈表上找,如果沒有再去大空間上找,如果還不夠再去向系統申請。
????????但我們希望從開辟的內存池申請的空間不是固定的,它可以是4字節、10字節、50字節,500字節等等。申請多少字節不是關鍵的問題,因為字節大小本來就是可以隨意申請的,問題在于這些空間被棄用后,我們如何把它們管理起來并進行二次利用。
????????定長內存池使用一個自由鏈表來維護棄用的空間,那么我們用多個自由鏈表來維護不同的字節大小空間,需要多少字節就到指定的自由鏈表去找不就行了嗎?答案是:是的!
? ? ? ? 為了高效的找到相應的自由鏈表,我們可以做一個哈希表。如下:
?
需要注意幾個問題:
- 不能直接一個任意字節就做一個對應自由鏈表,這樣數組的開銷太大了,需要引入對齊機制,比如我們需要5字節,6字節,8字節等都統一申請8字節,如果需要9字節,15字節等都統一申請16字節。當然這是會造成一定的內存浪費的,像這樣因為對齊帶來的內存浪費把它稱為內碎片。
- 因為要做成自由鏈表所以為了保證在任何環境下都能存下一個指針的大小,最少的對齊數必須是8 byte。
- 因為對齊會產生內碎片,所以對齊數的選取是很重要的,如果我們就以8的倍數來選定對齊數,它需要開辟的數組大小是256*1024/8=32768,這未免有些太大,我們可以根據字節大小采用不同分配策略。
為突出該哈希表的性質,下文就稱它為自由鏈表桶。
1.框架設計
????????首先用一個common.h文件來寫一些需要用到的公共的類,比如自由鏈表的封裝,對齊數的計算 和 哈希表下標映射的計算也封裝成類放在里面。如下:
FreeList:
- Push:把用完的內存插入鏈表中。
- Pop:從鏈表中取出內存。
- Empty:判斷該自由鏈表中是否有節點,以便判斷是否需要到central cache中取內存。
- _list_head:儲存自由鏈表的頭。
SizeClass:
- RoundUp:計算對齊數和對齊后實際需要申請的空間大小。
- Index:計數自由鏈表桶中對應的下標。
????????這里把它寫成靜態成員函數是為方便在沒有類對象情況下對函數的直接調用,因為現在并沒有成員變量所以沒必要實例化出對象。?
? ? ? ? 可以把它做為內聯函數,減少函數棧幀的開辟。
創建ConcurrentAlloc.h,在這個文件里提供一些直接能讓用戶使用的類和接口。我們封裝一個ConcurrentAlloc類,如下:
?
- ConAlloc:用來給讓用戶申請內存。
- ConFree:用來讓用戶釋放內存。
????????接下來就是創建ThreadCache.h文件,來聲明一些在thread cache這一層需要用到的類方法等。如下:
?
- Allocate:在自由鏈表桶里面申請空間。
- Deallocate:把不用的空間放到自由鏈表桶。
- FetchFromCentralCache:當自由鏈表里沒有空間時到Central Cache這一層獲取,該函數在該篇文章不進行實現,請期待下一篇文章的講解。
- _freelists[ ]:自由鏈表桶。
2.自由鏈表封裝
Push實現
? ? ? ? ?Push功能是把不用的空間放到自由鏈表中,因為頭插效率比較高時間復雜度為O(1),我們直接把它頭插到自由鏈表中。接下來就是頭插操作:
????????把這塊需要插入到鏈表的空間記為obj,首先把_list_head值儲存到obj空間的前4/8字節,因為并不確定用戶用的是32位系統還是64位系統,所以可以用這樣一個操作來解決:
*(void **)obj?= _list_head;
然后讓obj成為新的頭,即_list_head = obj,如下:
void Push(void *obj)
{*(void **)obj = _list_head;_list_head = obj;
}
Pop實現
? ? ? ? Pop是從自由鏈表中獲取到內存,我們直接取頭節點即可,如下:
void *Pop()
{void *obj = _list_head;_list_head = *(void **)obj;return obj;
}
- void *obj = _list_head:取到頭指針,并保存到obj中。
- _list_head = *(void **)obj:把_list_head更新為頭節點指向的下一個節點,也可寫為 _list_head = *(void**)_list_head;
Empty實現
? ? ? ? 直接返回 _list_head == nullptr即可
3.對齊數計算和下映射計算
在這里做這樣一個規則把內存浪費率控制在10%左右:
[1,128]? | 8byte對齊 | _freelist[0,16) |
[128+1,1024]? | 16byte對齊 | _freelist[16,72) |
[1024+1,8*1024]? | 128byte對齊 | _freelist[72,128) |
[8*1024+1,64*1024] | 1024byte對齊? | _freelist[128,184) |
[64*1024+1,256*1024]? | 8*1024byte對齊 | _freelist[184,208) |
浪費率:
? ? ? ? (浪費的字節數 / 對齊后的字節數)*100%
????????[1,128] 區間對齊數為8byte,所以最大的浪費數為7byte,而區間內的前8個字節對齊后申請的內存是8,所以浪費率 = (7/8)*100%87.50%
同理各個區間的浪費率為:
[1,128] | (7/8)*100% |
[128+1,1024] | (15/16*9)*100% |
[1024+1,8*1024] | (127/128*9)*100% |
[8*1024+1,64*1024] | (1023/1024*9)*100% |
[64*1024+1,256*1024] | ( (8*1024-1) / (8*1024*9) )*100% |
雖然[1,128]區間浪費率有點高但實際浪費的并不多,無傷大雅。
對齊后字節大小計算
????????接下來就是計算對齊數,也就是讓用戶任意扔一個數字過來,需要通過對齊規則計算它實際開辟的空間。在RoundUp中實現。
我們可以用 if else 把數據分開處理,如下:
static inline int RoundUp(size_t bytes)
{if (bytes <= 128)return _RoundUp(bytes, 8);else if (bytes <= 1024)return _RoundUp(bytes, 16);else if (bytes <= 8 * 1024)return _RoundUp(bytes, 128);else if (bytes <= 64 * 1024)return _RoundUp(bytes, 1024);else if (bytes <= 256 * 1024)return _RoundUp(bytes, 8 * 1024);else{assert(false);return 1;}
}
????????_RoundUp函數實現也很簡單,如果這個數本身就對齊直接返回即可,如果不是再做簡單的處理,如下:?
int _RoundUp(size_t size, size_t alignNum)
{if (size % alignNum == 0)return size;elsereturn (size / alignNum + 1) * alignNum;
}
但是代碼中又是取模,又是除,效率還略遜一籌,可以考慮把它改成位運算,即:
static inline int _RoundUp(size_t bytes, size_t alignNum)
{return (bytes + alignNum - 1) & ~(alignNum - 1);
}
- bytes + (alignNum - 1):把這個過程也當成為運算,作用是把bytes中不足下一個alignNum倍數的部分補上。
- &~(alignNum - 1):再把補過頭的部分消去。
這里大家可以帶進去數值來理一理。
下標映射計算
????????接下來是實現Index,假設我們已經知道用戶要申請的空間大小(size)和對齊數是2的多少次方(align_shift),那么可以做這樣的計算:
static inline int _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
????????1左移align_shift位得到對齊數,再減1,加上bytes,作用是把bytes中不足下一個alignNum倍數的部分補上。然后右移align_shift相當于除以對齊數,最后再減去1因為數組下標是從0開始的。
????????以上計算出來的只是這個對齊后的字節數?對于相應 對齊數 在數組上起始位置的相對位置,還需要加上前面對齊占用的數組元素個數才能得到正確的下標。即:
static inline int _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline int Index(size_t bytes)
{// 每個區間有多少個鏈static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128)return _Index(bytes, 3);else if (bytes <= 1024)return _Index(bytes, 4)+group_array[0];else if (bytes <= 8 * 1024)return _Index(bytes, 7)+group_array[0]+group_array[1];else if (bytes <= 64 * 1024)return _Index(bytes, 10)+group_array[0]+group_array[1]+group_array[2];else if (bytes <= 256 * 1024)return _Index(bytes, 13)+group_array[0]+group_array[1]+group_array[2]+group_array[3];else{assert(false);return 1;}
}
????????回顧上文框架,以上我們相當于完成了common.h文件,接下來處理ThreadCache和ConcurrentAlloc的封裝就簡單多了。
????????剛才我們設計了對齊數,接下來就可以確定自由鏈表桶的大小了,即208(從對齊規則表里得到)。我們可以把208做成一個全局的?static const size_t?類型 或者 宏定義。
單獨創建一個ThreadCache.cpp文件用來做接口的實現:
#include "ThreadCache.h"
void* ThreadCache::Allocate(size_t bytes)
{//得到對齊后的字節數int objSize = SizeClass::RoundUp(bytes);//得到自由鏈表桶的下標映射int intex = SizeClass::Index(bytes);//先去鏈表桶里面申請內存,如果沒有到CentralCache中獲取if(!_freelists[intex].Empty())return _freelists[intex].Pop();elsereturn FetchFromCentralCache(intex, objSize);
}
void ThreadCache::Deallocate(void* obj,size_t bytes)
{int intex = SizeClass::Index(bytes);_freelists[intex].Push(obj);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//... ...return nullptr;
}
????????我們希望每個線程都有自己獨立的Thread Cache對象,但又不想讓用戶在線程內自己申請。那么可以直接把它定義為全局變量 并在前面加上static thread_local關鍵字,讓各自線程在自己的線程局部儲存內都申請一份Thread Cache對象,而不是共享。
所以在ThreadCache.h文件中定義:
????????static thread_local ThreadCache* pTLSThreadCache = nullptr;
ConcurrentAlloc封裝如下:
#pragma once
#include "Common.h"
#include "ThreadCache.h"
class ConcurrentAlloc
{
public:static void* ConAlloc(size_t size){if(pTLSThreadCache == nullptr)pTLSThreadCache = new ThreadCache;return pTLSThreadCache->Allocate(size);}static void ConFree(void* ptr,size_t size){assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr,size);}
};
三、源碼
1.Common.h
#pragma once
#include <iostream>
#include <assert.h>
#include <thread>
using namespace std;
class FreeList
{
public:void Push(void *obj){*(void **)obj = _list_head;_list_head = obj;}void *Pop(){void *obj = _list_head;_list_head = *(void **)obj;return obj;}bool Empty(){return _list_head == nullptr;}private:void *_list_head = nullptr;
};
class SizeClass
{// 整體控制在最多10%左右的內碎片浪費// [1,128] 8byte對齊 freelist[0,16)// [128+1,1024] 16byte對齊 freelist[16,72)// [1024+1,8*1024] 128byte對齊 freelist[72,128)// [8*1024+1,64*1024] 1024byte對齊 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte對齊 freelist[184,208)
public:static inline int _RoundUp(size_t bytes, size_t alignNum){return (bytes + alignNum - 1) & ~(alignNum - 1);}static inline int RoundUp(size_t bytes){if (bytes <= 128)return _RoundUp(bytes, 8);else if (bytes <= 1024)return _RoundUp(bytes, 16);else if (bytes <= 8 * 1024)return _RoundUp(bytes, 128);else if (bytes <= 64 * 1024)return _RoundUp(bytes, 1024);else if (bytes <= 256 * 1024)return _RoundUp(bytes, 8 * 1024);else{assert(false);return 1;}}static inline int _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline int Index(size_t bytes){static int group_array[4] = {16, 56, 56, 56};if (bytes <= 128)return _Index(bytes, 3);else if (bytes <= 1024)return _Index(bytes, 4) + group_array[0];else if (bytes <= 8 * 1024)return _Index(bytes, 7) + group_array[0] + group_array[1];else if (bytes <= 64 * 1024)return _Index(bytes, 10) + group_array[0] + group_array[1] + group_array[2];else if (bytes <= 256 * 1024)return _Index(bytes, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];else{assert(false);return 1;}}private:
};
2.ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include "ThreadCache.h"
class ConcurrentAlloc
{
public:static void* ConAlloc(size_t size){pTLSThreadCache = new ThreadCache;return pTLSThreadCache->Allocate(size);}static void ConFree(void* ptr,size_t size){assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr,size);}
};
3.ThreadCache.h
#pragma once
#include "Common.h"
static int const FREE_LIST_SIZE = 208;
class ThreadCache
{
public://從Thread Cache中申請void* Allocate(size_t bytes);void Deallocate(void* obj,size_t bytes);//CentralCache中申請void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freelists[FREE_LIST_SIZE];
};
static thread_local ThreadCache* pTLSThreadCache = nullptr;
3.ThreadCache.cpp
#include "ThreadCache.h"
void* ThreadCache::Allocate(size_t bytes)
{int objSize = SizeClass::RoundUp(bytes);int intex = SizeClass::Index(bytes);if(!_freelists[intex].Empty())return _freelists[intex].Pop();elsereturn FetchFromCentralCache(intex,objSize);
}
void ThreadCache::Deallocate(void* obj,size_t bytes)
{int intex = SizeClass::Index(bytes);_freelists[intex].Push(obj);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//... ...return nullptr;
}
5.UnitTest.cpp?
#include "ConcurrentAlloc.h"
void Alloc1()
{for(int i=0;i<5;i++)ConcurrentAlloc::ConAlloc(7);
}
void Alloc2()
{for(int i=0;i<5;i++)ConcurrentAlloc::ConAlloc(8);
}
int main()
{thread t1(Alloc1);thread t2(Alloc2);t1.join();t2.join();return 0;
}