一、前言
接著前面的分析,接下來分析ConcurrentLinkedQueue,ConcurerntLinkedQueue一個基于鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue是一個恰當的選擇。此隊列不允許使用null元素。
二、ConcurrentLinkedQueue數據結構
通過源碼分析可知,ConcurrentLinkedQueue的數據結構與LinkedBlockingQueue的數據結構相同,都是使用的鏈表結構。ConcurrentLinkedQueue的數據結構如下
說明:ConcurrentLinkedQueue采用的鏈表結構,并且包含有一個頭結點和一個尾結點。
三、ConcurrentLinkedQueue源碼分析
3.1 類的繼承關系
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>implements Queue<E>, java.io.Serializable {}
說明:ConcurrentLinkedQueue繼承了抽象類AbstractQueue,AbstractQueue定義了對隊列的基本操作;同時實現了Queue接口,Queue定義了對隊列的基本操作,同時,還實現了Serializable接口,表示可以被序列化。
3.2 類的內部類


private static class Node<E> {// 元素volatile E item;// next域volatile Node<E> next;/*** Constructs a new node. Uses relaxed write because item can* only be seen after publication via casNext.*/// 構造函數 Node(E item) {// 設置item的值UNSAFE.putObject(this, itemOffset, item);}// 比較并替換item值boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {// 設置next域的值,并不會保證修改對其他線程立即可見UNSAFE.putOrderedObject(this, nextOffset, val);}// 比較并替換next域的值boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanics// 反射機制private static final sun.misc.Unsafe UNSAFE;// item域的偏移量private static final long itemOffset;// next域的偏移量private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}}
說明:Node類表示鏈表結點,用于存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。
3.3 類的屬性


public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>implements Queue<E>, java.io.Serializable {// 版本序列號 private static final long serialVersionUID = 196745693267521676L;// 反射機制private static final sun.misc.Unsafe UNSAFE;// head域的偏移量private static final long headOffset;// tail域的偏移量private static final long tailOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));} catch (Exception e) {throw new Error(e);}}// 頭結點private transient volatile Node<E> head;// 尾結點private transient volatile Node<E> tail; }
說明:屬性中包含了head域和tail域,表示鏈表的頭結點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭結點和尾結點,保證原子性。
3.4 類的構造函數
1. ConcurrentLinkedQueue()型構造函數


public ConcurrentLinkedQueue() {// 初始化頭結點與尾結點head = tail = new Node<E>(null);}
說明:該構造函數用于創建一個最初為空的 ConcurrentLinkedQueue,頭結點與尾結點指向同一個結點,該結點的item域為null,next域也為null。
2. ConcurrentLinkedQueue(Collection<? extends E>)型構造函數


public ConcurrentLinkedQueue(Collection<? extends E> c) {Node<E> h = null, t = null;for (E e : c) { // 遍歷c集合// 保證元素不為空 checkNotNull(e);// 新生一個結點Node<E> newNode = new Node<E>(e);if (h == null) // 頭結點為null// 賦值頭結點與尾結點h = t = newNode;else {// 直接頭結點的next域 t.lazySetNext(newNode);// 重新賦值頭結點t = newNode;}}if (h == null) // 頭結點為null// 新生頭結點與尾結點h = t = new Node<E>(null);// 賦值頭結點head = h;// 賦值尾結點tail = t;}
說明:該構造函數用于創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。
3.5 核心函數分析
1. offer函數


public boolean offer(E e) {// 元素不為null checkNotNull(e);// 新生一個結點final Node<E> newNode = new Node<E>(e);for (Node<E> t = tail, p = t;;) { // 無限循環// q為p結點的下一個結點Node<E> q = p.next;if (q == null) { // q結點為null// p is last nodeif (p.casNext(null, newNode)) { // 比較并進行替換p結點的next域// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".if (p != t) // p不等于t結點,不一致 // hop two nodes at a time// 比較并替換尾結點casTail(t, newNode); // Failure is OK.// 返回return true;}// Lost CAS race to another thread; re-read next }else if (p == q) // p結點等于q結點// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.// 原來的尾結點與現在的尾結點是否相等,若相等,則p賦值為head,否則,賦值為現在的尾結點p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.// 重新賦值p結點p = (p != t && t != (t = tail)) ? t : q;}}
說明:offer函數用于將指定元素插入此隊列的尾部。下面模擬offer函數的操作,隊列狀態的變化(假設單線程添加元素,連續添加10、20兩個元素)。
① 若ConcurrentLinkedQueue的初始狀態如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀態如下所示
② 如上圖所示,添加元素10后,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀態如下所示
③ 如上圖所示,添加元素20后,tail指向了最新添加的結點。
2. poll函數


public E poll() {restartFromHead:for (;;) { // 無限循環for (Node<E> h = head, p = h, q;;) { // 保存頭結點// item項E item = p.item;if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // p不等于h // hop two nodes at a time// 更新頭結點updateHead(h, ((q = p.next) != null) ? q : p); // 返回itemreturn item;}else if ((q = p.next) == null) { // q結點為null// 更新頭結點 updateHead(h, p);return null;}else if (p == q) // p等于q// 繼續循環continue restartFromHead;else// p賦值為qp = q;}}}
說明:此函數用于獲取并移除此隊列的頭,如果此隊列為空,則返回null。下面模擬poll函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20后的狀態,poll兩次)。
① 隊列初始狀態如上圖所示,在poll操作后,隊列的狀態如下圖所示
② 如上圖可知,poll操作后,head改變了,并且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀態如下圖所示。
③ 如上圖可知,poll操作后,head結點沒有變化,只是指示的結點的item域變成了null。
3. remove函數


public boolean remove(Object o) {// 元素為null,返回if (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個存活的結點// 第一個存活結點的item值E item = p.item;if (item != null &&o.equals(item) &&p.casItem(item, null)) { // 找到item相等的結點,并且將該結點的item設置為null// p的后繼結點Node<E> next = succ(p);if (pred != null && next != null) // pred不為null并且next不為null// 比較并替換next域 pred.casNext(p, next);return true;}// pred賦值為ppred = p;}return false;}
說明:此函數用于從隊列中移除指定元素的單個實例(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下


Node<E> first() {restartFromHead:for (;;) { // 無限循環,確保成功for (Node<E> h = head, p = h, q;;) {// p結點的item域是否為nullboolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) { // item不為null或者next域為null// 更新頭結點 updateHead(h, p);// 返回結點return hasItem ? p : null;}else if (p == q) // p等于q// 繼續從頭結點開始continue restartFromHead;else// p賦值為qp = q;}}}
說明:first函數用于找到鏈表中第一個存活的結點。succ函數源碼如下


final Node<E> succ(Node<E> p) {// p結點的next域Node<E> next = p.next;// 如果next域為自身,則返回頭結點,否則,返回nextreturn (p == next) ? head : next;}
說明:succ用于獲取結點的下一個結點。如果結點的next域指向自身,則返回head頭結點,否則,返回next結點。下面模擬remove函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20后的狀態,執行remove(10)、remove(20)操作)。
① 如上圖所示,為ConcurrentLinkedQueue的初始狀態,remove(10)后的狀態如下圖所示
② 如上圖所示,當執行remove(10)后,head指向了head結點之前指向的結點的下一個結點,并且head結點的item域置為null。繼續執行remove(20),狀態如下圖所示
③ 如上圖所示,執行remove(20)后,head與tail指向同一個結點,item域為null。
4. size函數


public int size() {// 計數int count = 0;for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往后遍歷if (p.item != null) // 結點的item域不為null// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出循環break;// 返回大小return count;}
說明:此函數用于返回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往后遍歷鏈表,當結點的item域不為null時,增加計數,之后返回大小。
五、示例
下面通過一個示例來了解ConcurrentLinkedQueue的使用


package com.hust.grid.leesf.collections;import java.util.concurrent.ConcurrentLinkedQueue;class PutThread extends Thread {private ConcurrentLinkedQueue<Integer> clq;public PutThread(ConcurrentLinkedQueue<Integer> clq) {this.clq = clq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("add " + i);clq.add(i);Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} }class GetThread extends Thread {private ConcurrentLinkedQueue<Integer> clq;public GetThread(ConcurrentLinkedQueue<Integer> clq) {this.clq = clq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("poll " + clq.poll());Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} }public class ConcurrentLinkedQueueDemo {public static void main(String[] args) {ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();PutThread p1 = new PutThread(clq);GetThread g1 = new GetThread(clq);p1.start();g1.start();} }
運行結果(某一次):


add 0 poll null add 1 poll 0 add 2 poll 1 add 3 poll 2 add 4 poll 3 add 5 poll 4 poll 5 add 6 add 7 poll 6 poll 7 add 8 add 9 poll 8
說明:GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接返回null,所以當實現隊列不空時,等待時,則需要用戶自己實現等待邏輯。
六、總結
ConcurrentLinkedQueue的源碼也相對簡單,其實對于并發集合而言,分析源碼時首先理解單線程情況,然后再考慮在多線程并發時的情況,這樣會使得分析源碼容易得多,ConcurrentLinkedQueue和LinkedBlockingQueue的區別還是很明顯的(前者在取元素時,若隊列為空,則返回null;后者會進行等待)。謝謝各位園友的觀看~