引入
之前我們探究了常見的阻塞隊列的特點,在本文我們就以 ArrayBlockingQueue 為例,首先分析
BlockingQueue ,也就是阻塞隊列的線程安全原理,然后再看看它的兄弟——非阻塞隊列的并發安全原理。
ArrayBlockingQueue 源碼分析
我們首先看一下 ArrayBlockingQueue 的源碼,ArrayBlockingQueue 有以下幾個重要的屬性:
/*** 用于存儲隊列元素的數組。該數組是固定大小的,一旦創建,其容量就不能再改變。* 數組中的元素類型為 Object,因為隊列可以存儲任意類型的元素。*/final Object[] items;/*** 下一次執行 take、poll、peek 或 remove 操作時,從數組中獲取元素的索引位置。* 這是一個循環數組,當 takeIndex 達到數組的末尾時,會重新回到數組的起始位置。*/int takeIndex;/*** 下一次執行 put、offer 或 add 操作時,將元素插入到數組中的索引位置。* 同樣,這是一個循環數組,當 putIndex 達到數組的末尾時,會重新回到數組的起始位置。*/int putIndex;/*** 當前隊列中元素的數量。* 該值始終小于或等于數組的容量,用于跟蹤隊列中實際存儲的元素數量。*/int count;
第一個就是最核心的、用于存儲元素的 Object 類型的數組;然后它還會有兩個位置變量,分別是takeIndex 和 putIndex,這兩個變量就是用來標明下一次讀取和寫入位置的;另外還有一個 count 用來計數,它所記錄的就是隊列中的元素個數。
另外,我們再來看下面這三個變量:
/*** 主鎖,用于保護對隊列的所有訪問操作。* 所有對隊列的讀寫操作都需要先獲取這個鎖,以確保線程安全。*/final ReentrantLock lock;/*** 用于等待 take 操作的條件對象。* 當隊列中沒有元素時,嘗試從隊列中取元素的線程會在此條件上等待。* 當有新元素被添加到隊列中時,會通過這個條件喚醒等待的線程。*/private final Condition notEmpty;/*** 用于等待 put 操作的條件對象。* 當隊列已滿時,嘗試向隊列中添加元素的線程會在此條件上等待。* 當有元素從隊列中被移除時,會通過這個條件喚醒等待的線程。*/private final Condition notFull;
這三個變量也非常關鍵,第一個就是一個 ReentrantLock,而下面兩個 Condition 分別是由ReentrantLock 產生出來的,這三個變量就是我們實現線程安全最核心的工具。
ArrayBlockingQueue 實現并發同步的原理就是利用 ReentrantLock 和它的兩個 Condition,讀操作和寫操作都需要先獲取到 ReentrantLock 獨占鎖才能進行下一步操作。進行讀操作時如果隊列為空,線程就會進入到讀線程專屬的 notEmpty 的 Condition 的隊列中去排隊,等待寫線程寫入新的元素;同理,如果隊列已滿,這個時候寫操作的線程會進入到寫線程專屬的 notFull 隊列中去排隊,等待讀線程將隊列元素移除并騰出空間。
下面,我們來分析一下最重要的 put 方法:
/*** 將指定元素插入此隊列的尾部,如果隊列已滿,則等待空間可用。** @param e 要插入的元素* @throws InterruptedException 如果在等待過程中當前線程被中斷* @throws NullPointerException 如果指定的元素為 null*/public void put(E e) throws InterruptedException {// 檢查傳入的元素是否為 null,若為 null 則拋出空指針異常checkNotNull(e);// 獲取用于控制隊列訪問的可重入鎖final ReentrantLock lock = this.lock;// 以可中斷的方式獲取鎖,允許線程在等待鎖的過程中被中斷lock.lockInterruptibly();try {// 當隊列中的元素數量達到數組容量時,線程進入等待狀態// 等待其他線程從隊列中取出元素,釋放空間while (count == items.length)notFull.await();// 當隊列有空間時,將元素插入隊列尾部enqueue(e);} finally {// 無論插入操作是否成功,最后都要釋放鎖lock.unlock();}}
在 put 方法中,首先用 checkNotNull 方法去檢查插入的元素是不是 null。如果不是 null,我們會用ReentrantLock 上鎖,并且上鎖方法是 lock.lockInterruptibly()。在獲取鎖的同時是可以響應中斷的,這也正是我們的阻塞隊列在調用 put 方法時,在嘗試獲取鎖但還沒拿到鎖的期間可以響應中斷的底層原因。
緊接著 ,是一個非常經典的 try ?finally 代碼塊,finally 中會去解鎖,try 中會有一個 while 循環,它會檢查當前隊列是不是已經滿了,也就是 count 是否等于數組的長度。如果等于就代表已經滿了,于是我們便會進行等待,直到有空余的時候,我們才會執行下一步操作,調用 enqueue 方法讓元素進入隊列,最后用 unlock 方法解鎖。
和 ArrayBlockingQueue 類似,其他各種阻塞隊列如 LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、DelayedWorkQueue 等一系列 BlockingQueue 的內部也是利用了 ReentrantLock 來保證線程安全,只不過細節有差異,比如 LinkedBlockingQueue 的內部有兩把鎖,分別鎖住隊列的頭和尾,比共用同一把鎖的效率更高,不過總體思想都是類似的。
非阻塞隊列ConcurrentLinkedQueue
看完阻塞隊列之后,我們就來看看非阻塞隊列 ConcurrentLinkedQueue。
我們先看看它的源碼注釋:
An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.
This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms ? by Maged M. Michael and Michael L. Scott.
Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw java. util. ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.
This class and its iterator implement all of the optional methods of the Queue and Iterator interfaces.
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.
This class is a member of the Java Collections Framework.翻譯:
這是一個基于鏈表節點的無界線程安全隊列。該隊列按照先進先出(FIFO)的順序對元素進行排序。隊列頭部的元素是在隊列中存在時間最長的元素,隊列尾部的元素是在隊列中存在時間最短的元素。新元素會被插入到隊列尾部,而隊列的檢索操作則從隊列頭部獲取元素。當有多個線程需要共享訪問一個公共集合時,ConcurrentLinkedQueue 是一個合適的選擇。和大多數其他并發集合實現一樣,此類不允許使用 null 元素。
此實現采用了一種高效的非阻塞算法,該算法基于 Maged M. Michael 和 Michael L. Scott 所著的《Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms》(簡單、快速且實用的非阻塞和阻塞并發隊列算法)中描述的算法。
迭代器具有弱一致性,它返回的元素反映了迭代器創建時或創建之后某個時刻隊列的狀態。迭代器不會拋出 java.util.ConcurrentModificationException 異常,并且可以與其他操作并發進行。自迭代器創建以來,隊列中包含的元素將恰好被返回一次。
請注意,與大多數集合不同,size 方法并非常量時間操作。由于這些隊列具有異步特性,確定當前元素的數量需要遍歷元素,因此如果在遍歷過程中集合被修改,可能會報告不準確的結果。此外,批量操作(如 addAll、removeAll、retainAll、containsAll、equals 和 toArray)不能保證以原子方式執行。例如,與 addAll 操作并發執行的迭代器可能只會看到部分被添加的元素。
此類及其迭代器實現了 Queue 接口和 Iterator 接口的所有可選方法。
內存一致性效果:與其他并發集合一樣,一個線程將對象放入 ConcurrentLinkedQueue 之前的操作,先行發生于另一個線程從該 ConcurrentLinkedQueue 中訪問或移除該元素之后的操作。
此類是 Java 集合框架的成員之一。
顧名思義,ConcurrentLinkedQueue 是使用鏈表作為其數據結構的,我們來看一下關鍵方法 offer 的源碼:
/*** 將指定元素插入此隊列的尾部。由于隊列是無界的,此方法將永遠不會返回 {@code false}。** @param e 要插入的元素* @return {@code true}(由 {@link Queue#offer} 指定)* @throws NullPointerException 如果指定的元素為 null*/public boolean offer(E e) {// 檢查插入的元素是否為 null,如果為 null 則拋出 NullPointerExceptioncheckNotNull(e);// 創建一個新的節點,用于存儲要插入的元素final Node<E> newNode = new Node<E>(e);// 從尾節點開始,嘗試將新節點插入到隊列中for (Node<E> t = tail, p = t;;) {// 獲取當前節點的下一個節點Node<E> q = p.next;// 如果下一個節點為 null,說明當前節點是隊列的最后一個節點if (q == null) {// p 是最后一個節點// 嘗試使用 CAS 操作將新節點設置為當前節點的下一個節點if (p.casNext(null, newNode)) {// 成功的 CAS 操作是元素 e 成為此隊列元素的線性化點,// 也是新節點 newNode 成為“活躍”節點的線性化點// 如果當前節點不是尾節點,嘗試更新尾節點if (p != t) // 一次跳躍兩個節點// 嘗試更新尾節點為新節點,失敗也沒關系casTail(t, newNode); return true;}// 與其他線程的 CAS 競爭失敗;重新讀取下一個節點} // 如果當前節點的下一個節點是自身,說明我們已經脫離了鏈表else if (p == q)// 我們已經脫離了鏈表。如果尾節點沒有改變,// 它也會脫離鏈表,在這種情況下,我們需要跳轉到頭節點,// 因為所有活躍節點總是可以從頭節點到達。否則,新的尾節點是更好的選擇。p = (t != (t = tail)) ? t : head;else// 經過兩次跳躍后檢查尾節點的更新情況p = (p != t && t != (t = tail)) ? t : q;}}
在這里我們不去一行一行分析具體的內容,而是把目光放到整體的代碼結構上,在檢查完空判斷之后,可以看到它整個是一個大的 for 循環,而且是一個非常明顯的死循環。在這個循環中有一個非常亮眼的 p.casNext 方法,這個方法正是利用了 CAS 來操作的,而且這個死循環去配合 CAS 也就是典型的樂觀鎖的思想。
我們就來看一下 p.casNext 方法的具體實現,其方法代碼如下:
/*** 嘗試使用CAS(比較并交換)操作將當前節點的next引用從cmp更新為val。** @param cmp 期望的當前next引用值* @param val 要設置的新的next引用值* @return 如果CAS操作成功,則返回true;否則返回false*/boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}
可以看出這里運用了 UNSAFE.compareAndSwapObject 方法來完成 CAS 操作,而compareAndSwapObject 是一個 native 方法,最終會利用 CPU 的 CAS 指令保證其不可中斷。
可以看出,非阻塞隊列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重試,來實現線程安全,適合用在不需要阻塞功能,且并發不是特別劇烈的場景。
總結
我們最后來做一下總結。通過我們對阻塞隊列和非阻塞隊列的并發安全原理的分析,可以知道,其中阻塞隊列最主要是利用了 ReentrantLock 以及它的 Condition 來實現,而非阻塞隊列則是利用 CAS 方法實現線程安全。