
1 前言
鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖能防止多個線程同時訪問共享資源(但是有些鎖可以允許多個線程并發的訪問共享資源,如讀寫鎖)。在以前,Java程序是靠synchronized來實現鎖功能的,而在Java SE 5之后,并發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,他提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式的獲取鎖和釋放鎖,雖然它缺少了synchronized提供的隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取和釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字不具備的同步特性。很多鎖都通過實現Lock接口來完成對鎖的操作,比如可重入鎖(ReentrantLock)、前一張講的Redisson分布式鎖等,而Lock接口的實現,基本是都是通過聚合了一個同步器的子類來完成線程訪問控制的,而同步器,就是我們常說的AQS(AbstractQueuedSynchronizer),也是今天要記錄的內容。
2 什么是AQS
AQS(隊列同步器AbstractQueuedSynchronizer)是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int成員變量來表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。

如上圖所示,同步器的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態,在抽象方法的實現過程中免不了要對同步狀態進行修改,這時就需要使用同步器提供的3個方法來進行操作:
1、getState():獲取當前同步狀態

2、setState():設置當前同步狀態

3、compareAndSetState(int expect, int update):通過CAS設置當前狀態,該方法能保證狀態設置的原子性

子類推薦被定義為自定義同步組件的靜態內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用。同步器既可以支持獨占式獲取同步狀態(單個線程獲取鎖),也可以支持共享式獲取同步狀態(多個線程獲取到鎖),這樣就可以方便實現不同類型的同步組件(如上圖所示的可重入鎖:ReentrantLock、可重入讀寫鎖:ReentrantReadWriteLock、計數器:CountDownLatch等等)
3 同步器可重寫的方法
以下代碼為可重入鎖繼承同步器后重寫的方法:
- protected boolean tryAcquire(int acquires):獨占式獲取同步鎖狀態,實現該方法需要查詢當前狀態并判斷同步狀態是否符合預期,然后再進行CAS設置同步狀態。
/*** 非公平鎖*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {// nonfairTryAcquire方法和下面的公平鎖方法除了判斷是否在隊列首位之外沒有不同return nonfairTryAcquire(acquires);}}/*** 公平鎖*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// 獲取當前同步狀態int c = getState();// 判斷是否符合預期if (c == 0) {// 判斷是否在隊列首位并且CAS設置當前狀態成功if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
- protected boolean tryRelease(int acquires):獨占式釋放同步鎖狀態,等待獲取同步狀態的線程將有機會獲取同步狀態。
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
- protected boolean isHeldExclusively():當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程鎖獨占。
protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}
以下代碼為讀寫鎖繼承同步器后重寫的方法:
- protected int tryAcquireShared(int arg):共享式獲取同步狀態,返回大于0的值,表示獲取鎖成功,反之獲取鎖失敗。
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 獲取當前同步狀態int c = getState();// 如果有線程持有寫鎖,則返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 獲取持有鎖的線程數int r = sharedCount(c);// 判斷是否阻塞,判斷線程數量,判斷CAS設置是否成功if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 當前線程是第一個獲取到鎖的線程if (r == 0) {firstReader = current;firstReaderHoldCount = 1;// 否則就++} else if (firstReader == current) {firstReaderHoldCount++;} else { HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}// 死循環去獲取鎖return fullTryAcquireShared(current);}
- protected boolean tryReleaseShared(int arg):共享式釋放同步狀態。
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 對應上面獲取鎖來讀就好了if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}
上面例子,因為讀寫鎖是共享鎖,可重入鎖是獨占鎖,而同步器對于共享鎖和獨占鎖都提供了可重寫的方法來獲取鎖或者釋放鎖,所以分了兩個例子來寫。
4 同步器提供的模版方法
- void acquire(int arg):獨占式獲取同步狀態,如果當前線程獲取同步狀態成功則返回,否則,將會進入同步隊列等待。
public final void acquire(int arg) {// 如果獲取鎖失敗,則加入同步隊列,如果加入同步隊列成功則自旋阻塞喚醒來不斷的嘗試獲取鎖,直到線程被中斷或獲取到鎖if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
- void acquireInterruptibly(int arg):與上面acquire相似,但是當前方法如果在獲取鎖的過程中線程中斷會拋出InterruptedException并返回。
public final void acquireInterruptibly(int arg) throws InterruptedException {// 線程中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 如果沒有獲取到鎖if (!tryAcquire(arg))// 不斷自旋嘗試獲取鎖doAcquireInterruptibly(arg);}
- boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly()方法的基礎上增加了超時時間,如果在超時時間內獲取到了鎖,則返回true,否則返回false。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {// 中斷拋異常if (Thread.interrupted())throw new InterruptedException();// 相應時間內不斷獲取鎖,超時返回falsereturn tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}
- void acquireShared(int arg):共享式的獲取同步狀態,如果當前線程未獲取到同步狀態,則會進入同步隊列等待,與獨占鎖的主要區別是在同一時刻可以有多少個線程獲取到同步狀態。
public final void acquireShared(int arg) {// 如果獲取鎖失敗,則不斷自旋嘗試獲取鎖,tryAcquireShared方法在上面有講if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
- void acquireSharedInterruptibly(int arg):與acquireShared方法相似,只是如果線程中斷,當前方法會拋出異常。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 如果獲取鎖失敗,則不斷自旋嘗試獲取鎖if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
- boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly方法的基礎上增加了超時時間。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {// 中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 在超時時間內如果獲取鎖失敗,則不斷自旋嘗試獲取鎖return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
- boolean release(int arg):獨占式釋放同步狀態,該方法在釋放同步狀態之后,會將隊列中的第一個節點包含的線程喚醒。
public final boolean release(int arg) {// 如果獲取鎖成功if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 喚醒第一個節點的線程unparkSuccessor(h);return true;}return false;}
- boolean releaseShared(int arg):共享式釋放同步狀態。
public final boolean releaseShared(int arg) {// 如果釋放鎖成功if (tryReleaseShared(arg)) {// 喚醒線程doReleaseShared();return true;}return false;}
- Collection getQueuedThreads():獲取等待在同步隊列上的線程集合。
public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list;}
同步器提供的模板方法基本是分為3類:
- 獨占式獲取與釋放同步狀態
- 共享式獲取與釋放同步狀態
- 查詢同步隊列中的等待線程集合
5 根據同步器自定義同步組件
上面介紹了一些AQS提供的可重寫方法和模板方法,接下來我們自定義一個獨占鎖(在同一時刻只有一個線程能獲取鎖,其他獲取鎖的線程只能處于同步隊列中,當獲取到鎖的線程釋放鎖之后,后面的線程才能夠獲取鎖)
public class ExclusiveLock implements Lock {/*** 自定義同步器*/private static class Sync extends AbstractQueuedSynchronizer{// 判斷是否處于占用狀態@Overrideprotected boolean isHeldExclusively(){return getState() == 1;}// 加鎖@Overrideprotected boolean tryAcquire(int arg) {// 通過CAS設置同步狀態(設置成功返回true 設置失敗返回false)if (compareAndSetState(0, 1)){setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 釋放鎖@SneakyThrows@Overrideprotected boolean tryRelease(int arg) {// 如果同步狀態為未獲取鎖,則拋出異常,沒有線程獲取到鎖,不能釋放鎖if (getState() == 0){throw new IllegalAccessException();}// 釋放鎖setExclusiveOwnerThread(null);setState(0);return true;}// 返回一個Condition,每個Condition都包含一個Condition隊列protected Condition newCondition() {return new ConditionObject();}}private final Sync sync = new Sync();@Overridepublic void lock() {// 獨占式加鎖sync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {// 加鎖線程中斷拋出異常,否則自旋加鎖sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {// 加鎖成功返回true,否則設置占用排它鎖的線程是當前線程return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// 加鎖線程中斷拋出異常,否則在有效時間內嘗試自旋加鎖return sync.tryAcquireSharedNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {// 釋放鎖sync.release(1);}@Overridepublic Condition newCondition() {// 返回Conditionreturn sync.newCondition();}
}
如上代碼,我們自定義了一個獨占鎖,它在同一時刻只允許一個線程占有鎖。sync內部類繼承了同步器并實現了獨占式獲取和釋放同步狀態。在tryAcquire(int arg)方法中,如果通過CAS設置成功,則代表獲取了同步狀態,而在tryRelease(int arg)方法中只是將同步狀態重制為0。用戶在使用ExclusiveLock時并不會直接和內部同步器打交道,而是調用ExclusiveLock提供的方法即可,如加鎖調用lock()方法,如果獲取鎖失敗則會被加入同步隊列中,釋放鎖調用unlock()方法,如果沒有線程獲取鎖的時候釋放鎖會拋出異常,還可以按指定時間嘗試獲取鎖等等。
結尾
本來想把同步器實現原理也寫一些的,結果看了一下篇幅好想有些許長,那就分兩篇來寫把,如果看完感覺有幫助的,請幫忙點個贊,謝謝各位,有緣下篇文章再見!