AQS結構
//頭節點 當前持有鎖的線程private transient volatile Node head;/*** Tail of the wait queue, lazily initialized. Modified only via* method enq to add new wait node.*///每個進來的線程都插入到最后private transient volatile Node tail;/*** The synchronization state.*///代表當前鎖的狀態 0:表示沒有占用 大于0代表有線程持有當前鎖private volatile int state;//CAS設置值protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
AQS內部包裝成一個Node節點 通過state標識線程的狀態
//等待線程被包裝成一個Node節點static final class Node {/** Marker to indicate a node is waiting in shared mode *///共享模式下static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///獨享模式下static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled *///線程取消爭搶這個鎖static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking *///當前node的后繼節點需要被喚醒static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;static final int PROPAGATE = -3;//強半天獲取不到鎖,就取消等待volatile int waitStatus;//前驅節點的引用volatile Node prev;//后繼節點的引用volatile Node next;//本線程volatile Thread thread;// 這個是在condition中用來構建單向鏈表Node nextWaiter;}
// 使用static,這樣每個線程拿到的是同一把鎖private static ReentrantLock reentrantLock = new ReentrantLock(true);public void createOrder() {// 比如我們同一時間,只允許一個線程創建訂單reentrantLock.lock();// 通常,lock 之后緊跟著 try 語句try {// 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程),// 其他的線程在lock()方法上阻塞,等待獲取到鎖,再進來// 執行代碼...} finally {// 釋放鎖// 釋放鎖必須要在finally里,確保鎖一定會被釋放,如果寫在try里面,發生異常,則有可能不會執行,就會發生死鎖reentrantLock.unlock();}}
Lock接口
public interface Lock {//加鎖void lock();//嘗試獲取鎖boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//釋放鎖void unlock();Condition newCondition();
}
public class ReentrantLock implements Lock, java.io.Serializable public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
通過構造參數進行區分使用公平鎖還是非公平鎖。默認是非公平鎖。
//繼承AQS 正在的獲取和釋放鎖是由Sync的實現類來控制的。abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;abstract void lock();final boolean nonfairTryAcquire(int acquires) {//xxxxxx}protected final boolean tryRelease(int releases) {//xxxx}}
Lock 加鎖
//sync進行管理鎖//公平鎖static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;//爭搶鎖🔒final void lock() {acquire(1);}}
父類實現的acquire
//lock.lock()public final void acquire(int arg) {//tryAcquire(arg) true 獲取鎖成功直接結束//如果沒有獲取到鎖,acquireQueued 會將線程壓入隊列中//!tryAcquire(arg) 沒有獲取到鎖,將當前線程掛起//addWaiterif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//因為FairSync實現了protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
protected final boolean tryAcquire(int acquires) {//獲取當前線程final Thread current = Thread.currentThread();int c = getState();// c == 0 當前沒有線程獲取鎖if (c == 0) {//當前是公平鎖,先來后到//查看隊列中是否有等待的線程//hasQueuedPredecessors 沒有的話才可以獲取線程//compareAndSetState CAS設置 有可能同時多個線程競爭鎖if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {//表示獲取到鎖了,標記以下setExclusiveOwnerThread(current);//執行到這里 表示獲取鎖成功return true;}}//當前有線程持有鎖,先判斷下 獲取線程的鎖是不是自己 也就是重入了//state += 1;else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;//checkif (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}//到這里表示沒有獲取到鎖//1.嘗試獲取失敗//2.也不是自己的可沖入鎖return false;}
//隊列中有等待的線程 返回truepublic final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}//CAS設置值protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}protected final void setExclusiveOwnerThread(Thread thread) {//將當前線程設置為獲取到線程//當前擁有鎖的線程exclusiveOwnerThread = thread;}
//此方法的作用是將線程包裝成node, 同時進入隊列中//EXCLUSIVE是獨占模式private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {//自己的前驅節點為隊尾節點node.prev = pred;//CAS更新if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//有競爭鎖,加入隊列失敗enq(node);return node;} //因為是公平鎖,所以會按照鏈表的形式將當前任務添加到隊列中final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//獲取node的prev節點final Node p = node.predecessor();// p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是headif (p == head && tryAcquire(arg)) {// //到這里說明剛加入到等待隊列里面的node只有一個,并且此時獲取鎖成功,設置head為nodesetHead(node);p.next = null; // help GCfailed = false;return interrupted;}// // 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭,// // 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
//當前線程沒有搶到鎖 是否需要掛起當前線程//第一個參數是前驅節點 第二個節點是當前線程的節點private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前驅節點正常 -1 當前線程需要掛起if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 136 // 前驅節點 waitStatus大于0 ,之前說過,大于0 說明前驅節點取消了排隊。這里需要知道這點:
// 137 // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。
// 138 // 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點,
// 139 // 簡單說,如果前驅節點取消了排隊,
// 140 // 找前驅節點的前驅節,往前循環總能找到一個waitStatus<=0的節點if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/
// // 仔細想想,如果進入到這個分支意味著什么
// 157 // 前驅節點的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 158 // 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0
// 159 // 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}//private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
Lock 解鎖
線程如果沒有獲取到鎖,那么會掛起,LockSupport.park(this); 等待喚醒。
public void unlock() {sync.release(1);}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();//是否完全釋放鎖boolean free = false;// c == 0 沒有嵌套鎖住了 可以釋放if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}protected final void setExclusiveOwnerThread(Thread thread) {//將當前線程設置為獲取到線程//當前擁有鎖的線程exclusiveOwnerThread = thread;}
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 喚醒后繼節點,但是有可能后繼節點取消了等待// 從隊尾往前找,找到waitStatus <= 0 的所有節點排在最前面的一個Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;//for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//喚醒線程LockSupport.unpark(s.thread);}
這里存在并發問題:從前往后尋找不一定能找到剛剛加入隊列的后繼節點。
如果此時正有一個線程加入等待隊列的尾部,執行到上面第7行,第7行還未執行,解鎖操作如果從前面開始找 頭節點后面的第一個節點狀態為-1的節點,此時是找不到這個新加入的節點的,因為尾節點的next 還未指向新加入的node,但是從后面開始遍歷的話,那就不存在這種情況。
小結
鎖狀態,通過state標記, 0 沒有線程占用鎖,state >= 代表有線程獲取到鎖,> 1說明是可沖入鎖。 所以lock和unlock必須是配對的。
線程的阻塞和解決阻塞:lockSupport.park(thread)掛起線程,unpack()喚醒線程。
阻塞隊列: 爭搶的線程很多,所以需要將等待的線程通過一個queue進行管理這些線程。AQS是一個FIFO的隊列,