目錄
前言:
一,thread cache線程局部存儲的實現
問題引入
概念說明
基本使用
thread cache TLS的實現
?二,Central Cache整體的結構框架
大致結構
?span結構
span結構的實現
三,Central Cache大致結構的實現
單例模式
thread cache向Central Cache申請空間的接口
前言:
在上篇文章中,我們完成了thread chche整體結構的設計。以及項目的整體框架也已經有所了解了。
對于該項目,高并發內存池:主要分為三層結構,thread cache,Central Cache以及Page Cache。對于 thread cache,每個線程獨享一個thread cache,申請資源時,優先找對應的thread cache,其中涉及到內存對齊規則的映射。
本篇會用到的知識:TLS線程局部存儲,單例模式,慢開始反饋調節算法。?
一,thread cache線程局部存儲的實現
現在我們已經實現了thread cache的大致結構:申請空間,釋放空間。
問題引入
但是現在還面臨一個問題:
在多線程環境下,如何讓當前的前程 只看到其對應的thread cache??其他線程的無法看到。也就是如何實現每個線程獨享一個 thread cache對象???
這時就需要使用到Thread Local storage(線程局部存儲),簡稱TLS。
概念說明
線程局部存儲(TLS),是一種變量的存儲方法,這個變量在它所在的線程內是全局可以訪問的,但是不能被其他線程訪問到,這樣就保證了數據的線程獨立性。而熟知的全局變量是所有線程都可以訪問的,這樣就不可避免需要鎖來控制,從而增加了控制成本和代碼復雜度。
基本使用
使用到的函數:TlsAlloc
,TlsSetValue
,TlsGetValue
,TlsFree
當然,在使用線程局部存儲時,除了使用上述Windows提供的API函數,還可以使用 Microsoft VC++ 編譯器提供的如下方法定義一個線程局部變量:
__declspec(thread) int g_mydata =1
?示例:
#include <iostream>
#include <Windows.h>
#include <thread>
__declspec(thread) int g_mydata = 1;
void task1()
{while (true){++g_mydata;Sleep(1000);}
}void task2()
{int n = 10;while (n--){std::cout << "g_mydata=" << g_mydata <<",線程ID為:" << std::this_thread::get_id() << std::endl;}
}//TLS線程局部存儲的使用示例
void testTLS()
{std::thread t1(task1);std::thread t2(task2);t1.join();t2.join();
}int main()
{//TestFiedMemoryPool();testTLS();return 0;
}
可以看到,一個線程 在對該數據進行修改時,另一個線程看到的數據不變。這就是線程局部存儲,每個線程只能看到自己對應的數據,不能看到其他線程的。?
thread cache TLS的實現
現在通過TLS,就可以實現 每個線程獨享一個thread cache,并且其他線程 無法獲取到。
//線程局部存儲::TLS機制
//每個線程只能看到自己的thread cahce
__declspec(thread) threadCache* pTLSThreadCache = nullptr;
剛開始,每個線程啟動時,我們都是通過 thread cache對象來進行申請空間,同時釋放空間的。所以,我們可以再增加兩個接口,申請空間,會先找到對應 的thread cache對象,再調用其申請空間的接口。同樣,釋放空間也是如此。代碼如下:
//相當于對thread cache做了一層封裝
//申請size大小的空間
static void* ConsurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new threadCache;}return pTLSThreadCache->Allocate(size);
}//釋放空間接口
static void ConcurrentDealloc(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
?二,Central Cache整體的結構框架
Central Cache做為該項目第二層的結構,它起到均衡調度的作用。
大致結構
Central Cache的結構和thread cache的結構相似,也使用哈希桶的設計結構 。
如上圖,Central Cache設計的時候,和thread cache的內存對齊規則是一樣的。
為什么要這樣設計???
- 假設thread cache中下標為n的桶為空時,在向下一層申請的時候,由于Central Cache采用相同的規則,所以此時直接去Cental Cache的下標為n的桶的申請。
- Central? Cache為thread cache分配內存空間,如果同時有多個線程來訪問,由于Central Cache是屬于所有線程,所以每個線程在申請內存空間的時候,就會存在線程安全問題,是需要加鎖的。
- 如果兩個線程訪問的是同一個桶,那么就會存在鎖競爭,一個線程申請完了,才能讓另一個線程申請。
- 但是如果兩個線程訪問的是不同的桶,那么就不會存在鎖競爭,可以認為是這兩個線程是并行申請的,效率就會大大提高。
- 所以,Central Cache是需要加鎖訪問的,但是不是整體進行加鎖的。而每個桶擁有一把鎖,訪問同一個桶時才會 存在鎖的競爭。
?span結構
與thread cache結構不同的是:
- thread cahce中,每個桶的后面掛的是一個個的小內存塊。比如按照4Byte對齊,對應桶中都是一個一個4Byte的內存塊(的地址)。
- 而Central Cache是為每一個thread cache分配空間的,所以他所管理的內存塊更大。每個哈希桶中掛的是一個一個的span。所謂span,就是管理以頁為單位的大塊空間。這里一頁的大小按照8KB計算。
- 一個span,可能包含多個頁,也可能包含一個頁。
span如何管理這大塊內存??
自由鏈表!!!沒錯,仍然是按照自由鏈表的方式。將這一大塊內存,切分成很多個小塊內存,然后使用鏈表的形式組織起來!!!如下圖:
?每個span按照對應的對齊規則,將大塊內存切分成對應的小塊內存,并使用自由鏈表組織起來。
所以,對于一個span,可能包含多個內存塊,也可能分配出去了一部分,剩余一部分,也可能全部都分配出去了,剩余為空 。
那么我們如何可以知道某個span中,分配出去多少內存???
- 所以,在 span結構中,我們需要增加一個變量usecount,來記錄有多少內存塊分配出去了。記錄這個變量的目的是,當span這個結構完全被還回來的時候,我們就可以將它還給下一層了。
- 所以,當上層thread cache申請內存塊的時候,就讓對應span的usecount++。當上層thread cache歸還內存塊的時候,就讓對應span的usecount--。
- 當usecount=0時,說明這個span的內存塊已經全部還回來了。那么此時就可以將該內存塊 返回給下一層 了。
- 當將span向一層返回的時候,在Central Cache中,就需要將對應哈希桶中對應的span刪除。如果哈希桶中的span按照單鏈表的形式存儲,刪除操縱會比較麻煩。所以我們可以設置成雙向鏈表的結構,刪除操作的時間復雜度是O(1)
span結構的實現
通過上述部分,了解到Central Cache的大致框架后,接下來,就是各部分的代碼實現。
要實現Central Cache的結構,首先就是對span結構的實現。
span——管理以頁為單位的大塊內存,每個span包含頁的個數不同,我們需要記錄一個 span有多少頁,變相的就記錄了這大塊內存的大小。同時還需要記錄起始頁號。
這里一頁按照8KB來計算 。
- 如果是在32位環境下,內存大小為2^32,也就是4GB。總頁數=4GB/8KB=2^32/2^13=2^19,大約一共有50多萬頁,使用int 可以存儲。
- 如果是在64位環境 下,內存大小為2^64,總頁數=2^64/2^13=2^51,這時候使用int就存不下了。?
- 為了解決這種問題,可以使用條件編譯,如果是32位環境,使用int。如果是64位,使用long long。
- 但是需要注意的是,在WIN32配置下,_WIN32有定義,_WIN64沒有定義。在_WIN64配置下,_WIN32和_WIN64的定義都有。
//管理以頁為單位的大塊空間
struct span
{size_t _pageID;//該大塊空間的起始頁號size_t _n = 0;//頁的數量span* _next = nullptr;//雙向鏈表的結構span* _prev = nullptr;size_t _usecount = 0;//切好的小塊內存,分配給thread cache的個數void* _freelist = nullptr;//管理切分好的小對象
};//Central Cache的每個哈希桶中保存的是span組成的鏈表
class SpanList
{
public:SpanList(){_head = new span;_head->_next = _head;_head->_prev = _head;}//在指定span前插入一個void Insert(span* pos, span* newspan){assert(pos);assert(newspan);span* prev = pos->_prev;//prev newspan posnewspan->_prev = prev;newspan->_next = pos;pos->_prev = newspan;}//從鏈表中刪除指定的某個spanvoid Erase(span* pos){assert(pos);assert(pos != _head);span* prev = pos->_prev;span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:span* _head=nullptr;//鏈表的頭指針
public:std::mutex _mtx;//桶鎖
};
三,Central Cache大致結構的實現
Central Cache也是一個哈希桶結構,和thread cache采用一樣的內存對齊規則。每個桶下面掛的是一個一個的span,而每個span內部也有一個 鏈表,掛的是切分好的小塊內存。
單例模式
對于thread cache,它是每個線程獨享的,每個線程只能看到自己的thread cache對象。
對于Central Cache,它是所有線程共享的。我們不希望未來有多個Central Cache,保證整個進程中只有一個Central Cache。所以我們可以通過單例模式來實現。
單例模式是一種設計模式,確保一個類只有一個實例,并提供一個全局訪問點。
//Central Cache的結構和Thread Cache的結構相似
//Central Cache的哈希桶中掛的是一個個的span
//實現成單例模式
class CentralCache
{
public://獲取單例對象static CentralCache* GetInstance(){return &_sInst;}//從 Central Cache獲取一定數量的對象給thread cache//start,end為輸出型參數,n表示希望獲得的內存塊個數,byte_size表示對應的內存塊的大小size_t FetchRangeObj(void*& start, void*& end, size_t n, size_t byte_size);//禁用構造,拷貝構造,賦值重載CentralCache() = delete;CentralCache(const CentralCache&) = delete;CentralCache operator=(const CentralCache&) = delete;
private:SpanList _spanlists[NFREELISTS];static CentralCache _sInst;
};
thread cache向Central Cache申請空間的接口
當某個線程申請內存空間,當對應的桶為空時,需要向Central Cache申請。
比如一個線程來向Central Cache申請8字節的內存,Central Cache一定會分配多個8字節的內存塊。
多余的會讓thread cache保存,下次再申請時,就直接找thread cache,因為訪問thread cache是無所的,申請內存能更快。那么Central Cache應該給返回對少個內存塊???
方法:慢開始反饋調節算法
1,按照申請的內存大小來決定返回多少個內存塊。
如果申請的內存比較小,比如5字節,我們可以多給幾個,比如分配給50字節,返回10個 內u才能塊。如果申請的內存空間比較大,比如256KB,就不能返回的太多,返回2個或者3個內存塊。
所以,當申請的內存塊大小為n時,我們需要知道最多給它分配多少個,也就是它的上限。
2,按照使用內存的是否頻繁,決定返回多少個內存塊
如果給的太多,可能很多都用不上。如果給的太少,可能會導致該線程頻繁的找Central Cache申請內存。
- 線程之所以會找Central Cache申請空間,無疑是thread cache對應桶的內存用完了。
- thread cahce有很多的桶,當頻繁的為某個桶申請內存時,說明這個桶用的很頻繁,我們就一次多給,比如給2倍或者3倍。
- 但是,如何知道一個桶使用的是否頻繁呢???我們可以對每個桶,也就是每個自由鏈表,在自由鏈表中增加一個變量maxSize=1,表示是否頻繁申請。當這個桶第一次向Central Cache申請內存時,就給一塊內存,然后讓這個桶的maxSize+1,下次申請的時候 ,就給2塊,依次類推......也可以將+1換成+2或者+3,這樣增長的速度就會變快。當然,這里不能一直+,會有上限的。
?結合這兩種情況,計算出的結果,取一個最小值,就是最后應該分配的內存塊的個數。
//自由鏈表中頭插一段區間
//start,end
void pushRange(void* start, void* end)
{NextObj(end) = _freelist;_freelist = start;
}
//向Central Cache申請內存
//index表示對應的哈希桶的下標
void* threadCache::FetchMemoryFromCental(size_t index, size_t size)
{//首先計算需要獲取多少個內存塊//慢開始反饋調節算法size_t batchNum = min(SizeClass::NumMoveSize(size), _freelists[index].MaxSize());//保證batchNum不超過上限if (_freelists[index].MaxSize() == batchNum){_freelists[index].MaxSize()++;}void* start = nullptr;void* end = nullptr;//調用Central Cache接口,返回獲取到的內存塊的個數//start和end是輸出型參數,表示 得到的內存塊的起始地址和結束地址//這里actual表示實際得到的內存塊的個數// 因為Central Cache的內存塊可能不夠batchNum個,只是將所有的都返回了size_t actual = CentralCache::GetInstance()->FetchRangeObj(start,end,batchNum,size);assert(actual > 1);//如果只返回了一個內存塊,將該內存塊直接返回給上層使用if (actual == 1){assert(start == nullptr);return start;}else{//先將start+1到end范圍的內存塊,保存在對應的哈希桶中//再將start返回給上層使用_freelists[index].pushRange(NextObj(start), end);return start;}
}
接下來就是要完成 Central Cache給thread cache分配內存的接口了。也就是FetchRangeObj(start,end,batchNum,size)的接口了。
Central Cache的結構如下圖 :每個span管理的是以頁為單位的大塊內存。一頁的大小是8KB。同時每個span內部是切分好的小塊內存,以鏈表的形式管理起來。
?我們現在已經計算出:thread cache找Central Cache申請內存時,Central Cache應該分配batchNum個內存塊給thread cache。
也就是從對應的哈希桶的某個span中切出batchNum個內存塊。但是由于可能之前有多個線程來申請,導致現在有的span為空,有的span有內存塊,但是可能不夠batchNum個。所以我們實際給的個數可能小于期望獲得的個數的。
實現思路:首先找到對應的哈希桶,遍歷spanlist鏈表,找到一個 非空的span。spanlist鏈表是雙向循環帶頭鏈表,為了方便遍歷,我們可以使用類似于迭代器的實際思路,封裝一層。代碼如下:
注意:在查找spanlist獲取一個非空的span時,可能整個spanlist都為空,此時就需要向下一層Page Cache申請。(這部分代碼先不實現,Page Cache實現之后完成該部分)。
span* Begin(){return _head->_next;}span* End(){return _head;}
獲取到span之后,就可以遍歷span中的_freelist,從中申請batchNum個內存塊,如果不夠,有多少申請多少。如下圖所示:
?上述情況是span中內存塊的個數足夠,可能存在不夠的情況,所以在end向后移動 的時候,需要判斷end不能為空。
//從對應的哈希桶,也就是spanlist中,獲取一個非空的span
span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...return nullptr;
}//從 Central Cache獲取一定數量的對象給thread cache
//start,end為輸出型參數,n表示希望獲得的內存塊個數,size表示對應的內存塊的大小
//返回值表示實際獲得的內存塊的個數
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{//申請的內存塊大小為size,先找到對應的哈希桶size_t index = SizeClass::Index(size);//多線程可能會訪問同一個桶,需要加鎖_spanlists[index]._mtx.lock();//在對應的桶中找到一個非空的spanspan* sp = CentralCache::GetInstance()->GetOneSpan(_spanlists[index], size);assert(sp);assert(sp->_freelist);//從sp中獲取n個內存塊//start指向第一個內存塊,end指向最后一個內存塊start = sp->_freelist;end = start;//end向后走n-1步,執行最后一個內存塊,但是可能不夠n個,需要判空size_t i = 0;size_t actualNum = 1;//記錄實際獲取到的內存塊的個數while (end!=nullptr&&i < n - 1){end = NextObj(end);i++;}//_freelist指向end的下一個內存塊sp->_freelist = NextObj(end);//將end與下一個內存塊斷開連接NextObj(end) = nullptr;_spanlists[index]._mtx.unlock();//返回實際獲得的內存塊的個數return actualNum;
}
源碼:
ConcurrentMemoryPool · 小鬼/高并發內存池 - 碼云 - 開源中國?
本節完!!!