JCTools Spmc 單生產者-多消費者的無鎖并發有界隊列

SpmcArrayQueue?是 JCTools 中為 單生產者-多消費者(Single-Producer-Multi-Consumer) 場景設計的有界隊列。與 SPSC 模型相比,SPMC 的復雜性主要體現在消費者側,因為多個消費者線程需要以線程安全的方式競爭消費同一個隊列中的元素。

單生產者-單消費者數組隊列 分析見:JCTools Spsc:單生產者-單消費者無鎖隊列

SpmcArrayQueue?的繼承鏈同樣是為了精確控制內存布局,但其側重點與?SpscArrayQueue?有所不同,它需要處理多消費者對?consumerIndex?的爭用。

  1. SpmcArrayQueueL1Pad?&?SpmcArrayQueueProducerIndexField:

    • 與 SPSC 類似,這部分定義了生產者索引?producerIndex,并用?L1Pad?將其與上游的“冷”字段(如?buffer,?mask)隔離開。
    • 由于只有一個生產者,producerIndex?的更新邏輯相對簡單,不需要 CAS 操作,使用?putOrderedLong?即可。
  2. SpmcArrayQueueL2Pad?&?SpmcArrayQueueConsumerIndexField:

    • 核心變化點SpmcArrayQueueConsumerIndexField?中不再有?soConsumerIndex?方法,取而代之的是?casConsumerIndex
       
      // ... existing code ...
      //$gen:ordered-fields
      abstract class SpmcArrayQueueConsumerIndexField<E> extends SpmcArrayQueueL2Pad<E>
      {protected final static long C_INDEX_OFFSET = fieldOffset(SpmcArrayQueueConsumerIndexField.class, "consumerIndex");private volatile long consumerIndex;// ... existing code ...@Overridepublic final long lvConsumerIndex(){return consumerIndex;}final boolean casConsumerIndex(long expect, long newValue){return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);}
      }
      // ... existing code ...
      
    • 原因:因為有多個消費者,它們必須通過 CAS(Compare-And-Swap) 操作來原子性地更新?consumerIndex,以確保只有一個消費者能成功獲取并消費一個元素。L2Pad?在此的作用依然是隔離生產者和消費者的熱點字段。
  3. SpmcArrayQueueMidPad?&?SpmcArrayQueueProducerIndexCacheField:

    • SPMC 特有的優化:這里引入了一個新的字段?producerIndexCache
      // ... existing code ...
      //$gen:ordered-fields
      abstract class SpmcArrayQueueProducerIndexCacheField<E> extends SpmcArrayQueueMidPad<E>
      {// This is separated from the consumerIndex which will be highly contended in the hope that this value spends most// of it's time in a cache line that is Shared(and rarely invalidated)private volatile long producerIndexCache;// ... existing code ...
      }
      // ... existing code ...
      
    • producerIndex?會被生產者頻繁更新(每次 offer 都會更新)
    • producerIndexCache?只在消費者發現緩存過期時才更新,因此減少了對?producerIndex?的 volatile 讀取,降低了緩存一致性流量的爭用
    • MidPad?的作用就是將這個消費者側的緩存producerIndexCache)與消費者側的爭用點consumerIndex)分離開,避免它們互相干擾。
  4. SpmcArrayQueueL3Pad: 最后的填充,隔離?producerIndexCache?和?SpmcArrayQueue?自身的字段。


offer(E e):簡單而直接

由于只有一個生產者,offer?的邏輯比 SPSC 還要簡單,因為它不需要“前瞻優化”(producerLimit)。生產者只需要檢查目標槽位是否為空即可。

// ... existing code ...@Overridepublic boolean offer(final E e){if (null == e){throw new NullPointerException();}final E[] buffer = this.buffer;final long mask = this.mask;final long currProducerIndex = lvProducerIndex(); // 1. 獲取當前生產者索引final long offset = calcCircularRefElementOffset(currProducerIndex, mask);// 2. 檢查槽位是否被消費者釋放if (null != lvRefElement(buffer, offset)){// 如果槽位不為空,說明隊列滿了long size = currProducerIndex - lvConsumerIndex();if (size > mask){return false;}else{// 等待消費者釋放該槽位 (這會破壞無等待性)while (null != lvRefElement(buffer, offset)){// BURN}}}// 3. 放置元素并更新索引soRefElement(buffer, offset, e);soProducerIndex(currProducerIndex + 1); // 使用 store-ordered 更新return true;}
// ... existing code ...

SPMC 特性分析

  1. 單生產者權威:生產者是唯一能推進?producerIndex?的線程,所以它只需?lvProducerIndex()?讀取自己的進度,然后用?soProducerIndex()?更新即可,無需 CAS。
  2. 依賴消費者:生產者通過?lvRefElement?檢查目標槽位是否為?null?來判斷隊列是否已滿。這個?null?是由消費者在消費后寫入的。
  3. 潛在的自旋等待:代碼中有一段?while?循環等待。這通常發生在消費者進度稍稍落后于生產者進度,但隊列并未完全滿的情況下。生產者會在此“自旋”等待消費者完成對該槽位的消費和清理。這是?SpmcArrayQueue?的一個關鍵特性,它犧牲了一定的無等待性(Wait-Free)來簡化設計

poll():競爭與緩存的藝術

poll?方法是 SPMC 模型的核心,完美展現了多消費者如何通過 CAS 和本地緩存來協同工作。

// ... existing code ...@Overridepublic E poll(){long currentConsumerIndex;// 1. 讀取本地的生產者進度緩存long currProducerIndexCache = lvProducerIndexCache();do{// 2. 讀取全局的消費者進度currentConsumerIndex = lvConsumerIndex();// 3. 快路徑判斷:使用本地緩存判斷隊列是否為空if (currentConsumerIndex >= currProducerIndexCache){// 4. 慢路徑:本地緩存表明隊列為空,需同步最新的生產者進度long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex){return null; // 隊列確實為空}else{// 更新本地緩存currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}}// 5. CAS 競爭:嘗試原子性地將 consumerIndex 加一while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));// 6. 成功獲取元素return removeElement(buffer, currentConsumerIndex, mask);}
// ... existing code ...

SPMC 特性分析

  1. 生產者進度緩存:每個消費者線程開始時都會讀取?producerIndexCache。這是一個本地快照,避免了每次都去訪問真正的?producerIndex
  2. 快慢路徑分離
    • 快路徑:只要?currentConsumerIndex < currProducerIndexCache,消費者就認為隊列中有元素,直接進入第5步的 CAS 競爭。這是絕大多數情況。
    • 慢路徑:當快路徑條件不滿足時,消費者必須通過?lvProducerIndex()?讀取最新的生產者進度,并更新自己的本地緩存?producerIndexCache
  3. CAS 爭用casConsumerIndex?是多消費者協調的核心。多個消費者線程可能同時讀取到相同的?currentConsumerIndex,但只有一個能通過 CAS 操作成功地將其加一,從而“贏得”消費該位置元素的權利。失敗的線程則會重新循環,讀取新的?consumerIndex?再次嘗試。
  4. 無競爭取出:一旦一個消費者通過 CAS 成功預定了位置,它就可以安全地調用?removeElement?來取出元素。因為?removeElement?操作的是一個已經被它“私有化”的索引,不會有其他消費者來干擾。

SpmcArrayQueue.peek() 方法詳細分析

peek()?方法是隊列中的一個重要操作,它允許查看隊列頭部元素但不移除該元素。

public E peek() {final E[] buffer = this.buffer;final long mask = this.mask;long currProducerIndexCache = lvProducerIndexCache();long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null;} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();} while (null == e || nextConsumerIndex != currentConsumerIndex);return e;
}

方法開始首先獲取幾個關鍵變量:

  • buffer: 隊列的底層數組,存儲實際元素
  • mask: 用于計算循環隊列位置的掩碼值,通常是 capacity-1
  • currProducerIndexCache: 生產者索引的緩存值,這是一個重要的優化變量
  • currentConsumerIndex?和?nextConsumerIndex: 消費者索引的當前值和下一個值

  1. 在?peek()?方法中,消費者首先檢查?producerIndexCache
  2. 只有當?currentConsumerIndex >= currProducerIndexCache?時才需要讀取真實的?producerIndex
  3. 大多數情況下,隊列不為空時,消費者可以直接使用緩存值,避免讀取主?producerIndex
  4. 性能影響

    • 直接讀取?producerIndex:每次都要從主內存讀取,可能觸發緩存失效
    • 使用?producerIndexCache:大部分時間從本地緩存讀取,減少內存屏障和緩存一致性流量
  5. 正確性保證

    • 雖然使用了緩存,但通過?lvProducerIndex()?的檢查確保了最終一致性
    • 當緩存可能過期時(currentConsumerIndex >= currProducerIndexCache),會重新讀取真實值

緩存更新邏輯

if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null; // 隊列為空} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex); // 更新緩存}
}

這段代碼處理了兩種情況:

  1. 隊列為空:當消費者索引已經趕上或超過生產者索引時,返回 null
  2. 緩存過期:當緩存值小于實際生產者索引時,更新緩存值

元素加載

e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));

這行代碼從緩沖區中加載元素:

  • calcCircularRefElementOffset?計算循環隊列中的實際位置
  • lvRefElement?是一個 volatile 加載操作,確保內存可見性

一致性檢查

// sandwich the element load between 2 consumer index loads
nextConsumerIndex = lvConsumerIndex();
} while (null == e || nextConsumerIndex != currentConsumerIndex);

這是一個重要的并發控制機制,被稱為"三明治加載":

  1. 在加載元素前獲取消費者索引(currentConsumerIndex
  2. 加載元素本身
  3. 再次獲取消費者索引(nextConsumerIndex

通過比較兩次獲取的消費者索引是否相同,可以確保在加載元素過程中沒有其他消費者修改了隊列狀態。如果不同,說明有其他消費者已經修改了隊列,需要重試。

循環條件分析

while (null == e || nextConsumerIndex != currentConsumerIndex)

循環繼續的條件有兩個:

  1. null == e: 加載的元素為 null,可能是因為:

    • 隊列確實為空
    • 生產者正在寫入元素但尚未完成
    • 其他消費者已經取走了該元素
  2. nextConsumerIndex != currentConsumerIndex: 消費者索引在加載元素過程中被修改,說明有并發操作干擾

內存訪問順序

方法中的內存訪問遵循特定順序,確保正確的并發語義:

  1. 首先讀取生產者索引緩存(lvProducerIndexCache()
  2. 讀取消費者索引(lvConsumerIndex()
  3. 如果需要,讀取實際生產者索引(lvProducerIndex()
  4. 讀取隊列元素(lvRefElement()
  5. 再次讀取消費者索引(lvConsumerIndex()

這種順序確保了在多線程環境下能正確檢測隊列狀態變化。

volatile 操作的作用

  • lvProducerIndexCache(): volatile 讀取,確保獲取最新的緩存值
  • lvConsumerIndex(): volatile 讀取,確保獲取最新的消費者位置
  • lvProducerIndex(): volatile 讀取,確保獲取最新的生產者位置
  • svProducerIndexCache(): volatile 寫入,確保緩存更新對所有線程可見

這些 volatile 操作確保了多線程間的內存可見性,防止出現不一致的視圖。

循環優化

雖然方法中包含一個 do-while 循環,但在正常情況下(隊列不為空且沒有并發干擾),循環只會執行一次。只有在以下情況下才會多次循環:

  1. 隊列為空
  2. 有并發消費者干擾
  3. 生產者正在寫入元素但尚未完成

與其他方法的比較

peek() vs poll()

  • peek()?只查看元素但不移除
  • poll()?查看并移除元素
  • peek()?不需要修改消費者索引,而?poll()?需要通過 CAS 操作更新消費者索引

peek() vs relaxedPeek()

@Override
public E relaxedPeek() {final E[] buffer = this.buffer;final long mask = this.mask;long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();}while (nextConsumerIndex != currentConsumerIndex);return e;
}

relaxedPeek()?是?peek()?的"寬松"版本:

  • 不檢查隊列是否為空
  • 不使用生產者索引緩存
  • 只確保在加載元素過程中消費者索引沒有變化

這使得?relaxedPeek()?性能更好,但可能在某些邊界情況下行為不同(例如當隊列為空時)。

適用場景

peek()?方法特別適用于以下場景:

  1. 檢查隊列內容:在不修改隊列狀態的情況下查看頭部元素
  2. 多消費者環境:在有多個消費者線程的情況下,確保正確處理并發訪問
  3. 性能敏感場景:通過緩存機制減少對共享變量的訪問,提高性能
  4. 需要強一致性保證:相比?relaxedPeek(),提供更強的一致性保證

?

總結?

SpmcArrayQueue?相比?SpscArrayQueue?的核心特性和設計權衡在于:

  • 多消費者協調:引入了?CAS 操作?(casConsumerIndex) 來解決多個消費者對?consumerIndex?的爭用問題,這是從 SPSC 到 SPMC 的根本性變化。
  • 消費者側緩存:增加了?producerIndexCache?字段,讓每個消費者可以緩存生產者進度,大大減少了對?producerIndex?的?volatile?讀,降低了緩存一致性流量,提升了消費者側的性能。
  • 內存布局的進一步細分:通過?MidPad?將?consumerIndex(極熱爭用點)和?producerIndexCache(消費者本地緩存)進行隔離,進一步優化了緩存性能。
  • 性能權衡:在?offer?方法中,允許了短暫的自旋等待,犧牲了嚴格的無等待性,以換取更簡單的生產者邏輯。

總而言之,SpmcArrayQueue?是一個通過精巧的內存布局、CAS競爭和消費者側緩存機制,高效解決了單生產者、多消費者并發難題的優秀實現。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/94078.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/94078.shtml
英文地址,請注明出處:http://en.pswp.cn/web/94078.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

SpringAI1.0.1實戰教程:避坑指南25年8月最新版

Spring AI 1.0.1 使用教程 項目簡介 作為一個Java的開發者 聽到Java也有ai框架了 很高興~~~ 本來想學一下SpringAI但是網上賣課的一大堆&#xff0c;并且大部分課程都是五月的&#xff0c;到2025年的8月份&#xff0c;SpringAI的版本做了很多更新&#xff0c;所以我本人參考…

Maven架構的依賴管理和項目構建

??????什么是依賴管理對第三方依賴包的管理&#xff0c;可以連接互聯網下載項目所需第三方jar包。對自己開發的模塊的管理&#xff0c;可以像引用第三方依賴包一樣引用自己項目的依賴包。Maven的依賴管理方式和傳統方式有什么區別傳統方式&#xff1a;從官網手動下載jar包…

微信小程序開發(一):使用開發者工具創建天氣預報項目

Hi&#xff0c;我是前端人類學&#xff08;之前叫布蘭妮甜&#xff09;&#xff01; 從今天開始&#xff0c;我將開啟一個全新的微信小程序開發系列教程&#xff0c;通過實際項目帶大家系統學習小程序開發。作為系列的第一篇文章&#xff0c;我們將從最基礎的環境搭建開始&…

【鏈表 - LeetCode】24. 兩兩交換鏈表中的節點

24. 兩兩交換鏈表中的節點 - 力扣&#xff08;LeetCode&#xff09; 題解&#xff1a; - 迭代 首先是直接遍歷的做法&#xff0c;這里注意調整指針指向的順序。 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* List…

爬蟲基礎學習-鏈接協議分析,熟悉相關函數

1、urlparse&#xff1a;&#xff08;python標準庫中的一個模塊&#xff0c;解析和操作url&#xff09;標準的url鏈接格式&#xff1a;scheme://netloc/path;params?query#fragmentscheme&#xff08;協議&#xff09; http or https netloc&#xff08;網絡位置&#xff09; …

kkfileview預覽Excel文件去掉左上角的跳轉HTML預覽、打印按鈕

上篇說了使用nginx代理kkfile預覽文件&#xff0c;但是又發現個新問題&#xff0c;預覽其他文件時都正常&#xff0c;但是預覽.xlsx格式的時候&#xff0c;在左上角會有【跳轉HTML預覽】【打印】兩個按鈕&#xff0c;如下所示&#xff1a;這篇就來說一下如何去掉。首先這個跟kk…

阿里開源新AI code工具:qoder功能介紹

下載地址&#xff1a; https://qoder.com/ 文檔地址&#xff1a; https://docs.qoder.com/ 文章目錄1. AI 編碼發展趨勢2. 真實世界軟件開發的挑戰3. 我們的方法3.1. 透明度3.1.1. 知識可見性3.1.2. 執行透明度3.2. 增強上下文工程3.3. 規范驅動與任務委托3.3.1. 聊天模式&…

什么是短視頻矩陣系統企業立項功能源碼開發,支持OEM

短視頻矩陣系統企業立項功能源碼開發解析在短視頻行業蓬勃發展的當下&#xff0c;企業紛紛布局短視頻矩陣&#xff0c;以實現多平臺、多賬號的協同運營。而企業立項作為短視頻矩陣項目啟動的關鍵環節&#xff0c;其高效、規范的管理直接影響項目的推進效率與成果。為此&#xf…

當GitHub宕機時,我們如何協作?

問題背景與影響 GitHub作為主流代碼托管平臺的依賴現狀宕機對分布式團隊、CI/CD流水線、緊急修復的影響案例其他類似平臺&#xff08;GitLab、Bitbucket&#xff09;的潛在連帶風險 本地與離線協作方案 利用Git分布式特性&#xff1a;本地倉庫繼續提交&#xff0c;恢復后同步搭…

【會議跟蹤】Model-Based Systems Engineering (MBSE) in Practice 2025

會議主旨與議題 會議宣傳鏈接:https://www.sei.cmu.edu/events/mbse-in-practice/ 本次會議將于2025年8月21日位美國弗吉尼亞州阿靈頓(五角大樓所在地)舉行。本次會議主旨為 MBSE in Practice: Bridging the Gap Between Theory and Success(2025)。隨著軟件定義系統日趨…

瀏覽器的渲染流程:從 HTML 到屏幕顯示

在我們日常使用瀏覽器瀏覽網頁時&#xff0c;往往忽略了瀏覽器背后復雜的渲染過程。從輸入 URL 到頁面最終顯示在屏幕上&#xff0c;瀏覽器需要經過一系列精心設計的步驟。 瀏覽器渲染的整體流程瀏覽器的渲染流程可以大致分為兩個主要部分&#xff1a;網絡 和 渲染。當用戶在地…

FastMCP 客戶端服務器通信示例:從入門到實戰(STDIO 傳輸)

引言 在現代分布式系統和AI應用中&#xff0c;模型上下文協議&#xff08;MCP&#xff09;扮演著重要角色&#xff0c;它負責協調客戶端與服務器之間的通信&#xff0c;尤其是在需要頻繁交互的場景中。本文將介紹如何使用FastMCP庫快速實現客戶端與服務器之間的通信&#xff0c…

寶可夢肉鴿 PC/手機雙端 多種存檔 全閃光 無限金幣 全寶可夢解鎖 免安裝中文版

網盤鏈接&#xff1a; 寶可夢肉鴿 免安裝中文版 名稱&#xff1a;寶可夢肉鴿 PC/手機雙端 多種存檔 全閃光 無限金幣 全寶可夢解鎖 免安裝中文版 描述&#xff1a;寶可夢肉鴿修改版是一款非常受歡迎的口袋妖怪系列&#xff0c;游戲擁有許多獨特的妖怪和玩法。在游戲中&#…

Linux 下的網絡編程

1、目的實現不同主機上進程間的通信。2、問題主機與主機之間在物理層面必須互聯互通。進程與進程在軟件層面必須互聯互通。IP地址&#xff1a;計算機的軟件地址&#xff0c;用來標識計算機設備。MAC地址&#xff1a;計算機的硬件地址&#xff08;固定&#xff09;。網絡的端口號…

Go語言在邊緣計算中的網絡編程實踐:從入門到精通

一、引言 在數字化浪潮席卷全球的今天&#xff0c;邊緣計算如同一股清流&#xff0c;正在重新定義我們對網絡架構的理解。想象一下&#xff0c;當你在自動駕駛汽車中需要毫秒級響應&#xff0c;或者在偏遠工廠中需要實時處理傳感器數據時&#xff0c;傳統的云計算模式就像是&qu…

ASPICE過程能力確定——度量框架

&#x1f697;【汽車人必看】ASPICE能力評估核心&#xff1a;度量框架全解析&#xff5c;90%工程師都搞不懂的評分規則&#xff01;&#x1f50d; 為什么你的ASPICE評估總卡在L2&#xff1f;——揭秘6大能力等級背后的評分邏輯&#xff0c;附提升秘籍&#xff01;&#x1f525;…

機器學習在量化中的應用

一、核心應用場景在因子研究中&#xff0c;scikit-learn 主要解決以下幾類問題&#xff1a;因子預處理與標準化&#xff1a;StandardScaler, RobustScaler因子有效性分析&#xff1a;LinearRegression (IC分析)降維與因子合成&#xff1a;PCA, FactorAnalysis機器學習預測模型&…

RabbitMQ:消息轉化器

目錄一、基本概述二、如何處理一、基本概述 在RabbitMQ中&#xff0c;一般情況下傳遞字符串會被正常解析&#xff0c;如果傳遞的是一個Object類型或者是一個對象類型的時候&#xff0c;RabbitMQ會將其自動轉化為字節碼發送&#xff0c;這不利于我們的讀取個解析。 二、如何處…

【Protues仿真】基于AT89C52單片機的LCD液晶顯示屏顯示控制

目錄 1 LM016L液晶顯示模塊 1.1 基本參數 1.2 引腳定義 1.3硬件連接示例&#xff08;AT89C52&#xff09; 1.4 常用指令集&#xff08;HD44780 子集&#xff09; 1.5 常見問題與注意事項 1.8 結論 2 LM016L液晶顯示模塊控制電路原理圖 3 LM016L液晶顯示模塊控制程序 …

孤獨傷感視頻素材哪里找?分享熱門傷感短視頻素材資源網站

你是不是也經常在抖音上刷到很火的傷感視頻&#xff0c;那么傷感視頻素材都在哪里可以下載呢&#xff1f;作為一名從業多年的視頻剪輯師&#xff0c;今天就跟大家聊聊那些可以下載傷感素材高清無水印的網站&#xff0c;如果你也在苦苦找尋傷感素材&#xff0c;快來看看吧&#…