?個人主頁:?熬夜學編程的小林
💗系列專欄:?【C語言詳解】?【數據結構詳解】【C++詳解】【Linux系統編程】【Linux網絡編程】【項目詳解】
目錄
1、pagecache
1.1、整體設計
1.2、核心實現
1.3、獲取Span
1.3.1、獲取一個非空的Span
1.3.2、獲取一個K頁的span
1.3.3、加鎖版獲取非空span?
1、pagecache
1.1、整體設計
page cache 與 central cache結構的相同之處
central cache的結構與thread cache 是相同的,都是哈希桶結構,并且page cache的每個哈希桶中掛的也是一個個的span,這些span也是按照雙鏈表的結構鏈接起來的。
page cache 與 central cache結構的不同之處
1、page cache與 central cache的映射規則不同(包括thread cache)。page cache的哈希桶映射規則采用的是直接定址法,比如1號桶掛的都是1頁的span,2號桶掛的都是2頁的span,一次類推。
2、central cache每個桶中的span被切成一個個對應大小的對象,以供thread cache申請。而page cache當中的span是沒有被進一步切小的,因為page cache服務的是central cache,當central cache沒有span時,向page cache申請的是某一固定頁數的span,而如何切分申請到的這個span由central cache自己來決定。
page cache當中究竟有多少個桶,這就要看你最大想掛多大頁的span了,我們這里就最大掛128頁的span,為了讓桶號與頁號對應起來,我們可以將第0號桶空出來,因此我們需要將哈希桶的個數設置成129。
// page cache中哈希桶的個數
static const size_t NPAGES = 129;
為什么這里最大掛128頁的span呢?
因為線程申請單個對象最大是256KB,而128頁可以被切成4個256KB的對象,因此是足夠的。當然,如果你想在page cache中掛更大的span也是可以的,根據具體的需求進行設置即可。
在page cache中獲取一個n頁的span
1、如果central cache要獲取一個n頁的span,那我們就可以在page cache的第n號桶中取出一個span返回給central cache即可。
2、如果第n號桶中沒有span,這時我們并不是直接向堆申請一個n頁的span,而是繼續在后面的桶中尋找span。
- 當第n號桶中沒有span時,我們可以找第n+1號桶,因為我們可以將第n+1頁的span分成一個n頁的span和一個1頁的span,這時我們就可以將n頁的span返回,而將切分后的1頁掛接到1號桶中。
- 當后面的桶當中都沒有span,這時我們就只能向堆申請一個128頁的內存塊,并將其用一個span結構管理起來,然后將128頁的span切分成n頁的span和128-n頁的span,其中n頁的span返回給central cache,而128-n頁的span掛到第128-n號桶中。
- 注意:當我們直接向堆申請以頁為單位的內存時,我們應該盡量申請大塊一點的內存塊,因為此時申請到的內存是連續的,當線程需要內存時我們可以將其切小后分配給線程,當線程將內存釋放后我們又可以將其合并成大塊連續的內存。?
實際上,我們每次向堆申請的都是128頁大小的內存塊,central cache要的這些span實際都是由128頁的span切分出來的。
1.2、核心實現
1、當每個線程的thread cache沒有內存時都會向central cache申請,此時多個線程的thread cache如果訪問的不是central cache的同一個桶,那么這些線程是可以同時進行訪問的。這時central cache的多個桶可能同時向page cache申請內存,索引page cache也是存在線程安全問題的,因此在訪問page cache時也必須要加鎖。
- 但是page cache這里不能使用桶鎖,因為當central cache向page cache申請內存時,page cache可能會將其他桶當中大頁的span切小再給central cache。此外,當central cache將某個span歸還給page cache時,page cache也會嘗試將該span與其他桶當中的span進行合并。
- 換句話說,在訪問page cache時,我們可能需要訪問page cache中的多個桶,如果page cache用桶鎖就會出現大量頻繁的加鎖和解鎖,導致程序的效率低下。因此我們在訪問page cache時沒有使用桶鎖,而是用一把大鎖將整個page cache鎖住。
2、page cache在整個進程中只能存在一個,因此我們也需要將其設置為單例模式。
// 單例模式
class PageCache
{
public:// 提供一個全局訪問點static PageCache* GetInstance(){return &_sInst;}std::mutex _pageMtx; // 大鎖,設置公有方便調用
private:SpanList _spanLists[NPAGES];// C++98,構造函數私有PageCache(){}// C++11,禁止拷貝PageCache(const PageCache&) = delete;static PageCache _sInst;
};
當程序運行起來后我們就立馬創建該對象即可。?
// 類外初始化靜態成員
PageCache PageCache::_sInst;
1.3、獲取Span
1.3.1、獲取一個非空的Span
thread cache向central cache申請對象時,central cache需要先從對應的哈希桶中獲取到一個非空的Span,然后從這個非空的Span中取出指定大小的對象返回給thread cache。那central cache到底是如何從對應的哈希桶中獲取到一個非空的Span的呢?
1、遍歷central cache對應哈希桶中的雙鏈表,如果該雙鏈表中有非空的Span,那么直接將Span返回即可。為了方便遍歷這個雙鏈表,我們可以模擬迭代器的方式,給SpanList類提供Begin和End成員函數,分別獲取雙鏈表的第一個Span結點和最后一個Span結點的下一個結點,也就是頭結點。
// 管理Span的鏈表結構
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}
private:Span* _head; // 雙向鏈表頭結點指針
public: std::mutex _mtx; // 桶鎖,方便類外訪問
};
2、但如果遍歷雙鏈表后發現雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請內存塊了。
那具體是向page cache申請多大的內存塊呢?
我們可以根據具體所需對象的大小來決定,就像之前我們根據對象的大小計算出,thread cache一次向central cache申請對象的個數上限,現在我們是根據對象的大小計算出,central cache一次應該向page cache申請幾頁的內存塊。
我們可以先根據對象的大小計算出,thread cache一次向central cache申請對象的個數上限,然后將這個上限值乘以單個對象的大小,就算出了具體需要多少字節,最后再將這個算出來的字節數轉換為頁數,如果轉換后不夠一頁,那么我們就申請一頁,否則轉換出來是幾頁就申請幾頁。也就是說,central cache向page cache申請內存時,要求申請到的內存盡量能夠滿足thread cache向central cache申請時的上限。
class SizeClass
{
public: // central cache一次向page cache獲取多少頁static size_t NumMovePage(size_t size){// 計算出thread cache一次向central cache申請對象的個數上限size_t num = NumMoveSize(size); // num 個size 大小的對象所需的字節數size_t nPage = num * size;nPage >>= PAGE_SHIFT; // 將字節數轉換為頁數if (nPage == 0)nPage = 1; // 至少給一頁return nPage;}
};
- PAGE_SHIFT代表頁大小轉換偏移,我們這里以頁的大小為8K為例,PAGE_SHIFT的值就是13。
// 頁大小轉換偏移,即一頁定義為2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;
- 注意:當central cache申請到若干頁的span后,還需要將這個span切成一個個對應大小的對象掛到該span的自由鏈表當中。
如何找到一個span所管理的內存塊呢?
首先需要計算出該span的起始地址,我們可以用這個span的起始頁號乘以一頁的大小即可得到這個span的起始地址,然后用這個span的頁數乘以一頁的大小就可以得到這個span所管理的內存塊的大小,用起始地址加上內存塊的大小即可得到這塊內存塊的結束位置。
明確了這塊內存的起始和結束位置后,我們就可以進行切分了。根據所需對象的大小,每次從大塊內存切出一塊固定大小的內存塊尾插到span的自由鏈表中即可。
為什么是尾插呢?
因為我們如果是將切好的對象尾插到自由鏈表,這些對象看起來是按照鏈式結構鏈接起來的,而實際它們在物理上是連續的,這時當我們把這些連續內存分配給某個線程使用時,可以提高該線程的CPU緩存利用率。
// 獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 1.先在list中尋找非空的SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 2.list中沒有非空的Span,只能向page cache申請Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));// 3.計算span大塊內存的其實地址和大塊內存的大小(字節數)char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址size_t bytes = span->_n << PAGE_SHIFT; // 內存大小char* end = start + bytes; // 結束地址// 4.把大塊內存切成自由鏈表鏈接起來// a.先切一小塊做頭,方便尾插(定義一個尾結點指針)span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1; // 用于調試while (start < end){++i;Next(tail) = start;tail = Next(tail); // tail = startstart += size;}Next(tail) = nullptr; // 將尾結點指向空// b.切好Span以后,將切好的Span掛到桶里面(頭插)list.PushFront(span);return span;//return nullptr;
}
1.在list中尋找非空的Span?
2.申請內存和計算起始地址
3.把大塊內存切成自由鏈表鏈接起來?
注意:當我們把span切分好之后,需要將這個切分好的span掛接到central cache對應的哈希桶中。因此SpanList類需要提供一個將span插入到雙鏈表的接口。此處我們選擇頭插,這樣當central cache下一次從該雙鏈表獲取非空span時,一來就能找到。
由于SpanList類已經實現了Insert和Begin函數,這里實現雙鏈表的頭插直接在雙鏈表的Begin位置進行Insert(在頭結點插入數據)即可。
// 管理Span的鏈表結構
class SpanList
{
public:// 頭插Spanvoid PushFront(Span* span){Insert(Begin(), span); // Begin之前插入span}
private:Span* _head; // 雙向鏈表頭結點指針
public: std::mutex _mtx; // 桶鎖,方便類外訪問
};
1.3.2、獲取一個K頁的span
當我們調用上述的GetOneSpan從central cache的某個哈希桶獲取一個非空的span時,如果遍歷哈希桶中的雙鏈表后發現雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請若干頁的span了,下面我們來分析如何從page cache獲取一個k頁的span。
1、因為page cache是直接按照頁數進行映射的,因此我們要從page cache獲取一個k頁的span,就應該 直接先去找page cache的第k號桶, 如果第k號桶中有span,那我們直接頭刪一個span返回給central cache就行了。所以我們這里需要再給 SpanList類添加對應的Empty和PopFront函數。
// 管理Span的鏈表結構
class SpanList
{
public:// 判斷是否為空bool Empty(){return _head == _head->_next;}// 頭刪SpanSpan* PopFront(){Span* front = _head->_next;Erase(front);return front;}
private:Span* _head; // 雙向鏈表頭結點指針
public: std::mutex _mtx; // 桶鎖,方便類外訪問
};
?2、如果page cache的第k號桶中沒有span,我們就應該繼續找后面的桶,只要后面任意一個桶中有一個n頁的span,我們就可以將其切分成一個k頁的span和一個n-k頁的span,然后將切出來k頁span返回給central cache,再將n-k頁的span掛接到page cache的第n-k號桶中。
3、如果后面的桶中也都沒有span,此時我們需要向堆申請一個128頁的span了,在向堆申請內存時,直接調用我們封裝的SystemAlloc函數即可。
注意:向堆申請內存后得到的是這塊內存的起始地址,此時我們需要將該地址轉換為頁號。由于我們向堆申請內存時都是按照頁進行申請的,因此我們直接將該地址除以一頁的大小即可得到對應的頁號。
// 獲取一個K頁的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保證在桶的范圍內// 1.檢查第k個桶里面有沒有Span,有則返回if (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}// 2.檢查一下后面的桶里面有沒有span,有則將其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的頭部切一個k頁下來kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 將剩下的掛到對應映射的位置_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}// 3.沒有大頁的span,找堆申請128頁的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申請128頁內存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 頁數_spanLists[bigSpan->_n].PushFront(bigSpan);// 遞歸調用自己(避免代碼重復)return NewSpan(k);
}
1.第k個桶中有Span
2.第k個桶后面有Span?
3.沒有大頁的span,找堆申請128頁的span?
說明:當我們向堆申請到128頁的span后,需要將其切分成k頁的span和128-k頁的span,但是為了盡量避免出現重復的代碼,此處最好不要再編寫對應的切分代碼。我們可以先將申請到的128頁的span掛接到page cache對應的哈希桶中,然后再遞歸調用該函數即可,此時在往后找span時就一定會在第128號桶找到該span,然后進行切分。
有一個問題:當central cache向page cache申請內存時,central cache對應的哈希桶是處于加鎖的狀態的,那在訪問page cache之前我們應不應該把central cache對應的桶鎖解掉呢?
這里建議在訪問page cache前,先把central cache對應的桶鎖解掉。雖然此時central cache的這個桶當中是沒有內存供其他thread cache申請的,但thread cache除了申請內存還會釋放內存,如果在訪問page cache前將central cache對應的桶鎖解掉,那么此時當其他thread cache想要歸還內存到central cache的這個桶時就不會被阻塞。
因此在調用NewSpan函數之前,我們需要先將central cache對應的桶鎖解掉,然后再將page cache的大鎖加上,當申請到k頁的span后,我們需要將page cache的大鎖解掉,但此時我們不需要立刻獲取到central cache中對應的桶鎖。因為central cache拿到k頁的span后還會對其進行切分操作,因此我們可以在span切好后需要將其掛到central cache對應的桶上時,再獲取對應的桶鎖。
1.3.3、加鎖版獲取非空span?
// 獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// 1.先在list中尋找非空的SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}// 先把central cache的桶鎖解掉,這樣如果其他線程釋放內存對象回來,不會阻塞list._mtx.unlock();// 2.list中沒有非空的Span,只能向page cache申請PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();// 3.計算span大塊內存的其實地址和大塊內存的大小(字節數)char* start = (char*)(span->_pageID << PAGE_SHIFT); // 起始地址size_t bytes = span->_n << PAGE_SHIFT; // 內存大小char* end = start + bytes; // 結束地址// 4.把大塊內存切成自由鏈表鏈接起來// a.先切一小塊做頭,方便尾插(定義一個尾結點指針)span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1; // 用于調試while (start < end){++i;Next(tail) = start;tail = Next(tail); // tail = startstart += size;}Next(tail) = nullptr; // 將尾結點指向空// b.切好Span以后,將切好的Span掛到桶里面(頭插),再加鎖list._mtx.lock();list.PushFront(span);return span;//return nullptr;
}