簡介
JUC的核心是AQS,大部分鎖都是基于AQS擴展出來的,這里先結合可重入鎖和AQS,做一個講解,其它的鎖的實現方式也幾乎類似
ReentrantLock和AQS
AQS的基本結構
AQS,AbstractQueuedSynchronizer,抽象隊列同步器,JUC中的基礎組件,基于AQS,JUC實現了多種鎖和同步工具。
AQS在設計模式是采用了模板方法設計模式,要想基于AQS實現一個同步工具,需要繼承AQS,同時實現所有protected權限的方法,這些方法定義了如何獲取鎖(獨占鎖、共享鎖),AQS負責整體流程的編排,同時維護阻塞隊列、線程的阻塞和喚醒。
AQS用于實現依賴單個原子值去表示狀態的同步器
1、AQS的繼承體系:
// AQS:AQS繼承了AbstractOwnableSynchronizer,并且AQS本身是一個抽象類
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable { }// AQS的父類,AbstractOwnableSynchronizer,只有一個成員變量,用于存放持有獨占鎖的線程。
// 一個線程可以獨占的同步器。本類為創建可能包含所有權概念的鎖和相關同步器提供了基礎
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {protected AbstractOwnableSynchronizer() { }// 持有排他鎖的線程private transient Thread exclusiveOwnerThread;protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}
2、AQS的體系結構
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {// protected權限的構造方法,只允許子類繼承和調用protected AbstractQueuedSynchronizer() { }// 定義了隊列中的節點,節點中封裝了線程對象和指向前后節點的指針。AQS中沒有隊列實例,// 而是通過包裝線程為Node類,用前、后節點指針來實現一個虛擬的雙向隊列。static final class Node {/* 兩個常量,定義了鎖的模式 */// 鎖是共享模式static final Node SHARED = new Node();// 鎖是獨占模式static final Node EXCLUSIVE = null;Node nextWaiter; // 鎖的模式,為null,獨占模式,SHARED,共享模式;同時,Condition使用它構建條件隊列// 節點間組成一個雙向鏈表volatile Node prev;volatile Node next;// 節點中封裝的線程volatile Thread thread;}// 雙向鏈表的頭結點private transient volatile Node head;// 雙向鏈表的尾結點private transient volatile Node tail;// 鎖的狀態,這個狀態由子類來操作private volatile int state;/* 模板方法:交給子類實現的方法 */// 嘗試獲取獨占鎖protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}// 嘗試釋放獨占鎖protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}// 嘗試獲取共享鎖protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}// 嘗試釋放共享鎖protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}// 判斷當前鎖是否是獨占模式protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}/* 父類中負責流程編排 */// 獲取獨占鎖,先調用交給子類實現的tryAcquire方法,如果獲取鎖失敗,進入隊列,然后等待public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}// 釋放獨占鎖,先調用交給子類實現的tryRelease方法,如果釋放成功,當前節點移出隊列,然后喚醒隊列中的第一個節點public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// 獲取共享鎖public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}// 釋放共享鎖public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
}
從AQS的基本結構中可以看到:
- AQS的隊列,實際是一個雙向鏈表。
- AQS在設計上使用了模板方法模式,父類中定義好流程,然后再定義好交給子類實現的模板方法。在當前案例中,交給子類實現的是獲取鎖、釋放鎖的方式,父類來維護阻塞隊列、線程的阻塞和喚醒。通過這種方式,子類指定鎖的獲取和釋放的方式,子類可以實現多種類型的鎖,比如公平鎖、非公平鎖、讀寫鎖、并發度控制(信號量)等。
AQS中提供了兩套模式:獨占模式和共享模式
- 獨占模式:exclusive, 同一時間只有一個線程能拿到鎖執行
- 共享模式:shared, 同一時間有多個線程可以拿到鎖協同工作
子類可以選擇獨占模式、也可以選擇共享模式,例如,讀寫鎖中,讀鎖就是共享模式,寫鎖就是獨占模式,可重入鎖也是獨占模式。
ReentrantLock的基本結構
1、ReentrantLock的繼承體系:ReentrantLock實現了Lock接口,并且支持序列化
public class ReentrantLock implements Lock, java.io.Serializable {
Lock接口:定義了一個鎖應該具備的功能
public interface Lock {/* 獲取鎖 */// 獲取鎖void lock();// 可打斷地獲取鎖void lockInterruptibly() throws InterruptedException;// 不阻塞地獲取鎖,如果獲取不到,立刻返回boolean tryLock();// 獲取鎖,指定超時時長boolean tryLock(long time, TimeUnit unit) throws InterruptedException;// 釋放鎖void unlock();// 獲取條件變量,條件變量是指調用wait方法、notify方法的鎖對象,ReentrantLock可以實現在多個條件變量上等待和喚醒Condition newCondition();
}
2、ReentrantLock的類結構
public class ReentrantLock implements Lock, java.io.Serializable {// 同步器:抽象靜態內部類,定義了獲取鎖、釋放鎖的功能。同步器繼承了AQS,AQS是juc的核心。abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;// 獲取鎖的功能,交給子類擴展abstract void lock();// 釋放鎖protected final boolean tryRelease(int releases) { } // 這里暫時不關心它的實現}// 同步器的實例private final Sync sync;// 非公平鎖static final class NonfairSync extends Sync { }// 公平鎖static final class FairSync extends Sync { }// 構造方法public ReentrantLock() {sync = new NonfairSync(); // 默認使用非公平鎖}// 獲取鎖public void lock() {sync.lock();}// 釋放鎖public void unlock() {sync.release(1);}
}
總結:
- ReentrantLock內部定義了一個同步器,并且它的所有功能實際上都是委托給同步器來完成。
- 同步器把獲取鎖的方法定義為抽象方法,同時實現了釋放鎖的方法,在同步器的基礎上,擴展出了兩個子類,公平鎖、非公平鎖,公平鎖和非公平鎖只是獲取鎖的方式不同,其它都相同。ReentrantLock默認使用非公平鎖
- 同步器繼承了AQS
ReentrantLock是如何獲取鎖的?
ReentrantLock內部實現了公平鎖和非公平鎖兩種獲取鎖的模式,默認使用非公平鎖,這里分別講解它們的工作機制。
非公平鎖是如何獲取鎖的?
源碼:
1、整體流程
// 這是同步器的子類NonfairSync中獲取鎖的方法
final void lock() {// 獲取鎖:嘗試使用cas算法改變狀態變量,把它的值從0改為1,改變成功,表示獲取到鎖if (compareAndSetState(0, 1))// 獲取到鎖后,把當前線程設置為獨占線程setExclusiveOwnerThread(Thread.currentThread());else// 沒有獲取到鎖:調用AQS中的方法,進入阻塞隊列acquire(1);
}
這個方法中調用的方法都是來自AQS。從方法的實現步驟來看,非公平鎖在獲取鎖時,先嘗試直接獲取鎖,獲取不到,再進入阻塞隊列,符合之前提到的非公平鎖的原理。
2、AQS中acquire方法:它在AQS中定義了獲取獨占鎖的流程
// 整體流程
public final void acquire(int arg) {// 嘗試獲取鎖:首先調用交給子類實現的的tryAcquire方法,嘗試獲取鎖,如果獲取成功,直接進入同步代碼塊,if (!tryAcquire(arg) && // 如果沒有獲取到鎖,進入阻塞隊列:如果獲取鎖失敗,線程進入隊列,阻塞,入隊之前還會嘗試再次獲取鎖,// 成功則不入隊acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 為線程打上中斷狀態,因為acquireQueued方法中會清除線程的中斷標識,所以這里需要重新為線程打上中斷標識selfInterrupt();
}
2.1、交給子類實現的tryAcquire方法:嘗試獲取鎖
// 定義在NonfairSync中
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}// 定義在Sync中
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 表示無鎖// 嘗試獲取鎖,獲取鎖就是通過cas算法改變state變量的狀態,改變成功,就是獲取到鎖if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 如果獲取鎖的線程是當前線程,重入int nextc = c + acquires; // 狀態加1if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc); // 更新狀態return true;}return false;
}
2.2、獲取鎖失敗后進入阻塞隊列的邏輯:
// 1、addWaiter方法:創建節點并且把節點加入到隊列中
private Node addWaiter(Node mode) { // 這里的參數mode表示節點的模式,這里是獨占模式// 將當前線程封裝成一個node節點Node node = new Node(Thread.currentThread(), mode);/* 下面是操作雙向鏈表的代碼,將新節點加入到隊列的尾部,隊列中的頭結點是一個虛擬節點 */Node pred = tail;if (pred != null) { // 如果當前隊列中有節點node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node); // 當前隊列中沒有節點,那么需要創建虛擬頭結點,然后新節點入隊return node;
}// 節點入隊的邏輯
private Node enq(final Node node) {for (;;) {// 自旋// 獲取當前隊列中的尾節點Node t = tail;if (t == null) { // Must initialize // 初始化一個空節點 new Node(),作為隊列中的哨兵節點if (compareAndSetHead(new Node())) // cas操作,設置頭節點為空節點tail = head;// 初始化空節點后,因為此時隊列中只有一個節點,所以head和tail都指向這一個節點} else {// 第二次循環,將節點掛載到空節點上node.prev = t;// 將用戶傳入的node節點設置為尾節點if (compareAndSetTail(t, node)) {t.next = node;return t; // 返回前一個節點}}}
}
2.3、線程進入阻塞的邏輯:
// 2、acquireQueued,線程進入阻塞
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) { // 自旋// 獲取當前節點的上一個節點final Node p = node.predecessor();// 判斷前一個節點是不是頭節點(隊列中使用一個虛擬節點作為頭結點,所以當前節點的前一個節點是頭結點,// 那么當前節點就是事實上的頭結點),如果是,嘗試搶占鎖資源if (p == head && tryAcquire(arg)) {// 搶到鎖資源后,這里可以理解為當前節點出隊,將當前節點設置為隊列的虛擬頭結點,// 所以線程獲取到鎖之后,就會被移出阻塞隊列setHead(node);p.next = null; // help GC,將原先的頭結點清除failed = false;return interrupted;}// 搶鎖失敗if (shouldParkAfterFailedAcquire(p, node) && // 判斷是否應該阻塞當前線程parkAndCheckInterrupt()) // 調用LockSupport中的park方法,阻塞當前線程interrupted = true;}} finally {if (failed)cancelAcquire(node); // 發生異常,當前節點出隊}
}// 判斷是否應該阻塞當前線程:shouldParkAfterFailedAcquire
// 第一個參數是當前節點的上一個節點,第二個參數是當前節點,
// 總結:如果上一個節點的狀態是SIGNAL,那么當前節點應該阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 這個方法中需要判斷節點的狀態。表示狀態的常量:// static final int CANCELLED = 1;:表示當前節點的線程已經被取消// static final int SIGNAL = -1;:表示當前節點的后繼節點的線程正在等待喚醒// static final int CONDITION = -2;:節點在條件隊列中,節點線程等待在Condition上,// 不過當其他的線程對Condition調用了signal()方法后,該節點就會從等待隊列// 轉移到同步隊列中,然后開始嘗試對同步狀態的獲取// static final int PROPAGATE = -3;:表示下一個共享狀態應該被無條件傳播int ws = pred.waitStatus;// 判斷上一個節點的狀態if (ws == Node.SIGNAL)return true; // 如果前驅節點狀態為SIGNAL,當前節點需要被阻塞if (ws > 0) { // 如果前驅節點的狀態是CANCELLED,需要把CANCELLED狀態的節點移除出隊列do {node.prev = pred = pred.prev; // 前一個節點前移,同時當前節點指向前前一個節點} while (pred.waitStatus > 0);pred.next = node; // 將當前節點連接到新的前驅節點} else {// 將前驅節點狀態設置為SIGNAL,當下一次執行這個方法時,// 因為ws == SIGNAL狀態成立,所以下一次會執行返回truecompareAndSetWaitStatus(pred, ws, Node.SIGNAL); }return false;
}// parkAndCheckInterrupt:阻塞當前線程
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); // 這個方法會清除中斷標記
}
2.4、發生異常,節點出隊的方法:
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessorsNode pred = node.prev;while (pred.waitStatus > 0) // 如果前一個節點的狀態是CANCELLED,需要移除前一個節點,重新構建隊列node.prev = pred = pred.prev; // 前一個節點前移,同時當前節點指向前前一個節點Node predNext = pred.next; // 循環過后前面的節點的下一個節點node.waitStatus = Node.CANCELLED;// 如果當前節點是尾結點,將尾結點更新為前一個節點if (node == tail && compareAndSetTail(node, pred)) {// 同時前一個節點的next指針指向nullcompareAndSetNext(pred, predNext, null);} else {// 如果后序節點需要被喚醒int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {// 進入if分支,證明:前驅節點不是頭節點、前驅節點的狀態是SIGNAL或可以被設置為SIGNAL、// 前驅節點仍然有線程關聯,移除當前節點即可Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next); // 從隊列中移除當前節點} else {// 進入else分支,證明當前節點的前驅節點無法正常處理后續節點的喚醒邏輯,因此在這里喚醒后續節點unparkSuccessor(node); // 喚醒后續節點}node.next = node; // help GC}
}// 喚醒后續節點
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 遍歷,從尾結點開始,找到當前節點之后第一個狀態不為CANCELLED的節點Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 喚醒該節點
}
總結:用一張流程圖來總結代碼中的流程
1、lock方法:嘗試獲取鎖的流程
2、acquire方法的流程
公平鎖是如何獲取鎖的?
源碼:
// FairSync:這是同步器的子類FairSync中獲取鎖的方法
final void lock() {// 直接進入阻塞隊列,也符合公平鎖的原理,acquire中會調用tryAcquire方法,// 公平鎖和非公平鎖都分別實現了這個方法,定制自己獲取鎖的邏輯acquire(1);
}// FairSync 公平鎖中獲取鎖的流程
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { // 如果無鎖if (!hasQueuedPredecessors() && // 等待隊列中沒有節點compareAndSetState(0, acquires)) { // 獲取鎖成功setExclusiveOwnerThread(current); // 設置當前線程為擁有獨占鎖的線程return true;}}else if (current == getExclusiveOwnerThread()) { // 鎖的可重入邏輯int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
公平鎖不像非公平鎖那樣,先嘗試獲取鎖,而是按照流程,判斷阻塞隊列中有沒有節點,如果有,進入阻塞隊列
ReentrantLock是如何釋放鎖的?
公平鎖和非公平鎖釋放鎖的原理是一樣的,這里統一講解。
調用AQS中的release方法,來完成釋放鎖的邏輯
public final boolean release(int arg) {// tryRelease,交給子類實現的模板方法,執行釋放鎖的邏輯,如果返回true,表示鎖釋放成功if (tryRelease(arg)) {// 處理等待隊列,喚醒后繼節點Node h = head;if (h != null && h.waitStatus != 0)// 喚醒后繼節點,這里是喚醒隊列中的第一個節點unparkSuccessor(h); // 這個方法在前面有提到return true;}return false;
}// ReentrantLock中釋放鎖的邏輯,同樣的,改變狀態,將獨占鎖標識改為null
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 改變狀態,這里包含可重入的邏輯if (Thread.currentThread() != getExclusiveOwnerThread()) // 判斷,如果當前線程沒有持有鎖throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {// 確認是釋放鎖,將獨占鎖的標識置為null,此前它是當前線程free = true;setExclusiveOwnerThread(null);}setState(c); // 設置狀態return free;
}
Condition 源碼
1、ReentrantLock中獲取Condition的方法:
// ReentrantLock中的方法:
public Condition newCondition() {return sync.newCondition();
}
// 同步器中的方法:
final ConditionObject newCondition() {return new ConditionObject();
}
可以看到,最終是獲取了一個ConditionObject的實例
2、ConditionObject:它被定義在AQS中,實現了Condition接口。
Condition接口:條件對象,條件對象從Object類中提取出了wait、notify、notifyAll方法的功能,使得一個鎖對象上可以支持多個等待隊列。
public interface Condition {// 使當前線程進入等待狀態,直到被喚醒或調用interrupt方法void await() throws InterruptedException;// 等待,直到被喚醒void awaitUninterruptibly();// 等待,并且指定時長long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;// 喚醒單個線程void signal();// 喚醒全部線程void signalAll();
}
2、ConditionObject的整體結構:它是AQS的成員內部類
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/* 維護一個等待隊列 */// 頭節點private transient Node firstWaiter;// 尾節點private transient Node lastWaiter;
}
3、阻塞邏輯的實現:
// await方法:線程入隊,然后調用park方法,進入等待狀態
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 向條件隊列添加一個節點Node node = addConditionWaiter();int savedState = fullyRelease(node); // 完全釋放當前線程持有的鎖int interruptMode = 0;// 判斷當前節點是否在同步隊列while (!isOnSyncQueue(node)) {// 如果不在,進入阻塞狀態LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 喚醒后,沒有進入上面的while,證明節點在同步隊列中,嘗試重新獲取鎖if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // 判斷節點是否仍在條件隊列中unlinkCancelledWaiters(); // 清理隊列中所有被取消的節點,確保隊列的正確性if (interruptMode != 0)reportInterruptAfterWait(interruptMode); // 處理中斷邏輯,被外部中斷后,是拋異常還是繼續進行
}// 清理隊列中失效的節點:這里實際上是處理單向鏈表的邏輯,等待隊列中,使用nextWaiter指向下一個節點
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) { // 從頭結點開始遍歷Node next = t.nextWaiter; // 下一個節點if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t; // 用作下一次循環時記錄前一個節點t = next; // 頭結點后移}
}
3.1、添加線程到條件隊列,條件隊列是一個單向鏈表
private Node addConditionWaiter() {Node t = lastWaiter;// 這里先判斷尾結點的狀態,如果它不是CONDITION狀態if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters(); // 清理隊列中失效的節點t = lastWaiter;}// 節點入隊Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;
}
3.2、釋放鎖的邏輯
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) { // 釋放鎖failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED; // 如果釋放失敗,將節點狀態改為CANCELLED}
}
4、喚醒邏輯的實現:
// signal方法:喚醒線程
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first); // 喚醒隊列中的第一個節點,方法中會調用LockSupport.unpark方法
}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null) // 頭結點后移lastWaiter = null; // 如果等待隊列中只有一個節點,尾結點置為空first.nextWaiter = null; // 原先的頭結點出隊} while (!transferForSignal(first) && // 喚醒頭結點所在的線程,如果喚醒成功,退出循環(first = firstWaiter) != null);
}// 喚醒頭結點所在的線程
final boolean transferForSignal(Node node) {// 狀態轉換,由CONDITION變為初始狀態if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); // 節點加入同步隊列,這里返回的是當前節點的前一個節點int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前一個節點的狀態無法被轉換為SIGNALLockSupport.unpark(node.thread); // 手動喚醒當前節點return true;
}
總結:Condition的工作機制
- Condition的實現類ConditionObject,是AQS的內部類,所以它會持有AQS的引用,在當前案例中,AQS是通過子類ReentrantLock實例化的,所以Condition實際上持有ReentrantLock的實例。成員內部類的實例依賴于外部類的實例,通過ReentrantLock創建ConditionObject的實例。
- Condition內部會維護一個自己的等待隊列,它又持有ReentrantLock的實例,所以它可以操作ReentrantLock的同步隊列
- 用戶通過Condition實例調用await方法,condition會把用戶線程加入到自己的條件隊列中,然后阻塞
- 用戶通過Condition實例調用signal方法,condition會把用戶線程從自己的條件隊列中移除,然后加入到ReentrantLock的同步隊列中,然后喚醒用戶線程
- 用戶線程被喚醒后,判斷自己是不是在同步隊列中,如果在,搶鎖,如果不在,繼續阻塞。搶到鎖之后,會額外判斷,如果當前線程還在條件隊列中,會清理條件隊列中失效的節點
總結
這里介紹了可重入鎖和AQS是如何配合在一起工作的,它們的設計模式,哪些功能被定義在可重入鎖中,哪些功能被定義在AQS中,后面介紹的幾個工具也是這么實現的,模式基本相同。
ReentrantReadWriteLock 源碼
讀寫鎖,讀鎖是共享鎖,寫鎖是獨占鎖,和之前的ReentrantLock類似的一點,ReentrantLock本身可以被理解為是一個寫鎖
ReentrantReadWriteLock的基本結構
1、ReentrantReadWriteLock的繼承體系:ReentrantReadWriteLock實現了讀寫鎖(ReadWriteLock)接口,并且支持序列化
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {
ReadWriteLock接口:定義了讀鎖和寫鎖
public interface ReadWriteLock {// 讀鎖Lock readLock();// 寫鎖Lock writeLock();
}
2、ReentrantReadWriteLock的類結構
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {// 同步器abstract static class Sync extends AbstractQueuedSynchronizer {/* 同步器中定義的抽象方法 */// 判斷讀是否應該阻塞abstract boolean readerShouldBlock();// 判斷寫是否應該阻塞abstract boolean writerShouldBlock();}// 非公平鎖static final class NonfairSync extends Sync {final boolean writerShouldBlock() {return false; // 非公平鎖,寫鎖可以競爭}final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}// 公平鎖static final class FairSync extends Sync {final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}}// 讀鎖:內部操作的是共享鎖public static class ReadLock implements Lock, java.io.Serializable {// 同步器,鎖的內部,所有的功能都是委托同步器實現的private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() { // 獲取鎖sync.acquireShared(1);}public void unlock() { // 釋放鎖sync.releaseShared(1);}}// 寫鎖,內部操作的是獨占鎖public static class WriteLock implements Lock, java.io.Serializable {// 同步器private final Sync sync;public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}}// 構造方法,讀鎖和寫鎖默認都是非公平鎖public ReentrantReadWriteLock() {this(false);}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}
}
ReentrantReadWriteLock和ReentrantLock類似,只是它的內部擴展出了讀鎖和寫鎖,讀鎖和寫鎖依賴的是同一個同步器,讀鎖是共享鎖,寫鎖是排它鎖。
讀寫鎖內置的同步器
讀寫鎖需要在讀鎖和寫鎖之間同步,這些功能都依賴同步器
同步器的結構:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 6317671515068378041L;/* 鎖的狀態:用state的高16位作為讀鎖的數量,低16位表示寫鎖的數量 */static final int SHARED_SHIFT = 16; // 計算常量static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 讀鎖的次數,狀態字段無符號右位移16位static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 寫鎖的次數,狀態字段取后16位static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }// 靜態內部類:封裝了線程id和該線程持有讀鎖的次數static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread()); // 線程id,在創建實例時初始化}private transient HoldCounter cachedHoldCounter;// 靜態內部類:繼承了ThreadLocal,用于記錄每個線程持有讀鎖的次數static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}private transient ThreadLocalHoldCounter readHolds;// 第一個持有讀鎖的線程private transient Thread firstReader = null;// 第一個持有讀鎖的線程持有幾次讀鎖,這里是可重入鎖的邏輯private transient int firstReaderHoldCount;Sync() {readHolds = new ThreadLocalHoldCounter();setState(getState()); // ensures visibility of readHolds}
}
獲取讀鎖的邏輯
讀鎖直接調用了AQS中獲取共享鎖的邏輯,并且在某些步驟做了自己的定制,這就是模板方法設計模式。
源碼:
1、基本步驟
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) // 嘗試獲取共享鎖doAcquireShared(arg); // 獲取失敗,進入阻塞隊列
}
2、嘗試獲取共享鎖
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 獲取 state 的值int c = getState();// 判斷是否有線程獲取了寫鎖并且不是當前線程,exclusiveCount(c),計算寫鎖的數量if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)return -1; // 如果有線程獲取了寫鎖,直接返回// 計算讀鎖的數量int r = sharedCount(c);if (!readerShouldBlock() && // 讀是否應阻塞,這個實現在子類中r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) { // cas算法修改狀態,修改成功,表示獲取到讀鎖/* 下面這一大段邏輯都是用來記錄當前線程獲取了幾次共享鎖 */if (r == 0) {// 表示是第一個獲取讀鎖的線程firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {// 如果是當前獲取讀鎖的線程重入,再次獲取鎖firstReaderHoldCount++;} else {// 如果不是第一個獲取讀鎖的線程來獲取讀鎖,使用cachedHoldCounter和readHolds。// cachedHoldCounter記錄線程id和該線程對于讀鎖的持有次數,readHolds將該數據// 存儲到ThreadLocal中HoldCounter rh = cachedHoldCounter;// 這個if else是處理readHolds和cachedHoldCounter之間的關系if (rh == null || rh.tid != getThreadId(current)){// 這里的get方法,會調用初始化數據的方法initialValue() // 參考ThreadLocalcachedHoldCounter = rh = readHolds.get();} else if (rh.count == 0)readHolds.set(rh);rh.count++; // 線程id在創建實例時初始化,這里記錄該線程對于讀鎖的持有次數}return 1; // 表示獲取鎖成功}// 需要被阻塞獲取讀鎖失敗,那么需要進入下面完整版的獲取鎖的過程return fullTryAcquireShared(current);
}// 讀是否需要被阻塞:這個邏輯定義在同步器中。具體實現是,如果隊列中的第一個節點是一個獨占模式,讀線程需要被阻塞,否則不需要。
// 因為即使當前沒有寫線程持有鎖,隊列中也可能會有寫請求,并且隊列中的寫請求也應該被優先處理,在非公平鎖中,讀線
// 程和寫線程的請求順序可能不嚴格按照先進先出處理,但寫線程的請求仍然需要被優先處理。
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() && // 隊列中的頭結點是獨占模式s.thread != null;
}
獲取鎖失敗后,需要調用完整版的獲取鎖的流程:
// 完整版的獲取鎖的過程:用于在快速路徑失敗后,提供一種更全面的嘗試獲取共享鎖的機制。
// 它處理了快速路徑未能處理的復雜情況,例如鎖降級、讀鎖數量達到上限、線程是否需要阻塞等。
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) { // 自旋int c = getState();// 判斷是否有線程獲取了寫鎖并且不是當前線程if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1; // 如果有,直接返回} else if (readerShouldBlock()) { // 判斷讀線程是否應該被阻塞if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove(); // 移除當前線程持有鎖的記錄}}if (rh.count == 0)return -1;}}// 如果當前線程已經持有寫鎖,繼續嘗試獲取讀鎖(鎖降級),if (sharedCount(c) == MAX_COUNT) // 檢查讀鎖數量是否達到上限throw new Error("Maximum lock count exceeded");// 嘗試獲取共享鎖if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}
}
3、獲取鎖失敗后,進入阻塞隊列
private void doAcquireShared(int arg) {// 向隊列尾部添加一個節點final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 如果當前節點是頭結點,再次嘗試獲取鎖int r = tryAcquireShared(arg);if (r >= 0) {// 獲取鎖成功,頭結點出隊并且喚醒后續節點setHeadAndPropagate(node, r); p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node); // 處理異常情況}
}private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node); // 將當前節點設置為一個虛擬頭結點if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 如果下一個節點是共享模式或者null,喚醒后續節點Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}
釋放讀鎖的邏輯
整體流程:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖doReleaseShared(); // 釋放共享鎖return true;}return false;
}
1、tryReleaseShared:負責更新狀態
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();// 更新當前線程持有共享鎖的次數if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}// 更新state變量for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}
}
2、doReleaseShared:維護阻塞隊列
private void doReleaseShared() {for (;;) { // 無限循環,直到釋放操作完成Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 嘗試將狀態從SIGNAL改為0continue; // 如果CAS失敗,重新循環檢查unparkSuccessor(h); // 喚醒頭節點的后繼節點}// 如果頭節點的狀態為0,說明當前沒有線程需要被喚醒,但需要確保釋放操作能夠繼續傳播。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changed// 在每次循環結束時,檢查頭節點是否發生變化。如果頭節點沒有變化,說明當前的釋放操作已經完成,可以退出循環。break; }
}
獲取和釋放寫鎖的邏輯
和ReentrantLock類似,只是操作state字段的方式不同。
獲取寫鎖的邏輯:
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) { // 如果鎖已經被占用// (Note: if c != 0 and w == 0 then shared count != 0),如果c != 0 && w == 0,// 證明共享鎖不為0,有線程持有共享鎖if (w == 0 || current != getExclusiveOwnerThread()) // 不是當前線程獲取的寫鎖return false; // 退出// 如果當前線程已經持有寫鎖,檢查是否超過最大次數if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 重入setState(c + acquires);return true;}// 如果鎖未被占用(c == 0)if (writerShouldBlock() || // 如果寫線程應該阻塞!compareAndSetState(c, c + acquires)) // 或者 CAS 更新狀態失敗return false;setExclusiveOwnerThread(current);return true;
}
釋放寫鎖的邏輯:
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;
}
總結
讀寫鎖使用同一個狀態字段、同一個阻塞隊列,內部基于同一個同步器,彼此之間互相影響,只是獲取鎖的方式不同。
Semaphore 源碼
Semaphore底層是基于共享鎖,也可以理解為讀鎖,加鎖和釋放鎖,操作的都是讀鎖。它允許多個線程同時執行同步代碼塊,但是它又會限制線程數量,從而達到控制并發量的目的
Semaphore的基本結構:
public class Semaphore implements java.io.Serializable {// 同步器的實例private final Sync sync;// 同步器的定義abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;// 這里重點關注構造方法,它調用AQS中的setState方法,state字段表示鎖的狀態,如果是共享鎖,// state的值0是到n,如果是排它鎖,state的值是0到1,0表示無鎖,1表示獲取到鎖。值為n,表示// 允許n個線程同時獲取共享鎖Sync(int permits) {setState(permits);} }// 構造方法,參數premits代表許可證數量,也就是同時允許多少個線程進行并發操作public Semaphore(int permits) {sync = new NonfairSync(permits); // 默認使用非公平鎖}// 非公平鎖static final class NonfairSync extends Sync {}// 公平鎖static final class FairSync extends Sync { }// Semaphore方法將獲取鎖和釋放鎖的功能都委托給同步器,同步器又調用了AQS中的方法,所以核心實現在AQSpublic void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void release() {sync.releaseShared(1);}}
和ReentrantLock類似,Semaphore內部定義了同步器Sync,同步器繼承了AQS,基于同步器擴展出了公平鎖、非公平鎖,Semaphore默認使用非公平鎖。
原理:Semaphore將狀態設置為n,acquire方法,n - 1,代表獲取到鎖、release方法, n + 1,代表釋放鎖
CountDownLatch 源碼
CountDownLatch的內部是一個共享鎖,同樣的,它的內部定義了同步器,但是沒有根據同步器擴展出非公平鎖、公平鎖
public class CountDownLatch {// 同步器,繼承了AQSprivate static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 構造方法Sync(int count) {setState(count); // 調用了AQS中的setState方法}}// 同步器實例private final Sync sync;// 構造方法,參數count,指定了調用countDown方法的次數,調用夠指定次數的countDown方法后,// await方法才會結束阻塞public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}// 等待,直到倒計時鎖內部值為0public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// 倒計時鎖減1,如果減1后值為0,釋放等待的線程public void countDown() {sync.releaseShared(1);}
}
和Semaphore類似,將state設置為n,調用countDown方法, n - 1,調用await方法,判斷 n 是否等于 0 ,如果是,取消阻塞
Q&A
讀鎖、寫鎖,公平鎖、非公平鎖,是怎么配合在一起工作的?
讀鎖、寫鎖,公平鎖、非公平鎖,是從不同的角度描寫了一個鎖的特性,讀鎖可以是公平鎖、也可以是非公平鎖,它們是一個實體的兩個屬性
線程什么時候從阻塞隊列中移出?
獲取到鎖之后,所以線程釋放鎖資源時只需要喚醒隊列中的第一個節點即可
鎖超時怎么實現?tryLock方法
進入阻塞狀態時加上超時時長。
源碼:
tryAcquireNanos:嘗試獲取鎖,在用戶指定的時長過后,如果沒有獲取鎖,結束阻塞,是ReentrantLock中tryLock等方法的基礎
public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 判斷打斷狀態if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) || // 嘗試獲取鎖doAcquireNanos(arg, nanosTimeout); // 獲取鎖,或者阻塞
}// doAcquireNanos:真正執行獲取鎖或阻塞的邏輯
private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;// 結束阻塞的時間,當前時間加上用戶傳入的時間final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor();// 如果上一個節點是頭節點,嘗試獲取鎖,成功后直接返回,不進入阻塞if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}// 計算出應該阻塞的時長nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;// 判斷是否應該阻塞if (shouldParkAfterFailedAcquire(p, node) &&// 這是一項優化,如果時長大于自旋閾值才進行阻塞,否則進行自旋,// 因為如果阻塞時間特別短,相較于自旋,阻塞比較耗費性能nanosTimeout > spinForTimeoutThreshold)// 阻塞LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
可打斷怎么實現?tryInterruptibly方法
AQS中會判斷當前線程是否調用了interrupt方法,可打斷的情況下,如果調用了interrupt方法,拋異常,同時用戶需要處理這個異常,不可打斷的情況下,也就是通過lock()方法獲取鎖時,即使檢測到interrupt方法被調用,也會繼續向下執行。