Utils系列之內存池(Fixed size)

內存池核心介紹

廢話不多說,show you code.

我實現了兩套內存池,一個是固定大小的內存池,一個是多重不同大小的內存池。

Fixed size memory pool

在這里插入圖片描述

設計思路:

我們一個個看,首先我們定義了一個chunk, chunk 里面包含了n個block, 每個block都存放著一個data數據和一個next的指針,這個指針指向下一個block,形成一個鏈表。我們分配的時候是以一個個chunk來分配的,我們會把空閑的block 存放在freelist 里面。所以每次分配的時候都會先從freelist 中查看是否有空閑的block, 如果沒有,再分配一個chunk。 但是呢,我們內存池考慮到thread-safe, 又要考慮到性能,所以使用了無鎖操作,也就是原子操作(atomic)。 但是實際下來還是性能比較差,所以進一步優化使用threadcache, 也就是每個thread_local。這樣就保證了每個thread都有自己的一份cache, 每次先分配的內存的時候就先從cache 分配,分配完再從freelist 中取。最后我們會把每個chunk數據push到LockFreeStack 里面管理,避免數據泄露。

LockFreeStack

無鎖棧代碼比較簡單:

  • 我們首先定義鏈表的節點Node,這個類是個模板類型的template, T就是數據的格式,next是指向下個節點的指針。Node的構造函數聲明了一個可變參數模板,允許接受任意數量和類型的參數,并使用std::forward 完美轉發到data成員的構造函數。
struct Node {T data;std::atomic<Node*> next;template<typename... Args>Node(Args&&... args): data(std::forward<Args>(args)...), next(nullptr) {}
};
std::atomic<Node*> head;
  • push 操作:

    • 使用std::move(item)將傳入的item值移動到新節點,避免不必要的復制, 并且利用了Node類中的完美轉發構造函數
    • 使用std::memory_order_relaxed原子加載當前的頭節點, 目前沒有同步需求。
    • 使用頭插法把新節點插入到當前節點的前一個,這里循環條件使用CAS(Compare-And-Swap)操作嘗試更新頭節點
      • compare_exchange_weak會比較headoldNode是否相等
      • 如果相等,則將head設為newNode并返回true,循環結束
      • 如果不相等(可能其他線程修改了head),則將oldNode更新為當前的head值并返回false,循環繼續
    void push(T item) {auto* newNode = new Node(std::move(item));Node* oldNode = head.load(std::memory_order_relaxed);do{newNode->next.store(oldNode, std::memory_order_relaxed);} while(!head.compare_exchange_weak(oldNode, newNode, std::memory_order_release, std::memory_order_relaxed));
    }
    
  • pop操作:

    • 讀取當前棧頂節點指針,使用寬松內存序
    • 如果頭節點為nullptr,表示棧為空,返回false
    • 否則:使用CAS操作原子地更新頭節點:
      • 比較當前頭節點是否仍為oldNode
      • 若相同,將頭節點更新為oldNode->next并返回true
      • 若不同(其他線程已修改頭節點),則更新oldNode為當前頭節點值并返回false
    • 成功更新頭節點后,將原頭節點的數據移動到結果變量
    • 釋放原頭節點內存
    • 返回操作成功
    bool pop(T& result) {Node* oldNode = head.load(std::memory_order_relaxed);if (oldNode) {if (head.compare_exchange_weak(oldNode, oldNode->next.load(std::memory_order_relaxed), std::memory_order_release, std::memory_order_relaxed)) {result = std::move(oldNode->data);delete oldNode;return true;}}return false;
    }
    

    這里我們為什么使用compare_exchange_weak 而不是compare_exchange_strong 呢?

    • compare_exchange_weak允許在某些平臺上"假失敗"(spurious failure),即使比較的值實際相等,但在實現上可能會返回失敗。這種設計讓它能夠:
      • 在某些CPU架構上有更高的性能
      • 避免昂貴的內存柵欄(memory barrier)操作
      • 特別在ARM等架構上實現更高效
    • compare_exchange_strong: 保證只在實際值不等于期望值時返回失敗, 如果失敗需要重試,性能開銷比較大。
    • compare_exchange_weak: 可能出現假失敗,但在循環中使用時,這種差異不影響結果正確性

    LockFreeFixedSizePool

    無鎖的固定大小內存池。

    • 首先看下我們的block的定義,這個結構體包含三個主要部分:

      1. alignas(T)指令:確保Block結構體與T類型有相同的對齊要求,防止潛在的內存對齊問題
      2. data數組
        • 類型:std::byte數組,大小為sizeof(T)
        • 作用:存儲T類型對象的原始內存
        • 布局:位于Block結構體起始位置
      3. next指針
        • 類型:std::atomic<Block*>
        • 作用:指向鏈表中的下一個Block,用于構建無鎖鏈表
        • 初始值:nullptr
      4. as_object()方法
        • 功能:將data數組轉換為T類型指針
        • 實現:使用reinterpret_cast進行類型轉換
        • 返回:指向data內存區域的T*指針
      5. 當對象被釋放時,Block不會被系統回收,而是返回到池中供后續使用。每個Block作為無鎖鏈表的節點,通過next原子指針鏈接。 將data數組放在結構體起始位置,使得T對象指針可直接轉換為Block指針。 當用戶從內存池獲取內存時,用戶拿到的是指向data數組的T*指針。因為data位于Block結構體的起始位置,并且通過alignas(T)保證了對齊要求,所以可以安全地在deallocate時,將用戶的T*指針轉回Block*指針。
      struct alignas(T) Block {std::byte data[sizeof(T)]; // 每個block的大小為Tstd::atomic<Block*> next{nullptr};T* as_object() { return reinterpret_cast<T*>(data);}
      };
      
    • chunk的結構體有兩個成員:

      1. blocks: 一個指向 Block 數組的智能指針,使用 std::unique_ptr<Block[]> 管理內存
      2. count: 表示這個 Chunk 中包含的 Block 數量.
      3. Chunk 實現了批量內存分配策略:
        • 一次性分配多個 Block 對象
        • 默認情況下,N = 4096,單個 Block 大小為 sizeof(Block)
        • 實際分配的 Block 數量由 BLOCK_PER_CHUNK = N / sizeof(Block) 決定
      4. std::unique_ptr<Block[]>Chunk 提供了自動內存管理:
        • Chunk 被銷毀時,blocks 指向的內存會被自動釋放
        • 避免了手動內存管理的復雜性和潛在的內存泄漏
      5. 構造函數接收一個 block_count 參數,并完成兩個操作:
        1. 分配指定數量的 Block 對象:std::make_unique<Block[]>(block_count)
        2. 存儲塊數量:count(block_count)
      struct Chunk { // 每個chunk包含N個blockstd::unique_ptr<Block[]> blocks;size_t count;Chunk(size_t block_count):blocks(std::make_unique<Block[]>(block_count)), count(block_count) {}
      };
      
    • 再看下我們的ThreadCache的結構體的實現。ThreadCache是一個線程局部內的緩存系統,用于優化無鎖內存池的性能。它主要為每個線程提供本地內存緩存,減少全局競爭,提高分配效率。

      • 這里為什么把緩存的block塊設置為32, 并且每次取的大小設置為16。
        • 減少競爭:每次獲取多個塊,減少對全局空閑列表的訪問頻率。
        • 適中開銷: 16個塊是內存和性能的平衡點,既不會一次占用過多內存,又能有效減少同步操作
        • 批量效率:填充緩存時批量獲取更高效,如在后面的fill_local_cache()中使用。
      • 這里我們為什么需要一個pool_instance 的指針。因為我們在內存歸還給全局freelist, 所以這里我們傳遞一個指向對象的指針。
      struct ThreadCache {Block* blocks[32];int count = 0;static constexpr size_t BATCH_SIZE = 16; // 批量獲取塊的數量LockFreeFixedSizePool<T, N>* pool_instance = nullptr;~ThreadCache() {if (count > 0 && pool_instance) {return_thread_cache();}}// 將線程本地緩存歸還全局鏈表void return_thread_cache() {if (count == 0 || !pool_instance) return;//將本地緩存鏈接成鏈表for (int i = 0; i < count - 1; ++i) {blocks[i]->next.store(blocks[i+1], std::memory_order_relaxed);}Block* old_head = pool_instance->free_list.load(std::memory_order_relaxed);Block* new_head = blocks[0];Block* new_tail = blocks[count - 1];do {new_tail->next.store(old_head, std::memory_order_relaxed);} while (!pool_instance->free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));count = 0;}
      };
      
    • 繼續看下我們的類成員的定義:

      • 我們要把ThreadCache 聲明為thread_local 類型,這樣每個線程都會存在一份緩存
      • free_list 為全局緩存,所有空閑的block塊都會鏈接到這個list 里面。
      • chunks 分配的chunk由LockFreeStack 來管理,避免內存泄露
      • allocate_count 為分配一個block的計數, deallocated_count 為歸還一個block的計數。
      static inline thread_local ThreadCache local_cache;static inline constexpr size_t BLOCK_PER_CHUNK = N / sizeof(Block);std::atomic<Block*> free_list{nullptr};
      LockFreeStack<std::unique_ptr<Chunk>> chunks; 
      std::atomic<size_t> allocate_count{0};
      std::atomic<size_t> deallocated_count{0};
      
    • 分配一個新的chunk.

      • 先使用make_unique分配一個chunk的大小,其中包含BLOCK_PER_CHUNK的block。
      • 獲取block的指針。
      • 這里我們先把block串聯起一個鏈表,然后使用一次原子操作即可。
      • 第一次分配的都是空的,所有把所有的block都放進free_list 里面去。
      • 最后把chunk放到LockFreeStack里面管理。
      void allocate_new_chunk() {auto chunk = std::make_unique<Chunk>(BLOCK_PER_CHUNK);Block* blocks = chunk->blocks.get();// 準備鏈表,只在最后一步進行一次原子操作auto* first_block = &blocks[0];for (size_t i = 0; i < chunk->count - 1; ++i) {blocks[i].next.store(&blocks[i+1], std::memory_order_relaxed);}Block* old_head = free_list.load(std::memory_order_relaxed);do {blocks[chunk->count-1].next.store(old_head, std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, first_block, std::memory_order_release, std::memory_order_relaxed));//將所有塊鏈接到空閑列表chunks.push(std::move(chunk));
      }
      
    • 然后我們再看看怎么把free_list 里面的數據批量取出來放到thread_cache里面去。

      • 首先獲取這個對象的instance, 如果緩存還有剩余,那么不需要從free_list里面取。
      • 否則檢查free_list 有沒有空閑的塊了,如果沒有分配一個新的chunk.
      • 這里我們直接從free_list 里面取ThreadCache::BATCH_SIZE 大小的block的鏈表,把獲取的塊放到chche的數組里面。
    // 批量獲取塊到本地緩存
    void fill_local_cache(ThreadCache& cache) {cache.pool_instance = this;// 如果本地緩存還有剩余,則直接返回if (cache.count > 0) return;// 從全局空閑列表中獲取塊int batch_size = ThreadCache::BATCH_SIZE;Block* head = nullptr;Block* tail = nullptr;Block* new_head = nullptr;Block* old_head = free_list.load(std::memory_order_acquire);do {// 如果全局鏈表為空,分配新的塊if (!old_head) {allocate_new_chunk();old_head = free_list.load(std::memory_order_acquire);if (!old_head)  return;  // 如果還是仍然為空,返回}head = old_head;Block* current = old_head;int count = 0;while (count < batch_size - 1 && current->next.load(std::memory_order_relaxed)) {current = current->next.load(std::memory_order_relaxed);++count;}tail = current;new_head = tail->next.load(std::memory_order_relaxed);} while(!free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));// 將獲取到的塊存儲到本地緩存Block* current = head;int i = 0;while (current != new_head) {cache.blocks[i++] = current;current = current->next.load(std::memory_order_relaxed);}cache.count = i;
    }
    
    • 分配函數
      • allocate 是參數是一個可變參數,允許n個參數原封不動傳給T對象。
      • 我們首先獲取數據從cache里面獲取,如果沒有,調用fill_local_cache 緩存cache數據。
      • 如果本地緩存失敗了(一般不會出現,除非沒內存了), 從free_list 分配一個block, 如果free_list 也沒有了,則分配一個新的chunk。 最后返回指向數據的指針,也就是Block的首地址。
    template<typename... Args>
    T* allocate(Args&&... args) {if (local_cache.count == 0) {fill_local_cache(local_cache);}Block* block = nullptr;// 嘗試從本地緩存獲取塊if (local_cache.count > 0) {block = local_cache.blocks[--local_cache.count];} else {// 本地緩存填充失敗,直接從全局獲取單個塊Block* old_block = free_list.load(std::memory_order_acquire);// 先嘗試從空閑列表中獲取一個塊while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}// 如果空閑列表為空,則分配一個新的塊if (!block) {allocate_new_chunk();old_block = free_list.load(std::memory_order_acquire);while(old_block) {if (free_list.compare_exchange_weak(old_block, old_block->next.load())) {block = old_block;break;}}}}allocate_count.fetch_add(1);if (block) {// 構造新對象return new (block->as_object()) T(std::forward<Args>(args)...);}return nullptr;
    }
    
    • 最后看下deallocate的函數
      • 釋放的時候先會去調用對象的析構函數
      • 把數據的指針轉為block的指針
      • 這里我們有個策略就是如果本地緩存的數量,有可能歸還超過總的大小,所以我們會預先檢測本地緩存的是否已經大于了容量的80%, 如果已經超過的話,就先把一半的緩存還給free_list 里面,這樣確保cache里面有足夠的空間。
      • 最后再把block 放到cache里面去。
    void deallocate(T* ptr) {if (!ptr) return;ptr->~T();/*Block 結構的第一個成員就是 data 數組as_object() 方法返回的是 data 數組的起始地址通過 alignas(T) 確保 Block 的對齊要求滿足 T 的需求因此,指向 T 對象的指針實際上就是指向 Block 結構體中 data 數組的指針,而 data 數組又在 Block 的起始位置,所以可以安全地將 T 指針轉回 Block 指針。內存布局:Block 對象:+----------------------+| data[sizeof(T)]      | <-- 用戶獲得的 T* 指針指向這里+----------------------+| next                 |+----------------------+*/Block* block = reinterpret_cast<Block*>(ptr);local_cache.pool_instance = this;// 確定本地緩存容量const int cache_capacity = static_cast<int>(sizeof(local_cache.blocks) / sizeof(local_cache.blocks[0]));// 如果本地緩存已近容量的80%,預先歸還一半if (local_cache.count >= cache_capacity * 0.8) {int half_count = local_cache.count / 2;// 構建鏈表for (int i = 0; i < half_count - 1; ++i) {local_cache.blocks[i]->next.store(local_cache.blocks[i + 1], std::memory_order_relaxed);}// 歸還到全局鏈表Block* old_head = free_list.load(std::memory_order_relaxed);Block* cache_head = local_cache.blocks[0];Block* cache_tail = local_cache.blocks[half_count - 1];do {cache_tail->next.store(old_head, std::memory_order_relaxed);} while (!free_list.compare_exchange_weak(old_head, cache_head, std::memory_order_release, std::memory_order_relaxed));// 壓縮剩余緩存for (int i = 0; i < local_cache.count - half_count; ++i) {local_cache.blocks[i] = local_cache.blocks[i + half_count];}local_cache.count -= half_count;}// 現在將新塊添加到本地緩存local_cache.blocks[local_cache.count++] = block;deallocated_count.fetch_add(1, std::memory_order_relaxed);
    }
    
    • 測試結果(和標準的分配器)
      • 可以看到比標準的分配器還是快了一點。PS: 后面有時間可以繼續優化下。

    Standard allocator time: 2450 microseconds
    Fixed size pool time: 1790 microseconds

源代碼: MemoryPool

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/86787.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/86787.shtml
英文地址,請注明出處:http://en.pswp.cn/web/86787.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ubuntu安裝docker遇到權限問題

問題現象&#xff1a; 使用snap安裝的docker&#xff0c;執行docker build命令構建景象時報錯&#xff1a; [] Building 0.1s (1/1) FINISHED docker:default > [internal] load build definition from Dockerfile 0.0s > > transferring dockerfile: 2B 0.0s ERROR:…

在Linux系統中部署Java項目

1.在Linux中啟動mysql的服務: systemctl start mysql可以采用以下代碼查看狀態: systemctl status mysql如下圖展示綠色代表啟動成功 2.之后進入mysql mysql -uroot -p輸入自己的密碼&#xff0c;這里的密碼不會顯示,直接輸入即可 3.在DG中連接Linux的數據庫 4.修改配置文件…

C++洛谷P1002 過河卒

題目 鏈接&#xff1a;https://www.luogu.com.cn/problem/P1002 解析 這道題適用于了解動態規劃的同學。 變量初始化 初始化B點坐標&#xff08;n, m&#xff09;和馬的坐標&#xff08;a, b&#xff09; 初始化方向數組和動態規劃數組 long long dp[30][30]; int dx[8] …

BlogX項目Go-gin--根據IP獲取地理位置

先定義一個函數來判斷IP地址是否為內網&#xff0c;歸為工具類 // utils/ip/enter.go package ipimport "net"func HasLocalIPAddr(ip string) bool {return HasLocalIP(net.ParseIP(ip)) }// HasLocalIP 檢測 IP 地址是否是內網地址 // 通過直接對比ip段范圍效率更…

鴻蒙系統(HarmonyOS)應用開發之實現瀑布流圖片展示效果

項目概述 科技圖庫是一款基于鴻蒙系統&#xff08;HarmonyOS&#xff09;開發的高品質圖片瀏覽應用&#xff0c;專注于展示精選科技主題圖片。應用采用現代化的瀑布流布局&#xff0c;為用戶提供流暢、直觀的瀏覽體驗&#xff0c;讓科技之美盡收眼底。 主要功能 1. 瀑布流布…

【fish-speech】新模型openaudio-s1-mini嘗鮮

一、配置 顯卡&#xff1a;v100&#xff08;測試簡短語句&#xff0c;顯存實際占用不足6G&#xff09; 二、安裝測試 1. 安裝 1.1 下載源碼 git clone https://github.com/fishaudio/fish-speech.git1.2 安裝系統組件 apt install portaudio19-dev libsox-dev ffmpeg1.3 …

介紹Windows下的由Sysinternals開發的一些小工具

Sysinternals是一個開發了很多Windows下系統工具的公司&#xff0c;這些工具能極大地提高對Windows系統的深入認知。就像它的名字Sys(tem)internals&#xff0c;深入系統里面。這些工具都放在微軟的網站上可以下載到。https://learn.microsoft.com/en-us/sysinternals/ 下載網…

云服務器環境下Linux系統epoll機制與高并發服務器優化實踐

在當今云計算時代&#xff0c;云已成為企業部署高并發服務的首選平臺。本文將深入探討Linux系統核心的epoll機制如何賦能云環境下的高并發服務器&#xff0c;解析其底層工作原理與性能優勢&#xff0c;并對比傳統IO復用模型的差異&#xff0c;幫助開發者構建更高效的云端服務架…

Java爬蟲實戰指南:按關鍵字搜索京東商品

在電商領域&#xff0c;快速獲取商品信息對于市場分析、選品上架、庫存管理和價格策略制定等方面至關重要。京東作為國內領先的電商平臺之一&#xff0c;提供了豐富的商品數據。雖然京東開放平臺提供了官方API來獲取商品信息&#xff0c;但有時使用爬蟲技術來抓取數據也是一種有…

aspose.word在IIS后端DLL中高并發運行,線程安全隔離

aspose.word在IIS后端DLL中運行,加載很慢,如何為全部用戶加載,再每個用戶訪問時在各自線程中直接可以打開WORD文件處理 Aspose.Words 在 IIS 中優化加載性能方案 針對 Aspose.Words 在 IIS 后端 DLL 中加載緩慢的問題&#xff0c;我們可以通過單例模式預加載組件并結合線程安…

鏈表題解——回文鏈表【LeetCode】

一、算法邏輯&#xff08;通順講解每一步思路&#xff09; 我們從 isPalindrome 這個主函數入手&#xff1a; 步驟 1&#xff1a;找到鏈表的中間節點 middleNode 使用 快慢指針法&#xff08;slow 和 fast&#xff09; 快指針一次走兩步&#xff0c;慢指針一次走一步。 當快…

allegro 銅皮的直角邊怎么快速變成多邊形?

像這種&#xff1a; 變成這種&#xff1a; 解決方案&#xff1a; shape edit boundary 點擊鋪銅邊緣就能裁剪

從廚房到代碼臺:用做菜思維理解iOS開發 - Swift入門篇②

從廚房到代碼臺&#xff1a;用做菜思維理解iOS開發 - Swift入門篇② 本章重點? 理解App開發的整體流程熟悉Xcode主界面結構與常用分區跟著步驟動手創建第一個App項目&#xff0c;認識模擬器掌握"打掃廚房"高頻快捷鍵&#xff0c;解決常見疑難雜癥 1、目標 像一個專…

EloqCloud for KV 初體驗:兼容redis的云原生KV數據庫

最近在做一些AI應用的時候&#xff0c;我在想嘗試利用redis的能力緩存一些信息&#xff0c;這使我想去找一個免費的redis來進行使用&#xff0c;在調研的過程中我發現了一款產品EloqCloud for KV可以提供類似的能力&#xff0c;于是嘗試使用了一下&#xff0c;本文記錄了這次體…

企業級路由器技術全解析:從基礎原理到實戰開發

簡介 在當今數字化時代,路由器作為網絡的核心設備,其技術深度與應用廣度直接影響著企業網絡的性能與安全性。本文將全面解析路由器的基礎原理、工作機制以及企業級開發技術,從網絡層尋址到路由協議算法,從安全配置到QoS實現,再到多廠商API開發實戰,旨在幫助網絡工程師和…

day041-web集群架構搭建

文章目錄 0. 老男孩思想-高薪四板斧1. web集群架構圖2. 搭建異地備份服務2.1 服務端-阿里云服務器2.1.1 查看rsync軟件包2.1.2 添加rsync配置文件2.1.3 添加虛擬用戶2.1.4 創建校驗用戶密碼文件2.1.5 創建備份目錄2.1.6 啟動服務2.1.7 開放安全組端口2.1.8 發送檢查郵件 2.2 客…

day44-Django RestFramework(drf)下

1.5 Django RestFramework(下) drf 內置了很多便捷的功能,在接下來的課程中會給大家依次講解下面的內容: 快速上手請求的封裝版本管理認證權限限流序列化視圖條件搜索分頁路由解析器10. 分頁 在查看數據列表的API中,如果 數據量 比較大,肯定不能把所有的數據都展示給用…

機器學習基礎 線性回歸與 Softmax 回歸

機器學習基礎 線性回歸與 Softmax 回歸 文章目錄 機器學習基礎 線性回歸與 Softmax 回歸1. 最小二乘法1.1 數據集定義1.2 最小二乘的矩陣推導1.3 最小二乘的幾何解釋1.4 概率視角下的最小二乘估計 2. 正則化2.1 L1 范數與 L2 范數2.2 正則化的作用2.3 Lasso 回歸的求解2.3.1 L-…

6.27_JAVA_面試(被抽到了)

1.MYSQL支持的存儲引擎有哪些, 有什么區別 ? In-no-DB&#xff08;默認&#xff09;&#xff1a;支持事務安全&#xff08;數據庫運行時&#xff0c;能保證數據的一致性、完整性&#xff09;&#xff0c;支持表行鎖&#xff0c;支持物理和邏輯外鍵。占用磁盤空間大。 MEMORY&…

YOLOv13震撼發布:超圖增強引領目標檢測新紀元

YOLOV13最近發布了&#xff0c;速速來看。 論文標題&#xff1a;YOLOv13&#xff1a;融合超圖增強的自適應視覺感知的實時目標檢測 論文鏈接&#xff1a;https://arxiv.org/pdf/2506.17733 代碼鏈接&#xff1a;https://github.com/iMoonLab/yolov13 話不多說&#xff0c;直…