AQS是J.U.C的核心
AQS(AbstractQueuedSynchronizer
)隊列同步器,AQS是JDK下提供的一套用于實現基于FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。
同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待和喚醒等底層操作。
同步隊列中的節點用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和后繼節點。

總結:在獲取同步狀態時,同步器維護這一個同步隊列,并持有對頭節點和尾節點的引用。獲取狀態失敗的線程會被包裝成節點加入到尾節點后面稱為新的尾節點,在進入同步隊列后開始自旋,停止自旋的條件就是前驅節點為頭節點并且成功獲取到同步狀態。在釋放同步狀態時,同步器調用tryRelease方法釋放同步狀態,然后喚醒頭節點的后繼節點。
共享式同步狀態獲取
設計原理
-
使用Node實現FIFO隊列
-
維護了一個volatile int state(代表共享資源)
-
使用方法是繼承,基于模板方法
-
子類通過繼承同步器并實現它的抽象方法來管理同步狀態
-
可以實現排它鎖和共享鎖的模式(獨占、共享)
具體實現的思路
1.首先 AQS內部維護了一個CLH隊列,多線程爭用資源被阻塞時會進入此隊列。同時AQS管理一個關于共享資源狀態信息的單一整數volatile int state,該整數可以表現任何狀態,同時配合Unsafe工具對其原子性的操作來實現對當前鎖的狀態進行修改。。比如,?Semaphore
?用它來表現剩余的許可數,ReentrantLock
?用它來表現擁有它的線程已經請求了多少次鎖;FutureTask
?用它來表現任務的狀態(尚未開始、運行、完成和取消)
2.線程嘗試獲取鎖,如果獲取失敗,則將等待信息等包裝成一個Node結點,加入到同步隊列Sync queue里
3.不斷重新嘗試獲取鎖(當前結點為head的直接后繼才會 嘗試),如果獲取失敗,則會阻塞自己,直到被喚醒
4.當持有鎖的線程釋放鎖的時候,會喚醒隊列中的后繼線程
AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch),獨占式或者共享式獲取同步狀態state。
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。
再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是并行執行的,每個子線程執行完后countDown()一次,state會CAS減1。等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。
AQS同步組件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
獨占鎖:ReentrantLock
共享鎖:CountDownLatch, CyclicBarrier, Semaphore
共享和獨占:ReentrantReadWriteLock
CountDownLatch
同步阻塞類,可以完成阻塞線程的功能
CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。
構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量。這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值。
與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。
?
使用場景
1.程序執行需要等待某個條件完成后,才能進行后面的操作。比如父任務等待所有子任務都完成的時候,在繼續往下進行
實例1:基本用法


@Slf4j public class CountDownLatchExample1 {private final static int threadCount = 200;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {final int threadNum = i;exec.execute(() -> {try {test(threadNum);} catch (Exception e) {log.error("exception", e);} finally {// 為防止出現異常,放在finally更保險一些 countDownLatch.countDown();}});}countDownLatch.await();log.info("finish");exec.shutdown();}private static void test(int threadNum) throws Exception {Thread.sleep(100);log.info("{}", threadNum);Thread.sleep(100);} }
2.比如有多個線程完成一個任務,但是這個任務只想給他一個指定的時間,超過這個任務就不繼續等待了。完成多少算多少


@Slf4j public class CountDownLatchExample2 {private final static int threadCount = 200;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final CountDownLatch countDownLatch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {final int threadNum = i;// 放在這里沒有用的,因為這時候還是在主線程中阻塞,阻塞完以后才開始執行下面的await// Thread.sleep(1);exec.execute(() -> {try {test(threadNum);} catch (Exception e) {log.error("exception", e);} finally {countDownLatch.countDown();}});}// 等待指定的時間 參數1:等待時間 參數2:時間單位countDownLatch.await(10, TimeUnit.MILLISECONDS);log.info("finish");// 并不是第一時間內銷毀掉所有線程,而是先讓正在執行線程執行完 exec.shutdown();}private static void test(int threadNum) throws Exception {Thread.sleep(100);log.info("{}", threadNum);} }
Semaphore
控制某個資源能被并發訪問的次數
使用場景
1.僅能提供有限訪問的資源:比如數據庫的連接數最大只有20,而上層的并發數遠遠大于20,這時候如果不做限制,可能會由于無法獲取連接而導致并發異常,這時候可以使用Semaphore來進行控制,當信號量設置為1的時候,就和單線程很相似了
實例1:每次獲取1個許可


@Slf4j public class SemaphoreExample1 {private final static int threadCount = 20;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;exec.execute(() -> {try {semaphore.acquire(); // 獲取一個許可 test(threadNum);semaphore.release(); // 釋放一個許可} catch (Exception e) {log.error("exception", e);}});}exec.shutdown();}private static void test(int threadNum) throws Exception {log.info("{}", threadNum);Thread.sleep(1000);} }
實例2:一次性獲取多個許可


@Slf4j public class SemaphoreExample2 {private final static int threadCount = 20;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;exec.execute(() -> {try {semaphore.acquire(3); // 獲取多個許可 test(threadNum);semaphore.release(3); // 釋放多個許可} catch (Exception e) {log.error("exception", e);}});}exec.shutdown();}private static void test(int threadNum) throws Exception {log.info("{}", threadNum);Thread.sleep(1000);} }
2.并發很高,想要超過允許的并發數之后,就拋棄


@Slf4j public class SemaphoreExample3 {private final static int threadCount = 20;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;exec.execute(() -> {try{if (semaphore.tryAcquire()) { // 嘗試獲取一個許可// 本例中只有一個三個線程可以執行到這里 test(threadNum);semaphore.release(); // 釋放一個許可 }} catch (Exception e) {log.error("exception", e);}});}exec.shutdown();}private static void test(int threadNum) throws Exception {log.info("{}", threadNum);Thread.sleep(1000);} }
3.嘗試獲取獲取許可的次數以及超時時間都可以設置


@Slf4j public class SemaphoreExample4 {private final static int threadCount = 20;public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(3);for (int i = 0; i < threadCount; i++) {final int threadNum = i;exec.execute(() -> {try {if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可 test(threadNum);semaphore.release(); // 釋放一個許可 }} catch (Exception e) {log.error("exception", e);}});}exec.shutdown();}private static void test(int threadNum) throws Exception {log.info("{}", threadNum);Thread.sleep(1000);} }
CyclicBarrier
同步輔助類,允許一組線程相互等待,知道所有線程都準備就緒后,才能繼續操作,當某個線程調用了await方法之后,就會進入等待狀態,并將計數器-1,直到所有線程調用await方法使計數器為0,才可以繼續執行,由于計數器可以重復使用,所以我們又叫他循環屏障。
CyclicBarrier與CountDownLatch區別
1.CyclicBarrier可以重復使用(使用reset方法),CountDownLatch只能用一次
2.CountDownLatch主要用于實現一個或n個線程需要等待其他線程完成某項操作之后,才能繼續往下執行,描述的是一個或n個線程等待其他線程的關系,而CyclicBarrier是多個線程相互等待,知道滿足條件以后再一起往下執行。描述的是多個線程相互等待的場景
可以設置等待時間


@Slf4j public class CyclicBarrierExample1 {// 1.給定一個值,說明有多少個線程同步等待private static CyclicBarrier barrier = new CyclicBarrier(5);public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int threadNum = i;// 延遲1秒,方便觀察Thread.sleep(1000);executor.execute(() -> {try {race(threadNum);} catch (Exception e) {log.error("exception", e);}});}executor.shutdown();}private static void race(int threadNum) throws Exception {Thread.sleep(1000);log.info("{} is ready", threadNum);// 2.使用await方法進行等待barrier.await();log.info("{} continue", threadNum);} }


@Slf4j public class CyclicBarrierExample2 {private static CyclicBarrier barrier = new CyclicBarrier(5);public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int threadNum = i;Thread.sleep(1000);executor.execute(() -> {try {race(threadNum);} catch (Exception e) {log.error("exception", e);}});}executor.shutdown();}private static void race(int threadNum) throws Exception {Thread.sleep(1000);log.info("{} is ready", threadNum);try {// 由于狀態可能會改變,所以會拋出BarrierException異常,如果想繼續往下執行,需要加上try-catchbarrier.await(2000, TimeUnit.MILLISECONDS);} catch (Exception e) {log.warn("BarrierException", e);}log.info("{} continue", threadNum);} }


@Slf4j public class CyclicBarrierExample3 {private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {// 當線程全部到達屏障時,優先執行這里的runablelog.info("callback is running");});public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int threadNum = i;Thread.sleep(1000);executor.execute(() -> {try {race(threadNum);} catch (Exception e) {log.error("exception", e);}});}executor.shutdown();}private static void race(int threadNum) throws Exception {Thread.sleep(1000);log.info("{} is ready", threadNum);barrier.await();log.info("{} continue", threadNum);} }
Lock
ReentrantLock與Condition
java一共分為兩類鎖,一類是由synchornized修飾的鎖,還有一種是JUC里提供的鎖,核心就是ReentrantLock
synchornized與ReentrantLock的區別對比:
?
對比維度 | synchornized | ReentrantLock |
---|---|---|
可重入性(進入鎖的時候計數器自增1) | 可重入 | 可重入 |
鎖的實現 | JVM實現,很難操作源碼,得到實現 | JDK實現 |
性能 | 在引入輕量級鎖后性能大大提升,建議都可以選擇的時候選擇synchornized | - |
功能區別 | 方便簡潔,由編譯器負責加鎖和釋放鎖 | 手工操作 |
粒度、靈活度 | 粗粒度,不靈活 | |
可否指定公平所 | 不可以 | 可以 |
可否放棄鎖 | 不可以 | 可以 |
基本使用


@Slf4j @ThreadSafe public class LockExample2 {// 請求總數public static int clientTotal = 5000;// 同時并發執行的線程數public static int threadTotal = 200;public static int count = 0;private final static Lock lock = new ReentrantLock();public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {semaphore.acquire();add();semaphore.release();} catch (Exception e) {log.error("exception", e);}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();log.info("count:{}", count);}private static void add() {lock.lock();try {count++;} finally {lock.unlock();}} }
公平鎖非公平鎖以及可重入的理解
輕松學習java可重入鎖(ReentrantLock)的實現原理
Condition
Condition的特性:
1.Condition中的await()方法相當于Object的wait()方法,Condition中的signal()方法相當于Object的notify()方法,Condition中的signalAll()相當于Object的notifyAll()方法。不同的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的。
2.Condition它更強大的地方在于:能夠更加精細的控制多線程的休眠與喚醒。對于同一個鎖,我們可以創建多個Condition,在不同的情況下使用不同的Condition。
例如,假如多線程讀/寫同一個緩沖區:當向緩沖區中寫入數據之后,喚醒"讀線程";當從緩沖區讀出數據之后,喚醒"寫線程";并且當緩沖區滿的時候,"寫線程"需要等待;當緩沖區為空時,"讀線程"需要等待。? ? ??
?如果采用Object類中的wait(), notify(), notifyAll()實現該緩沖區,當向緩沖區寫入數據之后需要喚醒"讀線程"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀線程",而只能通過notifyAll喚醒所有線程(但是notifyAll無法區分喚醒的線程是讀線程,還是寫線程)。 ?但是,通過Condition,就能明確的指定喚醒讀線程。


public class Task {private final Lock lock = new ReentrantLock();private final Condition addCondition = lock.newCondition();private final Condition subCondition = lock.newCondition();private static int num = 0;private List<String> lists = new LinkedList<String>();public void add() {lock.lock();try {while(lists.size() == 10) {//當集合已滿,則"添加"線程等待addCondition.await();}num++;lists.add("add Banana" + num);System.out.println("The Lists Size is " + lists.size());System.out.println("The Current Thread is " + Thread.currentThread().getName());System.out.println("==============================");this.subCondition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {//釋放鎖lock.unlock();}}public void sub() {lock.lock();try {while(lists.size() == 0) {//當集合為空時,"減少"線程等待subCondition.await();}String str = lists.get(0);lists.remove(0);System.out.println("The Token Banana is [" + str + "]");System.out.println("The Current Thread is " + Thread.currentThread().getName());System.out.println("==============================");num--;addCondition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}
Condition的實現分析
?ConditionObject類是AQS的內部類,實現了Condition接口。每個Condition對象都包含著一個等待隊列,這個隊列是實現等待/通知功能的關鍵。在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而并非包中的Lock(同步器)擁有一個同步隊列和多個等待隊列。等待隊列和同步隊列一樣,使用的都是同步器AQS中的節點類Node。?同樣擁有首節點和尾節點,?每個Condition對象都包含著一個FIFO隊列。
?等待:
如果一個線程調用了Condition.await()方法,那么該線程就會釋放鎖,構成節點加入等待隊列并進入等待狀態。相當于同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。
?
通知:
調用Condition.signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中,加入到獲取同步狀態的競爭中,成功獲取同步狀態(或者說鎖之后),被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功獲取了鎖。
?
=============================================================================
ReentrantReadWriteLock
ReadWriteLock,顧名思義,是讀寫鎖。它維護了一對相關的鎖 — — “讀取鎖”和“寫入鎖”,一個用于讀取操作,另一個用于寫入操作。
“讀取鎖”用于只讀操作,它是“共享鎖”,能同時被多個線程獲取。
“寫入鎖”用于寫入操作,它是“獨占鎖”,寫入鎖只能被一個線程鎖獲取。


@Slf4j public class LockExample3 {private final Map<String, Data> map = new TreeMap<>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public Data get(String key) {readLock.lock();try {return map.get(key);} finally {readLock.unlock();}}public Set<String> getAllKeys() {readLock.lock();try {return map.keySet();} finally {readLock.unlock();}}// 在沒有任何讀寫鎖的時候才可以進行寫入操作public Data put(String key, Data value) {writeLock.lock();try {return map.put(key, value);} finally {readLock.unlock();}}class Data {} }
特性:
- 公平性選擇:支持公平和非公平(默認)兩種獲取鎖的方式,非公平鎖的吞吐量優于公平鎖;
- 可重入:支持可重入,讀線程在獲取讀鎖之后能夠再次獲取讀鎖,寫線程在獲取了寫鎖之后能夠再次獲取寫鎖,同時也可以獲取讀鎖(同一線程);
- 鎖降級:線程獲取鎖的順序遵循獲取寫鎖,獲取讀鎖,釋放寫鎖,寫鎖可以降級成為讀鎖。
?優點:
- 通過分離讀鎖和寫鎖,能夠提供比排它鎖更好的并發性和吞吐量。
- 讀寫鎖能夠簡化讀寫交互場景的編程。
針對第二點,比如說一個共享的用作緩存數據結構,大部分時間提供讀服務,而寫操作占有的時間較少,但是寫操作完成后的更新需要對后序的讀服務可見。
在沒有讀寫鎖支持的時候,如果需要完成上述工作就要使用Java的等待通知機制,就是當寫操作開始時,所有晚于寫操作的讀操作均會進入等待狀態,只有寫操作完成并進行 通知之后,所有等待的讀操作才能繼續執行(寫操作之間依靠synchronized關鍵字進行同步),這樣做的目的是使讀操作都能讀取到正確的數據,而不會出現臟讀。
改用讀寫鎖實現上述功能,只需要在讀操作時獲取讀鎖,而寫操作時獲取寫鎖即可,當寫鎖被獲取到時,后續(非當前寫操作線程)的讀寫操作都會被 阻塞,寫鎖釋放之后,所有操作繼續執行,編程方式相對于使用等待通知機制的實現方式而言,變得簡單明了。
用讀寫鎖實現簡單的Cache:


public class Cache { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); // 獲取一個key對應的value public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } // 設置key對應的value,并返回舊有的value public static final Object put(String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } // 清空所有的內容 public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } }
Cache使用讀寫鎖提升讀操作并發性,也保證每次寫操作對所有的讀寫操作的可見性,同時簡化了編程方式。
讀寫鎖的實現分析:
1.讀寫狀態設計
讀寫鎖同樣依賴自定義同步器來實現同步功能,而讀寫狀態就是其同步器的同步狀態。回想ReentrantLock中自定義同步器的實現,同步狀態 表示鎖被一個線程重復獲取的次數,而讀寫鎖的自定義同步器需要在同步狀態(一個整型變量)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成為讀寫 鎖實現的關鍵。
如果在一個整型變量上維護多種狀態,就一定需要“按位切割使用”這個變量,讀寫鎖是將變量切分成了兩個部分,高16位表示讀,低16位表示寫,劃分方式如圖1所示。
2.鎖降級
鎖降級指的是寫鎖降級成為讀鎖。如果當前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。
class CachedData {Object data;volatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// Must release read lock before acquiring write lock rwl.readLock().unlock();rwl.writeLock().lock();try {// Recheck state because another thread might have// acquired write lock and changed state before we did.if (!cacheValid) {data = ...cacheValid = true;}// Downgrade by acquiring read lock before releasing write lockrwl.readLock().lock();} finally {rwl.writeLock().unlock(); // Unlock write, still hold read }}try {use(data);} finally {rwl.readLock().unlock();}}}
鎖降級中讀鎖的獲取是否必要呢?答案是必要的。主要是為了保證數據的可見性。就例子的代碼來說,是需要鎖降級獲取讀鎖的。如果不這樣,在釋放完寫鎖后,別的線程(假設B)可能在當前線程(假設A)還沒有執行到user(data)
時獲取到寫鎖,然后修改data的值,當線程A恢復運行后,由于可見性問題,此時線程A的data已經不是正確的data了,當前線程無法感知線程B的數據更新。使用讀鎖可以保證在線程A獲取讀鎖時別的線程無法修改data
。
這里要著重講一講“無法感知”是什么意思:
也就是說,在另一個線程(假設叫線程1)修改數據的那一個瞬間,當前線程(線程2)是不知道數據此時已經變化了,但是并不意味著之后線程2使用的數據就是舊的數據,相反線程2使用還是被線程1更新之后的數據。也就是說,就算我不使用鎖降級,程序的運行結果也是正確的(這是因為鎖的機制和volatile關鍵字相似)。“感知”其實是想強調讀的實時連續性,但是卻容易讓人誤導為強調數據操作。
RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最后釋放讀鎖的過程)。原因也是保證數據可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖并更新了數據,則其更新對其他獲取到讀鎖的線程不可見。
使用java的ReentrantReadWriteLock讀寫鎖時,鎖降級是必須的么?
不是必須的。
在這個問題里,如果不想使用鎖降級
- 可以繼續持有寫鎖,完成后續的操作。
- 也可以先把寫鎖釋放,再獲取讀鎖。
但問題是
- 如果繼續持有寫鎖,如果 use 函數耗時較長,那么就不必要的阻塞了可能的讀流程
- ?如果先把寫鎖釋放,再獲取讀鎖。在有些邏輯里,這個 cache 值可能被修改也可能被移除,這個取決于能不能接受了。
- 另外,降級鎖比釋放寫再獲取讀性能要好,因為當前只有一個寫鎖,可以直接不競爭的降級。而釋放寫鎖,獲取讀鎖的過程就面對著其他讀鎖請求的競爭,引入額外不必要的開銷。
downgrading 只是提供了一個手段,這個手段可以讓流程不被中斷的降低到低級別鎖,并且相對同樣滿足業務要求的其他手段性能更為良好。
https://www.zhihu.com/question/265909728/answer/301363927
https://segmentfault.com/q/1010000009659039
?
StampLock
該類是一個讀寫鎖的改進,它的思想是讀寫鎖中讀不僅不阻塞讀,同時也不應該阻塞寫。
讀不阻塞寫的實現思路:在讀的時候如果發生了寫,則應當重讀而不是在讀的時候直接阻塞寫!因為在讀線程非常多而寫線程比較少的情況下,寫線程可能發生饑餓現象,也就是因為大量的讀線程存在并且讀線程都阻塞寫線程,因此寫線程可能幾乎很少被調度成功!當讀執行的時候另一個線程執行了寫,則讀線程發現數據不一致則執行重讀即可。
所以讀寫都存在的情況下,使用StampedLock就可以實現一種無障礙操作,即讀寫之間不會阻塞對方,但是寫和寫之間還是阻塞的!
??StampedLock有三種模式的鎖,用于控制讀取/寫入訪問。StampedLock的狀態由版本和模式組成。鎖獲取操作返回一個用于展示和訪問鎖狀態的票據(stamp)變量,它用相應的鎖狀態表示并控制訪問,數字0表示沒有寫鎖被授權訪問。在讀鎖上分為悲觀鎖和樂觀鎖。鎖釋放以及其他相關方法需要使用郵編(stamps)變量作為參數,如果他們和當前鎖狀態不符則失敗,這三種模式為:
? ? ? ?? 寫入:方法writeLock可能為了獲取獨占訪問而阻塞當前線程,返回一個stamp變量,能夠在unlockWrite方法中使用從而釋放鎖。也提供了tryWriteLock。當鎖被寫模式所占有,沒有讀或者樂觀的讀操作能夠成功。
? ? ? ?? 讀取:方法readLock可能為了獲取非獨占訪問而阻塞當前線程,返回一個stamp變量,能夠在unlockRead方法中用于釋放鎖。也提供了tryReadLock。
? ? ? ?? 樂觀讀取:方法tryOptimisticRead返回一個非0郵編變量,僅在當前鎖沒有以寫入模式被持有。如果在獲得stamp變量之后沒有被寫模式持有,方法validate將返回true。這種模式可以被看做一種弱版本的讀鎖,可以被一個寫入者在任何時間打斷。樂觀讀取模式僅用于短時間讀取操作時經常能夠降低競爭和提高吞吐量。
程序舉例:


public class Point {//一個點的x,y坐標private double x, y;/*** Stamped類似一個時間戳的作用,每次寫的時候對其+1來改變被操作對象的Stamped值* 這樣其它線程讀的時候發現目標對象的Stamped改變,則執行重讀*/private final StampedLock stampedLock = new StampedLock();// an exclusively locked methodvoid move(doubledeltaX, doubledeltaY) {/**stampedLock調用writeLock和unlockWrite時候都會導致stampedLock的stamp值的變化* 即每次+1,直到加到最大值,然后從0重新開始 */long stamp = stampedLock.writeLock(); //寫鎖try {x += deltaX;y += deltaY;} finally {stampedLock.unlockWrite(stamp);//釋放寫鎖 }}double distanceFromOrigin() { // A read-only method/**tryOptimisticRead是一個樂觀的讀,使用這種鎖的讀不阻塞寫* 每次讀的時候得到一個當前的stamp值(類似時間戳的作用)*/long stamp = stampedLock.tryOptimisticRead();//這里就是讀操作,讀取x和y,因為讀取x時,y可能被寫了新的值,所以下面需要判斷double currentX = x, currentY = y;/**如果讀取的時候發生了寫,則stampedLock的stamp屬性值會變化,此時需要重讀,* 再重讀的時候需要加讀鎖(并且重讀時使用的應當是悲觀的讀鎖,即阻塞寫的讀鎖)* 當然重讀的時候還可以使用tryOptimisticRead,此時需要結合循環了,即類似CAS方式* 讀鎖又重新返回一個stampe值*/if (!stampedLock.validate(stamp)) {stamp = stampedLock.readLock(); //讀鎖try {currentX = x;currentY = y;} finally {stampedLock.unlockRead(stamp);//釋放讀鎖 }}//讀鎖驗證成功后才執行計算,即讀的時候沒有發生寫return Math.sqrt(currentX * currentX + currentY * currentY);} }
==================================================================
Synchronize和ReentrantLock的比較:
功能比較:
便利性:很明顯Synchronized的使用比較方便簡潔,并且由編譯器去保證鎖的加鎖和釋放,而ReenTrantLock需要手工聲明來加鎖和釋放鎖,為了避免忘記手工釋放鎖造成死鎖,所以最好在finally中聲明釋放鎖。
鎖的細粒度和靈活度:很明顯ReenTrantLock優于Synchronized
ReenTrantLock獨有的能力:
1.??????ReenTrantLock可以指定是公平鎖還是非公平鎖。而synchronized只能是非公平鎖。所謂的公平鎖就是先等待的線程先獲得鎖。


public class TestReentrantLock2 {private static Lock lock = new ReentrantLock(true); //lock為公平鎖public static void main(String[] args) {Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {System.out.println("線程1啟動...");} finally {lock.unlock();}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {System.out.println("線程2啟動...");} finally {lock.unlock();}}});Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {System.out.println("線程3啟動...");} finally {lock.unlock();}}});t1.start();t3.start();t2.start();} } 運行結果: 線程1啟動... 線程3啟動... 線程2啟動...
在ReentrantLock中的構造函數中,提供了一個參數,指定是否為公平鎖。
公平鎖:線程將按照它們發出的請求順序來獲得鎖?
非公鎖:當一個線程請求非公平鎖的時候,如果發出請求時,獲得鎖的線程剛好釋放鎖,則該線程將會獲得鎖而跳過在該鎖上等待的線程。
2.??????ReenTrantLock提供了一個Condition(條件)類,用來實現分組喚醒需要喚醒的線程們,而不是像synchronized要么隨機喚醒一個線程要么喚醒全部線程。
3.??????ReenTrantLock提供了一種能夠中斷等待鎖的線程的機制,通過lock.lockInterruptibly()來實現這個機制。可以讓它中斷自己或者在別的線程中中斷它,中斷后可以放棄等待,去? 處理其他事,而不可中斷鎖不會響應中斷,將一直等待,synchronized就是不可中斷。
4.? ? ??ReenTrantLock的tryLock(long time, TimeUnit unit)起到了定時鎖的作用,如果在指定時間內沒有獲取到鎖,將會返回false。應用:具有時間限制的操作時使用?


public class TestReentrantLock {public static void main(String[] args) {Lock r = new ReentrantLock();//線程1Thread thread1 = new Thread(new Runnable() {@Overridepublic void run() {//獲得鎖r.lock();try {System.out.println("線程1獲得了鎖");//睡眠5秒Thread.currentThread().sleep(5000);} catch (InterruptedException e) {e.printStackTrace();} finally {r.unlock();}}});thread1.start();//線程2Thread thread2 = new Thread(new Runnable() {@Overridepublic void run() {try {if (r.tryLock(1000, TimeUnit.MILLISECONDS)) {System.out.println("線程2獲得了鎖");} else {System.out.println("獲取鎖失敗了");}} catch (InterruptedException e) {e.printStackTrace();}}});thread2.start();} }
?
什么情況下使用ReenTrantLock:
答案是,如果你需要實現ReenTrantLock的四個獨有功能時。因為對于?java.util.concurrent.lock
?中的鎖定類來說,synchronized 仍然有一些優勢。比如,在使用 synchronized 的時候,不可能忘記釋放鎖;在退出?synchronized
?塊時,JVM 會為您做這件事。您很容易忘記用?finally
?塊釋放鎖,這對程序非常有害。
?
性能比較:
synchronized:?
在資源競爭不是很激烈的情況下,偶爾會有同步的情形下,synchronized是很合適的。原因在于,編譯程序通常會盡可能的進行優化synchronize,另外可讀性非常好,不管用沒用過5.0多線程包的程序員都能理解。?
ReentrantLock:?
ReentrantLock提供了多樣化的同步,比如有時間限制的同步,可以被Interrupt的同步(synchronized的同步是不能Interrupt的)等。在資源競爭不激烈的情形下,性能稍微比synchronized差點點。但是當同步非常激烈的時候,synchronized的性能一下子能下降好幾十倍。而ReentrantLock確還能維持常態。?
Atomic:?
和上面的類似,不激烈情況下,性能比synchronized略遜,而激烈的時候,也能維持常態。激烈的時候,Atomic的性能會優于ReentrantLock一倍左右。但是其有一個缺點,就是只能同步一個值,一段代碼中只能出現一個Atomic的變量,多于一個同步無效。因為他不能在多個Atomic之間同步。?
所以,我們寫同步的時候,優先考慮synchronized,如果有特殊需要,再進一步優化。ReentrantLock和Atomic如果用的不好,不僅不能提高性能,還可能帶來災難。?
?
?