緩存之美:萬文詳解 Caffeine 實現原理(上)

由于社區最大字數限制,本文章將分為兩篇,第二篇文章為緩存之美:萬文詳解 Caffeine 實現原理(下)


大家好,我是 方圓。文章將采用“總-分-總”的結構對配置固定大小元素驅逐策略的 Caffeine 緩存進行介紹,首先會講解它的實現原理,在大家對它有一個概念之后再深入具體源碼的細節之中,理解它的設計理念,從中能學習到用于統計元素訪問頻率的 Count-Min Sketch 數據結構、理解內存屏障和如何避免緩存偽共享問題、MPSC 多線程設計模式、高性能緩存的設計思想和多線程間的協調方案等等,文章最后會對全文內容進行總結,希望大家能有所收獲的同時在未來對本地緩存選型時提供完整的理論依據。

Caffeine 緩存原理圖如下:

在這里插入圖片描述

它使用 ConcurrentHashMap 保存數據,并在該數據結構的基礎上創建了窗口區、試用區和保護區,用于管理元素的生命周期,各個區的數據結構是使用了 LRU 算法的雙端隊列,隨著緩存的命中率變化,窗口區和保護區大小會自動調節以適應當前訪問模式。在對元素進行驅逐時,使用了 TinyLFU 算法,會優先將頻率低的元素驅逐,訪問頻率使用 Count-Min Sketch 數據結構記錄,它能在保證較高準確率(93.75%)的情況下占用較少內存空間。讀、寫操作分別會向 ReadBufferWriteBuffer 中添加“讀/寫后任務”,這兩個緩沖區的設計均采用了 MPSC 多生產者單消費者的多線程設計模式。緩沖區中任務的消費由維護方法 maintenancedrainReadBufferdrainWriteBuffer 實現,維護方法通過添加同步鎖,保證任務只由單線程執行,這種設計參考了 WAL(Write-Ahead Logging)思想,即:先寫日志,再執行操作,先把操作記錄在緩沖區,然后在合適的時機異步、批量地執行緩沖區中的任務。維護方法除了這些作用外,還負責元素在各個分區的移動、頻率的更新、元素的驅逐等操作。

接下來的源碼分析以如下測試用例為例:先分析構造方法,了解緩存初始化過程中創建的重要數據結構和關鍵字段,然后再深入添加元素的方法(put),該方法相對復雜,也是 Caffeine 緩存的核心,理解了這部分內容,文章剩余的內容理解起來會非常容易,接著分析獲取元素的方法(getIfPresent),最后再回到核心的維護方法 maintenance 中,這樣便基本理解了 Caffeine 緩存的運行原理,需要注意的是,因為我們并未指定緩存元素的過期時間,所以與此相關的內容如時間過期策略和時間輪等內容不會專門介紹。

public class TestReadSourceCode {@Testpublic void doRead() {// read constructorCache<String, String> cache = Caffeine.newBuilder().maximumSize(10_000).build();// read putcache.put("key", "value");// read getcache.getIfPresent("key");}}

constructor

Caffeine 的實現類區分了 BoundedLocalManualCacheUnboundedLocalManualCache,見名知意它們分別為“有邊界”的和“無邊界”的緩存。Caffeine#isBounded 方法詮釋了“邊界”的含義:

public final class Caffeine<K, V> {static final int UNSET_INT = -1;public <K1 extends K, V1 extends V> Cache<K1, V1> build() {// 校驗參數requireWeightWithWeigher();requireNonLoadingCache();@SuppressWarnings("unchecked")Caffeine<K1, V1> self = (Caffeine<K1, V1>) this;return isBounded()? new BoundedLocalCache.BoundedLocalManualCache<>(self): new UnboundedLocalCache.UnboundedLocalManualCache<>(self);}boolean isBounded() {// 指定了最大大小;指定了最大權重return (maximumSize != UNSET_INT) || (maximumWeight != UNSET_INT)// 指定了訪問后過期策略;指定了寫后過期策略|| (expireAfterAccessNanos != UNSET_INT) || (expireAfterWriteNanos != UNSET_INT)// 指定了自定義過期策略;指定了 key 或 value 的引用級別|| (expiry != null) || (keyStrength != null) || (valueStrength != null);}
}

也就是說,當為緩存指定了上述的驅逐或過期策略會定義為有邊界的 BoundedLocalManualCache 緩存,它會限制緩存的大小,防止內存溢出,否則為無邊界的 UnboundedLocalManualCache 類型,它沒有大小限制,直到內存耗盡。我們以創建配置了固定大小的緩存為例,它對應的類型便是 BoundedLocalManualCache,在執行構造方法時,有以下邏輯:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRefimplements LocalCache<K, V> {// ...static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable {private static final long serialVersionUID = 1;final BoundedLocalCache<K, V> cache;BoundedLocalManualCache(Caffeine<K, V> builder) {this(builder, null);}BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);}}
}

BoundedLocalCache 為抽象類,緩存對象的實際類型都是它的子類。它在創建時使用了反射并遵循簡單工廠的編碼風格:

interface LocalCacheFactory {static <K, V> BoundedLocalCache<K, V> newBoundedLocalCache(Caffeine<K, V> builder,@Nullable AsyncCacheLoader<? super K, V> cacheLoader, boolean async) {var className = getClassName(builder);var factory = loadFactory(className);try {return factory.newInstance(builder, cacheLoader, async);} catch (RuntimeException | Error e) {throw e;} catch (Throwable t) {throw new IllegalStateException(className, t);}}
}

getClassName 方法非常有意思,它會根據緩存配置的屬性動態拼接出實際緩存類名:

interface LocalCacheFactory {static String getClassName(Caffeine<?, ?> builder) {var className = new StringBuilder();// key 是強引用或弱引用if (builder.isStrongKeys()) {className.append('S');} else {className.append('W');}// value 是強引用或弱引用if (builder.isStrongValues()) {className.append('S');} else {className.append('I');}// 配置了移除監聽器if (builder.removalListener != null) {className.append('L');}// 配置了統計功能if (builder.isRecordingStats()) {className.append('S');}// 不同的驅逐策略if (builder.evicts()) {// 基于最大值限制,可能是最大權重W,也可能是最大容量SclassName.append('M');// 基于權重或非權重if (builder.isWeighted()) {className.append('W');} else {className.append('S');}}// 配置了訪問過期或可變過期策略if (builder.expiresAfterAccess() || builder.expiresVariable()) {className.append('A');}// 配置了寫入過期策略if (builder.expiresAfterWrite()) {className.append('W');}// 配置了刷新策略if (builder.refreshAfterWrite()) {className.append('R');}return className.toString();}
}

這也就是為什么能在 com.github.benmanes.caffeine.cache 包路徑下能發現很多類似 SSMS 只有簡稱命名的類的原因(下圖只截取部分,實際上有很多):

在這里插入圖片描述

根據代碼邏輯,它的命名遵循如下格式 S|W S|I [L] [S] [MW|MS] [A] [W] [R] 其中 [] 表示選填,| 表示某配置不同選擇的分隔符,結合注釋能清楚的了解各個位置字母簡稱表達的含義。如此定義實現類使用了 多級繼承,盡可能多地復用代碼。

以我們測試用例中創建的緩存類型為例,它對應的實現類為 SSMS,表示 key 和 value 均為強引用,并配置了非權重的最大緩存大小限制,類圖關系如下:

在這里插入圖片描述

雖然在一些軟件設計相關的書籍中強調“多用組合,少用繼承”,但是這里使用多級繼承我覺得并沒有增加開發者的理解難度,反而了解了它的命名規則后,能更清晰的理解各個緩存所表示的含義,更好地實現代碼復用。

執行 SSMS 的構造方法會有以下邏輯:

// 1
abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRefimplements LocalCache<K, V> {static final int WRITE_BUFFER_MIN = 4;static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;static final double PERCENT_MAIN = 0.99d;static final double PERCENT_MAIN_PROTECTED = 0.80d;static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;final @Nullable RemovalListener<K, V> evictionListener;final @Nullable AsyncCacheLoader<K, V> cacheLoader;final MpscGrowableArrayQueue<Runnable> writeBuffer;final ConcurrentHashMap<Object, Node<K, V>> data;final PerformCleanupTask drainBuffersTask;final Consumer<Node<K, V>> accessPolicy;final Buffer<Node<K, V>> readBuffer;final NodeFactory<K, V> nodeFactory;final ReentrantLock evictionLock;final Weigher<K, V> weigher;final Executor executor;final boolean isAsync;final boolean isWeighted;protected BoundedLocalCache(Caffeine<K, V> builder,@Nullable AsyncCacheLoader<K, V> cacheLoader, boolean isAsync) {// 標記同步或異步this.isAsync = isAsync;// 指定 cacheLoader this.cacheLoader = cacheLoader;// 指定用于執行驅逐元素、刷新緩存等任務的線程池,不指定默認為 ForkJoinPool.commonPool()executor = builder.getExecutor();// 標記是否定義了節點計算權重的 Weigher 對象isWeighted = builder.isWeighted();// 同步鎖,在接下來的內容中會看到很多標記了 @GuardedBy("evictionLock") 注解的方法,表示這行這些方法時都會獲取這把同步鎖// 根據該鎖的命名,eviction 表示驅逐的意思,也就是說關注驅逐策略執行的方法都要獲取該鎖,這一點需要在后文中注意evictionLock = new ReentrantLock();// 計算元素權重的對象,不指定為 SingletonWeigher.INSTANCEweigher = builder.getWeigher(isAsync);// 執行緩存 maintenance 方法的任務,在后文中具體介紹drainBuffersTask = new PerformCleanupTask(this);// 創建節點的工廠nodeFactory = NodeFactory.newFactory(builder, isAsync);// 驅逐監聽器,有元素被驅逐時會回調evictionListener = builder.getEvictionListener(isAsync);// 用于保存所有數據的 ConcurrentHashMapdata = new ConcurrentHashMap<>(builder.getInitialCapacity());// 如果指定驅逐策略 或 key為弱引用 或 value為弱引用或軟引用 或 訪問后過期則創建 readBuffer,否則它為不可用狀態// readBuffer 用于記錄某些被訪問過的節點readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()? new BoundedBuffer<>() : Buffer.disabled();// 如果指定了驅逐策略 或 訪問后過期策略則會定義訪問策略,執行 onAccess 方法,后文詳細介紹accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};// 初始化最大值和最小值的雙端隊列作為 writeBuffer,用于記錄一些寫后操作任務 writeBuffer = new MpscGrowableArrayQueue<>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);// 執行了驅逐策略則更新最大容量限制if (evicts()) {setMaximumSize(builder.getMaximum());}}@GuardedBy("evictionLock")void setMaximumSize(long maximum) {requireArgument(maximum >= 0, "maximum must not be negative");if (maximum == maximum()) {return;}// 不能超過最大容量long max = Math.min(maximum, MAXIMUM_CAPACITY);// 計算窗口區大小long window = max - (long) (PERCENT_MAIN * max);// 計算保護區大小long mainProtected = (long) (PERCENT_MAIN_PROTECTED * (max - window));// 記錄這些值setMaximum(max);setWindowMaximum(window);setMainProtectedMaximum(mainProtected);// 標記命中量、非命中量并初始化步長值,這三個值用于后續動態調整保護區和窗口區大小setHitsInSample(0);setMissesInSample(0);setStepSize(-HILL_CLIMBER_STEP_PERCENT * max);// 直到當前緩存的權重(大小)接近最大值一半時才初始化頻率草圖if ((frequencySketch() != null) && !isWeighted() && (weightedSize() >= (max >>> 1))) {frequencySketch().ensureCapacity(max);}}
}// 2
class SS<K, V> extends BoundedLocalCache<K, V> {static final LocalCacheFactory FACTORY = SS::new;// key value 強引用無需特殊操作SS(Caffeine<K, V> var1, @Nullable AsyncCacheLoader<? super K, V> var2, boolean var3) {super(var1, var2, var3);}
}// 3
class SSMS<K, V> extends SS<K, V> {// 頻率草圖,后文具體介紹final FrequencySketch<K> sketch = new FrequencySketch();final AccessOrderDeque<Node<K, V>> accessOrderWindowDeque;final AccessOrderDeque<Node<K, V>> accessOrderProbationDeque;final AccessOrderDeque<Node<K, V>> accessOrderProtectedDeque;SSMS(Caffeine<K, V> var1, @Nullable AsyncCacheLoader<? super K, V> var2, boolean var3) {super(var1, var2, var3);// 如果 Caffeine 初始化了容量則確定頻率草圖的容量if (var1.hasInitialCapacity()) {long var4 = Math.min(var1.getMaximum(), (long) var1.getInitialCapacity());this.sketch.ensureCapacity(var4);}// 初始化窗口區、試用區和保護區,它們都是雙端隊列(鏈表實現)this.accessOrderWindowDeque = !var1.evicts() && !var1.expiresAfterAccess() ? null : new AccessOrderDeque();this.accessOrderProbationDeque = new AccessOrderDeque();this.accessOrderProtectedDeque = new AccessOrderDeque();}
}

在步驟 1 中定義了三個區的初始化大小為 1%|19%|80%,這樣配置的性能相對較好。此外,我們還需要解釋一下 weightedSize() 方法,它用于訪問 long weightedSize 變量。根據其命名有“權重大小”的含義,在默認不指定權重計算對象 Weigher 的情況下,Weigher 默認為 SingletonWeigher.INSTANCE 表示每個元素的權重大小為 1,如下:

enum SingletonWeigher implements Weigher<Object, Object> {INSTANCE;@Overridepublic int weigh(Object key, Object value) {return 1;}
}

這樣 weightedSize 表示的便是當前緩存中元素數量。如果自定義了 Weigher 那么 weightedSize 表示的便是緩存中總權重大小,每個元素的權重則可能會不同。因為在示例中我們并沒有指定 Weigher,所以在此處可以將 weightedSize 理解為當前緩存大小。

上文中我們提到緩存的定義遵循大寫字母縮寫的命名規則,實際上節點類的定義也采用了這種方式,在創建節點工廠 NodeFactory.newFactory(builder, isAsync)
的邏輯中,它會執行如下邏輯,根據緩存的類型來確定它的節點類型,命名遵循 P|F S|W|D A|AW|W| [R] [MW|MS] 的規則,同樣使用了反射機制和簡單工廠的編碼風格,如下:

interface NodeFactory<K, V> {// ...static <K, V> NodeFactory<K, V> newFactory(Caffeine<K, V> builder, boolean isAsync) {if (builder.interner) {return (NodeFactory<K, V>) Interned.FACTORY;}var className = getClassName(builder, isAsync);return loadFactory(className);}static String getClassName(Caffeine<?, ?> builder, boolean isAsync) {var className = new StringBuilder();// key 強引用或弱引用if (builder.isStrongKeys()) {className.append('P');} else {className.append('F');}// value 強引用或弱引用或軟引用if (builder.isStrongValues()) {className.append('S');} else if (builder.isWeakValues()) {className.append('W');} else {className.append('D');}// 過期策略if (builder.expiresVariable()) {if (builder.refreshAfterWrite()) {// 訪問后過期className.append('A');if (builder.evicts()) {// 寫入后過期className.append('W');}} else {className.append('W');}} else {// 訪問后過期if (builder.expiresAfterAccess()) {className.append('A');}// 寫入后過期if (builder.expiresAfterWrite()) {className.append('W');}}// 寫入后刷新if (builder.refreshAfterWrite()) {className.append('R');}// 驅逐策略if (builder.evicts()) {// 默認最大大小限制className.append('M');// 加權if (isAsync || (builder.isWeighted() && (builder.weigher != Weigher.singletonWeigher()))) {className.append('W');} else {// 非加權className.append('S');}}return className.toString();}}

SSMS 類型緩存對應的節點類型為 PSMS

FrequencySketch

接下來,我們需要具體介紹下 FrequencySketch,它在上述構造方法的步驟 3 中被創建。這個類的實現采用了 Count-Min Sketch 數據結構,它維護了一個 long[] table 一維數組,每個元素有 64 位,每 4 位作為一個計數器(這也就限定了最大頻率為 15),那么數組中每個槽位便是 16 個計數器。通過哈希函數取 4 個獨立的計數值,將其中的最小值作為元素的訪問頻率。table 的初始大小為緩存最大容量最接近的 2 的 n 次冪,并在計算哈希值時使用 blockMask 掩碼來使哈希結果均勻分布,保證了獲取元素訪問頻率的正確率為 93.75%,達到空間與時間的平衡。它的實現原理和布隆過濾器類似,犧牲了部分準確性,但減少了占用內存的大小。如下圖所示為計算元素 e 的訪問頻率:

在這里插入圖片描述

以下為 FrequencySketch 的源碼,關注注釋即可,并不復雜:

final class FrequencySketch<E> {static final long RESET_MASK = 0x7777777777777777L;static final long ONE_MASK = 0x1111111111111111L;// 采樣大小,用于控制 resetint sampleSize;// 掩碼,用于均勻分散哈希結果int blockMask;long[] table;int size;public FrequencySketch() {}public void ensureCapacity(@NonNegative long maximumSize) {requireArgument(maximumSize >= 0);// 取緩存最大容量和 Integer.MAX_VALUE >>> 1 中的小值 int maximum = (int) Math.min(maximumSize, Integer.MAX_VALUE >>> 1);// 如果已經被初始化過并且 table 長度大于等于最大容量,那么不進行操作if ((table != null) && (table.length >= maximum)) {return;}// 初始化 table,長度為最接近 maximum 的 2的n次冪 和 8 中的大值table = new long[Math.max(Caffeine.ceilingPowerOfTwo(maximum), 8)];// 計算采樣大小sampleSize = (maximumSize == 0) ? 10 : (10 * maximum);// 計算掩碼blockMask = (table.length >>> 3) - 1;// 特殊判斷if (sampleSize <= 0) {sampleSize = Integer.MAX_VALUE;}// 計數器總數size = 0;}@NonNegativepublic int frequency(E e) {// 如果緩存沒有被初始化則返回頻率為 0if (isNotInitialized()) {return 0;}// 創建 4 個元素的數組 count 用于保存 4 次 hash 計算出的頻率值int[] count = new int[4];// hash 擾動,使結果均勻分布int blockHash = spread(e.hashCode());// 重 hash,進一步分散結果int counterHash = rehash(blockHash);// 根據掩碼計算對應的塊索引int block = (blockHash & blockMask) << 3;// 循環 4 次計算 4 個計數器的結果for (int i = 0; i < 4; i++) {// 位運算變更 hash 值int h = counterHash >>> (i << 3);int index = (h >>> 1) & 15;// 計算計數器的偏移量int offset = h & 1;// 定位到 table 中某個槽位后右移并進行位與運算得到最低的 4 位的值(0xfL 為二進制的 1111)count[i] = (int) ((table[block + offset + (i << 1)] >>> (index << 2)) & 0xfL);}// 取其中的較小值return Math.min(Math.min(count[0], count[1]), Math.min(count[2], count[3]));}public void increment(E e) {if (isNotInitialized()) {return;}// 長度為 8 的數組記錄該元素對應的位置,每個計數器需要兩個值來定位int[] index = new int[8];int blockHash = spread(e.hashCode());int counterHash = rehash(blockHash);int block = (blockHash & blockMask) << 3;for (int i = 0; i < 4; i++) {int h = counterHash >>> (i << 3);// i 記錄定位到 table 中某元素的位偏移量index[i] = (h >>> 1) & 15;int offset = h & 1;// i + 4 記錄元素所在 table 中的索引index[i + 4] = block + offset + (i << 1);}// 四個對應的計數器都需要累加boolean added =incrementAt(index[4], index[0])| incrementAt(index[5], index[1])| incrementAt(index[6], index[2])| incrementAt(index[7], index[3]);// 累加成功且達到采樣大小需要進行重置if (added && (++size == sampleSize)) {reset();}}boolean incrementAt(int i, int j) {int offset = j << 2;long mask = (0xfL << offset);if ((table[i] & mask) != mask) {table[i] += (1L << offset);return true;}return false;}// 重置機制防止計數器溢出void reset() {int count = 0;for (int i = 0; i < table.length; i++) {// 累加 table 中每個元素的 2 進制表示的 1 的個數,結果為計數器個數的 4 倍count += Long.bitCount(table[i] & ONE_MASK);// 右移一位將計數值減半并將高位清零table[i] = (table[i] >>> 1) & RESET_MASK;}// count >>> 2 表示計數器個數,計算重置后的 sizesize = (size - (count >>> 2)) >>> 1;}static int spread(int x) {x ^= x >>> 17;x *= 0xed5ad4bb;x ^= x >>> 11;x *= 0xac4c1b51;x ^= x >>> 15;return x;}static int rehash(int x) {x *= 0x31848bab;x ^= x >>> 14;return x;}}

到這里,Caffeine 緩存的基本數據結構全貌已經展現出來了,如下所示,在后文中我們再具體關注它們之間是如何協同的。

在這里插入圖片描述

put

接下來繼續了解向緩存中添加元素的流程,本節內容比較多,理解起來也相對復雜,結合文章內容的同時,也需要多去深入查看 Caffeine 源碼才能有更好的理解,以下為 put 方法的源碼:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {// 默認入參 onlyIfAbsent 為 false,表示向緩存中添加相同的 key 會對 value 進行替換 @Overridepublic @Nullable V put(K key, V value) {return put(key, value, expiry(), /* onlyIfAbsent */ false);}
}

它會執行到如下具體邏輯中,關注注釋信息:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {static final int WRITE_BUFFER_RETRIES = 100;final MpscGrowableArrayQueue<Runnable> writeBuffer;final ConcurrentHashMap<Object, Node<K, V>> data;final ReentrantLock evictionLock;final NodeFactory<K, V> nodeFactory;@NullableV put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {// 不允許添加 nullrequireNonNull(key);requireNonNull(value);Node<K, V> node = null;// 獲取當前時間戳long now = expirationTicker().read();// 計算緩存權重,如果沒有指定 weigher 的話,默認權重為 1int newWeight = weigher.weigh(key, value);// 創建用于查找的鍵對象Object lookupKey = nodeFactory.newLookupKey(key);for (int attempts = 1; ; attempts++) {// 嘗試獲取節點;prior 譯為先前的;較早的Node<K, V> prior = data.get(lookupKey);// 處理不存在的節點if (prior == null) {// 如果 node 在循環執行中還未被創建if (node == null) {// NodeFactory 創建對應類型節點node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);// 設置節點的過期時間setVariableTime(node, expireAfterCreate(key, value, expiry, now));}// 嘗試添加新節點到緩存中,如果鍵已存在則返回現有節點prior = data.putIfAbsent(node.getKeyReference(), node);// 返回 null 表示插入成功if (prior == null) {// 寫后操作:添加 AddTask 并調度執行任務afterWrite(new AddTask(node, newWeight));return null;}// onlyIfAbsent 形參在默認的 put 方法中為 false,以下邏輯簡單介紹// 如果此時有其他線程添加了相同 key 的元素else if (onlyIfAbsent) {// 獲取到當前值,嘗試判斷讀后失效策略,更新訪問時間,并執行讀后操作 afterRead 方法V currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}// 讀后操作,該方法在 getIfPresent 中進行講解afterRead(prior, now, /* recordHit */ false);return currentValue;}}} else if (onlyIfAbsent) {// 同樣的邏輯V currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}}// ...}
}

注意添加節點成功的邏輯,它會執行 afterWrite 寫后操作方法,添加 AddTask 任務到 writeBuffer 中:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {// 寫重試最多 100 次static final int WRITE_BUFFER_RETRIES = 100;static final int WRITE_BUFFER_MIN = 4;static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);final MpscGrowableArrayQueue<Runnable> writeBuffer = new MpscGrowableArrayQueue<>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);// 添加寫后 Task 到 writeBuffer 中并在合適的時機調度執行任務void afterWrite(Runnable task) {// 最多重試添加 100 次for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {if (writeBuffer.offer(task)) {// 寫后調度scheduleAfterWrite();return;}// 向 writeBuffer 中添加任務失敗會調度任務執行scheduleDrainBuffers();// 自旋等待,讓出 CPU 控制權Thread.onSpinWait();}// ...}
}

writeBuffer 的類型為 MpscGrowableArrayQueue,在這里我們詳細的介紹下它。

WriteBuffer

根據它的命名 GrowableArrayQueue 可知它是一個容量可以增長的雙端隊列,前綴 MPSC 表達的含義是“多生產者,單消費者”,也就是說可以有多個線程向其中添加元素,但只有一個線程能從其中獲取元素。那么它是如何實現 MPSC 的呢?接下來我們就根據源碼詳細了解一下。首先先來看一下它的類繼承關系圖及簡要說明:

在這里插入圖片描述

圖中灰色的表示抽象類,藍色為實現類,java.util.AbstractQueue 就不再多解釋了。我們先看看其中標記紅框的類,討論到底什么是“避免內存偽共享問題”?

BaseMpscLinkedArrayQueuePad1 為例:

abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> {byte p000, p001, p002, p003, p004, p005, p006, p007;byte p008, p009, p010, p011, p012, p013, p014, p015;byte p016, p017, p018, p019, p020, p021, p022, p023;byte p024, p025, p026, p027, p028, p029, p030, p031;byte p032, p033, p034, p035, p036, p037, p038, p039;byte p040, p041, p042, p043, p044, p045, p046, p047;byte p048, p049, p050, p051, p052, p053, p054, p055;byte p056, p057, p058, p059, p060, p061, p062, p063;byte p064, p065, p066, p067, p068, p069, p070, p071;byte p072, p073, p074, p075, p076, p077, p078, p079;byte p080, p081, p082, p083, p084, p085, p086, p087;byte p088, p089, p090, p091, p092, p093, p094, p095;byte p096, p097, p098, p099, p100, p101, p102, p103;byte p104, p105, p106, p107, p108, p109, p110, p111;byte p112, p113, p114, p115, p116, p117, p118, p119;
}

這個類除了定義了 120 字節的字段外,看上去沒有做其他任何事情,實際上它為 性能提升 默默做出了貢獻,避免了內存偽共享。CPU 中緩存行(Cache Line)的大小通常是 64 字節,在類中定義 120 字節來占位,這樣便能將上下繼承關系間的字段間隔開,保證被多個線程訪問的關鍵字段距離至少跨越一個緩存行,分布在不同的緩存行中。這樣在不同的線程訪問 BaseMpscLinkedArrayQueueProducerFieldsBaseMpscLinkedArrayQueueConsumerFields 中字段時互不影響,詳細了解原理可參考博客園 - CPU Cache與緩存行。

接下來我們看看其他抽象類的作用。BaseMpscLinkedArrayQueueProducerFields 定義生產者相關字段:

abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E> {// 生產者操作索引(并不對應緩沖區 producerBuffer 中索引位置)protected long producerIndex;
}

BaseMpscLinkedArrayQueueConsumerFields 負責定義消費者相關字段:

abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {// 掩碼值,用于計算消費者實際的索引位置protected long consumerMask;// 消費者訪問這個緩沖區來獲取元素消費protected E[] consumerBuffer;// 消費者操作索引(并不對應緩沖區 consumerBuffer 中索引位置)protected long consumerIndex;
}

BaseMpscLinkedArrayQueueColdProducerFields 中定義字段如下,該類的命名包含 Cold,表示其中字段被修改的次數會比較少:

abstract class BaseMpscLinkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueuePad3<E> {// 生產者可以操作的最大索引上限protected volatile long producerLimit;// 掩碼值,用于計算生產者在數組中實際的索引protected long producerMask;// 存儲生產者生產的元素protected E[] producerBuffer;
}

現在關鍵字段我們已經介紹完了,接下來看一下創建 MpscGrowableArrayQueue 的邏輯,執行它的構造方法時會為我們剛剛提到的字段進行賦值:

class MpscGrowableArrayQueue<E> extends MpscChunkedArrayQueue<E> {MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {// 調用父類的構造方法super(initialCapacity, maxCapacity);}
}abstract class MpscChunkedArrayQueue<E> extends MpscChunkedArrayQueueColdProducerFields<E> {// 省略字節占位字段...byte p119;MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {// 調用父類的構造方法super(initialCapacity, maxCapacity);}}abstract class MpscChunkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueue<E> {protected final long maxQueueCapacity;MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {// 調用父類的構造方法super(initialCapacity);if (maxCapacity < 4) {throw new IllegalArgumentException("Max capacity must be 4 or more");}// 保證了最大值最少比初始值大 2 倍if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) {throw new IllegalArgumentException("Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");}// 最大容量也為 2的n次冪maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) << 1;}
}abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {BaseMpscLinkedArrayQueue(final int initialCapacity) {if (initialCapacity < 2) {throw new IllegalArgumentException("Initial capacity must be 2 or more");}// 初始化緩沖區大小為數值最接近的 2 的 n 次冪int p2capacity = ceilingPowerOfTwo(initialCapacity);// 掩碼值,-1L 使其低位均為 1,左移 1 位則最低位為 0,eg: 00000110,注意該值會被生產者和消費者掩碼值共同賦值long mask = (p2capacity - 1L) << 1;// 創建一個大小為 2的n次冪 +1 大小的緩沖區,注意這個 buffer 分別被 producerBuffer 和 consumerBuffer 共同引用E[] buffer = allocate(p2capacity + 1);// BaseMpscLinkedArrayQueueColdProducerFields 類中相關字段賦值producerBuffer = buffer;producerMask = mask;// 將 producerLimit 值賦為 掩碼值soProducerLimit(this, mask);// BaseMpscLinkedArrayQueueConsumerFields 類中相關字段賦值consumerBuffer = buffer;consumerMask = mask;}
}

現在 MpscGrowableArrayQueue 的構建已經看完了,了解了其中關鍵字段的賦值,現在我們就需要看它是如何實現 MPSC 的。“多生產者”也就意味著會有多個線程向其中添加元素,既然是多線程就需要重點關注它是如何在多線程間完成協同的。添加操作對應了 BaseMpscLinkedArrayQueue#offer 方法,它的實現如下:

abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {private static final Object JUMP = new Object();@Override@SuppressWarnings("MissingDefault")public boolean offer(final E e) {if (e == null) {throw new NullPointerException();}long mask;E[] buffer;long pIndex;while (true) {// 生產者最大索引(生產者掩碼值),獲取 BaseMpscLinkedArrayQueueColdProducerFields 中定義的該字段long producerLimit = lvProducerLimit();// 生產者當前索引,初始值為 0,BaseMpscLinkedArrayQueueProducerFields 中字段 pIndex = lvProducerIndex(this);// producerIndex 最低位用來表示擴容(索引生產者索引 producerIndex 并不對應緩沖區中實際的索引)// 低位為 1 表示正在擴容,自旋等待直到擴容完成(表示只有一個線程操作擴容)if ((pIndex & 1) == 1) {continue;}// 掩碼值和buffer可能在擴容中被改變,每次循環使用最新值mask = this.producerMask;buffer = this.producerBuffer;// 檢查是否需要擴容if (producerLimit <= pIndex) {int result = offerSlowPath(mask, pIndex, producerLimit);switch (result) {case 0:break;case 1:continue;case 2:return false;case 3:resize(mask, buffer, pIndex, e);return true;}}// CAS 操作更新生產者索引,注意這里是 +2,更新成功結束循環if (casProducerIndex(this, pIndex, pIndex + 2)) {break;}}// 計算該元素在 buffer 中的實際偏移量,并將其添加到緩沖區中final long offset = modifiedCalcElementOffset(pIndex, mask);soElement(buffer, offset, e);return true;}// 沒有將 resize 邏輯封裝在該方法中,而是由該方法判斷是否需要擴容private int offerSlowPath(long mask, long pIndex, long producerLimit) {int result;// 獲取消費者索引 BaseMpscLinkedArrayQueueConsumerFields 類中final long cIndex = lvConsumerIndex(this);// 通過掩碼值計算當前緩沖區容量long bufferCapacity = getCurrentBufferCapacity(mask);result = 0;// 如果隊列還有空間if (cIndex + bufferCapacity > pIndex) {// 嘗試更新生產者最大限制,更新失敗則返回 1 重試if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {result = 1;}}// 如果隊列已滿且無法擴展else if (availableInQueue(pIndex, cIndex) <= 0) {result = 2;}// 更新 producerIndex 最低位為 1,成功則進行擴容,否則重試else if (casProducerIndex(this, pIndex, pIndex + 1)) {result = 3;} else {result = 1;}return result;}private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {// 計算新緩沖區大小并創建,2 * (buffer.length - 1) + 1int newBufferLength = getNextBufferSize(oldBuffer);final E[] newBuffer = allocate(newBufferLength);// 更新緩沖區引用為新的緩沖區producerBuffer = newBuffer;// 更新新的掩碼final int newMask = (newBufferLength - 2) << 1;producerMask = newMask;// 計算元素在新舊緩沖區中的偏移量final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);// 將元素放到新緩沖區中soElement(newBuffer, offsetInNew, e);// 將新緩沖區連接到舊緩沖區中soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// 校驗可用空間final long cIndex = lvConsumerIndex(this);final long availableInQueue = availableInQueue(pIndex, cIndex);if (availableInQueue <= 0) {throw new IllegalStateException();}// 更新生產者限制大小和生產者索引soProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));soProducerIndex(this, pIndex + 2);// 將舊緩沖區中該位置的元素更新為 JUMP 標志位,這樣在被消費時就知道去新的緩沖區獲取了soElement(oldBuffer, offsetInOld, JUMP);}private long nextArrayOffset(final long mask) {return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);}// 因為最低位用來表示是否在擴容,所以 producerIndex 和 consumerIndex 并不表示實際的索引// 注意生產者(消費者)操作索引值會隨著元素的增加不斷變大,因為有它們和掩碼值的位與運算才保證了索引值一直在索引值的有效范圍內static long modifiedCalcElementOffset(long index, long mask) {return (index & mask) >> 1;}
}

可見,在這個過程中它并沒有限制操作線程數量,也沒有使用加鎖的同步機制。它通過保證 可見性,并使用 自旋鎖結合 CAS 操作 更新生產者索引值,因為該操作是原子的,同時只有一個線程能更新獲取索引值成功,更新失敗的線程會自旋重試,這樣便允許多線程同時添加元素,可見性保證和CAS操作源碼如下:

abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {static final VarHandle P_INDEX = pIndexLookup.findVarHandle(BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);// volatile 可見性保證static long lvProducerIndex(BaseMpscLinkedArrayQueue<?> self) {return (long) P_INDEX.getVolatile(self);}// CAS 操作static boolean casProducerIndex(BaseMpscLinkedArrayQueue<?> self, long expect, long newValue) {return P_INDEX.compareAndSet(self, expect, newValue);}
}

保證可見性(內存操作對其他線程可見)的原理是 內存屏障,除了保證可見性以外,內存屏障還能夠 防止重排序(確保在內存屏障前后的內存操作不會被重排序,從而保證程序的正確性)。到這里,生產者添加元素的邏輯我們已經分析完了,接下來我們需要繼續看一下消費者獲取元素的邏輯,它對應了 BaseMpscLinkedArrayQueue#poll 方法,同樣地,在這過程中需要關注“在這個方法中有沒有限制單一線程執行”,以此實現單消費者呢:

abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E> {private static final Object JUMP = new Object();public E poll() {// 讀取消費者相關字段 BaseMpscLinkedArrayQueueConsumerFields 類final E[] buffer = consumerBuffer;final long index = consumerIndex;final long mask = consumerMask;// 根據消費索引,計算出元素在消費者緩沖區中實際的位置final long offset = modifiedCalcElementOffset(index, mask);// 讀取該元素(volatile 可見性讀取)Object e = lvElement(buffer, offset);// 如果為空if (e == null) {// 比較生產者索引,如果兩個索引不相等,那么證明兩索引間存在距離表示還有元素能夠被消費if (index != lvProducerIndex(this)) {// 自旋讀取元素,直到讀到元素do {e = lvElement(buffer, offset);} while (e == null);} else {// 索引相等證明確實是空隊列return null;}}if (e == JUMP) {// 獲取到新緩沖區final E[] nextBuffer = getNextBuffer(buffer, mask);// 在新緩沖區中獲取到對應元素return newBufferPoll(nextBuffer, index);}// 清除當前索引的元素,表示該元素已經被消費soElement(buffer, offset, null);// 更新消費者索引,這里也是 +2,它并不表示實際的在緩沖區的索引soConsumerIndex(this, index + 2);return (E) e;}private E[] getNextBuffer(final E[] buffer, final long mask) {// 如果已經發生擴容,此時 consumerMask 仍然對應的是擴容前的 mask// 此處與生產者操作擴容時拼接新舊緩沖區調用的是一樣的方法,這樣便能夠獲取到新緩沖區的偏移量final long nextArrayOffset = nextArrayOffset(mask);// 獲取到新緩沖區,因為在擴容操作時已經將新緩沖區鏈接到舊緩沖區上了final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);// 將舊緩沖區中新緩沖區位置設置為 null 表示舊緩沖區中已經沒有任何元素需要被消費了,也不再需要被引用了(能被垃圾回收了)soElement(buffer, nextArrayOffset, null);return nextBuffer;}private long nextArrayOffset(final long mask) {return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);}private E newBufferPoll(E[] nextBuffer, final long index) {// 計算出消費者操作索引在新緩沖區中對應的實際位置final long offsetInNew = newBufferAndOffset(nextBuffer, index);// 在新緩沖區中獲取到對應元素final E n = lvElement(nextBuffer, offsetInNew);if (n == null) {throw new IllegalStateException("new buffer must have at least one element");}// 清除當前索引的元素,表示該元素已經被消費soElement(nextBuffer, offsetInNew, null);// 更新消費者索引soConsumerIndex(this, index + 2);return n;}private long newBufferAndOffset(E[] nextBuffer, final long index) {// 將消費者緩沖區引用和掩碼值更新consumerBuffer = nextBuffer;consumerMask = (nextBuffer.length - 2L) << 1;return modifiedCalcElementOffset(index, consumerMask);}static long modifiedCalcElementOffset(long index, long mask) {return (index & mask) >> 1;}static <E> E lvElement(E[] buffer, long offset) {return (E) REF_ARRAY.getVolatile(buffer, (int) offset);}
}

可以發現在該方法中并沒有限制單一線程執行,所以理論上這個方法可能被多個線程調用,那么它又為什么被稱為 MPSC 呢?在這個方法中的一段注釋值得細心體會:

This implementation is correct for single consumer thread use only.
此實現僅適用于單消費者線程使用

所以調用該方法時開發者本身需要保證單線程調用而并不是在實現中控制。

到這里 MpscGrowableArrayQueue 中核心的邏輯已經講解完了,現在我們回過頭來再看一下隊列擴容前后生產者和消費者是如何協同的?在擴容前,consumerBufferproducerBuffer 引用的是同一個緩沖區對象。如果發生擴容,那么生產者會創建一個新的緩沖區,并將 producerBuffer 引用指向它,此時它做了一個 非常巧妙 的操作,將 新緩沖區依然鏈接到舊緩沖區 上,并將觸發擴容的元素對應的舊緩沖區的索引處標記為 JUMP,表示這及之后的元素已經都在新緩沖區中。此時,消費者依然會在舊緩沖區中慢慢地消費,直到遇到 JUMP 標志位,消費者就知道需要到新緩沖區中取獲取元素了。因為之前生產者在擴容時對新舊緩沖區進行鏈接,所以消費者能夠通過舊緩沖區獲取到新緩沖區的引用,并變更 consumerBuffer 的引用和 consumerMask 掩碼值,接下來的消費過程便和擴容前沒有差別了。

scheduleAfterWrite

現在我們再回到 put 方法的邏輯中,如果向 WriterBuffer 中添加元素成功,則會調用 scheduleAfterWrite 方法,調度任務的執行:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {final ReentrantLock evictionLock = new ReentrantLock();// 默認為 ForkJoinPool.commonPool()final Executor executor;// 該任務在創建緩存時已經完成初始化final PerformCleanupTask drainBuffersTask;// 根據狀態的變化來調度執行任務void scheduleAfterWrite() {// 獲取當前 drainStatus,drain 譯為排空,耗盡int drainStatus = drainStatusOpaque();for (; ; ) {// 這里的狀態機變更需要關注下switch (drainStatus) {// IDLE 表示當前無任務可做case IDLE:// CAS 更新狀態為 REQUIREDcasDrainStatus(IDLE, REQUIRED);// 調度任務執行scheduleDrainBuffers();return;// REQUIRED 表示當前有任務需要執行case REQUIRED:// 調度任務執行scheduleDrainBuffers();return;// PROCESSING_TO_IDLE 表示當前任務處理完成后會變成 IDLE 狀態case PROCESSING_TO_IDLE:// 又來了新的任務,則 CAS 操作將它更新為 PROCESSING_TO_REQUIRED 狀態if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {return;}drainStatus = drainStatusAcquire();continue;// PROCESSING_TO_REQUIRED 表示正在處理任務,處理完任務后還有任務需要處理case PROCESSING_TO_REQUIRED:return;default:throw new IllegalStateException("Invalid drain status: " + drainStatus);}}}// 調度執行緩沖區中的任務void scheduleDrainBuffers() {// 如果狀態表示正在有任務處理則返回if (drainStatusOpaque() >= PROCESSING_TO_IDLE) {return;}// 注意這里要獲取同步鎖 evictionLockif (evictionLock.tryLock()) {try {// 獲取鎖后再次校驗當前處理狀態int drainStatus = drainStatusOpaque();if (drainStatus >= PROCESSING_TO_IDLE) {return;}// 更新狀態為 PROCESSING_TO_IDLEsetDrainStatusRelease(PROCESSING_TO_IDLE);// 同步機制保證任何時刻只能有一個線程能夠提交任務executor.execute(drainBuffersTask);} catch (Throwable t) {logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);maintenance(/* ignored */ null);} finally {evictionLock.unlock();}}}}

寫后調度處理任務(scheduleAfterWrite)會根據狀態選擇性執行 scheduleDrainBuffers 方法,執行該方法時通過同步鎖 evictionLock 保證同時只有一個線程能提交 PerformCleanupTask 任務。這個任務在創建緩存時已經被初始化完成了,每次提交任務都會被復用,接下來我們看一下這個任務的具體實現:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {// 可重用的任務,用于執行 maintenance 方法,避免了使用 ForkJoinPool 來包裝static final class PerformCleanupTask extends ForkJoinTask<Void> implements Runnable {private static final long serialVersionUID = 1L;final WeakReference<BoundedLocalCache<?, ?>> reference;PerformCleanupTask(BoundedLocalCache<?, ?> cache) {reference = new WeakReference<BoundedLocalCache<?, ?>>(cache);}@Overridepublic boolean exec() {try {run();} catch (Throwable t) {logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", t);}// Indicates that the task has not completed to allow subsequent submissions to executereturn false;}@Overridepublic void run() {// 獲取到緩存對象BoundedLocalCache<?, ?> cache = reference.get();if (cache != null) {cache.performCleanUp(null);}}// ...}
}

它的實現非常簡單,其中 reference 字段在調用構造方法時被賦值,引用的是緩存對象本身。當任務被執行時,調用的是 BoundedLocalCache#performCleanUp 方法:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {final ReentrantLock evictionLock = new ReentrantLock();// 執行該任務時,也要獲取同步鎖,表示任務只能由一個線程來執行void performCleanUp(@Nullable Runnable task) {evictionLock.lock();try {// 執行維護任務maintenance(task);} finally {evictionLock.unlock();}rescheduleCleanUpIfIncomplete();}@GuardedBy("evictionLock")void maintenance(@Nullable Runnable task) {// 更新狀態為執行中setDrainStatusRelease(PROCESSING_TO_IDLE);try {// 處理讀緩沖區中的任務drainReadBuffer();// 處理寫緩沖區中的任務drainWriteBuffer();if (task != null) {task.run();}// 處理 key 和 value 的引用drainKeyReferences();drainValueReferences();// 過期和驅逐策略expireEntries();evictEntries();// “增值” 操作,后續重點講climb();} finally {// 狀態不是 PROCESSING_TO_IDLE 或者無法 CAS 更新為 IDLE 狀態的話,需要更新狀態為 REQUIRED,該狀態會再次執行維護任務if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {setDrainStatusOpaque(REQUIRED);}}}
}

注意在執行 performCleanUp 方法時,也需要獲取到同步鎖 evictionLock,那么任務的提交和任務的執行也是互斥的。這個執行的核心邏輯在 maintenance “維護”方法中,注意這個方法被標記了注解 @GuardedBy("evictionLock"),源碼中還有多個方法也標記了該注解,執行這些方法時都要獲取同步鎖,這也是在提醒我們這些方法同時只有由一條線程被執行。因為目前關注的是 put 方法,所以重點先看維護方法中 drainWriteBuffer 方法處理寫緩沖區中任務的邏輯:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {static final int NCPU = Runtime.getRuntime().availableProcessors();static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);final MpscGrowableArrayQueue<Runnable> writeBuffer;@GuardedBy("evictionLock")void drainWriteBuffer() {// 最大循環次數為 writeBuffer 最大容量,直至彈出元素為 nullfor (int i = 0; i <= WRITE_BUFFER_MAX; i++) {Runnable task = writeBuffer.poll();if (task == null) {return;}task.run();}// 更新狀態為 PROCESSING_TO_REQUIREDsetDrainStatusOpaque(PROCESSING_TO_REQUIRED);}
}

執行邏輯非常簡單,在獲取到同步鎖之后,在 WriteBuffer 中獲取要被執行的任務并執行。在這里我們能發現“SC 單消費者”的實現使用 同步鎖的機制保證同時只能有一個消費者消費緩沖區中的任務。在上文中我們已經知道,調用 put 方法時向緩沖區 WriteBuffer 中添加的任務為 AddTask,下面我們看一下該任務的實現:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;final class AddTask implements Runnable {final Node<K, V> node;// 節點權重final int weight;AddTask(Node<K, V> node, int weight) {this.weight = weight;this.node = node;}@Override@GuardedBy("evictionLock")@SuppressWarnings("FutureReturnValueIgnored")public void run() {// 是否指定了驅逐策略if (evicts()) {// 更新緩存權重和窗口區權重setWeightedSize(weightedSize() + weight);setWindowWeightedSize(windowWeightedSize() + weight);// 更新節點的 policyWeight,該字段只有在自定了權重計算規則時才有效// 否則像只定義了固定容量的驅逐策略,使用默認元素權重為 1 是不需要關注該字段的node.setPolicyWeight(node.getPolicyWeight() + weight);// 檢測當前總權重是否超過一半的最大容量long maximum = maximum();if (weightedSize() >= (maximum >>> 1)) {// 如果超過最大容量if (weightedSize() > MAXIMUM_CAPACITY) {// 執行驅逐操作evictEntries();} else {// 延遲加載頻率草圖 frequencySketch 數據結構,用于統計元素訪問頻率long capacity = isWeighted() ? data.mappingCount() : maximum;frequencySketch().ensureCapacity(capacity);}}// 更新頻率統計信息K key = node.getKey();if (key != null) {// 因為頻率草圖數據結構具有延遲加載機制(權重超過半數)// 所以實際上在元素權重還未過半未完成初始化時,調用 increment 是沒有作用的frequencySketch().increment(key);}// 增加未命中樣本數setMissesInSample(missesInSample() + 1);}// 同步檢測節點是否還有效boolean isAlive;synchronized (node) {isAlive = node.isAlive();}if (isAlive) {// 寫后過期策略if (expiresAfterWrite()) {writeOrderDeque().offerLast(node);}// 過期策略if (expiresVariable()) {timerWheel().schedule(node);}// 驅逐策略if (evicts()) {// 如果權重比配置的最大權重大if (weight > maximum()) {// 執行固定權重(RemovalCause.SIZE)的驅逐策略evictEntry(node, RemovalCause.SIZE, expirationTicker().read());}// 如果權重超過窗口區最大權重,則將其放在窗口區頭節點else if (weight > windowMaximum()) {accessOrderWindowDeque().offerFirst(node);}// 否則放在窗口區尾節點else {accessOrderWindowDeque().offerLast(node);}}// 訪問后過期策略else if (expiresAfterAccess()) {accessOrderWindowDeque().offerLast(node);}}// 處理異步計算if (isComputingAsync(node)) {synchronized (node) {if (!Async.isReady((CompletableFuture<?>) node.getValue())) {long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;setVariableTime(node, expirationTime);setAccessTime(node, expirationTime);setWriteTime(node, expirationTime);}}}}}
}

根據注釋很容易理解該方法的作用,因為我們目前對緩存只定義了固定容量的驅逐策略,所以我們需要在看一下 evictEntry 方法:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {final ConcurrentHashMap<Object, Node<K, V>> data;@GuardedBy("evictionLock")@SuppressWarnings({"GuardedByChecker", "NullAway", "PMD.CollapsibleIfStatements"})boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {K key = node.getKey();@SuppressWarnings("unchecked")V[] value = (V[]) new Object[1];boolean[] removed = new boolean[1];boolean[] resurrect = new boolean[1];Object keyReference = node.getKeyReference();RemovalCause[] actualCause = new RemovalCause[1];data.computeIfPresent(keyReference, (k, n) -> {if (n != node) {return n;}synchronized (n) {value[0] = n.getValue();// key 或 value 為 null,這種情況下可能使用了 Caffeine.weakKeys, Caffeine.weakValues, or Caffeine.softValues// 導致被垃圾回收了if ((key == null) || (value[0] == null)) {// 標記實際失效原因為垃圾回收 actualCause[0] = RemovalCause.COLLECTED;}// 如果原因為垃圾回收,記錄 resurrect 復活標記為 trueelse if (cause == RemovalCause.COLLECTED) {resurrect[0] = true;return n;}// 否則記錄入參中的原因else {actualCause[0] = cause;}// 過期驅逐策略判斷if (actualCause[0] == RemovalCause.EXPIRED) {boolean expired = false;if (expiresAfterAccess()) {expired |= ((now - n.getAccessTime()) >= expiresAfterAccessNanos());}if (expiresAfterWrite()) {expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());}if (expiresVariable()) {expired |= (n.getVariableTime() <= now);}if (!expired) {resurrect[0] = true;return n;}}// 固定容量驅逐策略else if (actualCause[0] == RemovalCause.SIZE) {int weight = node.getWeight();if (weight == 0) {resurrect[0] = true;return n;}}// 通知驅逐策略監聽器,調用它的方法notifyEviction(key, value[0], actualCause[0]);// 將該 key 對應的刷新策略失效discardRefresh(keyReference);// 標記該節點被驅逐removed[0] = true;// 退休準備被垃圾回收node.retire();}return null;});// 如果復活標記為 true 那么不被移除if (resurrect[0]) {return false;}// 節點已經要被驅逐// 如果在窗口區,那么直接從窗口區移除if (node.inWindow() && (evicts() || expiresAfterAccess())) {accessOrderWindowDeque().remove(node);}// 如果沒在窗口區else if (evicts()) {// 在試用區直接在試用區移除if (node.inMainProbation()) {accessOrderProbationDeque().remove(node);}// 在保護區則直接從保護區移除else {accessOrderProtectedDeque().remove(node);}}// 將寫后失效和時間輪中關于該節點的元素移除if (expiresAfterWrite()) {writeOrderDeque().remove(node);} else if (expiresVariable()) {timerWheel().deschedule(node);}// 同步機制將 node 置為 deadsynchronized (node) {logIfAlive(node);makeDead(node);}if (removed[0]) {// 節點被移除監控計數和節點移除通知回調statsCounter().recordEviction(node.getWeight(), actualCause[0]);notifyRemoval(key, value[0], actualCause[0]);}return true;}
}

該方法比較簡單,是將節點進行驅逐的邏輯,在后文中它會被多次復用,需要留一個印象。回到 AddTask 任務的邏輯中,當被添加的元素權重超過最大權重限制時會被直接移除。這種特殊情況試用于指定了權重計算策略的緩存,如果只指定了固定容量,元素權重默認為 1,所以不會直接超過最大緩存數量限制。

現在我們已經將 put 方法中向緩存中添加元素的邏輯介紹完了,接下來需要關注 put 方法中對已存在的相同 key 值元素的處理邏輯:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {static final int MAX_PUT_SPIN_WAIT_ATTEMPTS = 1024 - 1;static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);final ConcurrentHashMap<Object, Node<K, V>> data;@NullableV put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {requireNonNull(key);requireNonNull(value);Node<K, V> node = null;long now = expirationTicker().read();int newWeight = weigher.weigh(key, value);Object lookupKey = nodeFactory.newLookupKey(key);for (int attempts = 1; ; attempts++) {Node<K, V> prior = data.get(lookupKey);if (prior == null) {// ... }// 元素被讀到之后可能已經被驅逐了if (!prior.isAlive()) {// 自旋嘗試重新從 ConcurrentHashMap 中獲取,再獲取時如果為 null 則執行新增邏輯if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {Thread.onSpinWait();continue;}// 如果自旋嘗試后元素仍未被刪除,校驗元素是否處于存活狀態// 如果處于非存活狀態,那么可能這個元素已經被破壞,無法被移除,拋出異常data.computeIfPresent(lookupKey, (k, n) -> {requireIsAlive(key, n);return n;});continue;}V oldValue;// 新的過期時間long varTime;int oldWeight;boolean expired = false;boolean mayUpdate = true;boolean exceedsTolerance = false;// 為元素加同步鎖synchronized (prior) {// 如果此時元素已經失效了,那么需要重新循環if (!prior.isAlive()) {continue;}oldValue = prior.getValue();oldWeight = prior.getWeight();// oldValue 為 null 證明它被垃圾回收器回收了if (oldValue == null) {// 記錄元素創建后的過期時間varTime = expireAfterCreate(key, value, expiry, now);// 驅逐監聽器回調notifyEviction(key, null, RemovalCause.COLLECTED);}// 如果元素已經過期了else if (hasExpired(prior, now)) {// 標記過期標志為 trueexpired = true;// 記錄元素創建后的過期時間并回調驅逐監聽器varTime = expireAftexpireAfterCreateerCreate(key, value, expiry, now);notifyEviction(key, oldValue, RemovalCause.EXPIRED);}// onlyInAbsent 為 true 時不會對已存在 key 的值進行修改else if (onlyIfAbsent) {mayUpdate = false;// 記錄元素讀后過期時間varTime = expireAfterRead(prior, key, value, expiry, now);} else {// 記錄元素修改后過期時間varTime = expireAfterUpdate(prior, key, value, expiry, now);}// 需要修改原有 key 的 value 值if (mayUpdate) {exceedsTolerance =// 配置了寫后過期策略且已經超過寫后時間的容忍范圍(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)// 或者配置了可變時間過期策略同樣判斷是否超過時間的容忍范圍|| (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);// 更新值,更新權重,更新寫時間prior.setValue(value, valueReferenceQueue());prior.setWeight(newWeight);setWriteTime(prior, now);// 寫后刷新策略失效discardRefresh(prior.getKeyReference());}// 更新過期時間setVariableTime(prior, varTime);// 更新訪問時間setAccessTime(prior, now);}// 根據不同的情況回調不同的監聽器if (expired) {notifyRemoval(key, oldValue, RemovalCause.EXPIRED);} else if (oldValue == null) {notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);} else if (mayUpdate) {notifyOnReplace(key, oldValue, value);}// 計算寫后權重變化int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;// 如果 oldValue 已經被回收 或 權重修改前后發生變更 或 已經過期,添加更新任務if ((oldValue == null) || (weightedDifference != 0) || expired) {afterWrite(new UpdateTask(prior, weightedDifference));}// 如果超過了時間容忍范圍,添加更新任務else if (!onlyIfAbsent && exceedsTolerance) {afterWrite(new UpdateTask(prior, weightedDifference));} else {// 沒有超過時間容忍范圍,更新寫時間if (mayUpdate) {setWriteTime(prior, now);}// 處理讀后操作afterRead(prior, now, /* recordHit */ false);}return expired ? null : oldValue;}}
}

對于已有元素的變更,會對節點添加同步鎖,更新它的權重等一系列變量,如果超過 1s 的時間容忍范圍,則會添加 UpdateTask 更新任務,至于處理讀后操作 afterRead 在讀方法中再去介紹。接下來我們需要重新再看一下 afterWrite 方法,其中有部分我們在上文中沒有介紹的邏輯:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {final ReentrantLock evictionLock;void afterWrite(Runnable task) {// 這段邏輯我們在看 AddTask 的邏輯時已經看過了,所以略過for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {if (writeBuffer.offer(task)) {scheduleAfterWrite();return;}scheduleDrainBuffers();Thread.onSpinWait();}// 以下邏輯用于解決在重試了 100 次后仍然寫入失敗的問題,它會嘗試獲取 evictionLock 同步鎖// 直接同步執行“維護”方法并執行當前任務,但是它并無法解決某個寫入操作執行時間很長的問題// 發生這種情況的原因可能是由于執行器的所有線程都很忙(可能是寫入此緩存),寫入速率大大超過了消耗速率,優先級反轉,或者執行器默默地丟棄了維護任務lock();try {maintenance(task);} catch (RuntimeException e) {logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);} finally {evictionLock.unlock();}// 重新調度異步維護任務,確保維護操作能及時執行rescheduleCleanUpIfIncomplete();}void lock() {long remainingNanos = WARN_AFTER_LOCK_WAIT_NANOS;long end = System.nanoTime() + remainingNanos;boolean interrupted = false;try {for (;;) {try {if (evictionLock.tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {return;}logger.log(Level.WARNING, "The cache is experiencing excessive wait times for acquiring "+ "the eviction lock. This may indicate that a long-running computation has halted "+ "eviction when trying to remove the victim entry. Consider using AsyncCache to "+ "decouple the computation from the map operation.", new TimeoutException());evictionLock.lock();return;} catch (InterruptedException e) {remainingNanos = end - System.nanoTime();interrupted = true;}}} finally {if (interrupted) {Thread.currentThread().interrupt();}}}// 調用同步的維護方法時,可能發生獲取鎖超時,那么再重新開啟一個異步維護調度void rescheduleCleanUpIfIncomplete() {// 校驗是否有任務需要被執行if (drainStatusOpaque() != REQUIRED) {return;}// 默認線程池調度任務執行,這個方法我們在上文中已經詳細介紹過if (executor == ForkJoinPool.commonPool()) {scheduleDrainBuffers();return;}// 如果自定義了線程池,那么會使用自定義的線程池進行處理var pacer = pacer();if ((pacer != null) && !pacer.isScheduled() && evictionLock.tryLock()) {try {if ((drainStatusOpaque() == REQUIRED) && !pacer.isScheduled()) {pacer.schedule(executor, drainBuffersTask, expirationTicker().read(), Pacer.TOLERANCE);}} finally {evictionLock.unlock();}}}
}

寫后操作除了在添加任務到緩沖區成功后會執行維護方法,添加失敗(證明寫入操作非常頻繁)依然會嘗試同步執行維護方法和發起異步維護,用于保證緩存中的任務能夠被及時執行,使緩存中元素都處于“預期”狀態中。接下來我們在看一下 UpdateTask 更新任務的邏輯:

abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {final class UpdateTask implements Runnable {final int weightDifference;final Node<K, V> node;public UpdateTask(Node<K, V> node, int weightDifference) {this.weightDifference = weightDifference;this.node = node;}@Override@GuardedBy("evictionLock")public void run() {// 寫后過期和自定義過期邏輯if (expiresAfterWrite()) {reorder(writeOrderDeque(), node);} else if (expiresVariable()) {timerWheel().reschedule(node);}// 指定了驅逐策略if (evicts()) {// 變更節點權重int oldWeightedSize = node.getPolicyWeight();node.setPolicyWeight(oldWeightedSize + weightDifference);// 如果是窗口區節點if (node.inWindow()) {// 更新窗口區權重setWindowWeightedSize(windowWeightedSize() + weightDifference);// 節點權重超過最大權重限制,直接驅逐if (node.getPolicyWeight() > maximum()) {evictEntry(node, RemovalCause.SIZE, expirationTicker().read());}// 節點權重比窗口區最大值小else if (node.getPolicyWeight() <= windowMaximum()) {onAccess(node);}// 窗口區包含該節點且該節點的權重大于窗口最大權重,則放到頭節點else if (accessOrderWindowDeque().contains(node)) {accessOrderWindowDeque().moveToFront(node);}}// 如果是試用區節點else if (node.inMainProbation()) {// 節點權重比最大權重限制小if (node.getPolicyWeight() <= maximum()) {onAccess(node);}// 否則將該節點驅逐else {evictEntry(node, RemovalCause.SIZE, expirationTicker().read());}}// 如果是保護區節點else if (node.inMainProtected()) {// 更新保護區權重setMainProtectedWeightedSize(mainProtectedWeightedSize() + weightDifference);// 同樣的邏輯if (node.getPolicyWeight() <= maximum()) {onAccess(node);} else {evictEntry(node, RemovalCause.SIZE, expirationTicker().read());}}// 更新緩存權重大小setWeightedSize(weightedSize() + weightDifference);// 更新完成后超過最大權重限制執行驅逐操作if (weightedSize() > MAXIMUM_CAPACITY) {evictEntries();}}// 配置了訪問后過期else if (expiresAfterAccess()) {onAccess(node);}}}@GuardedBy("evictionLock")void onAccess(Node<K, V> node) {if (evicts()) {K key = node.getKey();if (key == null) {return;}// 更新訪問頻率frequencySketch().increment(key);// 如果節點在窗口區,則將其移動到尾節點if (node.inWindow()) {reorder(accessOrderWindowDeque(), node);}// 在試用區的節點執行 reorderProbation 方法,可能會將該節點從試用區晉升到保護區else if (node.inMainProbation()) {reorderProbation(node);}// 否則移動到保護區的尾結點else {reorder(accessOrderProtectedDeque(), node);}// 更新命中量setHitsInSample(hitsInSample() + 1);}// 配置了訪問過期策略else if (expiresAfterAccess()) {reorder(accessOrderWindowDeque(), node);}// 配置了自定義時間過期策略if (expiresVariable()) {timerWheel().reschedule(node);}}static <K, V> void reorder(LinkedDeque<Node<K, V>> deque, Node<K, V> node) {// 如果節點存在,將其移動到尾結點if (deque.contains(node)) {deque.moveToBack(node);}}@GuardedBy("evictionLock")void reorderProbation(Node<K, V> node) {// 檢查試用區是否包含該節點,不包含則證明已經被移除,則不處理if (!accessOrderProbationDeque().contains(node)) {return;}// 檢查節點的權重是否超過保護區最大值else if (node.getPolicyWeight() > mainProtectedMaximum()) {// 如果超過,將該節點移動到 試用區 尾巴節點,保證超重的節點不會被移動到保護區reorder(accessOrderProbationDeque(), node);return;}// 更新保護區權重大小setMainProtectedWeightedSize(mainProtectedWeightedSize() + node.getPolicyWeight());// 在試用區中移除該節點accessOrderProbationDeque().remove(node);// 在保護區尾節點中添加accessOrderProtectedDeque().offerLast(node);// 將該節點標記為保護區節點node.makeMainProtected();}
}

UpdateTask 修改任務負責變更權重值,并更新節點所在隊列的順序和訪問頻率,這里我們也能發現,這三個區域的隊列采用了 LRU 算法,一般情況下,最新被訪問的元素會被移動到尾節點。到現在,向有固定容量限制的緩存中調用 put 方法添加元素的邏輯基本已經介紹完了,目前對 Caffeine 緩存的了解程度如下所示:

在這里插入圖片描述

put 添加元素時會先直接添加到 ConcurrentHashMap 中,并在 WriteBuffer 中添加 AddTask/UpdateTask 任務,WriteBuffer 是一個 MPSC 的緩沖區,添加成功后會有加鎖的同步機制在默認的 ForkJoinPool.commonPool() 線程池中提交 PerformCleanupTask 任務,PerformCleanupTask 任務的主要作用是執行 maintenance 維護方法,該方法執行前需要先獲取同步鎖,單線程消費 WriteBuffer 中的任務。執行 AddTask 任務時會將元素先添加到窗口區,如果是 UpdateTask,它會修改三個不同區域的雙端隊列,這些隊列采用LRU算法,最新被訪問的元素會被放在尾節點處,并且試用區的元素被訪問后會被晉升到保護區尾節點,元素對應的訪問頻率也會在頻率草圖中更新,如果被添加的節點權重超過緩存最大權重會直接被驅逐。(目前維護方法中除了 drainWriteBuffer 方法外,其他步驟還未詳細解釋,之后會在后文中不斷完善)


點擊繼續閱讀 緩存之美:萬文詳解 Caffeine 實現原理(下)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66779.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66779.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66779.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Qt實踐:一個簡單的絲滑側滑欄實現

Qt實踐&#xff1a;一個簡單的絲滑側滑欄實現 筆者前段時間突然看到了側滑欄&#xff0c;覺得這個抽屜式的側滑欄非常的有趣&#xff0c;打算這里首先嘗試實現一個簡單的絲滑側滑欄。 首先是上效果圖 &#xff08;C&#xff0c;GIF幀率砍到毛都不剩了&#xff09; QProperty…

工作流引擎Camunda與LiteFlow核心組件對比

以下為 Camunda 7 和 LiteFlow 詳細的介紹&#xff0c;包括它們的核心組件和用途。 1. Camunda 7 詳細介紹 Camunda 7 是一個基于 BPMN 2.0 標準的企業級工作流和決策自動化平臺。它被廣泛應用于復雜業務流程的管理和執行&#xff0c;其核心目標是通過流程自動化來提升企業效…

css動畫水球圖

由于echarts水球圖動畫會導致ios卡頓&#xff0c;所以純css模擬 展示效果 組件 <template><div class"water-box"><div class"water"><div class"progress" :style"{ --newProgress: newProgress % }"><…

iOS 權限管理:同時請求相機和麥克風權限的最佳實踐

引言 在開發視頻類應用時&#xff0c;我們常常會遇到需要同時請求相機和麥克風權限的場景。比如&#xff0c;在用戶發布視頻動態時&#xff0c;相機用于捕捉畫面&#xff0c;麥克風用于錄制聲音&#xff1b;又或者在直播功能中&#xff0c;只有獲得這兩項權限&#xff0c;用戶…

Java 泛型上下限詳解:以 Info 泛型類和方法實現為例

本文將通過一個實際示例&#xff0c;來深入講解 Java 泛型中的上下限及其應用場景。在這個示例中&#xff0c;我們會實現一個泛型類 Info 和兩個泛型方法 upperLimit 和 lowerLimit&#xff0c;并解釋其工作機制。 1. 什么是 Java 泛型上下限&#xff1f; Java 泛型的上下限是…

客戶服務創新:數字化時代的策略與實踐

在數字化時代背景下&#xff0c;客戶服務已成為企業競爭的關鍵領域。隨著消費者需求的日益多樣化和個性化&#xff0c;傳統的客戶服務模式已難以滿足市場的要求。因此&#xff0c;企業需要不斷探索和創新客戶服務策略&#xff0c;以適應數字化時代的變化。 一、數字化時代客戶服…

【PyCharm】遠程連接Linux服務器

【PyCharm】相關鏈接 【PyCharm】連接Jupyter Notebook【PyCharm】快捷鍵使用【PyCharm】遠程連接Linux服務器【PyCharm】設置為中文界面 【PyCharm】遠程連接Linux服務器 PyCharm 提供了遠程開發的功能&#xff0c;使得開發者可以在本地編輯代碼或使用服務器資源。 下面將詳…

十三、數據的的輸入與輸出(3)

數據的輸出 writeClipboard&#xff08;&#xff09;函數 writeClipboard&#xff08;&#xff09;函數可以將數據輸出至剪貼板。 例如&#xff0c;將R的內置數據集iris輸出到剪貼板&#xff0c;在進入Excel中點擊"粘貼"。 head(iris) #查看數據集Sepal.L…

PyQt5之QDialog

1.描述 QDialog是對話窗口的基類&#xff0c;對話窗口是頂級窗口&#xff0c;主要用于短期任務和與用戶的簡短通信。 可分為模態對話框和非模態對話框。 模態對話框又可以分為應用程序級別和窗口級別。 ? 應用程序級別&#xff1a;當該種模態的對話框出現時&#xff0c;用…

Next.js:構建大模型智能體GPT研究者應用的 Web開發框架

Next.js&#xff1a;構建大模型智能體GPT研究者應用的 Web開發框架 Next.js 基礎知識 Next.js 是由 Vercel 公司開發維護的框架&#xff0c;極大地簡化了 React 應用的開發流程。其核心特性包括&#xff1a; 服務器端渲染&#xff08;SSR&#xff09;與靜態站點生成&#xff…

車載軟件架構 --- CP和AP作為中央計算平臺的軟件架構雙核心

我是穿拖鞋的漢子&#xff0c;魔都中堅持長期主義的汽車電子工程師。 老規矩&#xff0c;分享一段喜歡的文字&#xff0c;避免自己成為高知識低文化的工程師&#xff1a; 簡單&#xff0c;單純&#xff0c;喜歡獨處&#xff0c;獨來獨往&#xff0c;不易合同頻過著接地氣的生活…

華為EC6110T-海思Hi3798MV310_安卓9.0_通刷-強刷固件包

華為EC6110T-海思Hi3798MV310_安卓9.0_通刷-強刷固件包 刷機教程說明&#xff1a; 適用機型&#xff1a;華為EC6110-T、華為EC6110-U、華為EC6110-M 破解總分為兩個部分&#xff1a;拆機短接破解&#xff08;保留IPTV&#xff09;和OTT卡刷&#xff08;不保留IPTV&#xff09…

Element使用表單重置如果不使用prop,重置無法生效

文章目錄 為什么需要 prop&#xff1f;示例&#xff1a;使用 prop 的正確方式關鍵點總結 在 element-ui 的 el-form 組件中&#xff0c; prop 屬性是與表單驗證和表單字段綁定密切相關的&#xff0c;尤其在使用 resetFields() 重置表單數據時。 如果不使用 prop&#xff0…

使用pyboard、micropython和tja1050進行can通信

單片機和can收發器之間tx、rx不需要交叉接線&#xff01;&#xff01;&#xff01; tja1050的rx接Y3、tx接Y4 from pyb import CANcan CAN(1) can.init(modecan.NORMAL, prescaler6, sjw1, bs14, bs22, auto_restartTrue) # 1Mbps的配置&#xff0c;本文使用的micropython1.…

【信息系統項目管理師】高分論文:論信息系統項目的干系人管理(社保信息管理系統)

更多內容請見: 備考信息系統項目管理師-專欄介紹和目錄 文章目錄 論文1、識別干系人2、規劃干系人參與3、管理干系人4、監督干系人論文 2016年3月,我作為項目經理參與了XX市社保信息管理系統項目的建設,該項目投資共450萬元人民幣,建設工期為1年,通過該項目的實施,在XX市…

JavaScript系列(39)-- Web Workers技術詳解

JavaScript Web Workers技術詳解 &#x1f504; 今天&#xff0c;讓我們深入了解Web Workers技術&#xff0c;這是一種能夠在后臺線程中運行腳本的強大特性&#xff0c;可以避免阻塞主線程&#xff0c;提升Web應用的性能和響應性。 Web Workers基礎概念 &#x1f31f; &#…

26、正則表達式

目錄 一. 匹配字符 .&#xff1a;匹配除換行符外的任意單個字符。 二. 位置錨點 ^&#xff1a;匹配輸入字符串的開始位置。 $&#xff1a;匹配輸入字符串的結束位置。 \b&#xff1a;匹配單詞邊界。 \B&#xff1a;匹配非單詞邊界。 三. 重復限定符 *&#xff1a;匹配…

Chrome遠程桌面無法連接怎么解決?

Chrome遠程桌面連接已停止工作 Chrome遠程桌面是一款極為便捷的瀏覽器插件&#xff0c;能夠幫助用戶將自己的計算機連接到其他設備&#xff0c;無論是手機、平板電腦還是其他電腦。然而&#xff0c;在實際使用中&#xff0c;許多用戶可能會面臨各種各樣的問題&#xff0c;比如…

備賽藍橋杯之第十五屆職業院校組省賽第一題:智能停車系統

提示&#xff1a;本篇文章僅僅是作者自己目前在備賽藍橋杯中&#xff0c;自己學習與刷題的學習筆記&#xff0c;寫的不好&#xff0c;歡迎大家批評與建議 由于個別題目代碼量與題目量偏大&#xff0c;請大家自己去藍橋杯官網【連接高校和企業 - 藍橋云課】去尋找原題&#xff0…

基于AutoDL云計算平臺+LLaMA-Factory訓練平臺微調本地大模型

1. 注冊與認證 訪問AutoDL官網&#xff1a;前往 AutoDL官網。 注冊賬號&#xff1a;完成注冊流程。 實名認證&#xff1a;按照要求完成實名認證&#xff0c;以確保賬號的合規性。 2. 選擇GPU資源 進入算力市場&#xff1a;在官網首頁點擊“算力市場”菜單。 挑選GPU&#x…