LinkedBlockingQueue
基本的入隊出隊
初始化
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 靜態內部類 Node,用于存儲隊列元素及維護節點間關系static class Node<E> {// 存儲的實際元素,泛型 E 類型E item;/*** next 字段的三種可能情況說明:* - 真正的后繼節點:正常隊列鏈接場景,指向下一個節點* - 自己(this):出隊操作時的特殊標記,用于輔助處理并發下的節點狀態* - null:表示當前節點是隊列的最后一個節點,沒有后繼*/Node<E> next;// 構造方法,初始化節點時設置存儲的元素Node(E x) {item = x;}}}
初始化鏈表時,last
?和?head
?都指向新建的?Node<E>(null)
?。該節點是?Dummy 節點(啞節點、哨兵節點)?,作用是占位,其?item
?字段值為?null
?,用于輔助?LinkedBlockingQueue
?鏈表結構的初始化與操作(比如方便統一處理隊列空、頭尾指針等情況 )。
入隊
當一個節點入隊last = last.next = node;
再來一個節點入隊last = last.next = node;
出隊
// 出隊核心邏輯(基于 LinkedBlockingQueue 鏈表結構)
public E dequeue() {// 以下是關鍵出隊步驟,實際源碼會結合鎖、條件變量等保證線程安全,這里聚焦節點操作邏輯Node<E> h = head; // 1. 獲取真正存儲元素的首節點(head 是 dummy 節點,其 next 指向第一個有效節點)Node<E> first = h.next; // 2. 斷開原 head 節點的鏈接,并讓其 next 指向自身,輔助 GC 回收(避免內存泄漏)h.next = h; // 3. 更新 head 為真正的首節點(first),完成出隊后隊列頭指針遷移head = first; // 4. 取出首節點存儲的元素(隊列要彈出的元素)E x = first.item; // 5. 清空首節點的元素引用(幫助 GC,斷開元素引用鏈)first.item = null; // 6. 返回彈出的元素return x;
}
保存頭節點:(h = head)
用變量?h
?暫存隊列當前的?head
(dummy 節點 )。獲取有效首節點:(first = h.next)
通過?h.next
?拿到真正存儲元素的第一個有效節點?first
。斷開舊頭節點鏈接:(
h.next = h
)
讓?h.next = h
(自己指向自己 ),切斷舊頭節點與隊列的關聯,輔助垃圾回收。更新頭節點:(head = first)
把?head
?指向?first
,完成隊列頭指針遷移,保證后續出隊邏輯正確。取出元素:
從?first
?節點中取出要彈出的元素?x
。清空元素引用:
將?first.item
?置為?null
,幫助 GC 回收元素對象,避免內存泄漏。返回結果:
返回彈出的元素?x
,完成出隊操作。
加鎖分析
加鎖設計(高明之處)
- 單鎖 vs 雙鎖:
- 單鎖:同一時刻最多允許一個線程(生產者 / 消費者二選一 )執行操作。
- 雙鎖(
putLock
?、takeLock
?):同一時刻允許一個生產者 + 一個消費者同時執行,消費者線程間、生產者線程間仍串行。
線程安全分析(按節點總數分類)
節點總數 > 2(含 dummy 節點):
putLock
?保證?last
?節點線程安全(入隊操作 )。takeLock
?保證?head
?節點線程安全(出隊操作 )。- 兩把鎖隔離入隊、出隊操作,無競爭。
節點總數 = 2(1 個 dummy 節點 + 1 個正常節點):
仍用兩把鎖鎖定不同對象(last
、head
?相關 ),無競爭。節點總數 = 1(僅 dummy 節點):
take
?線程因隊列為空,被?notEmpty
?條件阻塞,有競爭時會阻塞等待。
// 用于 put(阻塞)、offer(非阻塞) 的鎖,控制入隊操作
private final ReentrantLock putLock = new ReentrantLock();
// 用于 take(阻塞)、poll(非阻塞) 的鎖,控制出隊操作
private final ReentrantLock takeLock = new ReentrantLock();
put操作
public void put(E e) throws InterruptedException {// 1. 校驗元素非空if (e == null) throw new NullPointerException();int c = -1; // 記錄入隊前的元素數量Node<E> node = new Node<E>(e); // 創建新節點final ReentrantLock putLock = this.putLock; // 生產者鎖final AtomicInteger count = this.count; // 隊列元素數量(原子類,保證線程安全)// 2. 加鎖(可中斷鎖)putLock.lockInterruptibly();try {// 3. 隊列滿時等待(循環檢測,防止虛假喚醒)while (count.get() == capacity) {// 等待 notFull 條件(隊列非滿時被喚醒)notFull.await(); }// 4. 入隊操作enqueue(node); // 將新節點加入隊列尾部c = count.getAndIncrement(); // 元素數量 +1(先獲取舊值,再自增)// 5. 喚醒其他生產者(如果隊列仍有空位)if (c + 1 < capacity) {// 喚醒等待 notFull 的生產者線程notFull.signal(); }} finally {// 6. 釋放鎖putLock.unlock();}// 7. 喚醒消費者(如果隊列從空變為非空)if (c == 0) {// 喚醒等待 notEmpty 的消費者線程notEmpty.signal(); }
}// 輔助方法:入隊操作(私有方法,保證線程安全)
private void enqueue(Node<E> node) {// 隊列的 last 節點指向新節點,完成入隊last = last.next = node;
}
校驗元素:
元素為?null
?時拋出異常。初始化與加鎖:
創建節點,獲取生產者鎖,可中斷加鎖。隊列滿時等待:
隊列滿則循環等待?notFull
?條件,釋放鎖并阻塞。入隊與計數:
將節點加入隊列尾部,原子遞增元素數量。喚醒生產者:
隊列仍有空位時,喚醒其他等待的生產者。(自己喚醒自己)釋放鎖:
保證鎖釋放,避免死鎖。喚醒消費者:
隊列入隊前為空時,喚醒等待的消費者。
take操作
public E take() throws InterruptedException {E x; // 存儲出隊的元素int c = -1; // 記錄出隊前的元素數量final AtomicInteger count = this.count; // 隊列元素數量(原子類,保證線程安全)final ReentrantLock takeLock = this.takeLock; // 消費者鎖// 1. 加鎖(可中斷鎖)takeLock.lockInterruptibly();try {// 2. 隊列空時等待(循環檢測,防止虛假喚醒)while (count.get() == 0) {// 等待 notEmpty 條件(隊列非空時被喚醒)notEmpty.await(); }// 3. 出隊操作x = dequeue(); // 從隊列頭部取出元素c = count.getAndDecrement(); // 元素數量 -1(先獲取舊值,再自減)// 4. 喚醒其他消費者(如果隊列仍有元素)if (c > 1) {// 喚醒等待 notEmpty 的消費者線程notEmpty.signal(); }} finally {// 5. 釋放鎖takeLock.unlock();}// 6. 喚醒生產者(如果隊列從滿變為非滿)if (c == capacity) {// 喚醒等待 notFull 的生產者線程signalNotFull(); }return x;
}
加鎖:
消費者線程加鎖,支持中斷。隊列空時等待:
隊列空則循環等待?notEmpty
?條件,釋放鎖并阻塞。出隊與計數:
從隊列頭部取出元素,原子遞減元素數量。喚醒消費者:
隊列仍有元素時,喚醒其他等待的消費者。釋放鎖:
保證鎖釋放,避免死鎖。喚醒生產者:
隊列出隊前滿時,喚醒等待的生產者。返回結果:
返回出隊的元素。
LinkedBlockingQueue VS ArrayBlockingQueue
LinkedBlockingQueue | ArrayBlockingQueue | |
---|---|---|
有界性支持 | 支持有界(默認容量為?Integer.MAX_VALUE ,可手動指定有界) | 強制有界(初始化必須指定容量,且不可動態擴容) |
底層數據結構 | 鏈表(Node ?節點構成鏈表) | 數組(提前初始化固定大小的?Object ?數組) |
初始化特性 | 懶惰初始化(首次入隊時創建節點,無需提前分配內存) | 提前初始化(創建時需指定容量,數組直接分配內存) |
節點創建時機 | 每次入隊(put /offer )時動態創建?Node | 初始化時提前創建數組元素(邏輯上的 “節點” 是數組槽位) |
鎖機制 | 雙鎖分離(putLock ?控制入隊,takeLock ?控制出隊) | 單鎖(ReentrantLock ?同時控制入隊和出隊) |
并發性能(典型場景) | 生產 / 消費可并行(雙鎖減少競爭),高并發下優勢明顯 | 生產 / 消費互斥(單鎖競爭激烈),高并發下性能受限 |
內存開銷 | 動態節點導致額外內存(Node ?對象頭、指針等) | 數組連續存儲,內存更緊湊(無額外指針開銷) |
擴容能力 | 理論上可無限擴容(受限于?Integer.MAX_VALUE ) | 不可擴容(容量固定,滿了只能等待出隊) |
適用場景 | 高并發生產消費、需動態擴容、內存不敏感場景 | 低并發、容量固定、內存敏感場景 |
ConcurrentLinkedQueue
CopyOnWriteArrayList
CopyOnWriteArraySet 是它的馬甲
底層實現采用了寫入時拷貝的思想,增刪改操作會將底層數組拷貝一份,更改操作在新數組上執行,這時不影響其它線程的并發讀,讀寫分離。
以新增為例:
public boolean add(E e) {synchronized (lock) {// 獲取舊的數組Object[] es = getArray();int len = es.length;// 拷貝新的數組(這里是比較耗時的操作,但不影響其它讀線程)es = Arrays.copyOf(es, len + 1);// 添加新元素es[len] = e;// 替換舊的數組setArray(es);return true;}
}
這里的源碼版本是 Java 11,在 Java 1.8 中使用的是可重入鎖而不是 synchronized
- CopyOnWriteArraySet 等基于寫時拷貝(Copy - On - Write)思想的類,其他讀操作未加鎖,適合 “讀多寫少” 的應用場景,以?
forEach
?方法為例進行展示。 - 適用場景:讀多寫少場景,讀操作無需加鎖,能高效并發讀取;寫操作因拷貝數組相對耗時,適合低頻寫操作的場景。
public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);for (Object x : getArray()) {@SuppressWarnings("unchecked") E e = (E) x;action.accept(e);}
}
get弱一致性
時間點 | 操作 | 結果 |
---|---|---|
1 | Thread-0 ?執行?getArray() | 拿到數組快照?array = [1, 2, 3] |
2 | Thread-1 ?執行?getArray() | 拿到同一數組快照?array = [1, 2, 3] |
3 | Thread-1 ?執行?remove(0) ?→ 觸發寫時拷貝 | 新建數組?arrayCopy = [2, 3] ,替換?array ?為?arrayCopy |
4 | Thread-0 ?執行?array[index] | 訪問的是舊數組?[1, 2, 3] ?的元素 |
寫時拷貝的 “快照” 特性:
讀線程(Thread-0
)在時間點?1
?拿到的是數組的快照(舊數組?[1, 2, 3]
)。即使后續寫線程(Thread-1
)修改了數組(替換為?[2, 3]
),讀線程仍會訪問自己持有的舊快照。
→?讀操作可能訪問到 “過期數據”,這就是弱一致性的體現。
迭代器弱一致性
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);// 1. 獲取迭代器(此時迭代器持有數組快照:[1,2,3])
Iterator<Integer> iter = list.iterator(); // 2. 新線程修改集合(觸發寫時拷貝,數組變為 [2,3])
new Thread(() -> {list.remove(0); System.out.println(list); // 輸出: [2, 3]
}).start();// 3. 主線程繼續用迭代器遍歷
sleep1s(); // 等待子線程執行完畢
while (iter.hasNext()) {System.out.println(iter.next()); // 輸出: 1, 2, 3
}
快照機制:
迭代器創建時,會拷貝當前數組的快照(如?[1,2,3]
),后續集合的修改(如?remove
)會生成新數組([2,3]
),但迭代器仍持有舊快照。無感知遍歷:
迭代器遍歷的是創建時的數組快照,不感知后續集合的修改 → 遍歷結果與 “當前集合實際數據” 不一致。