AQS詳解
- 前言
- AQS幾個重要的內部屬性
- 字段
- 內部類 Node
- 同步隊列 | 阻塞隊列
- 等待隊列 | 條件隊列
- 重要方法執行鏈
- 同步隊列的獲取、阻塞、喚醒
- 加鎖代碼流程
- 解鎖
- 條件隊列的獲取、阻塞、喚醒
- 大體流程
- 調用await()方法
- 1. 將節點加入到條件隊列
- 2. 完全釋放獨占鎖
- 3. 等待進入阻塞隊列
- 4. signal 喚醒線程,轉移到阻塞隊列
- 喚醒后檢查中斷狀態
- 6. 獲取獨占鎖
- 7. 處理中斷狀態
- * 帶超時機制的 await
- * 不拋出 InterruptedException 的 await
前言
在分析 Java 并發包 java.util.concurrent 源碼的時候,少不了需要了解 AbstractQueuedSynchronizer(以下簡寫AQS)這個抽象類,因為它是 Java 并發包的基礎工具類,是實現 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等類的基礎。
AQS幾個重要的內部屬性
字段
//共享變量,使用volatile修飾保證線程可見性
private volatile int state;// 頭結點,你直接把它當做 當前持有鎖的線程 可能是最好理解的
private transient volatile Node head;// 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個鏈表
private transient volatile Node tail;// 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入
// reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
state
是用來記錄同步狀態的一個重要屬性。在不同的AQS實現類上,往往有不同的含義。但是,總體有那么兩種作用。
共享鎖
:state的值代表著該臨界資源的數量。如 :打印機的數量是2,state =2,那么最多允許同時打印兩份文件。換算到線程中,就意味著,最多有 state 個線程同時進入。獨占鎖
:對于獨占鎖而言,以state的初始值并不是1,而是0。這是因為在獨占鎖中,這個值的含義代表著重入次數,每重入一次加一,但值為0時,意味著這個這個臨界資源并未被搶占。
內部類 Node
static final class Node {// 標識節點當前在共享模式下static final Node SHARED = new Node();// 標識節點當前在獨占模式下static final Node EXCLUSIVE = null;// ======== 下面的幾個int常量是給waitStatus用的 ===========/** waitStatus value to indicate thread has cancelled */// 代碼此線程取消了爭搶這個鎖static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */// 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒或者說可以被喚醒static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */// 條件隊列標識static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/// 同樣的不分析,略過吧static final int PROPAGATE = -3;// =====================================================// 取值為上面的1、-1、-2、-3,或者0(以后會講到)// 這么理解,暫時只需要知道如果這個值 大于0 代表此線程取消了等待,// ps: 半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。volatile int waitStatus;// 前驅節點的引用volatile Node prev;// 后繼節點的引用volatile Node next;// 這個就是線程本尊volatile Thread thread;//鏈接到等待條件或特殊值SHARED的下一個節點。即為,用于鏈接條件隊列的指針Node nextWaiter;}
Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next + nextWaiter 五個屬性而已,大家先要有這個概念在心里。
同步隊列 | 阻塞隊列
在多個線程競爭有限資源的情況下,一定會出現部分線程 得不到資源,即陷入阻塞狀態
。那么對于這些阻塞的線程,AQS會使用一個隊列組織起來,用于后續線程的喚醒。這個隊列就是CLH隊列(Craig,Landin,and Hagersten),一個虛擬的雙向鏈表
(這個為什么形容為虛擬的有興趣可以自己去了解一下),而隊列的每一個節點就是一個Node節點,記錄了阻塞線程的信息。
阻塞隊列不包含 head 節點
, head這個節點在邏輯上代表著 占有鎖的這個節點
等待隊列 | 條件隊列
如何說同步隊列是線程想要獲取鎖失敗而入隊的,那么條件隊列就是已經獲取到鎖,但是沒有滿足某種條件而主動阻塞的。而這種情況的阻塞不是因為競爭鎖而導致的,那么放在同步隊列就不合適了。于是,引申出了條件隊列,條件隊列是一個單向鏈表
Condition 經常可以用在生產者-消費者的場景中,這里以 ReentrantLock
舉例.
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;class BoundedBuffer {final Lock lock = new ReentrantLock();// condition 依賴于 lock 來產生final Condition notFull = lock.newCondition();final Condition notEmpty = lock.newCondition();final Object[] items = new Object[100];int putptr, takeptr, count;// 生產public void put(Object x) throws InterruptedException {lock.lock();try {while (count == items.length)notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去} finally {lock.unlock();}}// 消費public Object take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await(); // 隊列為空,等待,直到隊列 not empty,才能繼續消費Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去return x;} finally {lock.unlock();}}
}
await() 方法會釋放鎖
是的,有一條線指向了同步隊列,這是因為,當條件隊列的條件滿足時,線程理論上就可以繼續執行了,但是需要重新獲取鎖。
condition 是依賴于 ReentrantLock 的,不管是調用 await 進入等待還是 signal 喚醒,都必須獲取到鎖才能進行操作。
重要方法執行鏈
同步隊列的獲取、阻塞、喚醒
ReentrantLock
在內部用了內部類 Sync
來管理鎖,所以真正的獲取鎖和釋放鎖是由 Sync
的實現類來控制的。Sync 有兩個實現,分別為 NonfairSync
(非公平鎖)和 FairSync
(公平鎖),我們看 FairSync
部分。
加鎖代碼流程
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;// 爭鎖final void lock() {acquire(1);}// 來自父類AQS,我直接貼過來這邊,下面分析的時候同樣會這樣做,不會給讀者帶來閱讀壓力// 我們看到,這個方法,如果tryAcquire(arg) 返回true, 也就結束了。// 否則,acquireQueued方法會將線程壓到隊列中public final void acquire(int arg) { // 此時 arg == 1// 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試// 因為有可能直接就成功了呢,也就不需要進隊列排隊了,// 對于公平鎖的語義就是:本來就沒人持有鎖,根本沒必要進隊列等待(又是掛起,又是等待被喚醒的)if (!tryAcquire(arg) &&// tryAcquire(arg)沒有成功,這個時候需要把當前線程掛起,放到阻塞隊列中。acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {selfInterrupt();}}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/// 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖// 返回true:1.沒有線程在等待鎖;2.重入鎖,線程本來就持有鎖,也就可以理所當然可以直接獲取protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// state == 0 此時此刻沒有線程持有鎖if (c == 0) {// 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來后到,// 看看有沒有別人在隊列中等了半天了if (!hasQueuedPredecessors() &&// 如果沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了,// 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_=// 因為剛剛還沒人的,我判斷過了compareAndSetState(0, acquires)) {// 到這里就是獲取到鎖了,標記一下,告訴大家,現在是我占用了鎖setExclusiveOwnerThread(current);return true;}}// 會進入這個else if分支,說明是重入了,需要操作:state=state+1// 這里不存在并發問題else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}// 如果到這里,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖// 回到上面一個外層調用方法繼續看:// if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt();return false;}// 假設tryAcquire(arg) 返回false,那么代碼將執行:// acquireQueued(addWaiter(Node.EXCLUSIVE), arg),// 這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE)/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/// 此方法的作用是把線程包裝成node,同時進入到隊列中// 參數mode此時是Node.EXCLUSIVE,代表獨占模式private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// 以下幾行代碼想把當前node加到鏈表的最后面去,也就是進到阻塞隊列的最后Node pred = tail;// tail!=null => 隊列不為空(tail==head的時候,其實隊列是空的,不過不管這個吧)if (pred != null) { // 將當前的隊尾節點,設置為自己的前驅 node.prev = pred; // 用CAS把自己設置為隊尾, 如果成功后,tail == node 了,這個節點成為阻塞隊列新的尾巴if (compareAndSetTail(pred, node)) { // 進到這里說明設置成功,當前node==tail, 將自己與之前的隊尾相連,// 上面已經有 node.prev = pred,加上下面這句,也就實現了和之前的尾節點雙向連接了pred.next = node;// 線程入隊了,可以返回了return node;}}// 仔細看看上面的代碼,如果會到這里,// 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊)// 讀者一定要跟上思路,如果沒有跟上,建議先不要往下讀了,往回仔細看,否則會浪費時間的enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/// 采用自旋的方式入隊// 之前說過,到這個方法只有兩種可能:等待隊列為空,或者有線程競爭入隊,// 自旋在這邊的語義是:CAS設置tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的private Node enq(final Node node) {for (;;) {Node t = tail;// 之前說過,隊列為空也會進來這里if (t == null) { // Must initialize// 初始化head節點// 細心的讀者會知道原來 head 和 tail 初始化的時候都是 null 的// 還是一步CAS,你懂的,現在可能是很多線程同時進來呢if (compareAndSetHead(new Node()))// 給后面用:這個時候head節點的waitStatus==0, 看new Node()構造方法就知道了// 這個時候有了head,但是tail還是null,設置一下,// 把tail指向head,放心,馬上就有線程要來了,到時候tail就要被搶了// 注意:這里只是設置了tail=head,這里可沒return哦,沒有return,沒有return// 所以,設置完了以后,繼續for循環,下次就到下面的else分支了tail = head;} else {// 下面幾行,和上一個方法 addWaiter 是一樣的,// 只是這個套在無限循環里,反正就是將當前線程排到隊尾,有線程競爭的話排不上重復排node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// 現在,又回到這段代碼了// if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt();// 下面這個方法,參數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話,// 意味著上面這段代碼將進入selfInterrupt(),所以正常情況下,下面應該返回false// 這個方法非常重要,應該說真正的線程掛起,然后被喚醒后去獲取鎖,都在這個方法里了final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是head// 注意,阻塞隊列不包含head節點,head一般指的是占有鎖的線程,head后面的才稱為阻塞隊列// 所以當前節點可以去試搶一下鎖// 這里我們說一下,為什么可以去試試:// 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node,// enq(node) 方法里面有提到,head是延時初始化的,而且new Node()的時候沒有設置任何線程// 也就是說,當前的head不屬于任何一個線程,所以作為隊頭,可以去試一試,// tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下stateif (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭,// 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 什么時候 failed 會為 true???// tryAcquire() 方法拋異常的情況if (failed)cancelAcquire(node);}}/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/// 剛剛說過,會到這里就是沒有搶到鎖唄,這個方法說的是:"當前線程沒有搶到鎖,是否需要掛起當前線程?"// 第一個參數是前驅節點,第二個參數才是代表當前線程的節點private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程需要掛起,直接可以返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 前驅節點 waitStatus大于0 ,之前說過,大于0 說明前驅節點取消了排隊。// 這里需要知道這點:進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。// 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點,// 簡單說,就是為了找個好爹,因為你還得依賴它來喚醒呢,如果前驅節點取消了排隊,// 找前驅節點的前驅節點做爹,往前遍歷總能找到一個好爹的if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/// 仔細想想,如果進入到這個分支意味著什么// 前驅節點的waitStatus不等于-1和1,那也就是只可能是0,-2,-3// 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0// 正常情況下,前驅節點是之前的 tail,那么它的 waitStatus 應該是 0// 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// 這個方法返回 false,那么會再走一次 for 循序,// 然后再次進來此方法,此時會從第一個分支返回 truereturn false;}// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)// 這個方法結束根據返回值我們簡單分析下:// 如果返回true, 說明前驅節點的waitStatus==-1,是正常情況,那么當前線程需要被掛起,等待以后被喚醒// 我們也說過,以后是被前驅節點喚醒,就等著前驅節點拿到鎖,然后釋放鎖的時候叫你好了// 如果返回false, 說明當前不需要被掛起,為什么呢?往后看// 跳回到前面是這個方法// if (shouldParkAfterFailedAcquire(p, node) &&// parkAndCheckInterrupt())// interrupted = true;// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,// 那么需要執行parkAndCheckInterrupt():// 這個方法很簡單,因為前面返回true,所以需要掛起線程,這個方法就是負責掛起線程的// 這里用了LockSupport.park(this)來掛起線程,然后就停在這里了,等待被喚醒=======private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}// 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況// 仔細看shouldParkAfterFailedAcquire(p, node),我們可以發現,其實第一次進來的時候,一般都不會返回true的,原因很簡單,前驅節點的waitStatus=-1是依賴于后繼節點設置的。也就是說,我都還沒給前驅設置-1呢,怎么可能是true呢,但是要看到,這個方法是套在循環里的,所以第二次進來的時候狀態就是-1了。// 解釋下為什么shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起線程:// => 是為了應對在經過這個方法后,node已經是head的直接后繼節點了。剩下的讀者自己想想吧。
}
我們可以看到 FairSync extends SYnc extends AbstractQueuedSynchronize
,并對 tryAcquire(int acquires)
和 tryRelease(int releases)
進行了重寫,而其他操作都是通過 AbstractQueuedSynchronizer
這個抽象類的共性操作來實現的。
解鎖
最后,就是還需要介紹下喚醒的動作了。我們知道,正常情況下,如果線程沒獲取到鎖,線程會被 LockSupport.park(this);
掛起停止,等待被喚醒。
// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了,下面都不需要看就知道怎么回事了
public void unlock() {sync.release(1);
}public final boolean release(int arg) {// 往后看吧if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();// 是否完全釋放鎖boolean free = false;// 其實就是重入的問題,如果c==0,也就是說沒有嵌套鎖了,可以釋放了,否則還不能釋放掉if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}/*** Wakes up node's successor, if one exists.** @param node the node*/
// 喚醒后繼節點
// 從上面調用處知道,參數node是head頭結點
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;// 如果head節點當前waitStatus<0, 將其修改為0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1)// 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 從后往前找,仔細看代碼,不必擔心中間有節點取消(waitStatus==1)的情況for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 喚醒線程LockSupport.unpark(s.thread);
}
喚醒線程以后,被喚醒的線程將從以下代碼中繼續往前走:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 剛剛線程被掛起在這里了return Thread.interrupted();
}
// 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了
條件隊列的獲取、阻塞、喚醒
大體流程
我們首先來看下我們關注的 Condition 的實現類 AbstractQueuedSynchronizer
類中的 ConditionObject
。
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;// 條件隊列的第一個節點// 不要管這里的關鍵字 transient,是不參與序列化的意思private transient Node firstWaiter;// 條件隊列的最后一個節點private transient Node lastWaiter;......
prev 和 next
用于實現阻塞隊列的雙向鏈表
,這里的nextWaiter
用于實現條件隊列的單向鏈表
-
條件隊列和阻塞隊列的節點,都是 Node 的實例,因為條件隊列的節點是需要轉移到阻塞隊列中去的;
-
我們知道一個 ReentrantLock 實例可以通過多次調用 newCondition() 來產生多個 Condition 實例,這里對應 condition1 和 condition2。注意,ConditionObject 只有兩個屬性 firstWaiter 和 lastWaiter;
-
每個 condition 有一個關聯的條件隊列,如線程 1 調用 condition1.await() 方法即可將當前線程 1 包裝成 Node 后加入到條件隊列中,然后阻塞在這里,不繼續往下執行,條件隊列是一個單向鏈表;
-
調用condition1.signal() 觸發一次喚醒,此時喚醒的是隊頭,會將condition1 對應的條件隊列的 firstWaiter(隊頭) 移到阻塞隊列的隊尾,等待獲取鎖,獲取鎖后 await 方法才能返回,繼續往下執行。
這個圖看懂后,下面的代碼分析就簡單了。
接下來,我們一步步按照流程來走代碼分析。
調用await()方法
// 首先,這個方法是可被中斷的,不可被中斷的是另一個方法 awaitUninterruptibly()
// 這個方法會阻塞,直到調用 signal 方法(指 signal() 和 signalAll(),下同),或被中斷
public final void await() throws InterruptedException {// 老規矩,既然該方法要響應中斷,那么在最開始就判斷中斷狀態if (Thread.interrupted())throw new InterruptedException();// 添加到 condition 的條件隊列中Node node = addConditionWaiter();// 釋放鎖,返回值是釋放鎖之前的 state 值// await() 之前,當前線程是必須持有鎖的,這里肯定要釋放掉int savedState = fullyRelease(node);int interruptMode = 0;// 這里退出循環有兩種情況,之后再仔細分析// 1. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了// 2. checkInterruptWhileWaiting(node) != 0 會到 break,然后退出循環,代表的是線程中斷while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 被喚醒后,將進入阻塞隊列,等待獲取鎖if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
這個就是await 整體過程了,下面我們分步把上面的幾個點用源碼說清楚。
1. 將節點加入到條件隊列
addConditionWaiter() 是將當前節點加入到條件隊列,這種條件隊列內的操作是線程安全的。
// 將當前線程對應的節點入隊,插入隊尾
private Node addConditionWaiter() {Node t = lastWaiter;// 如果條件隊列的最后一個節點取消了,將其清除出去// 為什么這里把 waitStatus 不等于 Node.CONDITION,就判定為該節點發生了取消排隊?if (t != null && t.waitStatus != Node.CONDITION) {// 這個方法會遍歷整個條件隊列,然后會將已取消的所有節點清除出隊列unlinkCancelledWaiters();t = lastWaiter;}// node 在初始化的時候,指定 waitStatus 為 Node.CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);// t 此時是 lastWaiter,隊尾// 如果隊列為空if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
上面的這塊代碼很簡單,就是將當前線程進入到條件隊列的隊尾。
在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用于清除隊列中已經取消等待的節點。
當 await 的時候如果發生了取消操作(這點之后會說),或者是在節點入隊的時候,發現最后一個節點是被取消的,會調用一次這個方法。
// 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去
// 純屬鏈表操作,很好理解,看不懂多看幾遍就可以了
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;// 如果節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}
}
2. 完全釋放獨占鎖
回到 wait 方法,節點入隊了以后,會調用 int savedState = fullyRelease(node);
方法釋放鎖,注意,這里是完全釋放獨占鎖(fully release),因為 ReentrantLock 是可以重入的。
考慮一下這里的state 的值。如果在 condition1.await() 之前,假設線程先執行了 2 次 lock() 操作,那么 state 為 2,我們理解為該線程持有 2 把鎖,這里 await() 方法必須將 state 設置為 0,然后再進入掛起狀態,這樣其他線程才能持有鎖。當它被喚醒的時候,它需要重新持有 2 把鎖,才能繼續下去。
// 首先,我們要先觀察到返回值 savedState 代表 release 之前的 state 值
// 對于最簡單的操作:先 lock.lock(),然后 condition1.await()。
// 那么 state 經過這個方法由 1 變為 0,鎖釋放,此方法返回 1
// 相應的,如果 lock 重入了 n 次,savedState == n
// 如果這個方法失敗,會將節點設置為"取消"狀態,并拋出異常 IllegalMonitorStateException
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 這里使用了當前的 state 作為 release 的參數,也就是完全釋放掉鎖,將 state 置為 0if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}
這里注意,考慮一下,如果一個線程在不持有 lock 的基礎上,就去調用 condition1.await() 方法,它能進入條件隊列,但是在上面的這個方法中,由于它不持有鎖,
release(savedState) 這個方法肯定要返回 false
,進入到異常分支,然后進入 finally 塊設置node.waitStatus = Node.CANCELLED
,這個已經入隊的節點之后會被后繼的節點”請出去“。
3. 等待進入阻塞隊列
釋放掉鎖以后,接下來是這段,這邊會自旋,如果發現自己還沒到阻塞隊列,那么掛起,等待被轉移到阻塞隊列。
int interruptMode = 0;
// 如果不在阻塞隊列中,注意了,是阻塞隊列
while (!isOnSyncQueue(node)) {// 線程掛起LockSupport.park(this);// 這里可以先不用看了,等看到它什么時候被 unpark 再說if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}
isOnSyncQueue(Node node) 用于判斷節點是否已經轉移到阻塞隊列了:
// 在節點入條件隊列的時候,初始化時設置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的時候需要將節點從條件隊列移到阻塞隊列,
// 這個方法就是判斷 node 是否已經移動到阻塞隊列了
final boolean isOnSyncQueue(Node node) {// 移動過去的時候,node 的 waitStatus 會置為 0,這個之后在說 signal 方法的時候會說到// 如果 waitStatus 還是 Node.CONDITION,也就是 -2,那肯定就是還在條件隊列中// 如果 node 的前驅 prev 指向還是 null,說明肯定沒有在 阻塞隊列(prev是阻塞隊列鏈表中使用的)if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果 node 已經有后繼節點 next 的時候,那肯定是在阻塞(同步)隊列了。條件隊列是使用 nextWaiter 作為后繼指針!!if (node.next != null) return true;// 下面這個方法從阻塞隊列的隊尾開始從后往前遍歷找,如果找到相等的,說明在阻塞隊列,否則就是不在阻塞隊列// 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞隊列嗎?答案是:不能。// 這個可以看上篇 AQS 的入隊方法,首先設置的是 node.prev 指向 tail,// 然后是 CAS 操作將自己設置為新的 tail,可是這次的 CAS 是可能失敗的。return findNodeFromTail(node);
}// 從阻塞隊列的隊尾往前遍歷,如果找到,返回 true
private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}
}
回到前面的循環,isOnSyncQueue(node) 返回 false 的話,那么進到 LockSupport.park(this);
這里線程掛起。
4. signal 喚醒線程,轉移到阻塞隊列
為了大家理解,這里我們先看喚醒操作,因為剛剛到 LockSupport.park(this); 把線程掛起了,等待喚醒。
喚醒操作通常由另一個線程來操作,就像生產者-消費者模式中,如果線程因為等待消費而掛起,那么當生產者生產了一個東西后,會調用 signal 喚醒正在等待的線程來消費。
// 喚醒等待了最久的線程
// 其實就是,將這個線程對應的 node 從條件隊列轉移到阻塞隊列
public final void signal() {// 調用 signal 方法的線程必須持有當前的獨占鎖if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}// 從條件隊列隊頭往后遍歷,找出第一個需要轉移的 node
// 因為前面我們說過,有些線程會取消排隊,但是可能還在隊列中
private void doSignal(Node first) {do {// 將 firstWaiter 指向 first 節點后面的第一個,因為 first 節點馬上要離開了// 如果將 first 移除后,后面沒有節點在等待了,那么需要將 lastWaiter 置為 nullif ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 因為 first 馬上要被移到阻塞隊列了,和條件隊列的鏈接關系在這里斷掉first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);// 這里 while 循環,如果 first 轉移不成功,那么選擇 first 后面的第一個節點進行轉移,依此類推
}// 將節點從條件隊列轉移到阻塞隊列
// true 代表成功轉移
// false 代表在 signal 之前,節點已經取消了
final boolean transferForSignal(Node node) {// CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消,// 既然已經取消,也就不需要轉移了,方法返回,轉移后面一個節點// 否則,將 waitStatus 置為 0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// enq(node): 自旋進入阻塞隊列的隊尾// 注意,這里的返回值 p 是 node 在阻塞隊列的前驅節點Node p = enq(node);int ws = p.waitStatus;// ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程。喚醒之后會怎么樣,后面再解釋// 如果 ws <= 0, 那么 compareAndSetWaitStatus 將會被調用,上篇介紹的時候說過,節點入隊后,需要把前驅節點的狀態設為 Node.SIGNAL(-1)if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 如果前驅節點取消或者 CAS 失敗,會進到這里喚醒線程,之后的操作看下一節LockSupport.unpark(node.thread);return true;
}
正常情況下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
這句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL)
會返回 true,所以一般也不會進去 if 語句塊中喚醒 node 對應的線程。然后這個方法返回 true,也就意味著 signal 方法結束了,節點進入了阻塞隊列。
假設發生了阻塞隊列中的前驅節點取消等待,或者 CAS 失敗,只要喚醒線程,讓其進到下一步即可。
喚醒后檢查中斷狀態
上一步 signal 之后,我們的線程由條件隊列轉移到了阻塞隊列,之后就準備獲取鎖了。只要重新獲取到鎖了以后,繼續往下執行。
等線程從掛起中恢復過來,繼續往下看
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 線程掛起LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}
先解釋下 interruptMode。interruptMode 可以取值為 REINTERRUPT(1),THROW_IE(-1),0
- REINTERRUPT: 代表 await 返回的時候,需要重新設置中斷狀態
- THROW_IE: 代表 await 返回的時候,需要拋出 InterruptedException 異常
- 0 :說明在 await 期間,沒有發生中斷
有以下三種情況會讓 LockSupport.park(this); 這句返回繼續往下執行:
- 常規路徑。signal -> 轉移節點到阻塞隊列 -> 獲取了鎖(unpark)
- 線程中斷。在 park 的時候,另外一個線程對這個線程進行了中斷
- signal 的時候我們說過,轉移以后的前驅節點取消了,或者對前驅節點的CAS操作失敗了
- 假喚醒。這個也是存在的,和 Object.wait() 類似,都有這個問題
線程喚醒后第一步是調用 checkInterruptWhileWaiting(node) 這個方法,此方法用于判斷是否在線程掛起期間發生了中斷,如果發生了中斷,是 signal 調用之前中斷的,還是 signal 之后發生的中斷。
// 1. 如果在 signal 之前已經中斷,返回 THROW_IE
// 2. 如果是 signal 之后中斷,返回 REINTERRUPT
// 3. 沒有發生中斷,返回 0
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;
}
Thread.interrupted()
:如果當前線程已經處于中斷狀態,那么該方法返回 true,同時將中斷狀態重置為 false,所以,才有后續的 重新中斷(REINTERRUPT) 的使用。
看看怎么判斷是 signal 之前還是之后發生的中斷:
// 只有線程處于中斷狀態,才會調用此方法
// 如果需要的話,將這個已經取消等待的節點轉移到阻塞隊列
// 返回 true:如果此線程在 signal 之前被取消,
final boolean transferAfterCancelledWait(Node node) {// 用 CAS 將節點狀態設置為 0 // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,因為如果 signal 先發生的話,signal 中會將 waitStatus 設置為 0if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {// 將節點放入阻塞隊列// 這里我們看到,即使中斷了,依然會轉移到阻塞隊列enq(node);return true;}// 到這里是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設置為了 0// signal 方法會將節點轉移到阻塞隊列,但是可能還沒完成,這邊自旋等待其完成// 當然,這種事情還是比較少的吧:signal 調用之后,沒完成轉移之前,發生了中斷while (!isOnSyncQueue(node))Thread.yield();return false;
}
這里再說一遍,即使發生了中斷,節點依然會轉移到阻塞隊列。
到這里,大家應該都知道這個 while 循環怎么退出了吧。要么中斷,要么轉移成功。
這里描繪了一個場景,本來有個線程,它是排在條件隊列的后面的,但是因為它被中斷了,那么它會被喚醒,然后它發現自己不是被 signal 的那個,但是它會自己主動去進入到阻塞隊列。
6. 獲取獨占鎖
while 循環出來以后,下面是這段代碼:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;
由于 while 出來后,我們確定節點已經進入了阻塞隊列,準備獲取鎖。
這里的 acquireQueued(node, savedState) 的第一個參數 node 之前已經經過 enq(node) 進入了隊列,參數 savedState 是之前釋放鎖前的 state,這個方法返回的時候,代表當前線程獲取了鎖,而且 state == savedState了。
注意,前面我們說過,不管有沒有發生中斷,都會進入到阻塞隊列,而 acquireQueued(node, savedState) 的返回值就是代表線程是否被中斷。如果返回 true,說明被中斷了,而且 interruptMode != THROW_IE,說明在 signal 之前就發生中斷了,這里將 interruptMode 設置為 REINTERRUPT,用于待會重新中斷。
繼續往下:
if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();
if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
本著一絲不茍的精神,這邊說說 node.nextWaiter != null
怎么滿足。我前面也說了 signal 的時候會將節點轉移到阻塞隊列,有一步是 node.nextWaiter = null
,將斷開節點和條件隊列的聯系。
可是,在判斷發生中斷的情況下,是 signal 之前還是之后發生的? 這部分的時候,我也介紹了,如果 signal 之前就中斷了,也需要將節點進行轉移到阻塞隊列,這部分轉移的時候,是沒有設置 node.nextWaiter = null
的。
之前我們說過,如果有節點取消,也會調用 unlinkCancelledWaiters
這個方法,就是這里了。
7. 處理中斷狀態
到這里,我們終于可以好好說下這個 interruptMode 干嘛用了。
- 0:什么都不做,沒有被中斷過;
- THROW_IE:await 方法拋出 InterruptedException 異常,因為它代表在 await() 期間發生了中斷;
- REINTERRUPT:重新中斷當前線程,因為它代表 await() 期間沒有被中斷,而是 signal() 以后發生的中斷
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();
}
* 帶超時機制的 await
經過前面的 7 步,整個 ConditionObject 類基本上都分析完了,接下來簡單分析下帶超時機制的 await 方法。
public final long awaitNanos(long nanosTimeout) throws InterruptedException
public final boolean awaitUntil(Date deadline)throws InterruptedException
public final boolean await(long time, TimeUnit unit)throws InterruptedException
這三個方法都差不多,我們就挑一個出來看看吧:
public final boolean await(long time, TimeUnit unit)throws InterruptedException {// 等待這么多納秒long nanosTimeout = unit.toNanos(time);if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);// 當前時間 + 等待時長 = 過期時間final long deadline = System.nanoTime() + nanosTimeout;// 用于返回 await 是否超時boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {// 時間到啦if (nanosTimeout <= 0L) {// 這里因為要 break 取消等待了。取消等待的話一定要調用 transferAfterCancelledWait(node) 這個方法// 如果這個方法返回 true,在這個方法內,將節點轉移到阻塞隊列成功// 返回 false 的話,說明 signal 已經發生,signal 方法將節點轉移了。也就是說沒有超時嘛timedout = transferAfterCancelledWait(node);break;}// spinForTimeoutThreshold 的值是 1000 納秒,也就是 1 毫秒// 也就是說,如果不到 1 毫秒了,那就不要選擇 parkNanos 了,自旋的性能反而更好if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;// 得到剩余時間nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;
}
超時的思路還是很簡單的,不帶超時參數的 await 是 park,然后等待別人喚醒。而現在就是調用 parkNanos 方法來休眠指定的時間,醒來后判斷是否 signal 調用了,調用了就是沒有超時,否則就是超時了。超時的話,自己來進行轉移到阻塞隊列,然后搶鎖。
* 不拋出 InterruptedException 的 await
public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();
}