ConcurrentCircularArrayQueue?
ConcurrentCircularArrayQueue
是一個抽象類,它為基于數組的并發循環隊列提供了基礎功能。從其命名可以看出幾個關鍵特性:
- ??Concurrent??:常指無鎖并發。
- ??Circular Array??:內部使用循環數組作為其數據結構。這意味著它是一個有界隊列,容量是固定的。
- ??Queue??:遵循隊列的先進先出(FIFO)原則。
這個類本身是抽象的,它定義了所有基于循環數組的并發隊列的通用字段和方法,但將生產者和消費者的索引管理等具體并發策略留給子類實現(例如 SpscArrayQueue
、MpscArrayQueue
等)。
在 ConcurrentCircularArrayQueue.java
中,我們可以看到兩個核心字段:
// ... existing code ...
abstract class
ConcurrentCircularArrayQueue<E> extends
ConcurrentCircularArrayQueueL0Pad<E>implements MessagePassingQueue<E>, IndexedQueue, QueueProgressIndicators, SupportsIterator
{protected final long mask;protected final E[] buffer;ConcurrentCircularArrayQueue(int capacity){// ... existing code ...
??
protected final E[] buffer;
??
這是存儲隊列元素的數組。它被聲明為final
,一旦初始化后,其引用不可更改。數組的長度總是2的冪,這是為了通過位運算(&
)來高效地計算索引,從而實現循環數組。??
protected final long mask;
??
掩碼,它的值是capacity - 1
。通過index & mask
操作,可以將一個不斷增長的索引值(如生產者或消費者索引)快速映射到buffer
數組的有效范圍內,這比取模運算 (%
) 效率高得多。
構造函數 ConcurrentCircularArrayQueue(int capacity)
會將用戶請求的 capacity
向上取整到最接近的2的冪次方,以滿足循環數組的設計要求。
此外,ConcurrentCircularArrayQueueL0Pad
這個父類通過填充緩存行(Cache Line Padding)來防止偽共享(False Sharing),這是并發編程中一個重要的性能優化手段。
ConcurrentCircularArrayQueue
實現了一系列接口,這些接口定義了它的核心行為和能力。
MessagePassingQueue<E>
?
接口定義了隊列作為消息傳遞工具的核心 API。它擴展了標準的 java.util.Queue
,并增加了一些針對高性能并發場景的方法,例如:
relaxedOffer(E e)
:一個寬松的入隊操作,可能不會立即讓其他線程看到新元素,但性能更高。relaxedPoll()
:一個寬松的出隊操作。drain(Consumer<E> c, int limit)
:從隊列中批量取出元素并由指定的消費者處理。
這個接口表明 ConcurrentCircularArrayQueue
不僅僅是一個普通的數據集合,更是一個為高性能線程間通信設計的工具。
IndexedQueue
這個接口提供了訪問隊列內部索引的能力,主要用于計算隊列大小和容量。
lvProducerIndex()
:獲取生產者索引的最新值(volatile read)。lvConsumerIndex()
:獲取消費者索引的最新值(volatile read)。capacity()
:返回隊列的容量。
通過這兩個索引,IndexedQueueSizeUtil.size(this)
可以計算出當前隊列中的元素數量。這種設計將索引的訪問方式標準化,使得大小計算邏輯可以被重用。
QueueProgressIndicators
這個接口提供了對隊列生產者和消費者進度的可見性,主要用于監控和調試。
currentProducerIndex()
:返回當前生產者索引。currentConsumerIndex()
:返回當前消費者索引。
這與 IndexedQueue
中的方法類似,但語義上更側重于“進度”而非“索引”,通常用于外部觀察者了解隊列的活動狀態。
SupportsIterator
這個接口表明該隊列支持迭代器。ConcurrentCircularArrayQueue
提供了一個內部實現的 WeakIterator
。
// ... existing code ...@Overridepublic Iterator<E> iterator() {final long cIndex = lvConsumerIndex();final long pIndex = lvProducerIndex();return new WeakIterator(cIndex, pIndex, mask, buffer);}private static class WeakIterator<E> implements Iterator<E> {// ... existing code ...
這個迭代器是“弱一致性”的:
- 它提供了隊列在創建迭代器那一刻的一個“快照”。
- 它不保證返回的元素嚴格按照隊列順序,并且可能會遺漏在迭代期間入隊或出隊的元素。
- 它不會拋出
ConcurrentModificationException
,是線程安全的。
這種設計是為了在不加鎖的情況下提供一個盡力而為的迭代功能,適用于調試或監控等不要求強一致性的場景。
內部類?WeakIterator
private static class WeakIterator<E> implements Iterator<E>
首先,從類的聲明來看:
private
: 這是一個私有內部類,意味著它只能被外部類?ConcurrentCircularArrayQueue
?訪問。這是良好的封裝,表明它的實現細節與外部類緊密相關,不希望被外部直接使用。static
: 這是一個靜態內部類。靜態內部類不持有外部類實例的隱式引用。這意味著?WeakIterator
?對象本身是一個獨立的對象,不會意外地阻止?ConcurrentCircularArrayQueue
?實例被垃圾回收。它需要的所有信息(如數組緩沖區、索引等)都必須通過構造函數顯式傳入,這使得它的狀態更加清晰和獨立。implements Iterator<E>
: 它實現了標準的?java.util.Iterator
?接口,提供了?hasNext()
、next()
?和?remove()
?方法,可以用于標準的?for-each
?循環中。
核心定位:“弱一致性” (Weakly Consistent) 迭代器
這個迭代器最核心的特性是“弱一致性”或“盡力而為”(best-effort)。在類的注釋中也明確提到了這一點:
The iterator provides a best-effort snapshot of the elements in the queue. The returned iterator is not guaranteed to return elements in queue order, and races with the consumer thread may cause gaps in the sequence of returned elements.
譯:迭代器提供了隊列中元素的一個盡力而為的快照。返回的迭代器不保證按隊列順序返回元素,并且與消費者線程的競爭可能會導致返回的元素序列中出現間隙。
這意味著:
- 快照性 (Snapshot): 迭代器在創建時會“拍下”當時隊列的消費者索引 (
cIndex
) 和生產者索引 (pIndex
)。它只會嘗試遍歷這個索引范圍內的元素,不會看到在它創建之后入隊的任何新元素。 - 非阻塞/無鎖 (Non-Blocking/Lock-Free): 迭代器在遍歷時不會對隊列加鎖。生產者和消費者線程可以完全并發地對隊列進行操作,這保證了高吞吐量。
- 可能存在間隙 (Gaps): 由于無鎖,當迭代器正在遍歷時,消費者線程可能已經取走(消費)了某個元素。當迭代器訪問到那個位置時,會發現是?
null
,于是它會跳過這個元素,繼續向后尋找。這就造成了迭代結果中可能出現“間隙”。
成員變量
// ... existing code ...private static class WeakIterator<E> implements Iterator<E> {private final long pIndex;private final long mask;private final E[] buffer;private long nextIndex;private E nextElement;
// ... existing code ...
private final long pIndex;
: 迭代器創建時刻的生產者索引。final
?關鍵字確保它在迭代器生命周期內不會改變,定義了遍歷的上限。private final long mask;
: 用于將長整型的索引(會無限增長)映射到數組的實際下標,計算方式通常是?offset = index & mask
。private final E[] buffer;
: 對隊列底層數組的引用。private long nextIndex;
: 迭代器當前檢查的索引位置。它從?cIndex
?開始,逐步增加到?pIndex
。private E nextElement;
:?預取/前瞻的下一個元素。這是一種常見的迭代器實現模式,hasNext()
?只需檢查這個字段是否為?null
,而?next()
?返回這個字段并再次預取下一個,簡化了邏輯。
構造函數
// ... existing code ...WeakIterator(long cIndex, long pIndex, long mask, E[] buffer) {this.nextIndex = cIndex;this.pIndex = pIndex;this.mask = mask;this.buffer = buffer;nextElement = getNext();}
// ... existing code ...
- 構造函數接收創建迭代器那一刻的消費者索引 (
cIndex
)、生產者索引 (pIndex
)、掩碼和緩沖區。 - 它將?
cIndex
?初始化為?nextIndex
,將?pIndex
?存入?final
?字段。 - 最關鍵的一步是?
nextElement = getNext();
。在構造函數里,它就立即調用?getNext()
?嘗試獲取第一個有效的元素。這使得第一次調用?hasNext()
?時就能立刻知道結果,而無需做任何計算。
hasNext()
?和?next()
// ... existing code ...@Overridepublic boolean hasNext() {return nextElement != null;}@Overridepublic E next() {final E e = nextElement;if (e == null)throw new NoSuchElementException();nextElement = getNext();return e;}
// ... existing code ...
hasNext()
: 邏輯非常簡單,直接判斷預取的?nextElement
?是否為空。next()
:- 保存當前的?
nextElement
。 - 如果它為?
null
(意味著?hasNext()
?返回了?false
),則拋出?NoSuchElementException
,符合?Iterator
?接口規范。 - 調用?
getNext()
?去預取下一個元素,為下一次調用?hasNext()
?或?next()
?做準備。 - 返回之前保存的元素。
- 保存當前的?
getNext()
?- 迭代器的引擎
// ... existing code ...private E getNext() {while (nextIndex < pIndex) {long offset = calcCircularRefElementOffset(nextIndex++, mask);E e = lvRefElement(buffer, offset);if (e != null) {return e;}}return null;}
// ... existing code ...
這是迭代器最核心的邏輯所在。
while (nextIndex < pIndex)
: 循環的條件保證了迭代器只在創建時確定的索引范圍內查找。nextIndex++
: 在計算偏移量后,nextIndex
?會自增,為下一次循環做準備。E e = lvRefElement(buffer, offset);
: 這是關鍵的并發操作。lvRefElement
?是?Unsafe
?類操作的封裝,代表?Load Volatile,即以 volatile 內存語義讀取數組中的元素。這確保了生產者線程對元素的寫入對迭代器線程是可見的,避免了讀到舊的、未初始化的值。if (e != null)
: 如果讀取到的元素不為?null
,說明找到了一個有效元素,立即返回它。- 如果?
e
?是?null
,循環會繼續。這正是處理“間隙”的地方:迭代器看到了一個空槽,它會認為這個元素已經被消費者線程取走,于是跳過它,繼續尋找下一個。 return null;
: 如果?while
?循環結束(即?nextIndex
?到達了?pIndex
),仍然沒有找到非空元素,就返回?null
,表示迭代結束。
remove()
// ... existing code ...@Overridepublic void remove() {throw new UnsupportedOperationException("remove");}
// ... existing code ...
- 直接拋出?
UnsupportedOperationException
。在無鎖的并發數據結構上安全地實現?remove()
?操作非常復雜,且容易引入競爭條件。因此,JCTools 和很多其他并發庫(如?ConcurrentLinkedQueue
?的迭代器)一樣,選擇不支持此操作。在?RELEASE-NOTES.md
?中也提到了早期版本不支持?iterator()
?方法,后來雖然支持了,但?remove()
?依然是被禁止的,這是一種安全且常見的設計決策。
總結
WeakIterator
?是一個精心設計的、用于并發環境的輕量級迭代器。它的核心特點可以概括為:
- 線程安全與高性能: 通過無鎖設計和 volatile 讀,實現了不阻塞生產者/消費者的線程安全遍歷。
- 弱一致性快照: 它提供的是一個“盡力而為”的快照,不保證反映隊列的精確狀態或元素順序,但足以用于調試、監控或某些特定場景下的批量讀取。
- 實現簡潔優雅: 采用“預取”(
lookahead
)模式,使得?hasNext()
?和?next()
?的邏輯非常清晰簡單,將復雜性集中在?getNext()
?方法中。 - 內存友好: 作為靜態內部類,它不持有外部類的引用,避免了潛在的內存泄漏問題。
總而言之,WeakIterator
?是在追求極致性能的并發隊列中,為“遍歷”這一功能所做出的一個典型權衡:犧牲了強一致性,換取了高并發性和無鎖的性能優勢。
核心方法實現
??
size()
和isEmpty()
??
這兩個方法都委托給IndexedQueueSizeUtil
工具類,通過比較生產者和消費者索引來計算,避免了在并發環境下維護一個單獨的size
變量所帶來的開銷和競爭。??
clear()
??
通過不斷調用poll()
方法直到返回null
來清空隊列。這是一個簡單但有效的方法。
總結
ConcurrentCircularArrayQueue
是 JCTools 中各種有界并發數組隊列的基石。它通過以下設計實現了高性能和線程安全:
- ??循環數組與2的冪容量??:使用位運算替代取模運算,提升索引計算效率。
- ??緩存行填充??:避免偽共享,減少多核 CPU 環境下的緩存爭用。
- ??分離索引管理??:將生產者和消費者的索引管理邏輯分離到子類中,以支持不同的并發模型(SPSC、MPSC、SPMC、MPMC)。
- ??弱一致性迭代器??:提供一個輕量級、無鎖的迭代器,適用于監控和調試。
- ??標準化的接口??:通過實現
MessagePassingQueue
、IndexedQueue
等接口,提供了豐富且明確的 API,使其不僅僅是一個集合,更是一個強大的消息傳遞工具。
ConcurrentSequencedCircularArrayQueue
這是一個在 JCTools 中非常核心的抽象基類,理解它對于理解 MPMC(多生產者多消費者)等高級隊列的實現至關重要。
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{
//...
}
extends ConcurrentCircularArrayQueue<E>
: 它繼承自我們之前分析過的?ConcurrentCircularArrayQueue<E>
。這意味著它天然就擁有了父類的所有特性,包括:- 一個用于存儲元素的環形數組?
protected final E[] buffer;
。 - 一個用于計算數組索引的掩碼?
protected final long mask;
。 - 基本的隊列屬性和方法,如容量?
capacity()
、清空?clear()
?等。
- 一個用于存儲元素的環形數組?
sequenceBuffer
這個類最核心的擴展是增加了一個新的成員變量:
// ... existing code ...
public abstract class ConcurrentSequencedCircularArrayQueue<E> extends ConcurrentCircularArrayQueue<E>
{protected final long[] sequenceBuffer;public ConcurrentSequencedCircularArrayQueue(int capacity)
// ... existing code ...
protected final long[] sequenceBuffer;
: 這是一個?long
?類型的數組,名為“序列緩沖區”。它的長度與存儲元素的?buffer
?數組相同。這個數組是實現高級并發控制算法的關鍵。
sequenceBuffer
?為隊列中的每一個槽(slot)都提供了一個對應的“序列號”或“票據”。生產者和消費者通過檢查和更新這個序列號來協調對共享數據(即?buffer
?數組中的元素)的訪問,從而避免使用鎖。
構造函數與初始化
構造函數的實現揭示了?sequenceBuffer
?的工作原理。
// ... existing code ...public ConcurrentSequencedCircularArrayQueue(int capacity){super(capacity);int actualCapacity = (int) (this.mask + 1);// pad data on either end with some empty slots. Note that actualCapacity is <= MAX_POW2_INTsequenceBuffer = allocateLongArray(actualCapacity);for (long i = 0; i < actualCapacity; i++){soLongElement(sequenceBuffer, calcCircularLongElementOffset(i, mask), i);}}
}
super(capacity);
: 調用父類構造函數,初始化?buffer
?數組和?mask
。sequenceBuffer = allocateLongArray(actualCapacity);
: 分配?sequenceBuffer
?數組。allocateLongArray
?只是簡單分配一個數組。for
?循環初始化: 這是最關鍵的部分。循環遍歷?sequenceBuffer
?的每一個槽位,并執行?soLongElement(..., i)
。calcCircularLongElementOffset(i, mask)
: 計算第?i
?個槽在內存中的偏移量。soLongElement(..., i)
:?so
?是?Store Ordered
?的縮寫,對應于?Unsafe.putOrderedLong
。這是一個有內存屏障效果的寫操作,它比普通的寫要強,但比?volatile
?寫要弱。它保證了在此操作之前的內存寫入不會被重排序到它之后。- 初始化邏輯:?
sequenceBuffer
?的第?i
?個槽被初始化為值?i
。即?sequenceBuffer[i] = i
。
?“序列號”并發控制機制
通過這個初始化,ConcurrentSequencedCircularArrayQueue
?為子類(如?MpmcArrayQueue
)奠定了并發算法的基礎。這個算法大致如下:
- 初始狀態: 隊列為空。
buffer
?數組里都是?null
。sequenceBuffer
?的第?i
?個槽位的值是?i
。 - 生產者入隊 (offer):
- 一個生產者想要在邏輯索引?
p
?處放入一個元素。 - 它首先計算出該索引對應的數組位置?
offset = p & mask
。 - 它會去檢查?
sequenceBuffer[offset]
?的值。 - 如果?
sequenceBuffer[offset]
?的值正好等于?p
,這就像一張“票據”,表明這個槽位是空閑的,并且輪到索引為?p
?的這次操作來使用它。 - 生產者獲得許可,將元素寫入?
buffer[offset]
。 - 寫入元素后,生產者會更新?
sequenceBuffer[offset]
?的值為?p + 1
。這個新值?p + 1
?成為了給消費者的信號,表示“索引為?p
?的槽位已經準備好了,你可以來消費了”。
- 一個生產者想要在邏輯索引?
- 消費者出隊 (poll):
- 一個消費者想要消費邏輯索引?
c
?處的元素。 - 它計算出數組位置?
offset = c & mask
。 - 它去檢查?
sequenceBuffer[offset]
?的值。 - 如果?
sequenceBuffer[offset]
?的值等于?c + 1
,這就表明生產者已經完成了在?c
?位置的寫入,數據可供消費。 - 消費者獲得許可,從?
buffer[offset]
?讀取元素。 - 讀取之后,消費者會更新?
sequenceBuffer[offset]
?的值為?c + capacity
?(即?c + mask + 1
)。這個值將會是下一輪生產者在同一個物理槽位?offset
?上期望看到的序列號,從而完成一個循環。
- 一個消費者想要消費邏輯索引?
總結
ConcurrentSequencedCircularArrayQueue
?是一個巧妙的抽象,它在?ConcurrentCircularArrayQueue
?的基礎上,通過增加一個并行的?sequenceBuffer
,為無鎖(Lock-Free)并發隊列算法提供了核心機制。
- 關注點分離: 它將數據存儲 (
buffer
) 和并發控制 (sequenceBuffer
) 分離開來。 - 奠定基礎: 它不實現具體的?
offer
?和?poll
?邏輯,而是將?sequenceBuffer
?初始化好,交由?MpmcArrayQueue
、MpscSequencedArrayQueue
?等子類去實現具體的、基于序列號檢查的入隊和出隊操作。 - 高性能設計: 使用?
Unsafe
?和?putOrderedLong
?等底層技術,旨在最大化吞吐量和最小化延遲。
因此,這個類是 JCTools 中實現高性能多線程隊列(尤其是多生產者場景)的關鍵基石。
sequenceBuffer
:每個槽位的“狀態指示器”
可以把?sequenceBuffer
?想象成一個與主數據數組?buffer
?并行存在的、專門用來標記狀態的記分板。buffer
?里的每個槽位(slot),都在?sequenceBuffer
?的相同位置有一個對應的 long 類型數字,我們稱之為?seq
。
這個?seq
?的值不是隨便寫的,它遵循一個嚴格的協議,用來表示其對應槽位的狀態。對于一個位于?i
?位置的槽位,它的?seq
?值有以下幾種含義:
seq == i
: 槽位?i
?是空的,可以被生產者用來存放一個生產者序號(producerIndex)為?i
?的元素。seq == i + 1
: 槽位?i
?已被寫入數據,可以被消費者消費。生產者在放入元素后,會將?seq
?更新為?i + 1
,作為“已發布”的信號。seq == i + capacity
: 槽位?i
?已被消費,并且消費者已經將其標記為“可回收”。消費者取出元素后,會將?seq
?更新為?i + capacity
。這個?+ capacity
?的操作,相當于把這個槽位的“版本號”推進了一輪,為下一圈的生產者做好了準備。
舉個例子:?假設隊列容量?capacity
?是 8。
- 生產者想在?
producerIndex = 0
?的位置放東西。它會檢查?sequenceBuffer[0]
?的值是不是?0
。 - 如果是,它就把元素放入?
buffer[0]
,然后把?sequenceBuffer[0]
?的值更新為?1
。 - 消費者想從?
consumerIndex = 0
?的位置取東西。它會檢查?sequenceBuffer[0]
?的值是不是?0 + 1 = 1
。 - 如果是,它就從?
buffer[0]
?取出元素,然后把?sequenceBuffer[0]
?的值更新為?0 + 8 = 8
。 - 現在,當生產者跑完一整圈,輪到?
producerIndex = 8
?時,它會檢查?sequenceBuffer[0]
?(因為?8 % 8 == 0
) 的值是不是?8
。正好是消費者更新后的值!于是生產者可以安全地重用這個槽位。
這個?seq
?就是生產者和消費者之間不見面就能溝通的“信物”。