項目介紹
項目來源
本項目實現了一個高并發內存池,參考了Google的開源項目tcmalloc實現的簡易版;其功能就是實現高效的多線程內存管理。由功能可知,高并發指的是高效的多線程,而內存池則是實現內存管理的。
tcmalloc源碼
項目源碼
高并發內存池項目源碼
內存池相關知識
1、池化技術
池化技術就是程序先向系統申請過量的資源,并將這些資源管理起來,避免頻繁的申請和釋放資源導致的開銷。
內存池可以使用池化技術來維護可用內存塊的鏈表。當程序需要分配內存時,內存池會從鏈表中分配一個可用的內存塊。如果沒有可用的內存塊,內存池會從操作系統申請更多的內存,并將新分配的內存塊添加到鏈表中。當程序釋放內存時,內存池會將內存塊添加回鏈表中,以便將來使用。
池化技術可以有效地減少內存碎片,因為它可以將多個小內存塊組合成更大的內存塊,這樣就可以分配更大的連續內存空間,并減少碎片。此外,池化技術還可以提高內存使用效率,因為它可以快速分配和釋放內存,而無需每次都調用操作系統的內存分配和釋放函數。
2、內存池
內存池指的是程序預先向操作系統申請足夠大的一塊內存空間;此后,程序中需要申請內存時,不需要直接向操作系統申請,而是直接從內存池中獲取;同理,程序釋放內存時,也不是將內存直接還給操作系統,而是將內存歸還給內存池。當程序退出(或者特定時間)時,內存池才將之前申請的內存真正釋放。
3、內存池主要解決的問題
由上可知,內存池首要解決的是效率問題,其次從系統的內存分配器角度出發,還需要解決內存碎片的問題。那么什么是內存碎片問題呢?
內存碎片分為外碎片和內碎片。
-
外碎片由下圖所示:對于程序員申請的內存,可能因為頻繁的申請和釋放內存導致內存空間不連續,那么就會出現明明由足夠大的內存空間,但程序員卻申請不出連續的空間出來,這便是外碎片問題了。
-
內碎片則是由于一些對齊的需求,導致分配出去的內存空間無法被利用,比如本項目中的Round(Size)對size進行的對齊。
4、malloc
C語言中動態申請內存是通過malloc函數去申請內存的,但是實際上malloc并不是直接向堆申請內存的,而malloc也可以使用內存池來管理內存分配,在某些情況下,操作系統或C語言標準庫可能會使用內存池來管理堆內存,以提高內存分配效率。當程序將malloc管理的內存池中內存全部申請完時,malloc函數就會繼續向操作系統申請空間。
設計思路
第一階段–設計一個定長的內存池
我們知道malloc函數申請內存空間是通用的,即任何場景下都可以使用,但是各方面都通用就意味著各方面都不頂尖,那么我們可以設計一個定長內存池來保證特定場景下的內存申請效率要高于malloc函數。
?
適應平臺的指針方案
在這里,我們想取出一塊對象內存中的前4個字節(32位系統)或者8個字節(64位系統)的內存來存儲一個指針指向下一塊釋放回來的自由對象內存,那么在這里為了不作平臺系統的判斷,可以使用一個小技巧,即將對象內存強轉成void**的類型,那么再對這個二級指針類型解引用就可以取出當前對象的前4個字節(32位系統)或8個字節(64位系統)。
由于這個操作之后會頻繁使用,因此定義為內斂函數放在common.h頭文件中方便調用:
static inline void*& NextObj(void* obj)
{return *(void**)obj;
}
由此,我們就可以設計出定長內存池的對象:
定長內存池池的基本思想是在程序運行時預先分配一大塊內存,然后在需要使用某個對象時,從這塊內存中分配給它。當對象不再使用時,將它歸還給對象池,供其他對象使用。這樣做的好處在于減少了內存分配和釋放的次數,從而減少了內存碎片的產生,并降低了內存分配的開銷。
在這段代碼中,ObjectPool 類的主要功能包括:
-
New() 函數:用于分配一個新的對象,如果有自由鏈表中有空閑的對象,則直接從自由鏈表中取出;否則,如果當前剩余內存塊大小不夠一個對象的大小,則重新申請一個內存塊。申請到內存后,調用對象的構造函數來進行初始化。
-
Delete() 函數:用于釋放一個對象,調用對象的析構函數進行清理,然后將其加入自由鏈表中。
在這段代碼中,ObjectPool類的成員變量包括:
-
_memory:指向當前申請的內存塊的指針。
-
_remainBytes:當前內存塊剩余的字節數。
-
_freeList:自由鏈表的頭指針,用于保存當前有哪些對象可以被重復利用。
在這段代碼中,還有一個函數 SystemAlloc(),這是為了避免使用malloc而使用的,它的作用是申請一個新的內存塊。如果申請失敗,則拋出 std::bad_alloc 異常。
相關視頻推薦
面對內存再不發怵,手把手帶你實現內存池(自行準備linux環境)
200行代碼實現slab,開啟內存池的內存管理(準備linux環境)
5種內存泄漏檢測方式,讓你重新理解C++內存管理
免費學習地址:c/c++ linux服務器開發/后臺架構師
需要C/C++ Linux服務器架構師學習資料加qun579733396獲取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
?總的來說,這段代碼實現了一個簡單的對象池,可以有效地管理類型為 T 的對象的內存分配和釋放,從而減少了內存碎片的產生,并降低了內存分配的開銷。
template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;// 如果自由鏈表非空,以“頭刪”的方式從自由鏈表取走內存塊,重復利用if (_freeList){// 技巧:(void**)強轉方便32位下獲取前4字節,64位下獲取前8字節void* next = *((void**)_freeList); obj = (T*)_freeList;_freeList = next;}else{// 剩余內存_remainBytes不夠一個對象大小時,重新開一塊大空間if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;// 分配了 _remainBytes 個字節的空間,即(128 *1024字節,128KB)// memory = (char*)malloc(_remainBytes); // >>13 其實就是一頁8KB的大小,可以得到具體多少頁_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;// 保證一次分配的空間夠存放下當前平臺的指針size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);// 大塊內存塊往后走,前面objSize大小的內存該分配出去了_memory += objSize; _remainBytes -= objSize;}// 定位new顯式調用T類型構造函數:在內存地址obj處創建一個新的T類型的對象,并調用該對象的構造函數。// 與普通的new運算符不同的是,它不會使用動態內存分配器來分配內存,而是使用指定的內存地址。new(obj)T;return obj;}//將obj這塊內存鏈接到_freeList中void Delete(T* obj){//顯式調用obj對象的析構函數,清理空間obj->~T();//將obj內存塊頭插*(void**)obj = _freeList;_freeList = obj;}
private:char* _memory = nullptr; // 指向大塊內存的指針size_t _remainBytes = 0; // 大塊內存在切分過程中的剩余字節數void* _freeList = nullptr; // 自由鏈表的頭指針,用于保存當前有哪些對象可以被重復利用。
};
對于我們設計的定長內存池,可以通過下面的測試代碼來比較一下malloc與定長內存池的效率:
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(NULL),_right(NULL){}TreeNode(int x) : _val(x), _left(nullptr), _right(nullptr) {}
};void TestObjectPool()
{// 申請釋放的輪次const size_t Rounds = 5;// 每輪申請釋放多少次const size_t N = 1000000;size_t begin1 = clock();std::vector<TreeNode*> v1;v1.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;size_t begin2 = clock();std::vector<TreeNode*> v2;v2.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
?可以明顯的看出,定長內存池的開銷是要低于malloc的,由此可見,在特定場景下,定長內存池的效率高于malloc函數。
第二階段–高并發內存池整體框架設計
現代開發環境大多都是多核多線程,那么在申請內存的場景下,必然存在激烈的鎖競爭問題。其實,malloc本身就已經足夠優秀了,但本項目的原型tcmalloc將在多線程高并發的場景下更勝一籌。
而本項目實現的內存池將考慮以下幾方面的問題:
-
1.性能問題
-
2.多線程場景下的鎖競爭問題
-
3.內存碎片問題
concurrent memory pool(并發內存池),主要有以下3個部分組成:
1.線程緩存(thread cache)
線程緩存是每個線程獨有的,用于小于256kb內存的分配。那么對于每一個線程從thread cache申請資源,就無需考慮加鎖問題,每個線程獨享一個緩存(cache),這也是并發線程池高效的地方。
2.中心緩存(central cache)
中心緩存有所有線程所共享,thread cache 按需從central cache處獲取對象,而central cache在合適的時機從thread cache處回收對象從而避免一個線程占用太多資源,導致其他線程資源吃緊,進而實現內存分配在多個線程更加均衡的按需調度。由于所有thread cache都從一個central cache中取內存對象,故central cache是存在競爭的,也就是說從central cache中取內存對象需要加鎖,但我們在central cache這里用的是桶鎖,且只有thread cache中沒有對象后才會來central cache處取對象,因此鎖的競爭不會很激烈。
3.頁緩存(page cache)
頁緩存是中心緩存上一級的緩存,存儲并分配以頁為單位的內存,central cache中沒有內存對象時,會從page cache中分配出一定數量的page,并切割成定長大小的小塊內存,給central cache。當page cache中一個span的幾個跨度頁都回收以后,page cache會回收central cache中滿足條件的span對象,并且合并相鄰的頁,組成更大的頁,從而緩解內存碎片(外碎片)的問題。
?
第三階段–三級緩存的具體實現
1.Thread Cache框架構建及核心實現
thread cache是哈希桶結構,每個桶是一個根據桶位置映射的掛接內存塊的自由鏈表,每個線程都會有一個thread cache對象,這樣就可以保證線程在申請和釋放對象時是無鎖訪問的。
申請與釋放內存的規則及無鎖訪問
申請內存
-
當內存申請大小size不超過256KB,則先獲取到線程本地存儲的thread cache對象,計算size映射的哈希桶自由鏈表下標i。
-
如果自由鏈表_freeLists[i]中有對象,則直接Pop一個內存對象返回。
-
如果_freeLists[i]中沒有對象時,則批量從central cache中獲取一定數量的對象,插入到自由鏈表并返回一個對象。
釋放內存
1.當釋放內存小于256kb時將內存釋放回thread cache,計算size映射自由鏈表桶位置i,將對象Push到_freeLists[i]。
2.當鏈表的長度過長,則回收一部分內存對象到central cache。
tls - thread local storage
線程局部存儲(tls),是一種變量的存儲方法,這個變量在它所在的線程內是全局可訪問的,但是不能被其他線程訪問到,這樣就保持了數據的線程獨立性。而熟知的全局變量,是所有線程都可以訪問的,這樣就不可避免需要鎖來控制,增加了控制成本和代碼復雜度。
//TLS: thread local storage,實現線程的無鎖訪問
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
管理內存對齊和映射等關系
計算對齊大小映射的規則
thread cache中的哈希桶映射比例比非均勻的,如果將內存大小均勻劃分的話,則會劃分出大量的哈希桶,比如256kb如果按照8byte劃分,則會創建32768個哈希桶,這就有較大的內存開銷;而如果按照更大的字節劃分,那么內存開銷雖然減少了,但照顧到的場景也少了,且會產生內碎片問題。
那么參考tcmalloc項目,為了保證內碎片的浪費整體控制在10%左右進行的區間映射,同時沒有那么大的開銷。使用RoundUp 函數的將輸入的 size 對齊到一個固定的對齊值。對齊值是根據 size 的大小而定的,它分成了五個區間:
-
如果 size 位于 [1,128] 之間,那么 size 將被對齊到 8 字節。
-
如果 size 位于 [128+1,1024] 之間,那么 size 將被對齊到 16 字節。
-
如果 size 位于 [1024+1,8*1024] 之間,那么 size 將被對齊到 128 字節。
-
如果 size 位于 [8*1024+1,64*1024] 之間,那么 size 將被對齊到 1024 字節。
-
如果 size 位于 [64*1024+1,256*1024] 之間,那么 size 將被對齊到 8192 字節。
這個函數內部使用了另外一個靜態函數_RoundUp來實際計算對齊后的值。
也就是說,對于1byte到128byte的內存對象,按照8byte對齊,劃分為下標0-15號的哈希桶,而129byte到1kb的內存對象,按照16byte對齊,劃分下標16-71號的哈希桶,以此類推,最終劃分為0-207號總共208個哈希桶,這樣就保證了內存較小的開銷,同時各個對齊關系中內碎片浪費控制在10%左右,比如129byte到144byte區間,取144byte的內存對象,浪費率為(144 - 129) / 144 = 10.42%,當然對于最開始的1byte申請8byte內存對象,雖然浪費高達87.5%,但考慮到最終內碎片浪費了7byte,對比后續內碎片一次浪費7kb來說可以忽略不計了。
這便是申請的內存對象大小對齊的映射關系,這個關系在后續central cache及page cache中仍有應用,因此可以將其定義在頭文件common.h中,以后內存大小對齊的管理。
計算相應內存映射在哪一個哈希桶中
這里使用Index 函數計算將輸入的 size 映射到哪個自由鏈表桶(freelist)。和RoundUp函數一樣,這個函數也根據size的大小將它分成了五個區間,但是它返回的是一個數組下標。數組的大小和每個區間內的自由鏈表桶數量是固定的。
這個函數內部使用了另一個靜態函數_Index來計算桶的下標。在代碼中,size表示要被對齊的內存塊的大小,alignNum表示對齊的值,align_shift表示對齊的值的二進制位數。
關于 _RoundUp和 _Index:
對于 _RoundUp 函數,它使用位運算將 size 對齊到最接近它的大于等于它的 alignNum 的倍數。這里有一個簡單的例子:假設我們有一個值 size=11,我們想將它對齊到 alignNum=8 的倍數。那么 _RoundUp 函數會返回 16,因為 16 是最接近 11 且大于等于 11 的 alignNum 的倍數。
對于 _Index 函數,它計算的是將 size 映射到桶鏈的下標。它的計算方法是將 size 向上對齊到最接近它的大于等于它的 2^align_shift 的倍數,然后再減去 1。這個函數的作用和 _RoundUp 函數類似,但是它返回的是下標而不是對齊后的值。
//計算對齊數
size_t _RoundUp(size_t size, size_t alignNum)
{size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1) * alignNum;}else{alignSize = size;}return alignSize;
}//計算對應鏈桶的下標
static inline size_t _Index(size_t bytes, size_t alignNum)
{if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}
}
但是參考tcmalloc源碼,考慮到位移運算更加接近底層,效率更高,而實際應用中映射對應關系的計算是相當頻繁的,因此使用位運算來改進算法。
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static inline size_t _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
代碼實現
// 計算對象大小的對齊映射規則
class SizeClass
{
public:// 整體控制在最多10%左右的內碎片浪費// [1,128] 8byte對齊 freelist[0,16)// [128+1,1024] 16byte對齊 freelist[16,72)// [1024+1,8*1024] 128byte對齊 freelist[72,128)// [8*1024+1,64*1024] 1024byte對齊 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte對齊 freelist[184,208)// 使用位運算將 size 對齊到最接近它的大于等于它的 alignNum 的倍數// 比如size = 11對齊到16static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}// 將 size 映射到桶鏈的下標:// 這個函數的作用和 _RoundUp 函數類似,但是它返回的是下標而不是對齊后的值。// 比如size = 11映射下標到(2 - 1 = 1) static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 計算映射的哪一個自由鏈表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每個區間有多少個鏈static int group_array[4] = { 16, 56, 56, 56 };// 打表if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}// 計算ThreadCache一次從中心緩存CentralCache獲取多少個小對象,總的大小就是MAX_BYTES = 256KBstatic size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移動多少個對象的(慢啟動)上限值// 小對象一次批量上限高// 小對象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}// 計算中心緩存CentralCache一次向PageCache獲取多少頁// 單個對象 8byte// ...// 單個對象 256KBstatic size_t NumMovePage(size_t size){// 計算一次從中心緩存獲取的對象個數numsize_t num = NumMoveSize(size); // 單個對象大小與對象個數相乘,獲得一次需要向PageCache申請的內存大小size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0){npage = 1;}return npage;}
};
NumMoveSize 函數的作用是計算一次從中心緩存獲取多少個對象。它的計算方法是首先將單個對象大小除以總的緩存大小 MAX_BYTES,得到的結果即為一次從中心緩存獲取的對象個數。為了避免數量太少或太多,可以設置一個范圍,在 [2, 512] 之間。如果計算出的對象數量不在這個范圍內,就取邊界值。
NumMovePage函數的作用是計算中心緩存CentralCache一次向PageCache獲取多少頁。一頁的大小是由PAGE_SHIFT指定的。本項目中一個頁大小是8KB,即2的13次方,所以PAGE_SHIFT = 13。NumMovePage函數先調用NumMoveSize函數計算出一次從CentralCache獲取多少個對象,然后乘上對象大小,就獲得需要向PageCache申請的內存大小,然后除以單個頁的大小(左移PAGE_SHIFT)即可獲得向PageCache申請的總頁數。
突擊檢查:static inline 函數和 inline函數有什么區別呢?
inline內聯函數:為了減少因函數調用而引起的系統開銷,內聯函數實際上是以空間換效率的一種做法。編譯器會盡量將 inline 函數的代碼插入到調用函數的代碼中,從而減少函數調用的開銷。inline 函數的主要優點是可以提高程序的執行效率,因為省去了函數調用的開銷。但是,inline 函數的缺點是會降低程序的可讀性,代碼會變得復雜。
static inline 函數是一種特殊的函數,它同時具有 inline 函數的優點和 static 函數的優點。static 函數是指在編譯期間就將函數體內的代碼插入到調用函數的代碼中,并且只在本文件中可見。static 函數的主要優點是可以隱藏函數的實現細節,只提供接口。所以在頭文件中務必要加上static inline,否則和普通函數一樣,當多個CPP文件包含是就會重復定義。所以加入static提高代碼健壯性。
因此,static inline 函數既可以提高程序的執行效率,又可以隱藏函數的實現細節,是一種很好的函數聲明方式。
自由鏈表的設計
在有了上面的基礎之后,我們來設計自由鏈表,其實就是實現一個單鏈表,方便插入刪除,同時標識鏈表長度 _size以方便后續釋放流程,以及定義 _maxSize來保住thread cache一次申請對象批次的下限。
// 返回“obj前4或8字節內存”強轉得來的指針,指向的是下一個結點
static void*& NextObj(void* obj)
{return *(void**)obj;
}
class FreeList
{
public:void Push(void* obj){// 將歸還的內存塊對象頭插進自由鏈表NextObj(obj) = _freeList;_freeList = obj;++_size;} void PushRange(void* start, void* end, size_t size){NextObj(end) = _freeList;_freeList = start;_size += size;}void* Pop(){assert(_freeList);//將自由鏈表中的內存塊頭刪出去void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}void PopRange(void*& start, void*& end, size_t n){assert(n >= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);_size -= n;NextObj(end) = nullptr;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize()// 傳引用{return _maxSize;}size_t& Size(){return _size;}
private:void* _freeList = nullptr;size_t _maxSize = 1;//慢增長用于保住申請批次下限size_t _size = 0;//計算鏈表長度
};
thread cache框架構建
在有了上述基礎后,我們來搭建thread cache的框架,其實就是一個哈希桶,每個桶中掛接著自由鏈表對象。
_declspec(thread)是一個Windows平臺專用的關鍵字,用于聲明線程局部存儲(TLS)變量。在這里,它聲明了一個指向ThreadCache對象的指針變量pTLSThreadCache,該變量的值對于每個線程來說都是獨立的,可以使線程在向thread cache申請內存對象的時候實現無鎖訪問。
class ThreadCache
{
public:// 申請和釋放內存對象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 從中心緩存獲取對象void* FetchFromCentralCache(size_t index, size_t size);// 釋放內存時,如果自由鏈表過長,回收內存到CentralCache中心緩存void ListTooLong(FreeList& list, size_t size);
private:// 哈希桶,每個桶中掛接著自由鏈表對象FreeList _freeLists[NFREELIST];
};// pTLSThreadCache是一個指向ThreadCache對象的指針,每個線程都有一個獨立的pTLSThreadCache
// 可以使線程在向thread cache申請內存對象的時候實現無鎖訪問
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
thread cache核心實現
1.FetchFromCentralCache(size_t index, size_t size)
從中央緩存(CentralCache)獲取內存塊。接受兩個參數:ThreadCache自由鏈表對應的桶索引和想獲取的內存塊大小。
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢開始反饋調節算法// 1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完// 2、如果你不要這個size大小內存需求,那么batchNum就會不斷增長,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);// 至少要獲得一塊assert(actualNum > 0);if (actualNum == 1)// 只有一個內存塊{assert(start == end);return start;}else// 除了起始地址外的其他內存塊插入當前線程的緩存的自由鏈表中{_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);return start;}
}
2.Allocate(size_t size)
線程內分配內存
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);// 計算出內存塊的對齊大小 alignSize 和內存塊所在的自由鏈表的下標 indexsize_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);// _freeLists[index] 如果不為空,就從 _freeLists[index] 中彈出一個內存塊并返回。if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}// 如果為空,就調用 FetchFromCentralCache 函數從中央緩存獲取內存塊;else{FetchFromCentralCache(index, alignSize);}
}
3.Deallocate(void* ptr, size_t size)
線程內回收內存,傳入內存塊的指針: ptr 和內存塊的大小: size
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 計算出映射的自由鏈表桶index,并將 ptr 插入到 _freeLists[index] 中size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 當鏈表長度大于一次批量申請的內存時,就開始還一段list給CentralCacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
4.ListTooLong(FreeList& list, size_t size)
處理線程內過長自由鏈表,還一部分給中心緩存的span
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;// MaxSize就是歸還的list的長度,自由鏈表結點個數list.PopRange(start, end, list.MaxSize()); CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
2.central cache框架構建及核心實現
central cache也是一個哈希表結構,其映射關系與thread cache是一樣的,不同的是central cache的哈希桶位置所掛接的是SpanList鏈表結構,不過每個桶下的span對象被切分成了一塊塊小內存掛接在span對象的自由鏈表freeList中。
?
圖稍微有點不對,sapn鏈是帶頭雙向循環鏈表,最后不該指向NULL,應該指向頭。
申請與釋放內存規則
申請內存
1.當thread cache中沒有內存后,就會向central cache中申請大塊內存。這里的申請過程采用的是類似網絡tcp協議擁塞控制的慢開始算法,而central cache中哈希映射的spanlist下掛著的span則向thread cache提供大塊內存,而從span中取出對象給thread cache是需要加鎖的,這里為了保證效率,提供的是桶鎖。
慢開始算法
線程緩存從中央緩存獲取內存塊的數量是按照“慢開始反饋調節算法”遞增的:
1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完
2、如果你不要這個size大小內存需求,那么batchNum就會不斷增長,直到上限
3、size越大,一次向central cache要的batchNum就越小
4、size越小,一次向central cache要的batchNum就越大
// 預計獲取的批次數量
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)_freeLists[index].MaxSize() += 1;
舉個例子,線程申請7塊大小相同的內存,第一次申請的批次數量為1塊,第二次再來申請時,此時thread cache的自由鏈表下已經沒有空閑的內存了,則又需要繼續向central cache申請內存,申請的批次數量為2塊,第3次直接從thread cache的自由鏈表中獲取內存塊;第4次再申請時,則需要向central cache申請內存,此時申請的批次數量為3塊,掛接在thread cache的自由鏈表下,直到第7次來申請內存時,向central cache申請的內存批次數量為4,這時線程取走一塊內存,則掛接在thread cache的自由鏈表下還有3塊空閑的內存。
2.當central cache中映射的spanlist下所掛接的所有span對象都沒有內存后,則需要向page cache申請一塊新的span對象,central cache拿到這塊span對象后會按照所管理內存的大小將span對象劃分成多塊,再掛接到central cache的審判list中;然后再從這塊新申請的span對象中去內存分配給thread cache。
3.在這里,為了方便后續的回收,span對象每分配出一塊內存,其成員變量_useCount就++;相反thread cache每釋放歸還一塊內存后,_useCount就–。
釋放內存
當thread_cache過長或者線程銷毀,則會將內存釋放回central cache中的,釋放回來時–_useCount。當_useCount變為0后,說明所有分配出去的內存都歸還回來了,那么就將這個span對象歸還給page cache,page cache會對歸還回來的span進行前后頁合并。
管理多個大塊內存的跨度結構Span及SpanList定義
在上面我們知道central cache的哈希桶下掛接著的是跨度結構Span對象,其實Span對象是管理以頁為單位的大塊內存的結構體。而為了方便后續回收span對象到page cache,需要將任意位置span對象從spanlist中刪除,那么將spanlist定義為一個雙向鏈表更好一些。
由此,span及spanlist的定義如下:
// 管理多個連續頁大塊內存跨度結構
struct Span
{PAGE_ID _pageId = 0; // 大塊內存起始頁的頁號size_t _n = 0; // 頁的數量Span* _next = nullptr; // 指向下一個內存塊的指針Span* _prev = nullptr; // 指向上一個內存塊的指針size_t _objSize = 0; // 切好的小對象大小size_t _useCount = 0; // 已分配給ThreadCache的小塊內存的數量void* _freeList = nullptr; // 已分配給ThreadCache的小塊內存的自由鏈表bool _isUse = false; // 標記當前span內存跨度是否在被使用
};// 帶頭雙向循環鏈表
class SpanList
{
public:// 構造函數,創建帶頭雙向循環鏈表SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}// 頭插void PushFront(Span* span){Insert(Begin(), span);}// 頭刪,并返回刪除的結點指針Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}// 在鏈表的指定位置插入新的內存塊void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_next = pos;newSpan->_prev = prev;pos->_prev = newSpan;}// 從鏈表中刪除指定的內存塊void Erase(Span* pos){assert(pos);// 不能指向鏈表的頭,這是帶頭雙向循環鏈表,頭結點的意義就如同“刷題”里的啞結點,是虛擬的,只是為了操作方便。assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* _head;// 鏈表的頭指針
public:std::mutex _mtx;// 桶鎖
};
central cache框架構建
在明確了span與spanlist的定義描述后,也不能設計出central cache的框架結構,central cache是一個哈希表結構,其映射的是spanlist與哈希桶位置(內存大小)的關系。
其次,在這里我們將central cache設計為餓漢式單例模式,類的唯一實例在程序啟動時就已經被創建出來,并且在整個程序的生命周期內都只有這一個實例。餓漢式優點是線程安全,因為實例在程序啟動時就已經被創建,在整個程序的生命周期內都只有這一個實例,不會存在多線程競爭的情況。
class CentralCache
{
public:// 單例模式的定義,作用:獲取唯一實例的靜態方法static CentralCache* GetInstance(){// &_sInst 返回 _sInst 對象的地址,因為 _sInst 是一個靜態變量// 所以它的地址是固定的,其他代碼也可以通過該地址訪問 _sInst 對象return &_sInst;}// 獲取一個非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 從中心緩存獲取一定數量的對象給ThreadCache線程緩存size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 將一定數量的對象釋放到中心緩存的span跨度void ReleaseListToSpans(void* start, size_t byte_size);
private:SpanList _spanLists[NFREELIST];
private:// 構造函數和一個拷貝構造函數私有化CentralCache(){}CentralCache(const CentralCache&) = delete;// 定義一個靜態的變量 _sInst,該變量保存著 CentralCache 類的唯一實例static CentralCache _sInst;
};
central cache核心實現
1.GetOneSpan(SpanList& list, size_t size)
從中心緩存獲取一個空閑的Span對象,如果當前中心緩存的對應大小類別的桶中沒有空閑的Span對象,則會從頁緩存中獲取一個新的Span對象并將其添加到中心緩存的桶中。
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 查看當前的spanlist中是否有還有未分配對象的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 先把central cache的桶鎖解掉,這樣如果其他線程釋放內存對象回來,不會阻塞list._mtx.unlock();// 走到這里說沒有空閑span了,只能找page cache要PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_isUse = true;span->_objSize = size;PageCache::GetInstance()->_pageMtx.unlock();// 對獲取span進行切分,不需要加鎖,因為這時候這個span是當前進程單例創建的,其他線程訪問不到這個span// 計算span的大塊內存的起始地址和大塊內存的大小(字節數)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;// 把大塊內存切成自由鏈表鏈接起來// 先切一塊下來去做頭,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1;while (start < end){i++;NextObj(tail) = start;tail = NextObj(tail);start += size;}NextObj(tail) = nullptr; // 記得置空// 切好span以后,需要把span掛到中心緩存對應的哈希桶里面去的時候,再加鎖list._mtx.lock();list.PushFront(span);return span;
}
2.FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
從中心緩存獲取一定數量的對象給thread cache
值得注意void *& start 和 void *& end 都是傳址的形式傳入的參數,也就是所謂的輸入輸出型參數
void *& start:輸出參數,返回獲取到的內存塊的起始地址。
void *& end:輸出參數,返回獲取到的內存塊的結束地址。
size_t batchNum:輸入參數,指定從中心緩存獲取的內存塊的數量。
size_t size:輸入參數,指定要獲取的內存塊的大小
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{// 中央緩存CentralCache哈希桶的映射規則和線程緩存ThreadCache哈希桶映射規則一樣size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);// 檢查獲取的span是否為空assert(span->_freeList);// 檢查獲取的span的自由鏈表是否為空// 從span中獲取batchNum個對象// 如果不夠batchNum個,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);i++;actualNum++;}span->_freeList = NextObj(end);// span的[start, end]被取走了NextObj(end) = nullptr;// 置空span->_useCount += actualNum;// 調試:條件斷點int j = 0;void* cur = start;while (cur){cur = NextObj(cur);++j;}if (j != actualNum){int x = 0;}_spanLists[index]._mtx.unlock();return actualNum;
}
3.ReleaseListToSpans(void* start, size_t size)
將一段線程緩存的自由鏈表還給中心緩存的span。
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);// 把start開頭的這一串自由鏈表內存還給他屬于的span,一次循環還一個,一直還Span* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 說明span的切分出去的所有小塊內存都回來了,那就清理一下span,然后把完整的span交給page// 這個span就可以再回收給page cache,pagecache可以再嘗試去做前后頁的合并if (span->_useCount == 0){_spanLists[index].Erase(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 釋放span給page cache時,span已經從_spanLists[index]刪除了,不需要再加桶鎖了// 這時把桶鎖解掉,使用page cache的鎖就可以了,方便其他線程申請/釋放內存_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}
3.page cache框架構建及核心實現
page cache與前兩級緩存略有不同,其映射關系不再是哈希桶位置與自由鏈表或spanlist的映射,而是頁號與spanlist的映射,這里我們設計的是128頁的page cache。
申請與釋放內存
申請內存
1.當central cache向page cache申請內存時,page cache先檢查對應位置有沒有span,如果沒有則向更大頁尋找一個span,如果找到則分裂成兩個。比如:申請的是1頁page,1頁page后面沒有掛span,則向后面尋找更大的span,假設在100頁page位置找到一個span,則將100頁page的span分裂為一個1頁page span和一個99頁page span。
2.如果找到_spanList[128]都沒有合適的span,則向系統使用mmap、brk或者是VirtualAlloc等方式申請128頁page span掛在自由鏈表中,再重復1中的過程。
3.需要注意的是central cache和page cache 的核心結構都是spanlist的哈希桶,但是他們是有本質區別的,central cache中哈希桶,是按跟thread cache一樣的大小對齊關系映射的,他的spanlist中掛的span中的內存都被按映射關系切好鏈接成小塊內存的自由鏈表。而page cache 中的spanlist則是按下標桶號映射的,也就是說第i號桶中掛的span都是i頁內存。
釋放內存
如果central cache釋放回一個span,則依次尋找span的前后page id的沒有在使用的空閑span,看是否可以合并,如果合并繼續向前尋找。這樣就可以將切小的內存合并收縮成大的span,減少內存碎片。
直接向堆申請或釋放以頁為單位的大塊內存
這里我們為了避免使用malloc及free函數接口去向堆申請和釋放內存,因此使用系統調用接口直接向堆申請和釋放內存。
這里的系統調用接口在window下為VirtualAlloc與VirtualFree系統調用接口;在Linux系統下為mmap與munmap,brk與sbrk兩對系統調用接口。
inline static void* SystemAlloc(size_t kPage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kPage << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else//Linux下brk mmap等
#endif // _WIN32//拋異常if (ptr == nullptr)throw std::bad_alloc();return ptr;
}
Span跨度結構以頁為單位管理從堆申請的內存
我們向堆申請內存后會返回這塊內存的起始地址,那么我們將這個地址看作一個無符號整型,將其除以8*1024作為Span結構的_pageId,再將申請內存時用的頁號賦給 _n,這里為了方便后續回收分配出去的Span跨度結構,我們使用STL的unordered_map來構建 _pageId與Span對象的映射關系。
page cache框架構建
與central cache類似的是,page cache也是單例模式;不過page cache加的不是桶鎖,而是整級加的一把大鎖,即每次central cache向page cache申請內存時,page cache都要加鎖防止出現安全問題。
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}// 獲取從對象到span的映射Span* MapObjectToSpan(void* obj);// 釋放空閑span回到Pagecache,并合并相鄰的spanvoid ReleaseSpanToPageCache(Span* span);// 獲取一個k頁的spanSpan* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];// PageCache自己的雙鏈表哈希桶,映射方式是按照頁數直接映射ObjectPool<Span> _spanPool;// std::unordered_map<PAGE_ID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
page cache核心實現
1.NewSpan(size_t k)
獲取一個K頁的span
首先會檢查第k個桶里面是否有span,如果有就直接返回;如果沒有,則檢查后面的桶里面是否有更大的span,如果有就可以將它進行切分,切出一個k頁的span返回,剩下的頁數的span放到對應的桶里;如果后面的桶里也沒有span,就去系統堆申請一個大小為128頁的span,并把它放到對應的桶里。然后再遞歸調用自己,直到獲取到一個k頁的span為止。
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);// 大于128 page的直接向堆申請,這里的128頁相當于128*8*1024 = 1Mif (k > NPAGES - 1){void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;// 頁號:地址右移PAGE_SHIFT獲得span->_n = k; // 頁數// _idSpanMap[span->_pageId] = span;_idSpanMap.set(span->_pageId, span);return span;}// 先檢查第k個桶里面有沒有spanif (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();// 建立id和span的映射,方便central cache回收小塊內存時,查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){// _idSpanMap[kSpan->_pageId + i] = kSpan;_idSpanMap.set(kSpan->_pageId + i, kSpan);}return kSpan;}// 檢查一下后面的桶里面有沒有span,如果有可以把他它進行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();// Span* kSpan = new Span;Span* kSpan = _spanPool.New();// 在nSpan的頭部切一個k頁下來// k頁span返回// nSpan再掛到對應映射的位置kSpan->_pageId = nSpan->_pageId;// 標記起始頁kSpan->_n = k;// 標記頁數nSpan->_pageId += k;nSpan->_n -= k;_spanLists[nSpan->_n].PushFront(nSpan); // 被切分掉的另一塊放入對應哈希桶// 存儲nSpan的首尾頁號跟nSpan映射,方便page cache回收內存時進行的合并查找// 因為沒被中心緩存拿走,所以只標記了首尾就夠了// _idSpanMap[nSpan->_pageId] = nSpan;// _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;_idSpanMap.set(nSpan->_pageId, nSpan);_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);// 建立id和span的映射,方便central cache回收小塊內存時,查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){// _idSpanMap[kSpan->_pageId + i] = kSpan;_idSpanMap.set(kSpan->_pageId + i, kSpan);}return kSpan;}}// 走到這個位置就說明后面沒有大頁的span了// 這時就去找堆要一個128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);// 通過將 ptr 地址強制轉換為 PAGE_ID 類型,再使用二進制位運算符 >> 將指針的地址右移 PAGE_SHIFT 位// 最終得到的結果就是這個指針所在的“頁的編號”bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);// 遞歸調用自己,這一次一定能成功!
}
2.MapObjectToSpan(void* obj)
建立內存地址和span的映射。前期映射方式是哈希或者紅黑樹,后期性能優化成基數樹。
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;/* std::unique_lock<std::mutex> lock(_pageMtx);// 可以自動解鎖auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;} */// 基數樹優化后不需要加鎖了auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;
}
3.ReleaseSpanToPageCache(Span* span)
緩解外碎片問題,對span前后的頁,嘗試進行合并,緩解內存碎片問題
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 大于128 page的直接還給堆,這里的128頁相當于128*8*1024 = 1Mif (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);//delete span;SystemFree(ptr); // span結構釋放,內存還給堆,類似free_spanPool.Delete(span);// 放入定長內存池的自由鏈表,以便下次申請return;}// 向前合并while (1){PAGE_ID prevId = span->_pageId - 1;/*auto ret = _idSpanMap.find(prevId);if (ret == _idSpanMap.end()){break;}*/auto ret = (Span*)_idSpanMap.get(prevId);if (ret == nullptr){break;}// 前面相鄰頁的span在使用,不合并了// Span* prevSpan = ret->second;Span* prevSpan = ret;if (prevSpan->_isUse == true){break;}// 合并出超過128頁的span沒辦法管理,不合并了if (prevSpan->_n + span->_n > NPAGES - 1){break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);// 將prevSpan從頁緩存對應的哈希桶的鏈表中刪掉// delete prevSpan;// 為什么delete? _spanPool.Delete(prevSpan);}// 向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;/*auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end()){break;}*/auto ret = (Span*)_idSpanMap.get(nextId);if (ret == nullptr){break;}// Span* nextSpan = ret->second;Span* nextSpan = ret;if (nextSpan->_isUse == true){break;}if (nextSpan->_n + span->_n > NPAGES - 1) {break;}span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);// delete nextSpan;_spanPool.Delete(nextSpan);}_spanLists[span->_n].PushFront(span);// 將合并完的span掛到頁緩存的對應的哈希桶里面。span->_isUse = false;//_idSpanMap[span->_pageId] = span;// 首尾存起來,方便被合并//_idSpanMap[span->_pageId + span->_n - 1] = span;_idSpanMap.set(span->_pageId, span);_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
上述代碼delete的作用:這里的delete操作是用來釋放prevSpan和nextSpan這兩個Span結構體的內存的。這兩個Span結構體可能是之前由PageCache單例創建的,也可能是之前從中心緩存移動過來的。無論是哪一種情況,它們都不再被使用了,因為已經被合并到了當前的span中。所以可以直接釋放掉它們的內存。
這里的delete操作并不會影響prevSpan和nextSpan管理的內存。這些內存依然存在,只是沒有了管理它們的Span結構體。在進行合并的時候,這些內存就被合并到了當前的span中,當前的span繼續管理這些內存。因此,這里的delete操作僅僅是釋放了prevSpan和nextSpan這兩個Span結構體的內存,這個span管理的內存并不受影響。
delete釋放掉span結構體本身,不會同時釋放掉它管理的內存。舉個例子,假如你有一個對象A,它管理了一個數組arr,那么你調用delete A時,只會釋放掉A對象本身占用的內存,而arr數組的內存依然存在。
細節與性能優化
使用定長內存池配合脫離使用new
我們定義一個Span結構體時是new一個對象,但new的本質是malloc,而本項目是不依賴malloc的,因此我們可以運用我們自己實現的定長內存池來脫離new的使用。
對于Page Cache,由于要多次定義Span結構,因此我們定義一個特化Span對象的定長內存池:
//定義定長的span內存池以脫離使用new
ObjectPool<Span> _spanPool;
而對于Thread Cache,由于要保證對于線程而言,全局只有唯一一個Thread Cache對象,故在頭文件內定義為靜態變量的定長內存池:
//靜態成員,保證全局只有一個對象
static ObjectPool<ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
解決內存大于256kb的申請釋放問題
1.ConcurrentAlloc() 時,對于線程申請大于256kb內存的情況, 直接向頁緩存申請即可:
if (size > MAX_BYTES) // 大于256kb的超大內存
{size_t alignSize = SizeClass::RoundUp(size);// size對齊size_t kPage = alignSize >> PAGE_SHIFT;// 獲取頁數PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);// 找頁緩存要kpage頁spanspan->_objSize = size;// 會有一點內碎片,標記成alignSize也行PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);// 獲取對應地址return ptr;
}
2.當然了頁緩存的NewSpan()正常分配內存的能力也有上限,大于128 page的選擇直接向堆申請,這里的128頁相當于128 * 8KB = 1M。
if (k > NPAGES - 1)
{void* ptr = SystemAlloc(k);//Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;// 頁號:地址右移PAGE_SHIFT獲得span->_n = k; // 頁數// _idSpanMap[span->_pageId] = span;_idSpanMap.set(span->_pageId, span);return span;
}
3.同樣的,ConcurrentFree()時,大于256kb的內存的釋放就直接釋放給頁緩存即可:
if (size > MAX_BYTE)
{//找到ptr對應的那塊spanPageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->RealeaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();
}
4.ReleaseSpanToPageCache(Span* span)合并頁時,若釋放的span大于128頁,即span的頁數大于NPAGES - 1,則直接將內存釋放到堆。
if (span->_n > NPAGES - 1)
{void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;
}
使用基數樹進行性能優化
如果我們在Page Cache中使用STL的unordered_map容器來構建_pageId與span的映射關系,那么通過測試發現,當前項目的運行效率是要滿于malloc的。
接下來分析下項目的性能瓶頸:
?
分析得到項目在unordered_map<PAGE_ID, Span*> _idSpanMap;中的鎖競爭上浪費了大量性能,這主要是unordered_map是線程不安全的,因此多線程下使用時需要加鎖,而大量加鎖導致資源的消耗。
因此,這里參考tcmalloc,使用基數樹來進行性能的優化。tcmalloc設計了三種基數樹,即一層、二層與三層的基數樹,其中一層和二層的基數樹是適配32位系統下的,而三層基數樹則是適配64位系統。
這里簡單介紹以下一層和二層基數樹,三層基數樹類似于二層:
32位系統下,一個頁大小213,進程地址空間大小232,所以一共有219個頁,所以一層基數樹需要開辟219個元素的數組,每個位置存一個指針,需要的內存是4*2^19 = 2M。
32位系統下,兩層基數樹的結構是第一層一個25個元素,第二層每個結點又有214個元素,這樣也就構成了2^19個的數量。這樣的話拿到一個頁號,(這個頁號二進制下有32位,忽略高13位)這個頁號高13位之后的高5位決定了他在第一層的哪個位置,這個頁號高13位之后的高6位~高19位決定了他在第二層的哪個位置。
多層相較于1層還有個好處,多層不需要一次性開辟所有空間,可以到具體需要時再開辟空間。
?基數樹相較于哈希桶的優勢在于如果要寫入_pageId和span的映射關系的話,并不會像哈希桶可能有結構上的改變(紅黑樹翻轉、哈希桶擴容等)(一個線程在讀的時候,另一個線程在寫),而是一旦基數樹構建好映射關系后,就不會改變其結構,之后只會有讀的操作,因此多線程環境下無需加鎖,從而減少了資源的消耗,優化了性能。
?只有NewSpan和ReleaseSpanToPageCache的時候,會去建立id和 span的映射,進行所謂的“寫”操作,但是這倆都加了鎖,絕對安全。事實上,即便不加鎖也沒事,因為我們不可能在同一個位置進行寫,不可能同時創建一個span和釋放一個span。且基數樹寫之前已經開好空間了,“寫”的過程不會改變基數樹的結構。
采用基數樹不需要加鎖的原因:
-
因為往基數樹建立映射的時候span沒有在central cache不會給外層使用,并且建立好一次映射關系,后續不需要再建立了,后續都是讀了。讀寫分離了。
//單層基數樹
template <size_t BITS>
class TCMalloc_PageMap1
{
private:static const int LENGTH = 1 << BITS;// 32 - 13 = 19void** _array;public:typedef uintptr_t Number;//存儲指針的一個無符號整型類型explicit TCMalloc_PageMap1()//一次將數組所需空間開好{//計算數組開辟空間所需的大小size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);//由于要開辟的空間是2M,已經很大了,故直接想堆申請_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(_array, 0, size);}void Set(Number key, void* v)//key的范圍是[0, 2^BITS - 1],_pageId{_array[key] = v;}void* Get(Number key) const{if ((key >> BITS) > 0)//若key超出范圍或還未被設置,則返回空{return nullptr;}return _array[key];}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;// 19 - 5 = 14static const int LEAF_LENGTH = 1 << LEAF_BITS;// 1左移14位// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);// 獲取k低14位if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}// 確保從start頁開始,往后n頁的基數樹位置都給你開好bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
項目總結
結果演示
可以看到通過基數樹優化后的高并發內存池在性能上是要優于malloc函數的。
項目對比malloc性能高的原因
malloc底層是采用邊界標記法將內存劃分成很多塊,從而對內存的分配與回收進行管理。簡單來說,malloc分配內存時會先獲取分配區的鎖,然后根據申請內存的大小一級一級的去獲取內存空間,最后返回。
所以在高并發的場景下,malloc在申請內存時需要加鎖,以避免多個線程同時修改內存分配信息,這會導致性能下降。而內存池可以通過維護自由鏈表來分配內存,避免了加鎖的開銷。
總結出本項目效率相對較高的3點原因:
-
1.第一級thread cache通過tls技術實現了無鎖訪問。
-
2.第二級central cache加的是桶鎖,可以更好的實現多線程的并行。
-
3.第三級page cache通過基數樹優化,有效減少了鎖的競爭。
項目擴展及缺陷
1.實際上在釋放內存時由thread cache將自由鏈表對象歸還給central cache只使用了鏈表過長這一個條件,但是實際中這個條件大概率不能恰好達成,那么就會出現thread cache中自由鏈表掛著許多未被使用的內存塊,從而出現了線程結束時可能導致內存泄露的問題。
解決方法就是使用動態tls或者通過回調函數來回收這部分的內存,并且通過申請批次統計內存占有量,保證線程不會過多占有資源。
2.可以將這個項目打成靜態庫或動態庫替換調用系統調用malloc,不同平臺替換方式不同。 基于unix的系統上的glibc,使用了weak alias的方式替換。具體來說是因為這些入口函數都被定義成了weak symbols,再加上gcc支持 alias attribute,所以替換就變成了這種通用形式:
void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))
因此所有malloc的調用都跳轉到了tc_malloc的實現。有些平臺不支持這樣的東西,需要使用hook的鉤子技術來做。參考:hook
收獲與總結
1.鍛煉debug能力;
2.了解了池化技術;
3.學習了三級緩存自頂向下的設計方案;
4.單例設計模式在具體項目的應用、慢增長算法以及基數樹等。