先放結論:主要是實現AbstractQueuedSynchronizer中進入和退出函數,控制不同的進入和退出條件,實現適用于各種場景下的鎖。
JAVA中對于線程的同步提供了多種鎖機制,比較著名的有可重入鎖ReentrantLock,信號量機制Semaphore,隊列等待機制CountDownLatch,
通過查看源代碼可以,他們都是基于AbstractQueuedSynchronizer實現了自身的功能。
對于AbstractQueuedSynchronizer的講解,可以看上一篇文章,這里就講解下,如何通過集成AbstractQueuedSynchronizer實現上述的鎖機制。
所謂的鎖實現,就是對AbstractQueuedSynchronizer中state變量的搶占,誰先搶占并且修改了這個變量值不為0,誰就獲得了鎖。
AbstractQueuedSynchronizer保留了幾個未實現的接口供子類實現。分別是
protected boolean tryAcquire(intarg) {throw newUnsupportedOperationException();
}protected boolean tryRelease(intarg) {throw newUnsupportedOperationException();
}protected int tryAcquireShared(intarg) {throw newUnsupportedOperationException();
}protected boolean tryReleaseShared(intarg) {throw newUnsupportedOperationException();
}
tryAcquire和tryRelease是用于獨占鎖的獲取和釋放,tryAcquireShared和tryReleaseShared是共享鎖的獲取和釋放,下面看下他們分別是什么地方被調用。
/*** Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link#tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used
* to implement method {@linkLock#lock}.
*
*@paramarg the acquire argument. This value is conveyed to
* {@link#tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.*/
public final void acquire(intarg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/*** Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link#tryRelease} returns true.
* This method can be used to implement method {@linkLock#unlock}.
*
*@paramarg the release argument. This value is conveyed to
* {@link#tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
*@returnthe value returned from {@link#tryRelease}*/
public final boolean release(intarg) {if(tryRelease(arg)) {
Node h=head;if (h != null && h.waitStatus != 0)
unparkSuccessor(h);return true;
}return false;
}/*** Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link#tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link* #tryAcquireShared} until success.
*
*@paramarg the acquire argument. This value is conveyed to
* {@link#tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.*/
public final void acquireShared(intarg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}/*** Releases in shared mode. Implemented by unblocking one or more
* threads if {@link#tryReleaseShared} returns true.
*
*@paramarg the release argument. This value is conveyed to
* {@link#tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
*@returnthe value returned from {@link#tryReleaseShared}*/
public final boolean releaseShared(intarg) {if(tryReleaseShared(arg)) {
doReleaseShared();return true;
}return false;
}
在acquire中,如果tryAcquire失敗,那么就去等待隊列中排隊,release中如果tryRelease成功,那么就喚醒下一個等待隊列中的線程。
acquireShared中,如果tryAcquireShared失敗,那么再次進入循環獲取過程,releaseShared中,如果tryReleaseShared成功,那么就喚醒下一個等待隊列中的線程。
現在的主要問題是,如何判定上述的tryAcquire、tryRelease、tryAcquireShared和tryReleaseShared的成功和失敗,
通過閱讀源代碼可知:
ReentrantLock是一個線程執行,其他線程等待。
ReentrantLock的實現機制就是:線程lock()時,獲取時state變量的值不為0,那么tryAcquire就失敗,tryRelease執行完state變量的值==0,表示成功,喚醒等待的線程,否則就是失敗。
Semaphore是先分配一定數量的許可證,然后多個線程來搶許可證,搶到就可以執行。
Semaphore的實現機制就是:如果獲取時當前state減去申請的信號量數目acquires>0,那么就表示成功,此時 state=state-acquires, 否則失敗,釋放時,當前釋放的信號量不為負數,那么就表示成功,喚醒等待的線程,釋放后state=state+acquires.
CountDownLatch是一個線程執行,其他線程等待。
CountDownLatch的實現機制是: 線程如果lock()時,獲取時state變量的值不為0,那么tryAcquire就失敗,tryRelease執行完state變量的值等于0或者state-1后值等于0,表示成功,喚醒等待的線程,否則就是失敗。
ReentrantLock的代碼如下:
final boolean nonfairTryAcquire(intacquires) {final Thread current =Thread.currentThread();int c =getState();if (c == 0) {if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);return true;
}
}else if (current ==getExclusiveOwnerThread()) {int nextc = c +acquires;if (nextc < 0) //overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);return true;
}return false;
}protected final boolean tryRelease(intreleases) {int c = getState() -releases;if (Thread.currentThread() !=getExclusiveOwnerThread())throw newIllegalMonitorStateException();boolean free = false;if (c == 0) {
free= true;
setExclusiveOwnerThread(null);
}
setState(c);returnfree;
}
Semaphore代碼如下:
final int nonfairTryAcquireShared(intacquires) {for(;;) {int available =getState();int remaining = available -acquires;if (remaining < 0 ||compareAndSetState(available, remaining))returnremaining;
}
}protected final boolean tryReleaseShared(intreleases) {for(;;) {int current =getState();int next = current +releases;if (next < current) //overflow
throw new Error("Maximum permit count exceeded");if(compareAndSetState(current, next))return true;
}
}
CountDownLatch代碼如下:
protected int tryAcquireShared(intacquires) {return (getState() == 0) ? 1 : -1;
}protected boolean tryReleaseShared(intreleases) {//Decrement count; signal when transition to zero
for(;;) {int c =getState();if (c == 0)return false;int nextc = c-1;if(compareAndSetState(c, nextc))return nextc == 0;
}
}