文章目錄
- 前言
- 一、threadcache回收內存
- 二、centralcache回收內存
- 三、pagecache回收內存
- 總結
前言
??Hello,我們繼續乘勝追擊
??本篇難度較大,大家要好好學一下
一、threadcache回收內存
- 當某個線程申請的對象不用了,可以將其釋放給 thread cache ,然后 thread cache 將該對象插入到對應哈希桶的自由鏈表當中即可。
- 但是隨著線程不斷的釋放,對應自由鏈表的長度也會越來越長,這些內存堆積在一個 thread cache 中就是一種浪費,我們應該將這些內存還給 central cache ,這樣一來,這些內存對其他線程來說也是可申請的,因此當 thread cache 某個桶當中的自由鏈表太長時我們可以進行一些處理。
- 如果 thread cache 某個桶當中自由鏈表的長度超過它一次批量向 central cache 申請的對象個數,那么此時我們就要把該自由鏈表當中的這些對象還給 central cache 。
// 釋放內存對象
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找出對應的自由鏈表桶將對象插入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 當自由鏈表長度大于一次批量申請的對象個數時就開始還一段list給central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
??當自由鏈表的長度大于一次批量申請的對象時,我們具體的做法就是,從該自由鏈表中取出一次批量個數的對象,然后將取出的這些對象還給 central cache 中對應的 span 即可
// 釋放對象時,鏈表過長時,回收內存回到中心緩存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;// 從list取出批量個對象list.PopRange(start, end, list.MaxSize());// 將取出的對象還給central cache對應的spanCentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
??從上述代碼可以看出, FreeList類 需要支持用 Size函數 獲取自由鏈表中對象的個數,還需要支持用 PopRange 函數從自由鏈表中取出指定個數的對象。因此我們需要給 FreeList類 增加一個對應的 PopRange函數 ,然后再增加一個 _size 成員變量,該成員變量用于記錄當前自由鏈表中對象的個數,當我們向自由鏈表插入或刪除對象時,都應該更新 _size 的值。
// 管理切分好的小對象的自由鏈表
class FreeList
{
public:// 將釋放的對象頭插到自由鏈表void Push(void* obj){assert(obj);//頭插NextObj(obj) = _freeList;_freeList = obj;_size++;}// 從自由鏈表頭部獲取一個對象void* Pop(){assert(_freeList);//頭刪void* obj = _freeList;_freeList = NextObj(_freeList);_size--;return obj;}// 插入一段范圍的對象到自由鏈表void PushRange(void* start, void* end, size_t n){assert(start);assert(end);//頭插NextObj(end) = _freeList;_freeList = start; _size += n;}// 從自由鏈表獲取一段范圍的對象void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);//頭刪start = _freeList;end = start;for (size_t i = 0; i < n - 1;i++){end = NextObj(end);}_freeList = NextObj(end); // 自由鏈表指向end的下一個對象NextObj(end) = nullptr; // 取出的一段鏈表的表尾置空_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList = nullptr; // 自由鏈表size_t _maxSize = 1;size_t _size = 0;
};
??對于 FreeList類 當中的 PushRange 成員函數,我們最好也像 PopRange 一樣給它增加一個參數,表示插入對象的個數,不然我們這時還需要通過遍歷統計插入對象的個數
??因此之前在調用 PushRange 的地方就需要修改一下,而我們實際就在一個地方調用過 PushRange 函數,并且此時插入對象的個數也是很容易知道的。當時 thread cache 從 central cache 獲取了 actualNum 個對象,將其中的一個返回給了申請對象的線程,剩下的 actualNum - 1 個掛到了 thread cache 對應的桶當中,所以這里插入對象的個數就是 actualNum - 1。
// 從中心緩存獲取對象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >= 1); if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(Next(start),end, actualNum - 1); // 修改部分return start;}
}
-
當 thread cache 的某個自由鏈表過長時,我們實際就是把這個自由鏈表當中全部的對象都還給 central cache 了,但這里在設計 PopRange 接口時還是設計的是取出指定個數的對象,因為在某些情況下當自由鏈表過長時,我們可能并不一定想把鏈表中全部的對象都取出來還給 central cache ,這樣設計就是為了增加代碼的可修改性。
-
當我們判斷 thread cache 是否應該還對象給 central cache 時,還可以綜合考慮每個 thread cache 整體的大小。比如當某個 thread cache 的總占用大小超過一定閾值時,我們就將該 thread cache 當中的對象還一些給 central cache ,這樣就盡量避免了某個線程的 thread cache 占用太多的內存。對于這一點,在 tcmalloc 當中就是考慮到了的。(但是對我們來說還是太難了,至少我實在是不會)
二、centralcache回收內存
??當 thread cache 中某個自由鏈表太長時,會將自由鏈表當中的這些對象還給 central cache 中的 span
??還給 central cache 的這些對象不一定都是屬于同一個 span 的。 central cache 中的每個哈希桶當中可能都不止一個 span ,因此當我們計算出還回來的對象應該還給 central cache 的哪一個桶后,還需要知道這些對象到底應該還給這個桶當中的哪一個 span
至于說為什么還給 central cache 的這些對象不一定都是屬于同一個 span 的,答案在這里
至于說為什么需要區分 Span,原因在這里
那么如何根據對象的地址得到對象所在的頁號?
??某個頁當中的所有地址除以頁的大小都等該頁的頁號。比如我們這里假設一頁的大小是100,那么地址0 ~ 99都屬于第0頁,它們除以100都等于0,而地址100 ~ 199都屬于第1頁,它們除以100都等于1
如何找到一個對象對應的span?
??雖然我們現在可以通過對象的地址得到其所在的頁號,但是我們還是不能知道這個對象到底屬于哪一個 span 。因為一個 span 管理的可能是多個頁
??為了解決這個問題,我們可以建立 頁號 和 span 之間的映射。由于這個映射關系在 page cache 進行 span 的合并時也需要用到,因此我們直接將其存放到 page cache 里面。這時我們就需要在 PageCache類 當中添加一個映射關系了,這里可以用 C++ 當中的 unordered_map 進行實現,并且添加一個函數接口,用于讓 central cache 獲取這里的映射關系。
PageCache類當中新增的成員變量及其成員函數
// 單例模式
class PageCache
{
public:// 獲取從對象到span的映射Span* MapObjectToSpan(void* obj);
private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
??每當 page cache 分配 span 給 central cache 時,都需要記錄一下 頁號 和 span 之間的映射關系。此后當 thread cache 還對象給 central cache 時,才知道應該具體還給哪一個 span
??因此當 central cache 在調用 NewSpan 接口向 page cache 申請 k頁 的 span 時, page cache 在返回這個 k頁 的 span 給 central cache 之前,應該建立這 k個頁號 與該 span 之間的映射關系
// 獲取一個K頁的span(加映射版本)
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保證在桶的范圍內// 1.檢查第k個桶里面有沒有Span,有則返回if (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();// 建立頁號與span的映射,方便central cache回收小塊內存查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}// 2.檢查一下后面的桶里面有沒有span,有則將其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的頭部切一個k頁下來kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 將剩下的掛到對應映射的位置_spanLists[nSpan->_n].PushFront(nSpan);for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}}// 3.沒有大頁的span,找堆申請128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申請128頁內存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 頁數// 將128頁span掛接到128號桶上_spanLists[bigSpan->_n].PushFront(bigSpan);// 遞歸調用自己(避免代碼重復)return NewSpan(k);
}
??此時我們就可以通過對象的地址找到該對象對應的 span 了,直接將該對象的地址除以頁的大小得到頁號,然后在 unordered_map 當中找到其對應的 span 即可
// 獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; // 計算頁號auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
??注意:當我們要通過某個頁號查找其對應的 span 時,該頁號與其 span 之間的映射一定是建立過的,如果此時我們沒有在 unordered_map 當中找到,則說明我們之前的代碼邏輯有問題,因此當沒有找到對應的 span 時可以直接用斷言結束程序,以表明程序邏輯出錯
??這時當 thread cache 還對象給 central cache 時,就可以依次遍歷這些對象,將這些對象插入到其對應 span 的自由鏈表當中,并且及時更新該 span 的 _useCount 計數即可
??在 thread cache 還對象給 central cache 的過程中,如果 central cache 中某個 span 的 _useCount 減到 0 時,說明這個 span 分配出去的對象全部都還回來了,那么此時就可以將這個 span 再進一步還給 page cache
//將一定數量的對象還給對應的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock(); // 加鎖while (start){void* next = NextObj(start); // 記錄下一個Span* span = PageCache::GetInstance()->MapObjectToSpan(start);// 將對象頭插到span的自由鏈表NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--; // 更新被分配給thread cache的計數if (span->_useCount == 0) // 說明這個span分配出去的對象全部都回來了{// 此時這個span就可以再回收給page cache,page cache可以再嘗試去做前后頁的合并_spanLists[index].Erase(span);span->_freeList = nullptr; // 自由鏈表置空span->_next = nullptr;span->_prev = nullptr;// 釋放span給page cache時,使用page cache的鎖就可以了,這時把桶鎖解掉_spanLists[index]._mtx.unlock(); // 解桶鎖PageCache::GetInstance()->_pageMtx.lock(); // 加大鎖PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock(); // 解大鎖_spanLists[index]._mtx.lock(); // 加桶鎖}start = next;}_spanLists[index]._mtx.unlock(); // 解鎖
}
??如果要把某個 span 還給 page cache ,我們需要先將這個 span 從 central cache 對應的雙鏈表中移除,然后再將該 span 的自由鏈表置空,因為 page cache 中的 span 是不需要切分成一個個的小對象的,以及該 span 的前后指針也都應該置空,因為之后要將其插入到 page cache 對應的雙鏈表中。但 span 當中記錄的起始頁號以及它管理的頁數是不能清除的,否則對應內存塊就找不到了
??在 central cache 還 span 給 page cache 時也存在鎖的問題,此時需要先將 central cache 中對應的桶鎖解掉,然后再加上 page cache 的大鎖之后才能進入 page cache 進行相關操作,當處理完畢回到 central cache 時,除了將 page cache 的大鎖解掉,還需要立刻加上 central cache 對應的桶鎖,然后將還未還完對象繼續還給 central cache 中對應的 span
三、pagecache回收內存
- 如果 central cache 中有某個 span 的 _useCount 減到 0 了,那么 central cache 就需要將這個 span 還給 page cache 了。
- 這個過程看似是非常簡單的, page cache 只需將還回來的 span 掛到對應的哈希桶上就行了。但實際為了緩解內存碎片的問題, page cache 還需要嘗試將還回來的 span 與其他空閑的 span 進行合并。
??合并的過程可以分為 向前合并 和 向后合并 。如果還回來的 span 的起始頁號是 num ,該 span 所管理的頁數是 n 。那么在向前合并時,就需要判斷第 num-1 頁對應 span 是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向前嘗試進行合并,直到不能進行合并為止。而在向后合并時,就需要判斷第 num+n 頁對應的 span 是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向后嘗試進行合并,直到不能進行合并為止。
??因此 page cache 在合并 span 時,是需要通過頁號獲取到對應的 span 的,這就是我們要把頁號與 span 之間的映射關系存儲到 page cache 的原因
??當我們通過頁號找到其對應的 span 時,這個 span 此時可能掛在 page cache ,也可能掛在 central cache 。而在合并時我們只能合并掛在 page cache 的 span ,因為掛在 central cache 的 span 當中的對象正在被其他線程使用。
??可是我們不能通過 span 結構當中的 _useCount 成員,來判斷某個 span 到底是在 central cache 還是在 page cache 。因為當 central cache 剛向 page cache 申請到一個 span 時,這個 span 的 _useCount 就是等于 0 的,這時可能當我們正在對該 span 進行切分的時候, page cache 就把這個 span 拿去進行合并了,這顯然是不合理的。
??基于上面的分析,我們可以在 span 結構中再增加一個 _isUse 成員,用于標記這個 span 是否正在被使用,而當一個 span 結構被創建時我們默認該 span 是沒有被使用的。
// 管理多個連續頁大塊內存跨度結構
struct Span
{PAGE_ID _pageID = 0; // 大塊內存起始頁的頁號size_t _n = 0; // 頁的數量Span* _next = nullptr; // 雙向鏈表結構Span* _prev = nullptr;size_t _useCount = 0; // 切好的小內存塊,被分配給thread cache的計數void* _freeList = nullptr; // 切好的小塊內存的自由鏈表bool _isUse = false; // 是否在被使用
};
??當 central cache 向 page cache 申請到一個 span 時 (CentralCache::GetOneSpan函數) ,需要立即將該 span 的 _isUse 改為 true
// 獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_isUse = true; // 設置為truePageCache::GetInstance()->_pageMtx.unlock();//...
}
??當 central cache 將某個 span 還給 page cache 時 (PageCache::ReleaseSpanToPageCache函數) ,也就需要將該 span 的 _isUse 改成 false
// 釋放空閑的span回到PageCache,并合并相鄰的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//...span->_isUse = false;
}
??由于在合并 page cache 當中的 span 時,需要通過頁號找到其對應的 span ,而一個 span 是在被分配給 central cache 時,才建立的各個頁號與 span 之間的映射關系,因此 page cache 當中的 span 也需要建立頁號與 span 之間的映射關系。
??與 central cache 中的 span 不同的是,在 page cache 中,只需建立一個 span 的首尾頁號與該 span 之間的映射關系。因為當一個 span 在嘗試進行合并時,如果是往前合并,那么只需要通過一個 span 的尾頁找到這個 span ,如果是向后合并,那么只需要通過一個 span 的首頁找到這個 span 。也就是說,在進行合并時我們只需要用到 span 與其首尾頁之間的映射關系就夠了。
??因此當我們申請 k頁 的 span 時,如果是將 n頁 的 span 切成了一個 k頁 的 span 和一個 n-k頁 的 span ,我們除了需要建立 k頁 span 中每個頁與該 span 之間的映射關系之外,還需要建立剩下的 n-k頁 的 span 與其首尾頁之間的映射關系。
// 獲取一個K頁的span(加映射版本)
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保證在桶的范圍內// 1.檢查第k個桶里面有沒有Span,有則返回if (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();// 建立頁號與span的映射,方便central cache回收小塊內存查找對應的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}// 2.檢查一下后面的桶里面有沒有span,有則將其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的頭部切一個k頁下來kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 將剩下的掛到對應映射的位置_spanLists[nSpan->_n].PushFront(nSpan);for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}}// 3.沒有大頁的span,找堆申請128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申請128頁內存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 頁數// 將128頁span掛接到128號桶上_spanLists[bigSpan->_n].PushFront(bigSpan);// 遞歸調用自己(避免代碼重復)return NewSpan(k);
}
總結
??本篇難度相當大,后面可能還會有類似難度的文章,大家要好好消化!!