PoolThreadCache
?在 Netty 的內存池中扮演著線程本地緩存的角色。它的主要目的是減少線程在分配內存時對全局?PoolArena
?的競爭,通過緩存一部分最近釋放的內存塊,使得同一線程后續申請相同規格的內存時能夠快速獲取,從而提高分配效率。
下面我們詳細分析其源碼:
主要成員變量
// ... existing code ...
final class PoolThreadCache {private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;final PoolArena<byte[]> heapArena; // 關聯的堆內存Arenafinal PoolArena<ByteBuffer> directArena; // 關聯的直接內存Arena// 針對不同大小規格 (Small/Normal) 和類型 (Heap/Direct) 的內存區域緩存// Small類型的內存通常來自PoolSubpageprivate final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;// Normal類型的內存通常直接來自PoolChunk的Pageprivate final MemoryRegionCache<byte[]>[] normalHeapCaches;private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;// 分配次數閾值,達到此閾值時觸發trim操作,清理緩存private final int freeSweepAllocationThreshold;// 標記緩存是否已被釋放,防止重復釋放private final AtomicBoolean freed = new AtomicBoolean();@SuppressWarnings("unused") // Field is only here for the finalizer.// 用于在對象被GC回收前,通過finalizer機制嘗試釋放緩存中的資源private final FreeOnFinalize freeOnFinalize;// 當前線程緩存的分配次數,用于配合freeSweepAllocationThresholdprivate int allocations;// TODO: Test if adding padding helps under contention//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;// ... existing code ...
}
heapArena
?和?directArena
: 分別指向該線程緩存關聯的堆內存池和直接內存池。線程在緩存未命中時,會向這兩個 Arena 申請內存。smallSubPageHeapCaches
,?smallSubPageDirectCaches
,?normalHeapCaches
,?normalDirectCaches
: 這四個數組是核心的緩存存儲結構。它們是?MemoryRegionCache
?類型的數組,MemoryRegionCache
?內部維護了一個隊列,用于存儲緩存的內存塊信息。smallSubPage...Caches
: 用于緩存 "Small" 類型的內存塊。這類內存塊通常小于一個 Page,由?PoolSubpage
?管理。數組的索引對應不同的?elemSize
。normal...Caches
: 用于緩存 "Normal" 類型的內存塊。這類內存塊通常大于等于一個 Page,直接從?PoolChunk
?中分配。數組的索引對應不同的規格大小。
freeSweepAllocationThreshold
: 這是一個重要的參數。當?PoolThreadCache
?的?allocations
?計數達到這個閾值時,會觸發?trim()
?方法,嘗試回收一部分緩存的內存,以避免緩存過多導致內存浪費。freed
: 一個原子布爾值,確保?free()
?方法只被執行一次,防止資源被多次釋放。freeOnFinalize
: 一個內部類實例,如果啟用了?useFinalizer
,當?PoolThreadCache
?對象被垃圾回收時,其?finalize
?方法會被調用,進而調用?PoolThreadCache.free(true)
?來釋放緩存的資源。這是一種兜底機制。allocations
: 記錄從該線程緩存成功分配出去的次數。
構造函數?PoolThreadCache(...)
PoolThreadCache.java
// ... existing code ...PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,int freeSweepAllocationThreshold, boolean useFinalizer) {checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;this.heapArena = heapArena;this.directArena = directArena;if (directArena != null) {// 創建直接內存的Small和Normal類型的緩存數組smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);// 增加Arena中線程緩存的計數directArena.numThreadCaches.getAndIncrement();} else {// No directArea is configured so just null out all cachessmallSubPageDirectCaches = null;normalDirectCaches = null;}if (heapArena != null) {// 創建堆內存的Small和Normal類型的緩存數組smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);// 增加Arena中線程緩存的計數heapArena.numThreadCaches.getAndIncrement();} else {// No heapArea is configured so just null out all cachessmallSubPageHeapCaches = null;normalHeapCaches = null;}// Only check if there are caches in use.// 如果配置了任何緩存,則freeSweepAllocationThreshold必須大于0if ((smallSubPageDirectCaches != null || normalDirectCaches != null|| smallSubPageHeapCaches != null || normalHeapCaches != null)&& freeSweepAllocationThreshold < 1) {throw new IllegalArgumentException("freeSweepAllocationThreshold: "+ freeSweepAllocationThreshold + " (expected: > 0)");}// 根據useFinalizer參數決定是否創建FreeOnFinalize實例freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;}// ... existing code ...
構造函數的主要工作是初始化各個成員變量,特別是根據傳入的參數創建不同類型的?MemoryRegionCache
?數組。
smallCacheSize
,?normalCacheSize
: 分別定義了 Small 類型和 Normal 類型緩存區域中?MemoryRegionCache
?隊列的大小。maxCachedBufferCapacity
: 定義了可以被緩存的 Buffer 的最大容量。超過這個容量的 Buffer 不會被緩存。directArena.sizeClass.nSubpages
: 這個值決定了?smallSubPageDirectCaches
?數組的大小,即支持多少種不同規格的 Small 類型直接內存緩存。directArena.numThreadCaches.getAndIncrement()
: 每當一個?PoolThreadCache
?關聯到一個?PoolArena
?時,會增加?PoolArena
?內部的線程緩存計數器。
createSubPageCaches
PoolThreadCache.java
// ... existing code ...
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {if (cacheSize > 0 && numCaches > 0) {@SuppressWarnings("unchecked")MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];for (int i = 0; i < cache.length; i++) {// TODO: maybe use cacheSize / cache.length// 顯式類型實參 T 可被替換為 <>cache[i] = new SubPageMemoryRegionCache<>(cacheSize);}return cache;} else {return null;}
}
這個方法用于創建?SubPageMemoryRegionCache
?數組。
numCaches
?通常是?arena.sizeClass.nSubpages
,表示支持的 Small 類型規格數量。每個?SubPageMemoryRegionCache
?實例的內部隊列大小由?cacheSize
?決定。
createNormalCaches
// ... existing code ...
@SuppressWarnings("unchecked")
private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {if (cacheSize > 0 && maxCachedBufferCapacity > 0) {int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);// Create as many normal caches as we support based on how many sizeIdx we have and what the upper// bound is that we want to cache in general.List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;// 從nSubpages開始,因為之前的sizeIdx是為Small類型保留的// area.sizeClass.sizeIdx2size(idx) <= max 確保只為不超過maxCachedBufferCapacity的規格創建緩存for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&area.sizeClass.sizeIdx2size(idx) <= max; idx++) {// 顯式類型實參 T 可被替換為 <>cache.add(new NormalMemoryRegionCache<>(cacheSize));}return cache.toArray(new MemoryRegionCache[0]);} else {return null;}
}
這個方法用于創建?NormalMemoryRegionCache
?數組。
它會遍歷?PoolArena
?的?SizeClasses
?中定義的 Normal 類型的規格,但只為那些大小不超過?maxCachedBufferCapacity
?(且不超過?chunkSize
) 的規格創建緩存。
內存分配方法
-
allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx)
-
allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx)
?這兩個方法分別用于分配 Small 和 Normal 類型的內存。它們首先通過?cacheForSmall
?或?cacheForNormal
?找到對應的?MemoryRegionCache
,然后調用通用的?allocate
?方法。 -
allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity)
PoolThreadCache.java
// ... existing code ... @SuppressWarnings({ "unchecked", "rawtypes" }) private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {if (cache == null) {// no cache found so just return false herereturn false;}// 嘗試從指定的MemoryRegionCache分配boolean allocated = cache.allocate(buf, reqCapacity, this);// 如果分配成功,并且總分配次數達到閾值if (++ allocations >= freeSweepAllocationThreshold) {allocations = 0; // 重置計數器trim(); // 執行trim操作,清理緩存}return allocated; }
這是實際執行從緩存分配的邏輯:
1. 如果找不到對應的?MemoryRegionCache
?(例如,該規格的緩存未啟用或請求的?sizeIdx
?超出范圍),則返回?false
。
2. 調用?cache.allocate(buf, reqCapacity, this)
?嘗試從該?MemoryRegionCache
?的隊列中取出一個緩存的?Entry
?并用它初始化?buf
。
3. 如果分配成功 (allocated
?為?true
),則?allocations
?計數器加1。
4. 檢查?allocations
?是否達到?freeSweepAllocationThreshold
。如果是,則將?allocations
?重置為0,并調用?trim()
?方法來清理所有緩存區域中不活躍的條目。
添加到緩存方法?add(...)
PoolThreadCache.java
// ... existing code ...@SuppressWarnings({ "unchecked", "rawtypes" })boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,long handle, int normCapacity, SizeClass sizeClass) {// 根據normCapacity計算sizeIdxint sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);// 根據area類型(Heap/Direct)、sizeIdx和sizeClass(Small/Normal)獲取對應的MemoryRegionCacheMemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);if (cache == null) {return false; // 找不到合適的緩存區域}if (freed.get()) { // 如果緩存已被標記為釋放,則不再添加return false;}// 調用MemoryRegionCache的add方法return cache.add(chunk, nioBuffer, handle, normCapacity);}// ... existing code ...
當一個?PooledByteBuf
?被釋放時,如果滿足一定條件(例如,它的大小適合緩存,且其來源的?PoolArena
?允許緩存),PoolArena
?會嘗試調用此方法將其對應的內存塊信息(chunk
,?handle
,?normCapacity
?等)添加到當前線程的?PoolThreadCache
?中。
- 計算?
sizeIdx
。 - 通過?
cache(area, sizeIdx, sizeClass)
?方法定位到具體的?MemoryRegionCache
。 - 如果緩存已被釋放 (
freed.get()
?為?true
),則不添加。 - 調用?
MemoryRegionCache.add(...)
?將內存塊信息封裝成?Entry
?對象并嘗試放入其內部隊列。
緩存檢索方法
cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass)
: 根據?sizeClass
?(Normal 或 Small) 調用?cacheForNormal
?或?cacheForSmall
。cacheForSmall(PoolArena<?> area, int sizeIdx)
: 判斷?area
?是堆內存還是直接內存,然后從?smallSubPageHeapCaches
?或?smallSubPageDirectCaches
?中獲取緩存。cacheForNormal(PoolArena<?> area, int sizeIdx)
: 類似?cacheForSmall
,但操作的是?normalHeapCaches
?和?normalDirectCaches
。注意這里?idx = sizeIdx - area.sizeClass.nSubpages
,因為?sizeIdx
?是全局的,而 Normal 類型的緩存在數組中的索引需要減去 Small 類型的規格數量。cache(MemoryRegionCache<T>[] cache, int sizeIdx)
: 簡單的數組訪問,并進行邊界檢查。
這些方法共同構成了從緩存數組中定位特定?MemoryRegionCache
?的邏輯。
釋放資源方法?free(boolean finalizer)
PoolThreadCache.java
// ... existing code ...void free(boolean finalizer) {// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure// we only call this one time.// 使用AtomicBoolean確保free操作只執行一次if (freed.compareAndSet(false, true)) {if (freeOnFinalize != null) {// Help GC: this can race with a finalizer thread, but will be null out regardlessfreeOnFinalize.cache = null; // 解除FreeOnFinalize對PoolThreadCache的引用}// 依次釋放所有類型的緩存區域int numFreed = free(smallSubPageDirectCaches, finalizer) +free(normalDirectCaches, finalizer) +free(smallSubPageHeapCaches, finalizer) +free(normalHeapCaches, finalizer);if (numFreed > 0 && logger.isDebugEnabled()) {logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,Thread.currentThread().getName());}// 遞減Arena中的線程緩存計數if (directArena != null) {directArena.numThreadCaches.getAndDecrement();}if (heapArena != null) {heapArena.numThreadCaches.getAndDecrement();}}}private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {if (caches == null) {return 0;}int numFreed = 0;for (MemoryRegionCache<?> c: caches) {numFreed += free(c, finalizer); // 遍歷釋放數組中的每個MemoryRegionCache}return numFreed;}private static int free(MemoryRegionCache<?> cache, boolean finalizer) {if (cache == null) {return 0;}return cache.free(finalizer); // 調用MemoryRegionCache的free方法}// ... existing code ...
當線程結束或者?PooledByteBufAllocator
?關閉時,會調用此方法來釋放?PoolThreadCache
?中緩存的所有內存塊。
freed.compareAndSet(false, true)
?保證了此方法體內的邏輯只執行一次。finalizer
?參數指示這次釋放是否由 finalizer 機制觸發。如果是,MemoryRegionCache.freeEntry
?的行為會有所不同(主要是為了避免在 finalizer 線程中執行可能導致死鎖或復雜狀態的操作)。- 它會遍歷所有四種緩存數組,并調用每個?
MemoryRegionCache
?實例的?free(finalizer)
?方法,該方法會清空其內部隊列并將所有緩存的?Entry
?代表的內存塊歸還給?PoolArena
。 - 最后,遞減關聯?
PoolArena
?中的?numThreadCaches
?計數。
整理緩存方法?trim()
PoolThreadCache.java
// ... existing code ...void trim() {trim(smallSubPageDirectCaches);trim(normalDirectCaches);trim(smallSubPageHeapCaches);trim(normalHeapCaches);}private static void trim(MemoryRegionCache<?>[] caches) {if (caches == null) {return;}for (MemoryRegionCache<?> c: caches) {trim(c);}}private static void trim(MemoryRegionCache<?> cache) {if (cache == null) {return;}cache.trim(); // 調用MemoryRegionCache的trim方法}// ... existing code ...
當?PoolThreadCache
?的?allocations
?達到?freeSweepAllocationThreshold
?時被調用。它會遍歷所有緩存數組,并調用每個?MemoryRegionCache
?實例的?trim()
?方法。MemoryRegionCache.trim()
?會根據其自身的分配情況和隊列大小,決定是否釋放一部分緩存的?Entry
。
SubPageMemoryRegionCache<T>
?和?NormalMemoryRegionCache<T>
?
這兩個類都繼承自抽象的?MemoryRegionCache<T>
。它們的主要區別在于構造時傳入的?SizeClass
?(Small 或 Normal) 以及它們如何實現?initBuf
?方法:
SubPageMemoryRegionCache.initBuf(...)
?調用?chunk.initBufWithSubpage(...)
,用于從?PoolSubpage
?初始化?PooledByteBuf
。NormalMemoryRegionCache.initBuf(...)
?調用?chunk.initBuf(...)
,用于從?PoolChunk
?的 Page 初始化?PooledByteBuf
。
MemoryRegionCache<T>
?
abstract static class
這是線程緩存的核心數據結構之一,代表特定規格內存的緩存區域。
size
: 緩存隊列的容量,是2的冪次方。queue
:?PlatformDependent.newFixedMpscUnpaddedQueue(this.size)
?創建的一個多生產者單消費者隊列 (MPSC),用于存儲緩存的?Entry<T>
?對象。由于?PoolThreadCache
?是線程本地的,這里的“多生產者”實際上是指其他線程釋放內存并嘗試將內存塊添加到這個線程的緩存中(雖然 Netty 的設計主要是當前線程釋放的內存回到當前線程的緩存),而“單消費者”就是當前線程自己從緩存中分配內存。sizeClass
: 標記這個緩存區域是用于?Small
?還是?Normal
?類型的內存。allocations
: 記錄從這個特定?MemoryRegionCache
?分配出去的次數,用于其自身的?trim()
?邏輯。add(PoolChunk<T> chunk, ...)
: 創建一個新的?Entry
?對象(通過?RECYCLER
?獲取),設置好?chunk
、handle
?等信息,然后嘗試將其加入?queue
。如果隊列已滿 (offer
?返回?false
),則立即回收這個?Entry
?對象。allocate(PooledByteBuf<T> buf, ...)
: 從?queue
?中取出一個?Entry
?(poll
)。如果隊列為空,返回?false
。否則,調用抽象方法?initBuf
?用取出的?Entry
?中的信息來初始化?buf
,然后回收?Entry
?對象,并增加?allocations
?計數。free(int max, boolean finalizer)
: 從隊列中移除最多?max
?個?Entry
,并對每個?Entry
?調用?freeEntry
。trim()
: 計算當前隊列中可以釋放的?Entry
?數量(基于?size - allocations
),然后調用?free(free, false)
?來釋放它們。allocations
?在這里代表了近期從該緩存區域成功分配的次數,如果這個數字遠小于隊列的容量?size
,說明緩存利用率不高,可以進行清理。freeEntry(Entry entry, boolean finalizer)
: 這是將緩存的內存塊真正歸還給?PoolArena
?的地方。它獲取?Entry
?中的?chunk
,?handle
,?nioBuffer
,?normCapacity
。如果不是由 finalizer 觸發,它會先回收?Entry
?對象本身,然后調用?chunk.arena.free(chunk, entry.nioBuffer, handle, normCapacity, this)
?將內存塊歸還給 Arena。如果是 finalizer 觸發,則只歸還內存塊,不立即回收?Entry
?(避免在 finalizer 中操作 Recycler 可能引發的問題)。
Entry<T>
?
一個簡單的 POJO,用于封裝緩存的內存塊信息 (PoolChunk
,?ByteBuffer nioBuffer
,?long handle
,?int normCapacity
)。它通過 Netty 的?Recycler
?進行對象池管理,以減少?Entry
?對象自身的創建和銷毀開銷。
// ... existing code ...private static final ObjectPool<Entry<?>> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry<?>>() {@SuppressWarnings("unchecked")@Overridepublic Entry<?> newObject(Handle<Entry<?>> handle) {return new Entry(handle);}});static final class Entry<T> {final EnhancedHandle<Entry<?>> recyclerHandle; // Recycler的句柄PoolChunk<T> chunk;ByteBuffer nioBuffer; // 緩存的NIO ByteBuffer,主要用于Direct Bufferlong handle = -1; // 內存塊在PoolChunk中的句柄int normCapacity; // 規格化容量Entry(Handle<Entry<?>> recyclerHandle) {this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;}void recycle() {chunk = null;nioBuffer = null;handle = -1;recyclerHandle.recycle(this);}// "Unguarded" version of recycle() that must only be used when we are sure that the Entry is not double-recycled.// This is the case when we obtained the Entry from the queue and add it to the cache again.void unguardedRecycle() {chunk = null;nioBuffer = null;handle = -1;recyclerHandle.unguardedRecycle(this);}}@SuppressWarnings("rawtypes")private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry entry = RECYCLER.get(); // 從Recycler獲取Entry對象entry.chunk = chunk;entry.nioBuffer = nioBuffer;entry.handle = handle;entry.normCapacity = normCapacity;return entry;}
// ... existing code ...
FreeOnFinalize
?
一個簡單的包裝類,其?finalize()
?方法會調用?PoolThreadCache.free(true)
。這是為了在?PoolThreadCache
?對象本身被 GC 回收時,能夠嘗試釋放其占用的緩存資源,作為一種安全保障。
完全移動到 Java9+ 后, 會使用? java.lang.ref.Cleaner
// ... existing code ...
// Used to free the cache via a finalizer. This is just a best effort and should only be used if the
// ThreadLocal is not removed via FastThreadLocal.onRemoval(...) as this is the preferred way to free the cache.
private static final class FreeOnFinalize {private PoolThreadCache cache;FreeOnFinalize(PoolThreadCache cache) {this.cache = cache;}@Overrideprotected void finalize() throws Throwable {try {super.finalize();} finally {PoolThreadCache cache = this.cache;// this can race with a non-finalizer thread calling free: regardless who wins, the cache will be// null outthis.cache = null;if (cache != null) {// We must only call free if the cache was not null before, which means it was not freed before// by an explicit call to PoolThreadCache.free().//// We must use true as parameter which indicates that we were called from a finalizer.cache.free(true);}}}
}
}
總結
PoolThreadCache
?通過精心設計的緩存結構和回收策略,有效地提升了 Netty 內存分配的性能。它利用線程本地性避免了鎖競爭,并通過?MemoryRegionCache
?對不同規格的內存進行細粒度管理。
freeSweepAllocationThreshold
?和?trim
?機制確保了緩存在提供性能優勢的同時,不會無限制地消耗內存。內部類如?MemoryRegionCache
?和?Entry
?的設計,以及?Recycler
?的使用,都體現了 Netty 對性能和資源管理的極致追求。