文章目錄
- 一、讀寫鎖介紹
- 二、ReentrantReadWriteLock底層原理
- 1. 讀寫鎖的設計
一、讀寫鎖介紹
現實中有這樣一種場景:對共享資源有讀和寫的操作,且寫操作沒有讀操作那么頻繁(讀多寫少)。在沒有寫操作的時候,多個線程同時讀一個資源沒有任何問題,所以應該允許多個線程同時讀取共享資源(讀讀可以并發);但是如果一個線程想去寫這些共享資源,就不應該允許其他線程對該資源進行讀和寫操作了(讀寫,寫讀,寫寫互斥)。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發性和吞吐量。
針對這種場景,JAVA的并發包提供了讀寫鎖ReentrantReadWriteLock,它內部,維護了 一對相關的鎖,一個用于只讀操作,稱為讀鎖;一個用于寫入操作,稱為寫鎖。
線程進入讀鎖的前提條件:
- 沒有其它線程的寫鎖
- 沒有寫請求或者有寫請求,但是調用線程和持有鎖的線程是同一個
線程進入寫鎖的前提條件:
- 沒有其他線程的讀鎖
- 沒有其他線程的寫鎖
讀寫鎖有以下三個重要的特性:
- 公平選擇性:支持非公平和公平的鎖獲取方式,吞吐量還是非公平優于公平
- 可重入:讀鎖和寫鎖都支持線程重入,以讀寫線程為例,讀鎖獲取讀鎖后,能夠再次獲取讀鎖,寫線程在獲取寫鎖之后能夠再次獲取寫鎖,同時也可以獲取讀鎖。
- 鎖降級:遵循獲取寫鎖,再獲取讀鎖最后釋放寫鎖的次序,寫鎖能夠降級為讀鎖。
二、ReentrantReadWriteLock底層原理
看源碼需要了解三個核心問題:
讀寫鎖是怎樣實現分別記錄讀寫狀態的?
寫鎖時怎么獲取和釋放的?
讀鎖時怎么獲取和釋放的?
1. 讀寫鎖的設計
首先看它的類信息:
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {}
可以發現該類實現了ReadWriteLock這個接口
public interface ReadWriteLock {Lock readLock();Lock writeLock();
}
該接口就實現了讀鎖和寫鎖的規范,以下就是相關的類圖
下面看看ReentrantReadWriteLock的讀寫鎖的實現邏輯,首先看寫鎖:
public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean tryLock() {return sync.tryReadLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void unlock() {sync.releaseShared(1);}public Condition newCondition() {throw new UnsupportedOperationException();}public String toString() {int r = sync.getReadLockCount();return super.toString() +"[Read locks = " + r + "]";}}
- ReadLock:是一個ReetrantReadWriteLock的靜態內部類
- Sync:和ReentrantLock一樣,Sync也是ReetrantReadWriteLock的一個靜態內部抽象類,它繼承了AbstractQueuedSynchronizer,實現了AQS的邏輯,然后它會有兩種實現,分別是FairSync公平鎖,和NonfairSync非公平鎖
- lock:可以發現加讀鎖加的是AQS的共享鎖
- tryLock:嘗試獲取讀鎖
然后看看寫鎖是怎么實現的
public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock( ) {return sync.tryWriteLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}public Condition newCondition() {return sync.newCondition();}public String toString() {Thread o = sync.getOwner();return super.toString() + ((o == null) ?"[Unlocked]" :"[Locked by thread " + o.getName() + "]");}public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}public int getHoldCount() {return sync.getWriteHoldCount();}}
下面我們需要思考一個核心問題,讀寫鎖的狀態是怎么用AQS底層的state狀態來維護的。其實這里的核心問題就是,如何用一個變量維護多種狀態。在 ReentrantLock 中,使用 Sync ( 實際是 AQS )的 int 類型的 state 來表示同步狀態,表示鎖被一個線程重復獲取的次數。但是,讀寫鎖 ReentrantReadWriteLock 內部維護著一對讀寫鎖,如果要用一個變量維護多種狀態,需要采用“按位切割使用”的方式來維護這個變量,將其切分為兩部分:高16為表示讀,低16為表示寫。分割之后,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態呢? 其實底層是通過位運算來實現的。假如當前同步狀態為S, 那么寫狀態,等于 S & 0x0000FFFF(將高 16 位全部抹去)。 當寫狀態加1,等于S+1。讀狀態,等于 S >>> 16 (無符號補 0 右移 16 位)。當讀狀態加1,等于 S+(1<<16),也就是S+0x00010000。根據狀態的劃分能得出一個推論:S不等于0時,當寫狀態(S&0x0000FFFF)等于0時,則讀狀態(S>>>16)大于0,即讀鎖已被獲取。
源碼如下:
//該部分在Sync類中static final int SHARED_SHIFT = 16;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;/** Returns the number of shared holds represented in count */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }/** Returns the number of exclusive holds represented in count */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
- exclusiveCount:獲得持有寫狀態的鎖的次數
- sharedCount:獲得持有讀狀態的鎖的線程數量,不同于寫鎖,讀鎖利用同時被多個線程持有,而每個線程持有的讀鎖支持重入的特性,所以需要每個線程持有的讀鎖數量單獨計數,這就需要HoldCounter計數器。
HoldCounter計數器
讀鎖的內在機制其實就是一個共享鎖。一次共享鎖的操作就相當于對HoldCounter 計數器的操作。獲取共享鎖,則該計數器 + 1,釋放共享鎖,該計數器 - 1。只有當線程獲取共享鎖后才能對共享鎖進行釋放、重入操作。
static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread());}static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}
寫鎖的獲取
寫鎖是一個支持重進入的排它鎖。如果當前線程已經獲取了寫鎖,則增加寫狀態。如果當前線程在獲取寫鎖時,讀鎖已經被獲取(讀狀態不為0)或者該線程不是已經獲取寫鎖的線程, 則當前線程進入等待狀態。
protected final boolean tryAcquire(int acquires) {
//獲取當前線程Thread current = Thread.currentThread();//獲取當前state的值int c = getState();//獲取寫鎖的重入次數int w = exclusiveCount(c);//state!=0則表示當前有寫鎖或讀鎖if (c != 0) {//判斷是否是重入if (w == 0 || current != getExclusiveOwnerThread())return false;if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//獲取重入鎖setState(c + acquires);return true;}// writerShouldBlock有公平與非公平的實現, 非公平返回false,會嘗試通過cas加鎖,c==0 寫鎖未被任何線程獲取,當前線程是否阻塞或者cas嘗試獲取鎖if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;//設置鎖由當前線程獨占setExclusiveOwnerThread(current);return true;}
上面簡單的代碼就實現了下面的邏輯:
- 讀寫互斥
- 寫寫互斥
- 寫鎖支持同一個線程重入
- writeShouldBlock寫鎖是否阻塞實現取決公平與非公平的策略
寫鎖的釋放
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();//設置狀態int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;//如果state為0,表示釋放寫鎖if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}
讀鎖的獲取
protected final int tryAcquireShared(int unused) {//獲取當前線程Thread current = Thread.currentThread();//獲取stateint c = getState();//exclusiveCount(c) != 0 判斷是否有寫鎖//getExclusiveOwnerThread() != current),判斷當前線程是否是寫鎖的持有者if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&//cas加讀鎖compareAndSetState(c, c + SHARED_UNIT)) {//r==0表示第一次獲取讀鎖if (r == 0) {//設置第一個讀為當前線程firstReader = current;//設置當前讀鎖的重入次數firstReaderHoldCount = 1;} else if (firstReader == current) { //第一個讀的重入firstReaderHoldCount++;} else {//若不是第一個讀,則用HoldCounter記錄HoldCounter rh = cachedHoldCounter;//第一次讀鎖獲取失敗,再次嘗試if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullTryAcquireShared(current);//第一次讀鎖獲取失敗,再次嘗試(fullTryAcquireShared)}
上面代碼的邏輯就實現了:
- 讀鎖共享,讀讀不互斥
- 讀鎖可重入,每個獲取讀鎖的線程都會記錄對應的重入數
- 讀寫互斥,鎖降級場景除外
- 支持鎖降級,持有寫鎖的線程,可以獲取讀鎖,但是后續要記得把讀鎖和寫鎖讀釋放
- readerShouldBlock讀鎖是否阻塞實現取決公平與非公平的策略(FairSync和NonfairSync)
讀鎖的釋放
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))return nextc == 0;}}