概述
Java中可以通過加鎖,來保證多個線程訪問某一個公共資源時,資源的訪問安全性。Java提出了兩種方式來加鎖
- 第一種是我們上文提到的通過關鍵字synchronized加鎖,synchronized底層托管給JVM執行的,并且在java 1.6 以后做了很多優化(偏向鎖、自旋、輕量級鎖),使用很方便且性能也很好,所以在非必要的情況下,建議使用synchronized做同步操作;
- 第二種是本文將要介紹的通過java.util.concurrent包下的Lock來加鎖(lock大量使用CAS+自旋。因此根據CAS特性建議在低鎖沖突的情況下使用lock)
AQS
概述
- AQS全稱AbstractQueuedSynchronizer,譯為抽象隊列同步器
- AQS底層數據結構是被volatile修飾state和一個Node雙向隊列
- Lock下的實現類包括ReentrantLock、ReadLock、WriteLock底層都是基于AQS實現鎖資源獲取或釋放
內部結構

state是一個由volatile修飾的int型互斥變量,state=0表示沒有任務線程使用該資源,而state>=1表示已經有線程正在持有鎖資源。CLH隊列是由內部類Node來維護的FIFO隊列
實現原理
當一個線程獲取鎖資源時首先會判斷state是否等于0(無鎖狀態),如果是0則把這個state更新為1,此時該鎖資源被占用。在這個過程中,如果多個線程同時進行state更新操作,就會導致線程的安全性問題。因此AQS底層采用了CAS機制,來保證互斥變量state更新的原子性。未獲得鎖的線程通過Unsafe類中的park方法去進行阻塞,把阻塞的線程按照先進先出的原則放到CLH雙向鏈表中,當獲得鎖的線程釋放鎖后,會從這個雙向鏈表的頭部去喚醒下一個等待的線程再去競爭鎖。
公平鎖和非公平鎖
在競爭鎖資源時,公平鎖要判斷雙向鏈表中是否有阻塞的線程,如果有則需要去排隊等待。而非公平鎖的處理方式是,不管雙向鏈表中是否有阻塞的線程在排隊等待,它都會去嘗試修改state變量去競爭鎖,這對鏈表中排隊的線程來說是非公平的。
Lock接口
Lock實現類
- JDK8中,除了StampedLock為不可重入鎖,其他包括ReentrantLock、ReentrantReadWriteLock以及Synchronized關鍵字都是可重入鎖
- 可重入鎖是指一個線程搶占到了互斥鎖資源且在鎖釋放之前可以重復獲取該鎖資源,只需要記錄重入次數state遞增1即可
- lock實際上是通過更新AQS中的state來控制鎖的持有情況
Lock方法
// 嘗試獲取鎖,獲取成功則返回,否則阻塞當前線程
void lock();
// 嘗試獲取鎖,線程在成功獲取鎖之前被中斷,則放棄獲取鎖,拋出異常
void lockInterruptibly() throws InterruptedException;
// 嘗試獲取鎖,獲取鎖成功則返回true,否則返回false
boolean tryLock();
// 嘗試獲取鎖,若在規定時間內獲取到鎖,則返回true,否則返回false,未獲取鎖之前被中斷,則拋出異常
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 釋放鎖
void unlock();
// 返回當前鎖的條件變量,通過條件變量可以實現類似notify和wait的功能,一個鎖可以有多個條件變量
Condition newCondition();
ReentrantLock
- 根據源碼可以看到實現鎖功能的關鍵成員變量Sync類型的sync繼承AQS
- Sync在ReentrantLock中有兩個實現類NonfairSync公平鎖類型和FairSync非公平鎖類型
- ReentrantLock默認是非公平鎖實現,在實例化時可以指定選擇公平鎖或者非公平鎖
ReentrantLock獲取鎖流程
//.lock()調用的是AQS的acquire()
public void lock() {sync.acquire(1);
}public final void acquire(int arg) {//tryAcquire:會嘗試通過CAS獲取一次鎖。//addWaiter:將當前線程加入雙向鏈表(等待隊列)中//acquireQueued:通過自旋,判斷當前隊列節點是否可以獲取鎖if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}//---------------------非公平鎖嘗試獲取鎖的過程---------------------
protected final boolean tryAcquire(int acquires) {// AQS的nonfairTryAcquire()方法return nonfairTryAcquire(acquires);
}final boolean nonfairTryAcquire(int acquires) {// 獲取當前線程final Thread current = Thread.currentThread();// 獲取stateint c = getState();if (c == 0) {// 目前沒有線程獲取鎖,通過CAS(樂觀鎖)去修改state的值if (compareAndSetState(0, acquires)) {// 設置持有鎖的線程為當前線程setExclusiveOwnerThread(current);return true;}}// 鎖的持有者是當前線程(重入鎖)else if (current == getExclusiveOwnerThread()) {// state + 1int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}//---------------------當前線程加入雙向鏈表的過程---------------------
private Node addWaiter(Node mode) {Node node = new Node(mode);for (;;) {// 獲取末位節點Node oldTail = tail;if (oldTail != null) {// 當前節點的prev設置為原末位節點node.setPrevRelaxed(oldTail);// CAS確保在線程安全的情況下,將當前線程加入到鏈表的尾部if (compareAndSetTail(oldTail, node)) {// 原末位節點的next設置為當前節點oldTail.next = node;return node;}} else {// 鏈表為空則初始化initializeSyncQueue();}}
}//---------------------首節點自旋過程---------------------
final boolean acquireQueued(final Node node, int arg) {boolean interrupted = false;try {for (;;) {final Node p = node.predecessor();// 首節點線程去嘗試競爭鎖if (p == head && tryAcquire(arg)) {// 成功獲取到鎖,從首節點移出(FIFO)setHead(node);p.next = null; // help GCreturn interrupted;}if (shouldParkAfterFailedAcquire(p, node))interrupted |= parkAndCheckInterrupt();}} catch (Throwable t) {cancelAcquire(node);if (interrupted)selfInterrupt();throw t;}
}
ReentrantLock釋放鎖流程
釋放鎖本質就是對AQS中的狀態值State進行逐步遞減操作
//.unlock()調用AQS的release()方法釋放鎖資源
public void unlock() {sync.release(1);
}public final boolean release(int arg) {// Sync的tryRelease()方法if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}protected final boolean tryRelease(int releases) {// 獲取狀態int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 修改鎖的持有者為nullif (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
ReentrantReadWriteLock
- ReentrantReadWriteLock讀寫鎖可以分別獲取讀鎖或寫鎖,即將數據的讀寫操作分開;
- writeLock():獲取寫鎖
- writeLock().lock():為寫鎖上鎖
- writeLock().unlock():為寫鎖釋放鎖
- readLock():獲取讀鎖
- readLock().lock():為讀鎖上鎖
- readLock().unlock():為讀鎖釋放鎖
- 讀鎖使用共享模式,寫鎖使用獨占模式。即不存在寫鎖時,讀鎖可以被多個線程同時持有;存在寫鎖時,除了獲得寫鎖的這個線程可以獲得讀鎖外,其他線程不能獲得讀鎖;而當有讀鎖時,寫鎖就不能獲得
- 適用于讀多寫少應用場景,如緩存
Condition
概述?
- Condition也是一種線程通信的機制,通過await和singalAll()實現線程阻塞和喚醒
- 底層數據結構是復用AQS的Node類,由不帶頭結點的鏈表實現的隊列
- await實現原理:通過LockSupport.park將當前線程置于Waiting阻塞狀態,直到其他線程調用signal或signalAll將等待隊列的隊頭結點移入到同步隊列中,使其有機會通過自旋獲取到鎖
- signal/signalAll:將等待隊列的隊頭結點移入到同步隊列中,并通過LockSupport.unpark喚醒該線程
- 與Object的wait/notify機制對比
- Condition支持不響應中斷,而object不能
- Lock可以支持多個condition等待隊列,object只能支持一個
- Condition能夠對await設置超時時間,而object不能
- 可以通過Lock+Condition實現生產者-消費者問題(在后文并發實踐篇會有相關示例)
Condition實踐
package com.bierce;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*** Lock + Condition: 實現線程按序打印* 案例:開啟3個線程,id分別為A、B、C,并打印10次,而且按順序交替打印如:ABCABCABC...*/
public class TestCondition {public static void main(String[] args) {PrintByOrderDemo print = new PrintByOrderDemo();new Thread(() -> {for (int i = 1; i <= 10; i++) {// 打印10次print.loopA(i);}},"A").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {print.loopB(i);}},"B").start();new Thread(() -> {for (int i = 1; i <= 10; i++) {print.loopC(i);}},"C").start();}
}
class PrintByOrderDemo{private int number = 1; //當前正在執行線程的標記private Lock lock = new ReentrantLock();private Condition ConditionA = lock.newCondition();private Condition ConditionB = lock.newCondition();private Condition ConditionC = lock.newCondition();public void loopA(int totalLoop){lock.lock();try {if ( number != 1){ //判斷當前是否打印AConditionA.await();}for (int i = 1; i <= 1; i++) {System.out.print(Thread.currentThread().getName()); //打印A}//喚醒其他線程number = 2;ConditionB.signal();}catch (Exception e){e.printStackTrace();} finally {lock.unlock();}}public void loopB(int totalLoop){lock.lock();try {if ( number != 2){ //判斷當前是否打印BConditionB.await();}for (int i = 1; i <= 1; i++) {System.out.print(Thread.currentThread().getName()); //打印B}//喚醒其他線程number = 3;ConditionC.signal();}catch (Exception e){e.printStackTrace();} finally {lock.unlock();}}public void loopC(int totalLoop){lock.lock();try {if ( number != 3){ //判斷當前是否打印CConditionC.await();}for (int i = 1; i <= 1; i++) {System.out.print(Thread.currentThread().getName()); //打印C}//喚醒其他線程number = 1;ConditionA.signal();}catch (Exception e){e.printStackTrace();} finally {lock.unlock();}}
}