SpmcArrayQueue
?是 JCTools 中為 單生產者-多消費者(Single-Producer-Multi-Consumer) 場景設計的有界隊列。與 SPSC 模型相比,SPMC 的復雜性主要體現在消費者側,因為多個消費者線程需要以線程安全的方式競爭消費同一個隊列中的元素。
單生產者-單消費者數組隊列 分析見:JCTools Spsc:單生產者-單消費者無鎖隊列
SpmcArrayQueue
?的繼承鏈同樣是為了精確控制內存布局,但其側重點與?SpscArrayQueue
?有所不同,它需要處理多消費者對?consumerIndex
?的爭用。
-
SpmcArrayQueueL1Pad
?&?SpmcArrayQueueProducerIndexField
:- 與 SPSC 類似,這部分定義了生產者索引?
producerIndex
,并用?L1Pad
?將其與上游的“冷”字段(如?buffer
,?mask
)隔離開。 - 由于只有一個生產者,
producerIndex
?的更新邏輯相對簡單,不需要 CAS 操作,使用?putOrderedLong
?即可。
- 與 SPSC 類似,這部分定義了生產者索引?
-
SpmcArrayQueueL2Pad
?&?SpmcArrayQueueConsumerIndexField
:- 核心變化點:
SpmcArrayQueueConsumerIndexField
?中不再有?soConsumerIndex
?方法,取而代之的是?casConsumerIndex
。// ... existing code ... //$gen:ordered-fields abstract class SpmcArrayQueueConsumerIndexField<E> extends SpmcArrayQueueL2Pad<E> {protected final static long C_INDEX_OFFSET = fieldOffset(SpmcArrayQueueConsumerIndexField.class, "consumerIndex");private volatile long consumerIndex;// ... existing code ...@Overridepublic final long lvConsumerIndex(){return consumerIndex;}final boolean casConsumerIndex(long expect, long newValue){return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);} } // ... existing code ...
- 原因:因為有多個消費者,它們必須通過 CAS(Compare-And-Swap) 操作來原子性地更新?
consumerIndex
,以確保只有一個消費者能成功獲取并消費一個元素。L2Pad
?在此的作用依然是隔離生產者和消費者的熱點字段。
- 核心變化點:
-
SpmcArrayQueueMidPad
?&?SpmcArrayQueueProducerIndexCacheField
:- SPMC 特有的優化:這里引入了一個新的字段?
producerIndexCache
。// ... existing code ... //$gen:ordered-fields abstract class SpmcArrayQueueProducerIndexCacheField<E> extends SpmcArrayQueueMidPad<E> {// This is separated from the consumerIndex which will be highly contended in the hope that this value spends most// of it's time in a cache line that is Shared(and rarely invalidated)private volatile long producerIndexCache;// ... existing code ... } // ... existing code ...
producerIndex
?會被生產者頻繁更新(每次 offer 都會更新)producerIndexCache
?只在消費者發現緩存過期時才更新,因此減少了對?producerIndex
?的 volatile 讀取,降低了緩存一致性流量的爭用MidPad
?的作用就是將這個消費者側的緩存(producerIndexCache
)與消費者側的爭用點(consumerIndex
)分離開,避免它們互相干擾。
- SPMC 特有的優化:這里引入了一個新的字段?
-
SpmcArrayQueueL3Pad
: 最后的填充,隔離?producerIndexCache
?和?SpmcArrayQueue
?自身的字段。
offer(E e)
:簡單而直接
由于只有一個生產者,offer
?的邏輯比 SPSC 還要簡單,因為它不需要“前瞻優化”(producerLimit
)。生產者只需要檢查目標槽位是否為空即可。
// ... existing code ...@Overridepublic boolean offer(final E e){if (null == e){throw new NullPointerException();}final E[] buffer = this.buffer;final long mask = this.mask;final long currProducerIndex = lvProducerIndex(); // 1. 獲取當前生產者索引final long offset = calcCircularRefElementOffset(currProducerIndex, mask);// 2. 檢查槽位是否被消費者釋放if (null != lvRefElement(buffer, offset)){// 如果槽位不為空,說明隊列滿了long size = currProducerIndex - lvConsumerIndex();if (size > mask){return false;}else{// 等待消費者釋放該槽位 (這會破壞無等待性)while (null != lvRefElement(buffer, offset)){// BURN}}}// 3. 放置元素并更新索引soRefElement(buffer, offset, e);soProducerIndex(currProducerIndex + 1); // 使用 store-ordered 更新return true;}
// ... existing code ...
SPMC 特性分析:
- 單生產者權威:生產者是唯一能推進?
producerIndex
?的線程,所以它只需?lvProducerIndex()
?讀取自己的進度,然后用?soProducerIndex()
?更新即可,無需 CAS。 - 依賴消費者:生產者通過?
lvRefElement
?檢查目標槽位是否為?null
?來判斷隊列是否已滿。這個?null
?是由消費者在消費后寫入的。 - 潛在的自旋等待:代碼中有一段?
while
?循環等待。這通常發生在消費者進度稍稍落后于生產者進度,但隊列并未完全滿的情況下。生產者會在此“自旋”等待消費者完成對該槽位的消費和清理。這是?SpmcArrayQueue
?的一個關鍵特性,它犧牲了一定的無等待性(Wait-Free)來簡化設計。
poll()
:競爭與緩存的藝術
poll
?方法是 SPMC 模型的核心,完美展現了多消費者如何通過 CAS 和本地緩存來協同工作。
// ... existing code ...@Overridepublic E poll(){long currentConsumerIndex;// 1. 讀取本地的生產者進度緩存long currProducerIndexCache = lvProducerIndexCache();do{// 2. 讀取全局的消費者進度currentConsumerIndex = lvConsumerIndex();// 3. 快路徑判斷:使用本地緩存判斷隊列是否為空if (currentConsumerIndex >= currProducerIndexCache){// 4. 慢路徑:本地緩存表明隊列為空,需同步最新的生產者進度long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex){return null; // 隊列確實為空}else{// 更新本地緩存currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}}// 5. CAS 競爭:嘗試原子性地將 consumerIndex 加一while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));// 6. 成功獲取元素return removeElement(buffer, currentConsumerIndex, mask);}
// ... existing code ...
SPMC 特性分析:
- 生產者進度緩存:每個消費者線程開始時都會讀取?
producerIndexCache
。這是一個本地快照,避免了每次都去訪問真正的?producerIndex
。 - 快慢路徑分離:
- 快路徑:只要?
currentConsumerIndex < currProducerIndexCache
,消費者就認為隊列中有元素,直接進入第5步的 CAS 競爭。這是絕大多數情況。 - 慢路徑:當快路徑條件不滿足時,消費者必須通過?
lvProducerIndex()
?讀取最新的生產者進度,并更新自己的本地緩存?producerIndexCache
。
- 快路徑:只要?
- CAS 爭用:
casConsumerIndex
?是多消費者協調的核心。多個消費者線程可能同時讀取到相同的?currentConsumerIndex
,但只有一個能通過 CAS 操作成功地將其加一,從而“贏得”消費該位置元素的權利。失敗的線程則會重新循環,讀取新的?consumerIndex
?再次嘗試。 - 無競爭取出:一旦一個消費者通過 CAS 成功預定了位置,它就可以安全地調用?
removeElement
?來取出元素。因為?removeElement
?操作的是一個已經被它“私有化”的索引,不會有其他消費者來干擾。
SpmcArrayQueue.peek() 方法詳細分析
peek()
?方法是隊列中的一個重要操作,它允許查看隊列頭部元素但不移除該元素。
public E peek() {final E[] buffer = this.buffer;final long mask = this.mask;long currProducerIndexCache = lvProducerIndexCache();long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null;} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();} while (null == e || nextConsumerIndex != currentConsumerIndex);return e;
}
方法開始首先獲取幾個關鍵變量:
buffer
: 隊列的底層數組,存儲實際元素mask
: 用于計算循環隊列位置的掩碼值,通常是 capacity-1currProducerIndexCache
: 生產者索引的緩存值,這是一個重要的優化變量currentConsumerIndex
?和?nextConsumerIndex
: 消費者索引的當前值和下一個值
- 在?
peek()
?方法中,消費者首先檢查?producerIndexCache
- 只有當?
currentConsumerIndex >= currProducerIndexCache
?時才需要讀取真實的?producerIndex
- 大多數情況下,隊列不為空時,消費者可以直接使用緩存值,避免讀取主?
producerIndex
-
性能影響:
- 直接讀取?
producerIndex
:每次都要從主內存讀取,可能觸發緩存失效 - 使用?
producerIndexCache
:大部分時間從本地緩存讀取,減少內存屏障和緩存一致性流量
- 直接讀取?
-
正確性保證:
- 雖然使用了緩存,但通過?
lvProducerIndex()
?的檢查確保了最終一致性 - 當緩存可能過期時(
currentConsumerIndex >= currProducerIndexCache
),會重新讀取真實值
- 雖然使用了緩存,但通過?
緩存更新邏輯
if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null; // 隊列為空} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex); // 更新緩存}
}
這段代碼處理了兩種情況:
- 隊列為空:當消費者索引已經趕上或超過生產者索引時,返回 null
- 緩存過期:當緩存值小于實際生產者索引時,更新緩存值
元素加載
e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));
這行代碼從緩沖區中加載元素:
calcCircularRefElementOffset
?計算循環隊列中的實際位置lvRefElement
?是一個 volatile 加載操作,確保內存可見性
一致性檢查
// sandwich the element load between 2 consumer index loads
nextConsumerIndex = lvConsumerIndex();
} while (null == e || nextConsumerIndex != currentConsumerIndex);
這是一個重要的并發控制機制,被稱為"三明治加載":
- 在加載元素前獲取消費者索引(
currentConsumerIndex
) - 加載元素本身
- 再次獲取消費者索引(
nextConsumerIndex
)
通過比較兩次獲取的消費者索引是否相同,可以確保在加載元素過程中沒有其他消費者修改了隊列狀態。如果不同,說明有其他消費者已經修改了隊列,需要重試。
循環條件分析
while (null == e || nextConsumerIndex != currentConsumerIndex)
循環繼續的條件有兩個:
-
null == e
: 加載的元素為 null,可能是因為:- 隊列確實為空
- 生產者正在寫入元素但尚未完成
- 其他消費者已經取走了該元素
-
nextConsumerIndex != currentConsumerIndex
: 消費者索引在加載元素過程中被修改,說明有并發操作干擾
內存訪問順序
方法中的內存訪問遵循特定順序,確保正確的并發語義:
- 首先讀取生產者索引緩存(
lvProducerIndexCache()
) - 讀取消費者索引(
lvConsumerIndex()
) - 如果需要,讀取實際生產者索引(
lvProducerIndex()
) - 讀取隊列元素(
lvRefElement()
) - 再次讀取消費者索引(
lvConsumerIndex()
)
這種順序確保了在多線程環境下能正確檢測隊列狀態變化。
volatile 操作的作用
lvProducerIndexCache()
: volatile 讀取,確保獲取最新的緩存值lvConsumerIndex()
: volatile 讀取,確保獲取最新的消費者位置lvProducerIndex()
: volatile 讀取,確保獲取最新的生產者位置svProducerIndexCache()
: volatile 寫入,確保緩存更新對所有線程可見
這些 volatile 操作確保了多線程間的內存可見性,防止出現不一致的視圖。
循環優化
雖然方法中包含一個 do-while 循環,但在正常情況下(隊列不為空且沒有并發干擾),循環只會執行一次。只有在以下情況下才會多次循環:
- 隊列為空
- 有并發消費者干擾
- 生產者正在寫入元素但尚未完成
與其他方法的比較
peek() vs poll()
peek()
?只查看元素但不移除poll()
?查看并移除元素peek()
?不需要修改消費者索引,而?poll()
?需要通過 CAS 操作更新消費者索引
peek() vs relaxedPeek()
@Override
public E relaxedPeek() {final E[] buffer = this.buffer;final long mask = this.mask;long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();}while (nextConsumerIndex != currentConsumerIndex);return e;
}
relaxedPeek()
?是?peek()
?的"寬松"版本:
- 不檢查隊列是否為空
- 不使用生產者索引緩存
- 只確保在加載元素過程中消費者索引沒有變化
這使得?relaxedPeek()
?性能更好,但可能在某些邊界情況下行為不同(例如當隊列為空時)。
適用場景
peek()
?方法特別適用于以下場景:
- 檢查隊列內容:在不修改隊列狀態的情況下查看頭部元素
- 多消費者環境:在有多個消費者線程的情況下,確保正確處理并發訪問
- 性能敏感場景:通過緩存機制減少對共享變量的訪問,提高性能
- 需要強一致性保證:相比?
relaxedPeek()
,提供更強的一致性保證
?
總結?
SpmcArrayQueue
?相比?SpscArrayQueue
?的核心特性和設計權衡在于:
- 多消費者協調:引入了?CAS 操作?(
casConsumerIndex
) 來解決多個消費者對?consumerIndex
?的爭用問題,這是從 SPSC 到 SPMC 的根本性變化。 - 消費者側緩存:增加了?
producerIndexCache
?字段,讓每個消費者可以緩存生產者進度,大大減少了對?producerIndex
?的?volatile
?讀,降低了緩存一致性流量,提升了消費者側的性能。 - 內存布局的進一步細分:通過?
MidPad
?將?consumerIndex
(極熱爭用點)和?producerIndexCache
(消費者本地緩存)進行隔離,進一步優化了緩存性能。 - 性能權衡:在?
offer
?方法中,允許了短暫的自旋等待,犧牲了嚴格的無等待性,以換取更簡單的生產者邏輯。
總而言之,SpmcArrayQueue
?是一個通過精巧的內存布局、CAS競爭和消費者側緩存機制,高效解決了單生產者、多消費者并發難題的優秀實現。