一.當申請的內存大小大于256kb的處理方式
? ? ? ? 因為256kb對于我們當前的實現其實也就32頁,我們的頁緩存上限是128頁.所以思路非常清晰明了:當申請內存大小大于32頁同時小于等于128頁時,我們按照一頁的方式向上對齊后計算所需頁數,然后向頁緩存申請.而大于128頁的請求我們直接向堆申請:
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES){//當PageCache能滿足需要時,我們向PageCache申請,滿足不了直接向堆申請size_t pages = (AlignMoudle::AlignUpwards(size)) >> PAGE_SHIFT;PageCache::GetInstance()->_smutex.lock();Span* span = PageCache::GetInstance()->NewSpan(pages);PageCache::GetInstance()->_smutex.unlock();void* res = (void*)(span->_pageId << PAGE_SHIFT);return res;}else{//<MAX_BYTES時的邏輯if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}return pTLSThreadCache->Allocate(size);}
}static void ConcurrentFree(void* ptr,size_t size)
{if (size > MAX_BYTES){Span* FreeSpan = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->_smutex.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(FreeSpan);PageCache::GetInstance()->_smutex.unlock();}else{//<MAX_BYTES時的邏輯assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
Span* PageCache::NewSpan(size_t pos)
{if (pos >= MAX_PAGE){//此時頁緩存無法滿足需求,直接向堆進行申請void* ptr = SystemAlloc(pos);Span* BigSpan = new Span;BigSpan->_n = pos;BigSpan->_pageId = ((PAGE_ID)ptr) >> PAGE_SHIFT;_idmapspan[BigSpan->_pageId] = BigSpan;return BigSpan;}//<128頁均走原來的邏輯
...........................................................................................
void PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_n >= MAX_PAGE){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}//同上,<128頁的時候走原來的邏輯
...........................................................................................
二.使用對象池脫離使用new與Delete
? ? ? ? 因為原高并發內存池是通過某種技術脫離使用new與Delete的.而我們原來的代碼中涉及new與delete的地方一共有三個地方:PageCache.cpp申請新Span與釋放Span,comm.h中對于spanlist的頭結點的申請,以及每個線程池申請自己的pTLSThreadCache.
? ? ? ? 而這些地方都是對對象的申請,所以我們就可以使用對象池來替換他們.因為ppTLSThreadCache涉及到多線程競爭問題,所以我們需要對對象池的申請與釋放進行額外的加鎖,以避免多線程的競爭問題.對于comm.h應在構造函數處定義局部靜態對象池,不然如果是類靜態成員變量則需要額外的加鎖解鎖.
ly/Project-Code//可以從ConCurrentMemoryPool提交記錄中找到這部分的修改
三.釋放對象時優化為不傳對象大小
? ? ? ? 對于new與delete接口,delete釋放對象的時候是不需要顯示給對象大小的.但是我們能夠獲取釋放對象的地址.因此,我們可以在span中新增一個objsize對象,用于記錄當前span管理的頁面上分配的對象的大小:
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES){//當PageCache能滿足需要時,我們向PageCache申請,滿足不了直接向堆申請size_t pages = (AlignMoudle::AlignUpwards(size)) >> PAGE_SHIFT;PageCache::GetInstance()->_smutex.lock();Span* span = PageCache::GetInstance()->NewSpan(pages);span->_objsize = size;PageCache::GetInstance()->_smutex.unlock();void* res = (void*)(span->_pageId << PAGE_SHIFT);return res;}else{//<MAX_BYTES時的邏輯if (pTLSThreadCache == nullptr){static ObjectPool<ThreadCache> _threcpool;//pTLSThreadCache = new ThreadCache;pTLSThreadCache = _threcpool.New();}return pTLSThreadCache->Allocate(size);}
}static void ConcurrentFree(void* ptr)
{Span* FreeSpan = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = FreeSpan->_objsize;if (size > MAX_BYTES){PageCache::GetInstance()->_smutex.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(FreeSpan);PageCache::GetInstance()->_smutex.unlock();}else{//<MAX_BYTES時的邏輯assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}//CentralCache::GetOneSpan
//走到這里說明當前中心緩存的該list哈希桶沒有空閑頁,則需要向頁緩存申請新頁,加全局鎖
PageCache::GetInstance()->_smutex.lock();
Span* span = PageCache::GetInstance()->NewSpan(AlignMoudle::NumMovePage(byte_size));
span->_isUse = true;
span->_objsize = byte_size;
PageCache::GetInstance()->_smutex.unlock();
四. 與malloc/free的性能對比
測試用例如下:
#include"ConcurrentAlloc.h"// ntimes 一輪申請和釋放內存的次數
// rounds 輪次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u個線程并發執行%u輪次,每輪次malloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u個線程并發執行%u輪次,每輪次free %u次: 花費:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u個線程并發malloc&free %u次,總計花費:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 單輪次申請釋放次數 線程數 輪次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u個線程并發執行%u輪次,每輪次concurrent alloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u個線程并發執行%u輪次,每輪次concurrent dealloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u個線程并發concurrent alloc&dealloc %u次,總計花費:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
?
五.性能瓶頸分析?
? ? ? ? 使用vs2022的性能分析工具,我們可以發現較為耗時的部分為查找映射表的函數部分,尤其是加鎖解鎖操作最為耗時.所以我們可以采取基數樹的方式進行優化,也就是去掉MapObjectToSpan中的加鎖解鎖操作.
使用基數樹進行優化
先來看效果:
再來看基數樹替換原來pagecache中哈希表的代碼:
#pragma once
#include"Comm.h"// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = AlignMoudle::_AlignUpwards(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
基數樹優化比原來快的根本原因是因為它避免了頻繁的加鎖解鎖,因為他是讀寫分離的(STL容器不保證線程安全問題).為什么是這樣,我們這里不再介紹,有興趣的讀者可以參考下面連接的文章:
查找——圖文翔解RadixTree(基數樹) - wgwyanfs - 博客園?
完整的基數樹優化代碼如下:
ConCurrentMemoryPool · ly/Project-Code - 碼云 - 開源中國?