自己一個人隨便看看源碼學習的心得,分享一下啦,不過我覺得還是建議去買本Java并發編程的書來看會比較好點,畢竟個人的理解有限嘛。
獨占鎖和共享鎖
首先先引入這兩個鎖的概念:
獨占鎖即同一時刻只有一個線程才能獲取到鎖,Lock的實現類中ReentrantLock和WriteLock就是獨占鎖,所以獨占鎖也叫排它鎖;
共享鎖是同一時刻多線程能獲取到的鎖叫共享鎖,即ReadLock;在讀寫鎖共有的情況下,會出現共存的情況即讀-讀共存,讀-寫、寫-寫不能共存,他們會產生互斥。
數據結構
打開源碼首先就得看下它的所有方法和變量,就會發現,AQS其實用的設計模式是模板模式,先講一下啥是模板模式吧,直接上代碼吧,用代碼看會比較直觀一點:
public abstract class UseTemplate{//這個定義的三個抽象接口方法public abstract void function1();public abstract void function2();public abstract void function3();//...還有其他屬性或方法//這個方法就是模板方法,在繼承的子類中實現好抽象方法,然后在調用父類的模板方法public final void runTemplate(){//在父類中寫好功能執行的方法即可,也可設置權限,禁止重新覆蓋function1();function2();function3();}}
AQS是采用的是一個雙向鏈表隊列,在該類中包含了一個Node類,雙向鏈表就是右這個實現的
先了解一下這個靜態內部類,以下為源代碼(每個屬性在下面都有解釋):
static final class Node{/** Marker to indicate a node is waiting in shared mode *///用于共享鎖的節點屬性static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode *///用于獨占鎖使用屬性static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;/*** Status field, taking on only the values:* SIGNAL: The successor of this node is (or will soon be)* blocked (via park), so the current node must* unpark its successor when it releases or* cancels. To avoid races, acquire methods must* first indicate they need a signal,* then retry the atomic acquire, and then,* on failure, block.* CANCELLED: This node is cancelled due to timeout or interrupt.* Nodes never leave this state. In particular,* a thread with cancelled node never again blocks.* CONDITION: This node is currently on a condition queue.* It will not be used as a sync queue node* until transferred, at which time the status* will be set to 0. (Use of this value here has* nothing to do with the other uses of the* field, but simplifies mechanics.)* PROPAGATE: A releaseShared should be propagated to other* nodes. This is set (for head node only) in* doReleaseShared to ensure propagation* continues, even if other operations have* since intervened.* 0: None of the above** The values are arranged numerically to simplify use.* Non-negative values mean that a node doesn't need to* signal. So, most code doesn't need to check for particular* values, just for sign.** The field is initialized to 0 for normal sync nodes, and* CONDITION for condition nodes. It is modified using CAS* (or when possible, unconditional volatile writes).*///當前節點的狀態,狀態值即上面幾個狀態值,有相對應的英文解釋,就不翻譯了volatile int waitStatus;/*** Link to predecessor node that current node/thread relies on* for checking waitStatus. Assigned during enqueuing, and nulled* out (for sake of GC) only upon dequeuing. Also, upon* cancellation of a predecessor, we short-circuit while* finding a non-cancelled one, which will always exist* because the head node is never cancelled: A node becomes* head only as a result of successful acquire. A* cancelled thread never succeeds in acquiring, and a thread only* cancels itself, not any other node.*/volatile Node prev;//前一個節點指向/*** Link to the successor node that the current node/thread* unparks upon release. Assigned during enqueuing, adjusted* when bypassing cancelled predecessors, and nulled out (for* sake of GC) when dequeued. The enq operation does not* assign next field of a predecessor until after attachment,* so seeing a null next field does not necessarily mean that* node is at end of queue. However, if a next field appears* to be null, we can scan prev's from the tail to* double-check. The next field of cancelled nodes is set to* point to the node itself instead of null, to make life* easier for isOnSyncQueue.*/volatile Node next;//下一個節點指向/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/volatile Thread thread;//當前節點的線程/*** Link to next node waiting on condition, or the special* value SHARED. Because condition queues are accessed only* when holding in exclusive mode, we just need a simple* linked queue to hold nodes while they are waiting on* conditions. They are then transferred to the queue to* re-acquire. And because conditions can only be exclusive,* we save a field by using special value to indicate shared* mode.*/Node nextWaiter;//使用在Condition上面,即下一個等待節點/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}/*** Returns previous node, or throws NullPointerException if null.* Use when predecessor cannot be null. The null check could* be elided, but is present to help the VM.** @return the predecessor of this node*///獲取前一個節點final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
了解完這個之后,在AQS類中有三個變量:
/*** Head of the wait queue, lazily initialized. Except for* initialization, it is modified only via method setHead. Note:* If head exists, its waitStatus is guaranteed not to be* CANCELLED.*/private transient volatile Node head;//當前頭部節點/*** Tail of the wait queue, lazily initialized. Modified only via* method enq to add new wait node.*/private transient volatile Node tail;//當前尾節點/*** The synchronization state.*/private volatile int state;//拿鎖的狀態
AQS中還有一個內部類ConditionObject,這個類繼承了Condition接口,這個類也有一個隊列,不過不是雙向隊列,單向隊列,也是通過Node實現的:
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;
AQS方法
獨占式獲取:? accquire()、? acquireInterruptibly()、? tryAcquireNanos()
共享式獲取:? acquireShared()、? acquireSharedInterruptibly()、? tryAcquireSharedNanos()
獨占式釋放鎖:? release()
共享式釋放鎖:? releaseShared()
需要子類覆蓋的流程方法:
?獨占式獲取: tryAcquire()
?獨占式釋放: tryRelease()
?共享式獲取: tryAcquireShared()
共享式釋放: tryReleaseShared()
這個同步器是否處于獨占模式 isHeldExclusively()
同步狀態state:1)、? getState():獲取當前的同步狀態;2)、? setState(int):設置當前同步狀態;3)、? compareAndSetState(int,int) 使用CAS設置狀態,保證狀態設置的原子性.
AQS使用方法及流程
需要繼承AQS這個類并實現他的方法,我們先自己寫一個獨占鎖的實現方式:
/*** state 當為0的時候表示還沒有拿到鎖 為1的時候為拿到鎖了* @author 26225**/static final class Sync extends AbstractQueuedSynchronizer{/*** */private static final long serialVersionUID = 1L;@Overrideprotected boolean tryAcquire(int arg) {if(getState() == 1)return false;else {/*** 需使用compareAndSetState() CAS原子操作 而不是使用setState(1),因為使用了volatile修飾了state 雖然保證了可見性,但是修改還是得保證原子操作*/if(compareAndSetState(getState(), arg)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}}@Overrideprotected boolean tryRelease(int arg) {if(getState() == 0 )throw new UnsupportedOperationException();setExclusiveOwnerThread(null);setState(arg);return true;}@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}Condition newCondition() {return new ConditionObject();}}private final Sync sync = new Sync();public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquire(1);}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {// TODO Auto-generated method stubreturn sync.tryAcquireNanos(1, unit.toNanos(time));}public void unlock() {sync.release(0);}public Condition newCondition() {return sync.newCondition();}
先簡單的講解一下該段代碼:
在調用Lock的unlock()方法時,需調用AQS的acquire()方法,我們先看一下這個方法調用:
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
其實就是先調用我們自己實現的方法,判斷拿鎖的狀態,并且設置這個狀態;如果拿到了鎖的話,就不用管它,沒有拿到的話,就得加入到等待隊列中,我們看一下加入等待隊列的方法:
/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
?判斷頭部節點是否存在,如果不存在的話就調用enq()方法,判斷當前尾節點存不存在,存在的話,則將當前節點通過CAS操作設置成尾節點,如果不存在則將當前線程包裝成一個Node對象設置成頭節點,并且將頭部和尾部設置成同一個
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
?這個方法是加入到隊列,就是設置當前Node對象的前一個節點指向以及后一個節點指向,并且調用parkAndCheckInterrupt()通過LockSupport將當前線程進入到等待狀態。
以上就是調用lock()方法基本流程,現在看一下代用unlock()的基本流程,執行的是AQS的release()
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
?在釋放鎖的過程中,首先還原之前的拿鎖狀態,還原之后將隊列頭部節點的下一個節點通過LockSupport.unpark()進行喚醒。
AbstractOwnableSynchronizer
在JDK1.6之后AQS繼承了AbstractOwnableSynchronizer這個類,其實這個類主要是用來記錄當前訪問的線程:
/*** The current owner of exclusive mode synchronization.*/private transient Thread exclusiveOwnerThread;/*** Sets the thread that currently owns exclusive access.* A {@code null} argument indicates that no thread owns access.* This method does not otherwise impose any synchronization or* {@code volatile} field accesses.* @param thread the owner thread*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** Returns the thread last set by {@code setExclusiveOwnerThread},* or {@code null} if never set. This method does not otherwise* impose any synchronization or {@code volatile} field accesses.* @return the owner thread*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
AQS的基本流程差不多分析完了,講的有問題的話,還請大佬指正!!!