充分了解 AbstractQueuedSynchronizer 對于深入理解并發編程是有益處的,它是用來構建鎖或者其他同步組件的基礎框架,我們常用的同步工具類如 CountDownLatch、Semaphore、ThreadPoolExecutor、ReentrantLock 和 ReentrantReadWriteLock 內部都用到了它。
以上提到的同步工具類,都是用靜態內部類繼承了 AQS。因此,平時在使用這些同步工具類時,我們感覺不到 AQS 的存在。
AbstractQueuedSynchronizer 中有一個很重要的變量 state:
/*** The synchronization state.* 當 state 為 0 時,表示鎖沒有被占用。*/private volatile int state;
不論是 JDK 還是我們自定義的鎖/工具類,都是在圍繞這個 state 做文章。比如上鎖時要將 state 置為 1,而解鎖時將其置為 0,只不過并不是由自定義的鎖直接操作,而是通過 AQS 去直接操作 state。
可以這樣理解鎖和 AQS 之間的關系:
- 鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程并行訪問),隱藏了實現細節。
- AQS 面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。
鎖和 AQS 很好地隔離了使用者和實現者所需關注的領域。
1、AQS 的使用方法
1.1 模板方法
AQS 使用了模板方法設計模式,要實現自定義的同步工具類,推薦使用靜態內部類繼承 AQS,并根據需求重寫對應的模板方法:
/*** 嘗試以獨占模式獲取同步狀態。該方法應該查詢對象的狀態是否允許在獨占模式下被獲取,* 如果允許就去獲取它。** 這個方法總是被執行獲取的線程調用。如果這個方法返回 false,* 獲取方法可能會將尚未排隊的線程排隊,直到其他線程發出釋放信號。* 這個可以用來實現 {@link Lock#tryLock()} 方法。* * @param arg 獲取參數。這個值總是傳遞給獲取方法,或者是在進入條件等待時保存。* 否則,該值是未解釋的,可以表示你喜歡的任何值。* @return true 表示成功。一旦成功,這個對象就被獲得了。* @throws IllegalMonitorStateException 如果獲取動作會將此同步器置于非法狀態就拋出這個異常。* 必須以一致的方式拋出此異常,才能使同步正常工作。* @throws UnsupportedOperationException 如果不支持獨占模式拋出此異常。**/protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}/*** 在獨占模式下,嘗試設置 state 狀態變量來反映釋放操作。** 這個方法總是被執行釋放操作的線程調用。** @param arg 釋放參數. 這個值總是傳遞給一個釋放方法,或者是進入等待條件時的當前狀態值。* 否則,該值是未解釋的,可以表示您喜歡的任何值。* @return 如果這個對象當前處于完全釋放的狀態,就返回 true,這樣在等待中的線程就可以嘗試* 去獲取。否則,返回 false。* * @throws IllegalMonitorStateException,UnsupportedOperationException* 同 tryAcquire() 方法*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 嘗試在共享模式下獲取。該方法應該查詢對象的狀態是否允許在共享模式下被獲取,* 如果允許就去獲取它。** 參數、返回值、拋出的異常同 tryAcquire(),只不過本方法是在共享模式下。*/protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}/*** 共享模式下的 tryRelease()。*/protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/*** 如果同步是被當前(調用)線程獨占的,就返回 true。* 每次調用 ConditionObject 類中非等待的方法時,都會調用此方法。(等待的方法會調用 release())* 這個方法只在 ConditionObject 的方法內部調用,因此如果沒用到 Condition 就不用定義這個方法。*/protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
除此之外,還有一些模板方法可能會在實現自定義鎖的過程中被調用到:
這些模板方法負責獨占式&共享式獲取與釋放同步狀態,同步狀態和查詢同步隊列中的等待線程情況。
此外,跟 state 相關的還有三個方法 getState()、setState()、compareAndSetState() 分別用來獲取同步狀態、設置同步狀態、使用 CAS 設置同步狀態。
1.2 舉例
如果要實現一個獨占鎖,就要重寫 tryAcquire()、tryRelease() 和 isHeldExclusively(),而實現共享鎖就要重寫 tryAcquireShared()、tryReleaseShared() 。
比如我要自定義一個不可重入的顯式獨占鎖,那么就要實現 Lock 接口,定義靜態內部類繼承 AQS,實現獲取鎖、釋放鎖等方法:
public class MyLock implements Lock {/* MyLock 有關鎖的操作僅需要將操作代理到 Sync 上即可*/private final Sync sync = new Sync();private final static class Sync extends AbstractQueuedSynchronizer {// 判斷處于獨占狀態@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}// 獲得鎖@Overrideprotected boolean tryAcquire(int i) {// CAS 操作設置 state 保證同步if (compareAndSetState(0, 1)) {// 設置占有獨占鎖的線程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 釋放鎖@Overrideprotected boolean tryRelease(int i) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 返回一個Condition,每個condition都包含了一個condition隊列public Condition newCondition() {return new ConditionObject();}}@Overridepublic void lock() {System.out.println(Thread.currentThread().getName() + " ready get lock");sync.acquire(1);System.out.println(Thread.currentThread().getName() + " already got lock");}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long timeout, TimeUnit timeUnit) throws InterruptedException {return sync.tryAcquireNanos(1, timeUnit.toNanos(timeout));}@Overridepublic void unlock() {System.out.println(Thread.currentThread().getName() + " ready release lock");sync.release(1);System.out.println(Thread.currentThread().getName() + " already released lock");}@Overridepublic Condition newCondition() {return sync.newCondition();}
}
Lock 接口中的方法實現都是借助于靜態內部類 Sync 的實例,調用相應的方法即可。在 lock() 和 unlock() 中分別調用 Sync 對象的 acquire() 和 release() 方法,其內部調用了 Sync 內部重寫的 tryAcquire() 和 tryRelease():
AbstractQueuedSynchronizer.java:public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
2、AQS 的基本思想 CLH 隊列鎖
AQS 以 CLH(Craig、Landin、Hagersten 三人名字首字母)隊列鎖為基本思想,而 CLH 隊列鎖是一種基于鏈表的可擴展、高性能、公平的 FIFO 自旋鎖。申請鎖的線程僅僅在本地變量上自旋,它不斷輪詢前驅的狀態,假設發現前驅釋放了鎖就結束自旋。
先把申請鎖的線程打包成一個 QNode,myPred 指向前驅節點,locked 表示當前線程是否需要鎖:
QNode 入隊要按照先后順序排列,當線程 A 需要獲取鎖時,要把包裝該線程的 QNode 節點加入到 CLH 隊列尾部,并且把 locked 設為 true:
隨后線程 B 也需要獲取鎖被加入到 CLH 隊列尾部:
加入隊列的 QNode 節點要自旋檢測前一個節點的 locked 是否變為了 false,變成 false 就認為前一個節點已經釋放了鎖,該輪到當前節點獲取鎖了:
QNode_A 拿到鎖后停止自旋。在線程執行完任務釋放鎖后,也要把 locked 置為 false。
AQS 在實現時是基于 CLH 算法但是又做了很多改進,例如在 QNode 中加入 next 域實現雙向隊列,自旋檢測鎖狀態時有次數限制等。
3、AQS 鎖流程源碼分析
AQS 使用頭指針(head)與尾指針(tail)參與管理這個雙向同步隊列。其一般工作流程為:
- 初始狀態下,head 與 tail 都為 null。當有線程獲取同步器失敗后,需要打包成 Node 節點進入同步隊列,此時先通過 new Node() 創建一個空節點作為頭節點,并且讓尾節點也指向這個初始頭節點。然后把入隊節點添加到尾節點之后,由于可能會有多個競爭失敗的線程所在的節點都要做入隊操作,因此添加到隊尾的這個動作要使用 CAS 原子操作,CAS 失敗的 Node 要自旋直到成功添加到隊尾為止。(addWaiter() 和 acquireQueued() 方法)
- 在同步隊列中等待獲取同步器的節點,也會進行自旋操作,嘗試再次獲取同步器。規則是只有當前節點的前驅節點是頭節點才有獲取同步器的資格,如果該節點成功獲取到,那么就釋放掉原來的頭節點,并把當前節點設置為新的頭節點(也就是說頭節點一般是占有鎖且正在執行線程任務的節點)。
- 隊列中的線程如果一直無法獲得同步器,那么在一定時間后就要先中斷,一定時間是通過前驅節點的等待狀態變化決定的。所有入隊節點的等待狀態 waitStatus 初始值都為 0,在上一步的自旋過程中,如果當前節點沒有獲取到同步器,那么就要給前驅節點的 waitStatus 設置為 SIGNAL 表示將要把后繼節點(前驅結點的后繼節點就是當前節點)阻塞。待自旋進入第二次循環時,當前節點如果還沒拿到同步器,并且檢查到它的前置節點已經是 SIGNAL 狀態了,就要使用 LockSupport.park() 暫停當前線程,直到它的前置節點執行完任務釋放同步器時再喚醒它。
3.1 acquire()
在自定義鎖時,需要調用 AQS 的 acquire() 來獲取鎖:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 其實就是調用 Thread.currentThread().interrupt()selfInterrupt();}
如果 tryAcquire() 返回 true 說明拿到了鎖,拿鎖過程直接結束,后續代碼就不會執行。否則,要先后執行 addWaiter() 和 acquireQueued()。
3.2 addWaiter()
addWaiter() 會在線程獲取鎖失敗后,將該線程封裝進 Node 并將其入隊:
/*** 等待隊列的隊尾,延遲初始化。只有在隊尾添加新的等待 Node 時* 才由 enq 方法修改。*/private transient volatile Node tail;/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {// 把當前線程包裝成一個 NodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {// 如果尾節點不為空,就讓 node 的 prev 字段指向這個尾節點 pred。node.prev = pred;// CAS 操作把 node 設置為隊列尾節點if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果隊列沒有初始化,則執行初始化操作,否則 CAS 把 node 加入到隊列。enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {// 自旋,直到成功把 node 添加為隊列尾節點。for (;;) {Node t = tail;if (t == null) { // Must initialize// 原子操作設置頭節點if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 原子操作設置尾節點if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
addWaiter() 內部在添加一個 Node 到隊尾時,會先直接用 compareAndSetTail() 嘗試將 Node 加到隊尾。如果嘗試失敗,才會在 enq() 內自旋,直到成功的將 Node 加到隊尾。
3.3 acquireQueued()
acquireQueued() 負責在獨占且不可中斷模式下,為已經在隊列中的線程獲取同步器。該方法會返回一個布爾類型值 interrupted 表示當前線程是否應該被中斷:
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {// 因為 acquire() 的 if 語句中 tryAcquire() 失敗了,因此這里的 failed 初始化為 true。boolean failed = true;try {boolean interrupted = false;// 自旋,再次用 tryAcquire() 嘗試拿鎖for (;;) {final Node p = node.predecessor();// 如果當前節點的前驅結點是頭節點,并且當前節點拿鎖成功if (p == head && tryAcquire(arg)) {// 把當前節點設為頭節點,并把原來的頭節點從隊列中刪除并返回 interrupted。setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 檢查前驅節點狀態,并執行中斷操作if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 如果失敗,就取消當前節點獲取鎖的操作。if (failed)cancelAcquire(node);}}
shouldParkAfterFailedAcquire() 會根據前驅節點的 waitStatus 狀態判斷是否應該在當前節點獲取鎖失敗之后中斷當前線程,返回 true 表示應該中斷:
/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前驅節點狀態如果已經是 SIGNAL,就應該中斷當前線程int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 前驅節點狀態值大于 0 說明其取消獲取線程,從隊列中移除這些節點if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
pred 是 node 的前驅結點,根據前驅結點的狀態做不同的操作:
- 如果是 Node.SIGNAL,那么就返回 true 表示前驅節點的后繼節點,也就是當前節點需要被 park;
- 如果狀態數值大于 0,說明前驅節點處于取消狀態,那么就要向前找,一直找到一個不是取消狀態的節點,讓這個節點作為 node 的前驅節點;
- 如果是 0 或者 PROPAGATE(-3),就用 CAS 操作把這個前驅節點的狀態值設置為 Node.SIGNAL。
這里注意,每個節點的 waitStatus 初始值都為 0,在 acquireQueued() 的 for 循環中,第一次循環假設仍沒有拿到鎖,那么進入到 shouldParkAfterFailedAcquire(),把前驅結點的 waitStatus 置為 Node.SIGNAL 并返回 false;然后執行第二次 for 循環,假設也沒拿到鎖,那么 shouldParkAfterFailedAcquire() 就會返回 true,使得 parkAndCheckInterrupt() 得以執行:
/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
其實就是使用 LockSupport.park() 暫停當前線程,并返回線程狀態。如果線程已經中斷,那么設置 interrupted 為 true。
3.4 release()
釋放鎖的操作就很簡單了:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 傳入的 node 是頭節點,要找到頭節點后第一個不是取消狀態的節點,喚醒其中的線程。Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
tryRelease() 為 true 表示成功釋放了同步器,那么就要用 LockSupport.unpark() 把當前節點(即頭節點)后第一個不是 CANCELLED 狀態的節點(如果有的話)中的線程喚醒。
4、總結
AQS 是公平鎖(因為新來的節點都是加到隊尾)。公平鎖與非公平鎖的實現幾乎一模一樣,在 ReentrantLock 中,提供了 FairSync 和 NonfairSync 兩個內部類,分別實現公平鎖和非公平鎖。比較這兩個內部類的代碼,幾乎一模一樣,只不過在 tryAcquire() 拿鎖時,公平鎖要先判斷隊列中是否有前驅節點已經在等待中(hasQueuedPredecessors())。
不可重入鎖在同一線程做遞歸調用時會發生死鎖,需要在自定義鎖時對 state 做特殊處理(即實現可重入鎖,一般都是要實現可重入鎖的)。
AQS 參考資料:
AbstractQueuedSynchronizer的介紹和原理分析
深入理解AbstractQueuedSynchronizer(AQS)
[官方文檔AbstractQueuedSynchronizer](