目錄
8. thread cache回收內存
9. central cache回收內存
10. page cache回收內存
11. 大于256KB的內存申請和釋放
11.1 申請
11.2 釋放
12. 使用定長內存池脫離使用new
13. 釋放對象時優化成不傳對象大小
14.?多線程環境下對比malloc測試
15. 調試和復雜問題的調試技巧
16. 性能瓶頸分析
17. 使用基數樹進行優化
8. thread cache回收內存
當某個線程申請的對象不用了,可以將其釋放給thread cache,然后thread cache將該對象插入到對應哈希桶的自由鏈表當中即可。
但是隨著線程不斷的釋放,對應自由鏈表的長度也會越來越長,這些內存堆積在一個thread cache中就是一種浪費,應該將這些內存還給central cache,這樣一來,這些內存對其他線程來說也是可申請的,因此當thread cache某個桶當中的自由鏈表太長時可以進行一些處理。
如果thread cache某個桶當中自由鏈表的長度超過它一次批量向central cache申請的對象個數,那么此時就要把該自由鏈表當中的這些對象還給central cache。
void ThreadCache::Deallocate(void* ptr, size_t size) // 釋放內存對象
{assert(ptr);assert(size <= MAX_BYTES);// 找對映射的自由鏈表桶,對象插入進入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 當鏈表長度大于一次批量申請的內存時就開始還一段list給central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()) // 要增加Size接口{ListTooLong(_freeLists[index], size);}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize()); // 取批量的內存,要增加PopRange接口CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
要提供Size接口,并維護_size,再增加PopRange接口
調用的地方增加傳入參數(倒數第三行):
當thread cache的某個自由鏈表過長時,實際就是把這個自由鏈表當中全部的對象都還給central cache了,但這里在設計PopRange接口時還是設計的是取出指定個數的對象,因為在某些情況下當自由鏈表過長時,我們可能并不一定想把鏈表中全部的對象都取出來還給central cache,這樣設計就是為了增加代碼的可修改性。
當判斷thread cache是否應該還對象給central cache時,還可以綜合考慮每個thread cache整體的大小。比如當某個thread cache的總占用大小超過一定閾值時,就將該thread cache當中的對象還一些給central cache,這樣就盡量避免了某個線程的thread cache占用太多的內存。對于這一點,在tcmalloc當中就是考慮到了的,但這里就簡化了。
9. central cache回收內存
當thread cache中某個自由鏈表太長時,會將自由鏈表當中的這些對象還給central cache中的span。需要注意的是,還給central cache的這些對象不一定都是屬于同一個span的。central cache中的每個哈希桶當中可能都不止一個span,因此計算出還回來的對象應該還給central cache的哪一個桶后,還需要知道還回來的對象到底應該還給這個桶當中的哪一個span。
如何根據對象的地址得到對象所在的頁號?
某個頁當中的所有地址除以頁的大小都等該頁的頁號。比如假設一頁的大小是100,那么地址0~99都屬于第0頁,它們除以100都等于0,而地址100~199都屬于第1頁,它們除以100都等于1。
如何找到一個對象對應的span?
雖然現在可以通過對象的地址得到其所在的頁號,但是還是不能知道這個對象到底屬于哪一個span。因為一個span管理的可能是多個頁。
為了解決這個問題,可以建立頁號和span之間的映射。由于這個映射關系在page cache進行span的合并時也需要用到,因此直接將其存放到page cache里面。這時就需要在PageCache類當中添加一個映射關系了,這里可以用STL當中的unordered_map進行實現,并且添加一個函數接口,用于讓central cache獲取這里的映射關系。(下面代碼中只展示了PageCache
類當中新增的成員)
class PageCache
{
public:// 獲取從對象到span的映射Span* MapObjectToSpan(void* obj);
private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
每當page cache分配span給central cache時,都需要記錄一下頁號和span之間的映射關系。此后當thread cache還對象給central cache時,才知道應該具體還給哪一個span。
因此當central cache在調用NewSpan接口向page cache申請k頁的span時,page cache在返回這個k頁的span給central cache之前,應該建立這k個頁號與該span之間的映射關系,這里改一下之前寫的NewSpan(多了兩個for循環):
Span* PageCache::NewSpan(size_t k) // 獲取一個K頁的span
{assert(k > 0 && k < NPAGES);if (!_spanLists[k].Empty()) // 先檢查第k個桶里面有沒有span{//return _spanLists[k].opFront(); // 有就直接返回Span* kSpan = _spanLists[k].PopFront();//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}// 檢查一下后面的桶里面有沒有span,如果有->進行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的頭部切一個k頁span下來返回,nSpan再掛到對應映射的位置kSpan->_pageId = nSpan->_pageId; // _pageId類似地址kSpan->_n = k;nSpan->_pageId += k; // 起始頁的頁號往后走nSpan->_n -= k; // 頁數減等k_spanLists[nSpan->_n].PushFront(nSpan);//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 走到這說明后面沒有大頁的span了->去找堆要一個128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 轉換為頁號bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k); // 把新申請的span插入后再遞歸調用一次自己(避免代碼重復)
}
此時就可以通過對象的地址找到該對象對應的span了,直接將該對象的地址除以頁的大小得到頁號,然后在unordered_map當中找到其對應的span即可。
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
通過某個頁號查找其對應的span時,該頁號與其span之間的映射一定是建立過的,如果此時沒有在unordered_map當中找到,則說明之前的代碼邏輯有問題,因此當沒有找到對應的span時可以直接用斷言結束程序,以表明程序邏輯出錯。
此時就能寫central cache的回收內存了,當thread cache還對象給central cache時,就可以依次遍歷這些對象,將這些對象插入到其對應span的自由鏈表當中,并且及時更新該span的_usseCount計數即可。
在thread cache還對象給central cache的過程中,如果central cache中某個span的_useCount減到0時,說明這個span分配出去的對象全部都還回來了,那么此時就可以將這個span再進一步還給page cache。
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size); // 先算出是哪個桶_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 說明span的切分出去的所有小塊內存都回來了// 這個span就可以再回收給page cache,pagecache可以再嘗試去做前后頁的合并if (span->_useCount == 0){_spanLists[index].Erase(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 釋放span給page cache時,使用page cache的鎖就可以了_spanLists[index]._mtx.unlock(); // 這時把桶鎖解掉PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}
需要注意,如果要把某個span還給page cache,需要先將這個span從central cache對應的雙鏈表中移除,然后再將該span的自由鏈表置空,因為page cache中的span是不需要切分成一個個的小對象的,以及該span的前后指針也都應該置空,因為之后要將其插入到page cache對應的雙鏈表中。但span當中記錄的起始頁號以及它管理的頁數是不能清除的,否則對應內存塊就找不到了。
并且在central cache還span給page cache時也存在鎖的問題,此時需要先將central cache中對應的桶鎖解掉,然后再加上page cache的大鎖之后才能進入page cache進行相關操作,當處理完畢回到central cache時,除了將page cache的大鎖解掉,還需要立刻獲得central cache對應的桶鎖,然后將還未還完對象繼續還給central cache中對應的span。
10. page cache回收內存
如果central cache中有某個span的_useCount減到0了,那么central cache就需要將這個span還給page cache了。
這個過程看似是非常簡單的,page cache只需將還回來的span掛到對應的哈希桶上就行了。但實際為了緩解內存碎片的問題,page cache還需要嘗試將還回來的span與其他空閑的span進行合并。
合并的過程可以分為向前合并和向后合并。如果還回來的span的起始頁號是num,該span所管理的頁數是n。那么在向前合并時,就需要判斷第num-1頁對應span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向前嘗試進行合并,直到不能進行合并為止。而在向后合并時,就需要判斷第num+n頁對應的span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向后嘗試進行合并,直到不能進行合并為止。
因此page cache在合并span時,是需要通過頁號獲取到對應的span的,這就是要把頁號與span之間的映射關系存儲到page cache的原因。
需要注意的是,當通過頁號找到其對應的span時,這個span此時可能掛在page cache,也可能掛在central cache。而在合并時只能合并掛在page cache的span,因為掛在central cache的span當中的對象正在被其他線程使用。
可是我們不能通過span結構當中的_useCount成員,來判斷某個span到底是在central cache還是在page cache。因為當central cache剛向page cache申請到一個span時,這個span的_useCount就是等于0的,這時可能當我們正在對該span進行切分的時候,page cache就把這個span拿去進行合并了,這顯然是不合理的。
所以可以在span結構中再增加一個_isUse成員,用于標記這個span是否正在被使用,而當一個span結構被創建時我們默認該span是沒有被使用的。
struct Span // 管理多個連續頁大塊內存跨度的結構
{PAGE_ID _pageId = 0; // 大塊內存起始頁的頁號size_t _n = 0; // 頁的數量Span* _next = nullptr; // 雙向鏈表的結構Span* _prev = nullptr;size_t _useCount = 0; // 切好小塊內存,被分配給thread cache的計數void* _freeList = nullptr; // 切好的小塊內存的自由鏈表bool _isUse = false; // 是否在被使用
};
當central cache向page cache申請到一個span時,需要立即將該span的_isUse改為true。
當central cache將某個span還給page cache時,也就需要將該span的_isUse改成false。
在合并page cache當中的span時,需要通過頁號找到其對應的span,而一個span是在被分配給central cache時,才建立的各個頁號與span之間的映射關系,因此page cache當中的span也需要建立頁號與span之間的映射關系。
與central cache中的span不同的是,在page cache中,只需建立一個span的首尾頁號與該span之間的映射關系。因為當一個span在嘗試進行合并時,如果是往前合并,那么只需要通過一個span的尾頁找到這個span,如果是向后合并,那么只需要通過一個span的首頁找到這個span。也就是說,在進行合并時我們只需要用到span與其首尾頁之間的映射關系就夠了。
因此申請k頁的span時,如果是將n頁的span切成了一個k頁的span和一個n-k頁的span,除了需要建立k頁span中每個頁與該span之間的映射關系之外,還需要建立剩下的n-k頁的span與其首尾頁之間的映射關系。
Span* PageCache::NewSpan(size_t k) // 獲取一個K頁的span
{assert(k > 0 && k < NPAGES);if (!_spanLists[k].Empty()) // 先檢查第k個桶里面有沒有span{//return _spanLists[k].PopFront(); // 有就直接返回Span* kSpan = _spanLists[k].PopFront();//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}// 檢查一下后面的桶里面有沒有span,如果有->進行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的頭部切一個k頁span下來返回,nSpan再掛到對應映射的位置kSpan->_pageId = nSpan->_pageId; // _pageId類似地址kSpan->_n = k;nSpan->_pageId += k; // 起始頁的頁號往后走nSpan->_n -= k; // 頁數減等k_spanLists[nSpan->_n].PushFront(nSpan); // 將剩下的掛到對應映射的位置//存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}// 走到這說明后面沒有大頁的span了->去找堆要一個128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 轉換為頁號bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k); // 把新申請的span插入后再遞歸調用一次自己(避免代碼重復)
}
此時page cache當中的span就都與其首尾頁之間建立了映射關系,現在就可以進行span的合并了,需要注意的是,在向前或向后進行合并的過程中:
- 如果沒有通過頁號獲取到其對應的span,說明對應到該頁的內存塊還未申請,此時需要停止合并。
- 如果通過頁號獲取到了其對應的span,但該span處于被使用的狀態,那我們也必須停止合并。
- 如果合并后大于128頁則不能進行本次合并,因為page cache無法對大于128頁的span進行管理。
void PageCache::ReleaseSpanToPageCache(Span* span)
{while (true) // 對span前后的頁,嘗試進行合并,緩解內存碎片問題{PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);if (ret == _idSpanMap.end()) // 前面的頁號沒有,不合并{break;}Span* prevSpan = ret->second; // 前面相鄰頁的span在使用,不合并if (prevSpan->_isUse == true){break;}if (prevSpan->_n + span->_n > NPAGES - 1) // 合并出超過128頁的span沒辦法管理,不合并{break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}while (true) // 向后合并{PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}if (nextSpan->_n + span->_n > NPAGES - 1){break;}span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}_spanLists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;
}
在合并span時,由于這個span是在page cache的某個哈希桶的雙鏈表當中的,因此在合并后需要將其從對應的雙鏈表中移除,然后再將這個被合并了的span結構進行delete。
除此之外,在合并結束后,除了將合并后的span掛到page cache對應哈希桶的雙鏈表當中,還需要建立該span與其首位頁之間的映射關系,便于此后合并出更大的span。
測試代碼:
void TestConcurrentAlloc3()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);void* p4 = ConcurrentAlloc(7);void* p5 = ConcurrentAlloc(8);cout << p1 << endl; // 申請的小塊內存正好是連續的cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);ConcurrentFree(p4, 7);ConcurrentFree(p5 ,8);cout << endl;cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;
}void MultiThreadAlloc1()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 6);}
}
void MultiThreadAlloc2()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(16);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 16);}
}
void TestMultiThread()
{std::thread t1(MultiThreadAlloc1);std::thread t2(MultiThreadAlloc2);t1.join();t2.join();
}
至此我們便完成了這三個對象的申請和釋放流程。
11. 大于256KB的內存申請和釋放
11.1 申請
每個線程的thread cache是用于申請小于等于256KB的內存的,而對于大于256KB的內存,我們可以考慮直接向page cache申請,但page cache中最大的頁也就只有128頁,因此如果是大于128頁的內存申請,就只能直接向堆申請了。
當申請的內存大于256KB時,雖然不是從thread cache進行獲取,但在分配內存時也是需要進行向上對齊的,對于大于256KB的內存我們可以直接按頁進行對齊。
之前實現RoundUp函數時,對傳入字節數大于256KB的情況直接做了斷言處理,因此這里需要對RoundUp函數稍作修改。
static inline size_t RoundUp(size_t size) // 返回size對齊以后的值{if (size <= 128) {return _RoundUp(size, 8);}else if (size <= 1024) {return _RoundUp(size, 16);}else if (size <= 8 * 1024) {return _RoundUp(size, 128);}else if (size <= 64 * 1024) {return _RoundUp(size, 1024);}else if (size <= 256 * 1024) {return _RoundUp(size, 8 * 1024);}else{ //大于256KB的按頁對齊return _RoundUp(size, 1 << PAGE_SHIFT);}}
現在對于之前的申請邏輯就需要進行修改了,當申請對象的大小大于256KB時,就不用向thread cache申請了,這時先計算出按頁對齊后實際需要申請的頁數,然后通過調用NewSpan申請指定頁數的span即可。
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES) //大于256KB的內存申請{//計算出對齊后需要申請的頁數size_t alignSize = SizeClass::RoundUp(size);size_t kPage = alignSize >> PAGE_SHIFT;//向page cache申請kPage頁的spanPageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);span->_objSize = size;PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{// 通過TLS,每個線程無鎖地獲取自己專屬的ThreadCache對象if (pTLSThreadCache == nullptr){//pTLSThreadCache = new ThreadCache;static std::mutex tcMtx;static ObjectPool<ThreadCache> tcPool;tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();}//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}
也就是說,申請大于256KB的內存時,會直接調用page cache當中的NewSpan函數進行申請,因此這里要再對NewSpan函數進行改造,當需要申請的內存頁數大于128頁時,就直接向堆申請對應頁數的內存塊。而如果申請的內存頁數是小于128頁的,那就在page cache中進行申請,因此當申請大于256KB的內存調用NewSpan函數時也是需要加鎖的,因為可能是在page cache中進行申請的。
11.2 釋放
當釋放對象時,我們需要判斷釋放對象的大小:
當釋放對象時,需要先找到該對象對應的span,但是在釋放對象時只知道該對象的起始地址。這也就是在申請大于256KB的內存時,也要給申請到的內存建立span結構,并建立起始頁號與該span之間的映射關系的原因。此時就可以通過釋放對象的起始地址計算出起始頁號,進而通過頁號找到該對象對應的span。
static void ConcurrentFree(void* ptr, size_t size)
{if (size > MAX_BYTES) //大于256KB的內存釋放{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
因此page cache在回收span時也需要進行判斷,如果該span的大小是小于等于128頁的,那么直接還給page cache進行了,page cache會嘗試對其進行合并。而如果該span的大小是大于128頁的,那么說明該span是直接向堆申請的,我們直接將這塊內存釋放給堆,然后將這個span結構進行delete就行了。
直接向堆申請內存時我們調用的接口是VirtualAlloc,與之對應的將內存釋放給堆的接口叫做VirtualFree,而Linux下的brk和mmap對應的釋放接口叫做sbrk和unmmap。此時也可以將這些釋放接口封裝成一個叫做SystemFree的接口,需要將內存釋放給堆時直接調用SystemFree即可。
inline static void SystemFree(void* ptr) // 直接將內存還給堆
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else//linux下sbrk unmmap等
#endif
}
12. 使用定長內存池脫離使用new
tcmalloc是要在高并發場景下替代malloc進行內存申請的,因此tcmalloc在實現的時,其內部是不能調用malloc函數的,我們當前的代碼中存在通過new獲取到的內存,而new在底層實際上就是封裝了malloc。
為了完全脫離掉malloc函數,此時之前實現的定長內存池就起作用了,代碼中使用new時基本都是為Span結構的對象申請空間,而span對象基本都是在page cache層創建的,因此可以在PageCache類當中定義一個_spanPool,用于span對象的申請和釋放。
class PageCache
{
public://...
private://...ObjectPool<Span> _spanPool;
};
然后將代碼中使用new的地方替換為調用定長內存池當中的New函數,將代碼中使用delete的地方替換為調用定長內存池當中的Delete函數。
Span* span = _spanPool.New(); //申請span對象_spanPool.Delete(span); //釋放span對象
注意,當使用定長內存池當中的New函數申請Span對象時,New函數通過定位new也是對Span對象進行了初始化的。ConcurrentAlloc接口改一下:
// 通過TLS,每個線程無鎖地獲取自己專屬的ThreadCache對象if (pTLSThreadCache == nullptr){//pTLSThreadCache = new ThreadCache;static std::mutex tcMtx;static ObjectPool<ThreadCache> tcPool;tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
這里我們將用于申請ThreadCache類對象的定長內存池定義為靜態的,保持全局只有一個,讓所有線程創建自己的thread cache時,都在個定長內存池中申請內存就行了。
但注意在從該定長內存池中申請內存時需要加鎖,防止多個線程同時申請自己的ThreadCache對象而導致線程安全問題。
最后在SpanList的構造函數中也用到了new,因為SpanList是帶頭循環雙向鏈表,所以在構造期間我們需要申請一個span對象作為雙鏈表的頭結點。
class SpanList // 帶頭雙向循環鏈表
{
public:SpanList(){_head = _spanPool.New();_head->_next = _head;_head->_prev = _head;}
private:Span* _head;static ObjectPool<Span> _spanPool;
};
由于每個span雙鏈表只需要一個頭結點,因此將這個定長內存池定義為靜態的,保持全局只有一個,讓所有span雙鏈表在申請頭結點時,都在一個定長內存池中申請內存就行了。
13. 釋放對象時優化成不傳對象大小
使用malloc函數申請內存時,需要指明申請內存的大小;而使用free函數釋放內存時,只需要傳入指向這塊內存的指針即可。
目前實現的內存池,在釋放對象時除了需要傳入指向該對象的指針,還需要傳入該對象的大小。
原因如下:
- 如果釋放的是大于256KB的對象,需要根據對象的大小來判斷這塊內存到底應該還給page cache,還是應該直接還給堆。
- 如果釋放的是小于等于256KB的對象,需要根據對象的大小計算出應該還給thread cache的哪一個哈希桶。
如果我們也想做到,在釋放對象時不用傳入對象的大小,那么就需要建立對象地址與對象大小之間的映射。由于現在可以通過對象的地址找到其對應的span,而span的自由鏈表中掛的都是相同大小的對象。
因此可以在Span結構中再增加一個_objSize成員,該成員代表著這個span管理的內存塊被切成的一個個對象的大小。
struct Span // 管理多個連續頁大塊內存跨度的結構
{PAGE_ID _pageId = 0; // 大塊內存起始頁的頁號size_t _n = 0; // 頁的數量Span* _next = nullptr; // 雙向鏈表的結構Span* _prev = nullptr;size_t _useCount = 0; // 切好小塊內存,被分配給thread cache的計數void* _freeList = nullptr; // 切好的小塊內存的自由鏈表bool _isUse = false; // 是否在被使用size_t _objSize = 0; //切好的小對象的大小
};
而所有的span都是從page cache中拿出來的,因此每當我們調用NewSpan獲取到一個k頁的span時,就應該將這個span的_objSize保存下來。
代碼中有兩處,一處是在central cache中獲取非空span時,如果central cache對應的桶中沒有非空的span,此時會調用NewSpan獲取一個k頁的span;另一處是當申請大于256KB內存時,會直接調用NewSpan獲取一個k頁的span。
此時當我們釋放對象時,就可以直接從對象的span中獲取到該對象的大小,準確來說獲取到的是對齊以后的大小。
static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES) //大于256KB的內存釋放{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
讀取映射關系時的加鎖問題
我們將頁號與span之間的映射關系是存儲在PageCache類當中的,當我們訪問這個映射關系時是需要加鎖的,因為STL容器是不保證線程安全的。
對于當前代碼來說,如果我們此時正在page cache進行相關操作,那么訪問這個映射關系是安全的,因為當進入page cache之前是需要加鎖的,因此可以保證此時只有一個線程在進行訪問。
但如果我們是在central cache訪問這個映射關系,或是在調用ConcurrentFree函數釋放內存時訪問這個映射關系,那么就存在線程安全的問題。因為此時可能其他線程正在page cache當中進行某些操作,并且該線程此時可能也在訪問這個映射關系,因此在page cache外部訪問這個映射關系時是需要加鎖的。
實際就是在調用page cache對外提供訪問映射關系的函數時需要加鎖,這里可以考慮使用C++當中的unique_lock,當然也可以用普通的鎖。
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);std::unique_lock<std::mutex> lock(_pageMtx); // 構造時加鎖,析構時自動解鎖auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
14.?多線程環境下對比malloc測試
之前只是對代碼進行了一些基礎的單元測試,下面在多線程場景下對比malloc進行測試。
// ntimes 一輪申請和釋放內存的次數 || nworks 線程數 || rounds 輪次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u個線程并發執行%u輪次,每輪次malloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u個線程并發執行%u輪次,每輪次free %u次: 花費:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u個線程并發malloc&free %u次,總計花費:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}// ntimes 一輪申請和釋放內存的次數 || nworks 線程數 || rounds 輪次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u個線程并發執行%u輪次,每輪次concurrent alloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, (size_t)malloc_costtime);printf("%u個線程并發執行%u輪次,每輪次concurrent dealloc %u次: 花費:%u ms\n",nworks, rounds, ntimes, (size_t)free_costtime);printf("%u個線程并發concurrent alloc&dealloc %u次,總計花費:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}int main()
{size_t n = 1000;cout << "=========================================================" << endl;BenchmarkConcurrentMalloc(n, 5, 10);cout << endl << endl;BenchmarkMalloc(n, 5, 10);cout << "=========================================================" << endl;return 0;
}
在測試函數中,通過clock函數分別獲取到每輪次申請和釋放所花費的時間,然后將其對應累加到malloc_costtime和free_costtime上。最后就得到了,nworks個線程跑rounds輪,每輪申請和釋放ntimes次,這個過程申請所消耗的時間、釋放所消耗的時間、申請和釋放總共消耗的時間。
創建線程時讓線程執行的是lambda表達式,而這里在使用lambda表達式時,以值傳遞的方式捕捉了變量k,以引用傳遞的方式捕捉了其他父作用域中的變量,因此可以將各個線程消耗的時間累加到一起。
將所有線程申請內存消耗的時間都累加到malloc_costtime上, 將釋放內存消耗的時間都累加到free_costtime上,此時malloc_costtime和free_costtime可能被多個線程同時進行累加操作的,所以存在線程安全的問題。鑒于此,在定義這兩個變量時使用了atomic類模板,這時對它們的操作就是原子操作了。
先來測試一下固定大小內存的申請和釋放:(如果運行失敗可以先看下面的調試技巧,這里調試完換到Release測試)
運行后可以看到,malloc的效率還是更高的。
由于此時申請釋放的都是固定大小的對象,每個線程申請釋放時訪問的都是各自thread cache的同一個桶,當thread cache的這個桶中沒有對象或對象太多要歸還時,也都會訪問central cache的同一個桶。此時central cache中的桶鎖就不起作用了,因為我們讓central cache使用桶鎖的目的就是為了,讓多個thread cache可以同時訪問central cache的不同桶,而此時每個thread cache訪問的卻都是central cache中的同一個桶。
下面再來測試一下不同大小內存的申請和釋放:(兩個地方都要改)
在Release下已經超過malloc。
Debug運行后可以看到,相比malloc來說還是差一點。
15. 調試和復雜問題的調試技巧
多線程調試比單線程調試要復雜得多,調試時各個線程之間會相互切換,并且每次調試切換的時機也是不固定的,這就使得調試過程變得非常難以控制。
本人在此項目也在調試花了很多時間,下面舉幾個例子來記錄一下調試過程和技巧。
下面是在上面多線程測試出現一些問題(問題已被本人調試找出,現改回有問題之前):
運行一下:(斷言報錯了)
自己寫的斷言報錯還好,可以定位到報錯的地方,而且這個問題是必現的。
現在根據報錯找到Common.hpp的339行:
此時并不能找到上一步的原因,所以要用到條件斷點來幫助調試:
1、條件斷點
一般情況下我們可以直接運行程序,通過報錯來查找問題。如果此時報的是斷言錯誤,那么我們可以直接定位到報錯的位置,然后將此處的斷言改為與斷言條件相反的if判斷,在if語句里面打上一個斷點,但注意空語句是無法打斷點的,這時隨便在if里面加上一句代碼就可以打斷點了。
運行到條件斷點處后,我們就可以對當前程序進行進一步分析,找出斷言錯誤的被觸發原因。
2、查看函數棧幀
當程序運行到斷點處時,需要對當前位置進行分析,如果檢查后發現當前函數是沒有問題的,這時可能需要回到調用該函數的地方進行進一步分析,此時依次點擊“調試→窗口→調用堆棧”。
點擊調用堆棧后:
雙擊函數棧幀中的其他函數,就可以跳轉到該函數對應的棧幀。
需要注意的是,監視窗口只能查看當前棧幀中的變量。如果要查看此時其他函數棧幀中變量的情況,就可以通過函數棧幀跳轉來查看。
此時知道有函數調用了PopFront,把鼠標移到各個變量發現_head的next和prev都等于_head,所以此時_head是空的,說明邏輯錯了。
此時再跳到上一層棧幀:
此時就知道是手誤了,正確邏輯應該是這樣的:
上面錯誤的代碼就相當于訪問了第0個桶,第0個肯定是空的。
把條件斷點換回斷言再調試運行一下:
又崩了,還是跳到上幾層棧幀:
此時還是找不到問題就要分析了,PopRange出問題了,大概率是Push或者PushRange有問題:
Push單個對象就是一個頭插,怎么看都沒問題,那應該就是PushRange有問題了:
此時再看看ThreadCache里調用PushRange的地方:
此時對比就應該想到是不是第三個參數有問題,所以在PushRange里改一下代碼:
還是驗證+條件斷點:
void PushRange(void* start, void* end, size_t n) // 頭插一段范圍的內存給_freeList{NextObj(end) = _freeList; // 賦值給end的頭4/8字節_freeList = start;// 測試驗證+條件斷點int i = 0;void* cur = start;while (cur){cur = NextObj(cur);++i;}if (n != i){int test = 0;}_size += n; // 這里在參數增加一個個數n}
此時鼠標移到 n 和 i 或者通過自動窗口就能看到 n 和 i 并不能對應上,你說給我的是94個,但我驗證了才有51個,所以還是上一層的問題,應該就是FetchRangeObj的問題,但是是同級的函數跳不到上一層棧幀,此時就把PushRange的調試代碼注釋掉,在FetchRangeObj再打類似的調試代碼:
此時發現還是對不上,問題就在附近了,此時就找了很久就發現span有問題:
最下面的_objSize一個對象的大小是16,一頁的大小是8 * 1024 =?8192,_useCount最大才512,但此時已經到533了。
所以此時就是span切錯了,是GetOneSpan的問題,再往上一層打條件斷點:
如果不看之前的代碼能找到哪里出錯嗎?
此時運行結果是疑似死循環了,再看一個技巧:
3、疑似死循環時中斷程序
當你在某個地方設置斷點后,如果遲遲沒有運行到斷點處,而程序也沒有崩潰,這時有可能是程序進入到某個死循環了。這時我們可以依次點擊“調試→全部中斷”。
這時程序就會在當前運行的地方停下來。
再按F10就發現死循環的原因是 j 出問題了。
此時就發現是少了一句代碼,最后忘記把tail的next置空了:
現在終于找出了兩個問題,前面都是固定申請16字節的大小,現讓申請隨機一點,把n調大一點:
找到斷言錯誤的地方打個斷點然后查看棧幀:
跳到上一層棧幀發現指針錯了:
此時就看_idSpanMap有沒有錯,然后把unordered_map改成map一個個看桶里的數據。
最后就發現了這個問題(第一個問題的時候可能也看出來了):
在NewSpan這直接返回的也要建立和ID的映射:
調試技巧就先講到這了,調試還是在于平時的積累和動手。
16. 性能瓶頸分析
下面分析當前項目的瓶頸在哪里,但這不能簡單的憑感覺,應該用性能分析的工具來進行分析。
VS編譯器中就帶有性能分析的工具的,依次點擊“調試→性能查看器”進行性能分析,注意該操作要在Debug模式下進行。
將代碼中n的值由10000調成了1000,否則該分析過程可能會花費較多時間,并且將malloc的測試代碼進行了屏蔽,因為要分析的是我們實現的高并發內存池。
點擊后再選擇檢測選項查看各個函數的用時時間,然后點擊開始就行了。
17. 使用基數樹進行優化
tcmalloc源碼中還有很多優化,其中之一就實現了基數樹進行優化。
基數樹實際上就是一個分層的哈希表,根據所分層數不同可分為單層基數樹、二層基數樹、三層基數樹等。
單層基數樹實際采用的就是直接定址法,每一個頁號對應span的地址就存儲數組中在以該頁號為下標的位置。
最壞的情況下需要建立所有頁號與其span之間的映射關系,因此這個數組中元素個數應該與頁號的數目相同,數組中每個位置存儲的就是對應span的指針。
下面對tcmalloc源碼中的單層基數樹進行簡單修改:
// Single-level array
template <int BITS>
class TCMalloc_PageMap1
{
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1(){//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS)); // 改成下面三行size_t size = sizeof(void*) << BITS; // 需要開辟數組的大小size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); // 按頁對齊后的大小array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); // 向堆申請空間(頁數memset(array_, 0, sizeof(void*) << BITS); // 對申請到的內存進行清理}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const{if ((k >> BITS) > 0) // k的范圍不在[0, 2^BITS-1]{return NULL;}return array_[k]; // 返回該頁號對應的span}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v; // 建立映射}
};
此時當我們需要建立映射時就調用set函數,需要讀取映射關系時,就調用get函數就行了。
代碼中的非類型模板參數BITS表示存儲頁號最多需要比特位的個數。在32位下我們傳入的是32-PAGE_SHIFT,在64位下傳入的是64-PAGE_SHIFT。而其中的LENGTH成員代表的就是頁號的數目,即2^BITS。
比如32位平臺下,以一頁大小為8K為例,此時頁的數目就是2^32 / 2^13 = 2^19。因此存儲頁號最多需要19個比特位,此時傳入非類型模板參數的值就是32 ? 13 = 19。由于32位平臺下指針的大小是4字節,因此該數組的大小就是2^19 * 4 = 2M,內存消耗不大,是可行的。但如果是在64位平臺下,此時該數組的大小是2^51 * 8 = 2^54 = 2^24 G,這顯然是不可行的,實際上對于64位的平臺,我們需要使用三層基數樹。
這里還是以32位平臺下,一頁的大小為8K為例來說明,此時存儲頁號最多需要19個比特位。而二層基數樹實際上就是把這19個比特位分為兩次進行映射。
比如用前5個比特位在基數樹的第一層進行映射,映射后得到對應的第二層,然后用剩下的比特位在基數樹的第二層進行映射,映射后最終得到該頁號對應的span指針。
在二層基數樹中,第一層的數組占用2^5 * 4 = 2^7 Byte空間,第二層的數組最多占用2^5 * 2 ^14 * 4 = 2M,二層基數樹相比一層基數樹的好處就是,一層基數樹必須一開始就把2M的數組開辟出來,而二層基數樹一開始時只需將第一層的數組開辟出來,當需要進行某一頁號映射時再開辟對應的第二層的數組就行了。
// Single-level array
template <int BITS>
class TCMalloc_PageMap1
{
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1(){//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS)); // 改成下面三行size_t size = sizeof(void*) << BITS; // 需要開辟數組的大小size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); // 按頁對齊后的大小array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); // 向堆申請空間(頁數memset(array_, 0, sizeof(void*) << BITS); // 對申請到的內存進行清理}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const{if ((k >> BITS) > 0) // k的范圍不在[0, 2^BITS-1]{return NULL;}return array_[k]; // 返回該頁號對應的span}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v; // 建立映射}
};
因此在二層基數樹中有一個Ensure函數,當需要建立某一頁號與其span之間的映射關系時,需要先調用該Ensure函數確保用于映射該頁號的空間是開辟了的,如果沒有開辟則會立即開辟。
在32位平臺下,就算將二層基數樹第二層的數組全部開辟出來也就消耗了2M的空間,內存消耗也不算太多,因此可以在構造二層基數樹時就把第二層的數組全部開辟出來。
上面一層基數樹和二層基數樹都適用于32位平臺,而對于64位的平臺就需要用三層基數樹了。三層基數樹與二層基數樹類似,三層基數樹實際上就是把存儲頁號的若干比特位分為三次進行映射。
此時只有當要建立某一頁號的映射關系時,再開辟對應的數組空間,而沒有建立映射的頁號就可以不用開辟其對應的數組空間,此時就能在一定程度上節省內存空間。
因此當我們要建立某一頁號的映射關系時,需要先確保存儲該頁映射的數組空間是開辟好了的,也就是調用代碼中的Ensure函數,如果對應數組空間未開辟則會立馬開辟對應的空間。