第一章:線程基礎
1.1 線程與進程
進程:系統資源分配的基本單位,擁有獨立的內存空間
線程:CPU調度的基本單位,共享進程內存空間
關系:一個進程可包含多個線程,線程切換成本遠低于進程
1.2 線程的創建方式
1.2.1 繼承Thread類
public class MyThread extends Thread {@Overridepublic void run() {System.out.println("Thread running: " + Thread.currentThread().getName());}public static void main(String[] args) {MyThread thread = new MyThread();thread.start(); // 啟動線程,不能直接調用run()}
}
1.2.2 實現Runnable接口
public class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println("Runnable running: " + Thread.currentThread().getName());}public static void main(String[] args) {Thread thread = new Thread(new MyRunnable());thread.start();}
}
1.2.3 使用Callable和Future
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;public class MyCallable implements Callable<Integer> {@Overridepublic Integer call() throws Exception {return 1 + 1;}public static void main(String[] args) throws Exception {FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());new Thread(futureTask).start();System.out.println("Result: " + futureTask.get()); // 獲取返回值}
}
1.3 線程生命周期與狀態轉換
1.3.1 六種狀態定義
- NEW:線程創建但未啟動
- RUNNABLE:可運行狀態(包含就緒和運行中)
- BLOCKED:等待監視器鎖的阻塞狀態
- WAITING:無限期等待狀態
- TIMED_WAITING:限期等待狀態
- TERMINATED:終止狀態
1.3.2 狀態轉換完整圖示
┌───────────┐ start() ┌───────────┐
│ NEW ├────────────────?│ RUNNABLE │
└───────────┘ └─────┬─────┘│┌───────────────────────────┼───────────────────────────┐│ │ │▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────────────┐
│ BLOCKED │ │ WAITING │ │ TIMED_WAITING │
└─────┬─────┘ └─────┬─────┘ └─────────┬─────────┘│ │ │└───────────────────────┼───────────────────────────┘│▼┌───────────┐│TERMINATED │└───────────┘
1.3.3 狀態轉換觸發條件
狀態轉換 | 觸發條件 |
---|---|
NEW→RUNNABLE | 調用start()方法 |
RUNNABLE→BLOCKED | 進入synchronized塊/方法未獲取到鎖 |
RUNNABLE→WAITING | 調用Object.wait()、Thread.join()、LockSupport.park() |
RUNNABLE→TIMED_WAITING | 調用Thread.sleep(long)、Object.wait(long)、Thread.join(long) |
BLOCKED/WAITING/TIMED_WAITING→RUNNABLE | 獲取到鎖、被喚醒、超時 |
所有狀態→TERMINATED | run()執行完畢或拋出未捕獲異常 |
1.4 并發帶來的問題
不可見性,亂序性,非原子性 ,這里先介紹一個概念
1.4.1 JMM
Java 內存模型(Java Memory Model,JMM)規范了Java 虛擬機與計算機內存是如何協同工作的。Java 虛擬機是一個完整的計算機的一個模型,因此這個模型自然也包含一個內存模型——又稱為 Java 內存模型。
JVM 主內存與工作內存
Java 內存模型中規定了所有的變量都存儲在主內存中,每條線程還有自己的工作內存,線程對變量的所有操作都必須在工作內存中進行,而不能直接讀寫主內存中的變量。
這里的工作內存是 JMM 的一個抽象概念,也叫本地內存,其存儲了該線程讀/寫共享變量的副本。
1.4.2 并發帶來的問題
非原子性問題
原子性指一個操作是不可分割的,要么全部執行完成,要么完全不執行。若操作被拆分,多線程并發時會導致中間狀態被干擾。
public class AtomicityDemo {private static int count = 0;public static void main(String[] args) throws InterruptedException {// 1000個線程,每個線程對count自增1000次Thread[] threads = new Thread[1000];for (int i = 0; i < 1000; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 1000; j++) {count++; // 非原子操作}});threads[i].start();}// 等待所有線程執行完畢for (int i = 0; i < 1000; i++) {threads[i].join();}System.out.println("最終count值:" + count); // 預期1000000,實際遠小于該值}
}
現象:最終輸出的 count 值遠小于預期的 1000000。
原因:count++ 并非原子操作,實際分為三步:
讀取 count 的當前值;
將值 + 1;
將結果寫回 count。
多線程并發時,多個線程可能同時讀取到同一個值,導致 “丟失更新”(例如兩個線程同時讀取到 100,都 + 1 后寫回 101,實際應是 102)。
因為JVM 底層執行的時候是指令執行的,我們寫的Java代碼會被翻譯為指令,說白了就是 你 在沒有鎖的情況下 寫 ++ 這種可拆的操作會有風險,結果可能會和預期不一致
不可見性問題
可見性指一個線程對共享變量的修改,其他線程能立即看到。若缺乏可見性,線程可能一直使用舊數據,導致邏輯錯誤。
public class VisibilityDemo {private static boolean flag = false; // 未加volatilepublic static void main(String[] args) throws InterruptedException {// 線程1:循環等待flag變為trueThread t1 = new Thread(() -> {while (!flag) {// 空循環,等待flag更新}System.out.println("線程1:檢測到flag變為true,退出循環");});t1.start();// 主線程:1秒后將flag改為trueThread.sleep(1000);flag = true;System.out.println("主線程:已將flag設為true");}
}
現象:線程 1 可能永遠不會退出循環,始終打印 “主線程:已將 flag 設為 true”,但線程 1 無輸出。
原因:線程 1 運行時,會將 flag 從主內存加載到自己的工作內存中;
由于 flag 沒有volatile修飾,主線程對 flag 的修改可能僅更新自己的工作內存,未及時同步到主內存;
線程 1 始終讀取自己工作內存中的舊值(false),導致循環無法退出。
有序性問題
有序性指程序執行順序與代碼順序一致。編譯器或 CPU 為優化性能,可能對指令重排序,若重排序破壞了邏輯依賴,會導致并發問題。
public class Singleton {private static Singleton instance; // 未加volatileprivate Singleton() {}public static Singleton getInstance() {if (instance == null) { // 第一次檢查synchronized (Singleton.class) {if (instance == null) { // 第二次檢查instance = new Singleton(); // 可能發生指令重排序}}}return instance;}
}
現象:多線程環境下,可能獲取到未初始化完成的 Singleton 實例,導致空指針異常。
原因:instance = new Singleton() 可拆分為 3 步:
分配內存空間(memory = allocate ());
初始化對象(ctorInstance (memory));
將 instance 指向內存地址(instance = memory)。編譯器可能重排序為 1→3→2:線程 A 執行到 3 時,instance 已非 null(指向未初始化的內存);
此時線程 B 進入getInstance(),第一次檢查發現 instance≠null,直接返回未初始化的實例,導致使用時出錯。
第二章:同步機制與鎖
2.1 synchronized關鍵字詳解
2.1.1 鎖對象類型及區別
鎖類型 | 語法形式 | 鎖對象 | 作用范圍 | 示例 |
---|---|---|---|---|
實例方法鎖 | synchronized void method(){} | 當前實例(this) | 整個方法 | public synchronized void add() |
靜態方法鎖 | static synchronized void method(){} | 類的Class對象 | 整個靜態方法 | public static synchronized void increment() |
代碼塊鎖-實例 | synchronized(obj){} | 指定實例對象 | 代碼塊 | synchronized(this){ ... } |
代碼塊鎖-類 | synchronized(Class.class){} | 類的Class對象 | 代碼塊 | synchronized(MyClass.class){ ... } |
實例鎖與類鎖的獨立性:
public class SyncDemo {// 實例鎖public synchronized void instanceLock() {}// 類鎖public static synchronized void classLock() {}public static void main(String[] args) {SyncDemo demo = new SyncDemo();// 可以同時獲取實例鎖和類鎖,兩者相互獨立new Thread(demo::instanceLock).start();new Thread(SyncDemo::classLock).start();}
}
2.1.2 底層實現:對象頭與Monitor
對象頭結構(64位JVM):
┌─────────────────────────────────────────────────────────────┐
│ Object Header (128 bits) │
├───────────────────────┬───────────────────────────────────┤
│ Mark Word (64 bits) │ Class Metadata (64 bits)│
└───────────────────────┴───────────────────────────────────┘
在 Hotspot 虛擬機中,對象在內存中的布局分為三塊區域:對象頭、實例數據和對齊填充;Java 對象頭是實現 synchronized 的鎖對象的基礎,一般而言,synchronized 使用的鎖對象是存儲在 Java 對象頭里。它是輕量級鎖和偏向鎖的關鍵。
對象頭中有一塊區域稱為 Mark Word,用于存儲對象自身的運行時數據,如哈希碼(HashCode)、GC 分代年齡、鎖狀態標志、線程持有的鎖、偏向線程ID等等。
32 位操作系統 Mark Word 為 32bit 為,64 位操作系統Mark Word為64bit. 下面就是對象頭的一些信息:
Mark Word狀態變化:
鎖狀態 | 結構 | 鎖標志位 | 偏向鎖標志 |
---|---|---|---|
無鎖 | hashcode(31) + 分代年齡(4) + 0 + 001 | 001 | 0 |
偏向鎖 | 線程ID(54) + epoch(2) + 分代年齡(4) + 1 + 001 | 001 | 1 |
輕量級鎖 | 指向棧中鎖記錄的指針(62) + 00 | 00 | - |
重量級鎖 | 指向Monitor的指針(62) + 10 | 10 | - |
GC標記 | 空 + 11 | 11 | - |
Monitor機制:
- 每個對象關聯一個Monitor(管程)
- 包含WaitSet(等待隊列)、EntryList(阻塞隊列)和Owner(持有線程)
- 執行synchronized時,線程通過CAS競爭Monitor的Owner
2.1.3 鎖升級完整流程
下面先介紹一些常見鎖的概念
樂觀鎖/悲觀鎖
樂觀鎖與悲觀鎖不是指具體的什么類型的鎖,而是指看待并發同步的角度。樂觀鎖則認為對于同一個數據的并發操作,是不會發生修改的。在更新數據的時候,會采用嘗試更新,不斷重新的方式更新數據。樂觀的認為,不加鎖的并發操作是沒有事情的。
悲觀鎖認為對于同一個數據的并發操作,一定是會發生修改的,哪怕沒有修改,也會認為修改。因此對于同一個數據的并發操作,悲觀鎖采取加鎖的形式。悲觀的認為,不加鎖的并發操作一定會出問題。
從上面的描述我們可以看出,悲觀鎖適合寫操作非常多的場景,樂觀鎖適合讀操作非常多的場景,不加鎖會帶來大量的性能提升。
悲觀鎖在 Java 中的使用,就是利用各種鎖。
樂觀鎖在 Java 中的使用,是無鎖編程,常常采用的是CAS 算法,典型的例子就是原子類,通過 CAS 自旋實現原子操作的更新。
可重入鎖
可重入鎖又名遞歸鎖,是指在同一個線程在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖。對于 ReentrantLock 而言, 他的名字就可以看出是一個可重入鎖,其名字是 ReentrantLock 重新進入鎖。
對于 Synchronized 而言,也是一個可重入鎖。可重入鎖的一個好處是可一定程度避免死鎖。
CAS
CAS(Compare-And-Swap) :比較并交換,該算法是硬件對于并發操作的支持。CAS 是樂觀鎖的一種實現,他采用的是自旋的思想,是一種輕量級的鎖機制。即***每次判斷我的預期值和內存中的值是不是相同,如果不相同則說明該內存值已經被其他線程更新過了,因此需要拿到該最新值作為預期值,重新判斷。而該線程不斷的循環判斷是否該內存值已經被其他線程更新過了,這就是自旋的思想。***底層是通過 Unsafe 類中的 compareAndSwapInt 等方法實現.
讀寫鎖
讀寫鎖特點: 讀讀不互斥,讀寫互斥,寫寫互斥
加讀鎖是防止在另外的線程在此時寫入數據,防止讀取臟數據提高讀的效率
分段鎖
分段鎖并非一種實際的鎖,而是一種思想,用于將數據分段,并在每個分段上都會單獨加鎖,把鎖進一步細粒度化,以提高并發效率。
自旋鎖
所謂自旋其實指的就是自己重試,當線程搶鎖失敗后,重試幾次,要是搶到鎖了就繼續,要是搶不到就持續循環嘗試(自旋轉)。說白了還是為了盡量不要阻塞線程。由此可見,自旋鎖是是比較消耗 CPU 的,因為要不斷的循環重試,不會釋放CPU資源。另外,加鎖時間普遍較短的場景非常適合自旋鎖,可以極大提高鎖的效率。
共享鎖/獨占鎖
共享鎖是指該鎖可被多個線程所持有,并發訪問共享資源。
獨占鎖也叫互斥鎖,是指該鎖一次只能被一個線程所持有。
對于 Java ReentrantLock,Synchronized 而言,都是獨享鎖。ReadWriteLock 是接口,其實現類(如 ReentrantReadWriteLock)支持讀寫分離鎖。讀鎖的共享鎖可保證并發讀是非常高效的,讀寫,寫讀,寫寫的過程是互斥的。
1. 偏向鎖
- 適用場景:單線程重復獲取鎖
- 實現:在Mark Word中存儲線程ID,后續獲取只需比對ID
- 撤銷條件:當第二個線程嘗試獲取鎖時,觸發偏向鎖撤銷
2. 輕量級鎖
- 適用場景:多線程交替執行同步塊
- 實現:
- 線程在棧幀中創建Lock Record
- CAS將Mark Word復制到Lock Record并替換為指針
- 競爭時通過自旋嘗試獲取鎖
3. 重量級鎖
- 適用場景:多線程激烈競爭
- 實現:向操作系統申請互斥量(Mutex),線程阻塞等待
- 性能影響:涉及內核態切換,開銷較大
升級觸發條件:
無鎖 → 偏向鎖(首次獲取)→ 輕量級鎖(第二個線程競爭)→ 重量級鎖(自旋失敗或第三個線程競爭)
鎖的狀態升級是只針對 synchronized 來說的
代碼演示鎖升級:
import org.openjdk.jol.info.ClassLayout;public class LockUpgradeDemo {public static void main(String[] args) throws InterruptedException {Object lock = new Object();System.out.println("無鎖狀態:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());// 偏向鎖synchronized (lock) {System.out.println("\n偏向鎖狀態:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());}// 輕量級鎖(另一個線程競爭)Thread t1 = new Thread(() -> {synchronized (lock) {try { Thread.sleep(100); } catch (InterruptedException e) {}}});t1.start();t1.join();System.out.println("\n輕量級鎖狀態:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());// 重量級鎖(多線程競爭)Thread t2 = new Thread(() -> synchronized (lock) {});Thread t3 = new Thread(() -> synchronized (lock) {});t2.start();t3.start();t2.join();t3.join();System.out.println("\n重量級鎖狀態:");System.out.println(ClassLayout.parseInstance(lock).toPrintable());}
}
2.2 ReentrantLock深入解析
ReentrantLock 是 java.util.concurrent.locks 包下的類, 實現Lock接口,Lock 的意義在于提供區別于 synchronized 的另一種具有更多廣泛操作的同步方式,它能支持更多靈活的結構
ReentrantLock 基于 AQS,在并發編程中它可以實現公平鎖和非公平鎖來對共享資源進行同步,同時和 synchronized 一樣,ReentrantLock 支持可重入,除此之外,ReentrantLock 在調度上更靈活,支持更多豐富的功能。
ReentrantLock 總共有三個內部類,并且三個內部類是緊密相關的
public class ReentrantLock implements Lock {// 1. 抽象同步器:繼承自 AQS,定義鎖的基本操作abstract static class Sync extends AbstractQueuedSynchronizer { ... }// 2. 非公平鎖實現:繼承自 Syncstatic final class NonfairSync extends Sync { ... }// 3. 公平鎖實現:繼承自 Syncstatic final class FairSync extends Sync { ... }// 4. 條件對象(非內部類,但與鎖密切相關)public class ConditionObject implements Condition { ... }
}
2.2.1 類結構與繼承關系
虛線表示實現接口,實線表示繼承
AQS 在 ReentrantLock 中的關鍵屬性
public class ReentrantLock implements Lock {private final Sync sync; // 鎖的核心實現,繼承自 AQS// 抽象同步器,繼承自 AQSabstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock(); // 由子類實現的加鎖方法// 其他方法...}// 非公平鎖實現static final class NonfairSync extends Sync { ... }// 公平鎖實現static final class FairSync extends Sync { ... }
}
2.2.2 公平鎖與非公平鎖實現對比
1. 加鎖流程差異
- 非公平鎖加鎖流程
static final class NonfairSync extends Sync {final void lock() {// 關鍵區別:直接嘗試 CAS 搶占鎖,不管隊列中是否有等待線程if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1); // 失敗則進入 AQS 的正常獲取流程}// 重寫 AQS 的 tryAcquire 方法protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}// Sync 類中的非公平獲取鎖方法
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 關鍵區別:不檢查隊列,直接嘗試 CASif (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
搶占式獲取:線程進入 lock() 時直接嘗試 CAS 修改 state,即使隊列中已有等待線程。
失敗后入隊:如果 CAS 失敗(鎖已被占用),才進入 AQS 隊列等待。
公平鎖加鎖流程
static final class FairSync extends Sync {final void lock() {acquire(1); // 調用 AQS 的 acquire 方法}// 重寫 AQS 的 tryAcquire 方法protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); // 獲取 AQS 的同步狀態if (c == 0) {// 關鍵區別:先檢查隊列中是否有前驅節點,再嘗試 CASif (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); // 設置當前線程為鎖的持有者return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires; // 重入鎖計數增加if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
hasQueuedPredecessors():檢查隊列中是否有其他線程在等待(判斷當前線程是否是隊列的第一個節點)。
如果有前驅節點,則當前線程必須入隊等待,保證公平性。
CAS 操作:僅當隊列中沒有前驅節點時,才嘗試通過 CAS 修改 state。
2. tryAcquire方法核心區別
-
非公平鎖
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 不檢查隊列,直接CASif (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入邏輯...return false; }
-
公平鎖:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 檢查隊列中是否有前驅線程if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 重入邏輯...return false; }
3. 性能對比
特性 | 公平鎖 | 非公平鎖 |
---|---|---|
吞吐量 | 較低 | 較高 |
響應時間 | 穩定(FIFO) | 可能波動 |
上下文切換 | 頻繁 | 較少 |
饑餓風險 | 無 | 可能有 |
適用場景 | 對順序敏感場景 | 高并發吞吐量優先 |
2.3 CAS與原子操作
2.3.1 CAS原理深度解析
CAS(Compare-And-Swap) 是樂觀鎖的核心實現,包含三個操作數:
- V:要更新的內存位置
- A:預期值
- B:新值
CPU指令實現:
- x86架構:
lock cmpxchg
指令 - ARM架構:
ldrex/strex
指令組合
Java實現:
// Unsafe類的CAS方法
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int update);
AtomicInteger自增實現:
public class AtomicInteger extends Number implements java.io.Serializable {private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}private volatile int value;public final int incrementAndGet() {return unsafe.getAndAddInt(this, valueOffset, 1) + 1;}
}
2.3.2 ABA問題及解決方案
ABA問題:當變量從A變為B再變回A時,CAS無法檢測中間變化
解決方案:
- 版本號機制:每次更新增加版本號
- AtomicStampedReference:Java提供的帶版本戳的原子引用
代碼示例:
import java.util.concurrent.atomic.AtomicStampedReference;public class ABADemo {public static void main(String[] args) {// 初始值100,版本號1AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(100, 1);// 線程1執行ABA操作new Thread(() -> {int stamp = asr.getStamp();System.out.println("線程1初始版本號: " + stamp);// A→Basr.compareAndSet(100, 101, stamp, stamp + 1);System.out.println("線程1修改后版本號: " + asr.getStamp());// B→Aasr.compareAndSet(101, 100, asr.getStamp(), asr.getStamp() + 1);System.out.println("線程1再次修改后版本號: " + asr.getStamp());}, "線程1").start();// 線程2嘗試修改new Thread(() -> {int stamp = asr.getStamp();System.out.println("線程2初始版本號: " + stamp);try { Thread.sleep(1000); } catch (InterruptedException e) {}// 版本號已變化,修改失敗boolean success = asr.compareAndSet(100, 200, stamp, stamp + 1);System.out.println("線程2修改是否成功: " + success);System.out.println("當前值: " + asr.getReference());System.out.println("當前版本號: " + asr.getStamp());}, "線程2").start();}
}
輸出結果:
線程1初始版本號: 1
線程1修改后版本號: 2
線程1再次修改后版本號: 3
線程2初始版本號: 1
線程2修改是否成功: false
當前值: 100
當前版本號: 3
第三章:AQS框架詳解
AQS 的 全 稱 為 ( AbstractQueuedSynchronizer ),這個類在java.util.concurrent.locks 包下面。
AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出同步 器 , 是 JUC 中 核 心 的 組 件 , 比如我們提到的ReentrantLock,CountDownLatch 等等都是基于 AQS 來實現。只要搞懂了 AQS,那么 JUC 中絕大部分的 api 都能掌握。
3.1 AQS核心結構
AbstractQueuedSynchronizer是Java并發工具的基礎框架,核心組成:
- 狀態變量:
volatile int state
,表示同步狀態 - 同步隊列:CLH雙向鏈表,存儲等待線程
- 條件隊列:單向鏈表,用于Condition等待/通知
Node節點結構:
static final class Node {volatile int waitStatus; // 等待狀態volatile Node prev; // 前驅節點volatile Node next; // 后繼節點volatile Thread thread; // 等待線程Node nextWaiter; // 條件隊列后繼節點
}
3.2 獨占鎖獲取流程(acquire)
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
流程分解:
- tryAcquire:嘗試獲取鎖(子類實現)
- addWaiter:將線程封裝為Node加入隊列尾部
- acquireQueued:在隊列中自旋等待獲取鎖
- selfInterrupt:處理中斷狀態
3.3 Condition條件變量
Condition提供比Object.wait/notify更靈活的線程協作機制:
- 每個Lock可以創建多個Condition
- 支持精確的線程喚醒控制
實現原理:
- 等待隊列:每個Condition維護一個單向等待隊列
- await():釋放鎖并加入等待隊列
- signal():將等待隊列節點轉移到同步隊列
代碼示例:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ConditionDemo {private static final Lock lock = new ReentrantLock();private static final Condition condition = lock.newCondition();private static boolean flag = false;public static void main(String[] args) {// 等待線程new Thread(() -> {lock.lock();try {while (!flag) {System.out.println("條件不滿足,進入等待");condition.await(); // 釋放鎖并等待}System.out.println("條件滿足,繼續執行");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "等待線程").start();// 通知線程new Thread(() -> {lock.lock();try {flag = true;System.out.println("條件已滿足,發出通知");condition.signal(); // 喚醒等待線程} finally {lock.unlock();}}, "通知線程").start();}
}
第四章:并發工具與實戰
4.1 線程池原理與使用
核心參數:
public ThreadPoolExecutor(int corePoolSize, // 核心線程數int maximumPoolSize, // 最大線程數long keepAliveTime, // 非核心線程空閑時間TimeUnit unit, // 時間單位BlockingQueue<Runnable> workQueue, // 任務隊列ThreadFactory threadFactory, // 線程工廠RejectedExecutionHandler handler // 拒絕策略
)
工作流程:
- 提交任務時,若核心線程未滿則創建核心線程
- 核心線程滿則加入任務隊列
- 隊列滿則創建非核心線程
- 總線程數達最大值則執行拒絕策略
常用線程池:
- FixedThreadPool:固定大小線程池
- CachedThreadPool:可緩存線程池
- ScheduledThreadPool:定時任務線程池
- SingleThreadExecutor:單線程線程池
4.2 并發容器
容器類 | 特點 | 適用場景 |
---|---|---|
ConcurrentHashMap | 分段鎖/CAS實現,支持高并發讀寫 | 高頻讀寫場景 |
CopyOnWriteArrayList | 寫時復制,讀無鎖 | 讀多寫少場景 |
ConcurrentLinkedQueue | 無鎖隊列,CAS操作 | 高并發生產者消費者 |
BlockingQueue | 支持阻塞的隊列 | 線程間通信 |
4.3 synchronized與ReentrantLock對比
特性 | synchronized | ReentrantLock |
---|---|---|
實現層面 | JVM內置鎖 | JDK API |
鎖類型 | 可重入、非公平 | 可重入、可公平/非公平 |
靈活性 | 低(自動釋放) | 高(可中斷、超時、嘗試獲取) |
性能 | JDK1.6后優化,與Lock接近 | 高并發下更優 |
條件變量 | 一個對象一個條件隊列 | 可創建多個Condition |
底層實現 | 對象頭+Monitor | AQS框架+CAS |
代碼示例對比:
// synchronized實現
public class SyncExample {private final Object lock = new Object();public void doSync() {synchronized (lock) {// 臨界區}}
}// ReentrantLock實現
public class LockExample {private final Lock lock = new ReentrantLock();public void doLock() {lock.lock();try {// 臨界區} finally {lock.unlock(); // 必須手動釋放}}
}
第五章:總結與最佳實踐
5.1 核心知識點總結
- 線程模型:掌握線程狀態轉換及生命周期管理
- 鎖機制:理解synchronized鎖升級過程和ReentrantLock實現原理
- AQS框架:熟悉同步隊列和條件隊列的工作機制
- 并發工具:合理選擇線程池參數和并發容器
- 性能優化:根據場景選擇合適的鎖策略和同步機制
5.2 并發編程最佳實踐
- 減少鎖持有時間:同步塊盡可能小
- 降低鎖粒度:如ConcurrentHashMap的分段鎖思想
- 避免死鎖:固定加鎖順序、使用tryLock超時機制
- 優先使用并發容器:減少手動同步
- 合理設置線程池參數:根據CPU核心數和任務類型調整
- 使用原子類:無鎖情況下替代鎖機制
- 避免過度同步:不需要同步的代碼不要加鎖
5.3 常見問題排查
-
死鎖檢測:使用jstack命令分析線程狀態
-
鎖競爭:通過JConsole觀察線程阻塞情況
-
內存可見性:正確使用volatile和final
-
線程泄漏:確保線程池正確關閉
-
CPU過高:檢查是否存在無限循環或過度自旋
通過深入理解Java并發編程的底層原理和框架實現,能夠編寫出高效、安全的多線程程序,在面對高并發場景時能夠做出合理的技術選型和性能優化。
一、高頻場景:輕量級同步
1. synchronized 關鍵字
優勢:語法簡單,自動釋放鎖,JVM 對其進行了多種優化(偏向鎖、輕量級鎖)。
適用場景:簡單同步塊、方法級同步。
public class Counter {private int count = 0;public synchronized void increment() {count++;}
}
2. ReentrantLock(非公平模式)
優勢:比 synchronized 更靈活(可中斷、超時、條件變量),競爭激烈時性能更優。
適用場景:復雜同步邏輯,如需要嘗試鎖、可中斷鎖的場景。
private final ReentrantLock lock = new ReentrantLock();public void doCriticalWork() {lock.lock();try {// 臨界區代碼} finally {lock.unlock();}
}
二、讀多寫少場景:讀寫鎖
1. ReentrantReadWriteLock
特性:讀鎖共享,寫鎖獨占,讀寫互斥。
適用場景:緩存更新、數據庫讀寫分離等讀多寫少的場景。
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private Map<String, Object> cache = new HashMap<>();public Object get(String key) {readLock.lock();try {return cache.get(key);} finally {readLock.unlock();}
}public void put(String key, Object value) {writeLock.lock();try {cache.put(key, value);} finally {writeLock.unlock();}
}
2. StampedLock(Java 8+)
特性:支持樂觀讀鎖(無鎖機制),讀性能更高。
適用場景:讀極多、寫極少且寫操作耗時短的場景(如緩存)。
private final StampedLock lock = new StampedLock();
private double x, y;public void move(double deltaX, double deltaY) {long stamp = lock.writeLock();try {x += deltaX;y += deltaY;} finally {lock.unlockWrite(stamp);}
}public double distanceFromOrigin() {long stamp = lock.tryOptimisticRead();double currentX = x, currentY = y;if (!lock.validate(stamp)) {stamp = lock.readLock();try {currentX = x;currentY = y;} finally {lock.unlockRead(stamp);}}return Math.hypot(currentX, currentY);
}
第6章 JUC常用類底層實現原理
JUC 是 java.util.concurrent 包的縮寫,是 Java 并發編程的核心工具類庫,主要用于解決多線程環境下的線程同步、并發控制、線程安全等問題。
6.1 ConcurrentHashMap
6.1.1 實現演進
JDK7 vs JDK8+對比
版本 | 核心技術 | 數據結構 | 并發度 | 鎖粒度 |
---|---|---|---|---|
JDK7 | 分段鎖(Segment) | Segment數組 + HashEntry數組 + 鏈表 | 等于Segment數量(默認16) | Segment級 |
JDK8+ | CAS + synchronized | Node數組 + 鏈表 + 紅黑樹 | 等于CPU核心數 | 桶頭節點級 |
JDK8+核心改進
- 取消分段鎖:減少鎖競爭,提高并發度
- 紅黑樹優化:鏈表長度>8時轉為紅黑樹,查詢復雜度從O(n)→O(log n)
- CAS無鎖操作:空桶插入時使用CAS避免加鎖
- synchronized細粒度鎖:僅鎖定當前桶的頭節點
6.1.2 核心源碼分析
1. 節點結構
// 普通節點
static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;// ...
}// 紅黑樹節點容器
static final class TreeBin<K,V> extends Node<K,V> {TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;// ...
}
2. putVal方法核心邏輯
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode()); // 哈希擾動int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable(); // 延遲初始化else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// CAS無鎖插入新節點if (casTabAt(tab, i, null, new Node<>(hash, key, value)))break;}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f); // 協助擴容else {V oldVal = null;// 鎖定桶頭節點synchronized (f) {if (tabAt(tab, i) == f) { // 二次檢查if (fh >= 0) { // 鏈表節點binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<>(hash, key, value);break;}}}else if (f instanceof TreeBin) { // 紅黑樹節點Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i); // 鏈表轉紅黑樹if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); // 更新元素計數return null;
}
6.1.3 高效原因分析
- 細粒度鎖:僅鎖定當前操作的桶,不影響其他桶并發訪問
- 無鎖讀操作:volatile數組保證讀可見性,無需加鎖
- 紅黑樹優化:解決長鏈表查詢性能問題
- 分散擴容:擴容時多個線程可協助遷移數據,提高效率
- 計數優化:通過baseCount + CounterCell數組實現高效并發計數
6.2 LongAdder
6.2.1 分段累加設計
核心原理
- 繼承關系:
LongAdder extends Striped64
- 核心字段:
transient volatile long base; // 基礎值,低競爭時使用 transient volatile Cell[] cells; // 分段數組,高競爭時使用 transient volatile int cellsBusy; // 初始化/擴容鎖(0-無鎖,1-鎖定)
Cell類(避免偽共享)
@sun.misc.Contended // 緩存行填充,避免偽共享
static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}
}
6.2.2 累加流程
- 低競爭:直接CAS更新base值
- 高競爭:根據線程哈希定位到cells數組元素進行CAS更新
- 動態擴容:當cells元素競爭激烈時,數組容量翻倍(最大2^24)
add方法源碼
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;// 1. 嘗試直接更新baseif ((cs = cells) == null || !casBase(b = base, b + x)) {boolean uncontended = true;int h = getProbe(); // 線程哈希值// 2. 檢查cells狀態并嘗試更新if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[m & h]) == null ||!(uncontended = c.cas(v = c.value, v + x)))// 3. 復雜情況處理(初始化/擴容/重試)longAccumulate(x, null, uncontended, h);}
}
6.2.3 與AtomicLong對比
特性 | AtomicLong | LongAdder |
---|---|---|
實現 | 單一變量CAS | 分段CAS + 匯總 |
低并發性能 | 相近 | 相近 |
高并發性能 | 較差(CAS沖突嚴重) | 優異(分散競爭) |
內存占用 | 低 | 高(Cell數組) |
功能完整性 | 支持多種原子操作 | 僅支持加減操作 |
適用場景 | 功能需求多,并發低 | 計數場景,高并發 |
6.3 ThreadPoolExecutor
6.3.1 核心參數詳解
構造函數
public ThreadPoolExecutor(int corePoolSize, // 核心線程數int maximumPoolSize, // 最大線程數long keepAliveTime, // 非核心線程空閑時間TimeUnit unit, // 時間單位BlockingQueue<Runnable> workQueue, // 任務隊列ThreadFactory threadFactory, // 線程工廠RejectedExecutionHandler handler // 拒絕策略
)
工作隊列類型
隊列類型 | 特點 | 適用場景 |
---|---|---|
ArrayBlockingQueue | 有界,數組結構 | 資源有限,需控制隊列大小 |
LinkedBlockingQueue | 無界/有界,鏈表結構 | 任務量大,無界時需防止OOM |
SynchronousQueue | 無緩沖,直接移交 | 任務需立即處理,配合無界線程池 |
PriorityBlockingQueue | 優先級排序 | 需按優先級處理任務 |
6.3.2 拒絕策略深度解析
內置策略對比
策略類 | 處理方式 | 適用場景 |
---|---|---|
AbortPolicy | 拋出RejectedExecutionException | 關鍵任務,需明確失敗 |
CallerRunsPolicy | 由提交任務的線程執行 | 非核心任務,限流保護 |
DiscardPolicy | 靜默丟棄任務 | 可丟失任務(如日志收集) |
DiscardOldestPolicy | 丟棄隊列頭部任務 | 允許任務過期(如緩存更新) |
自定義拒絕策略示例
// 保存被拒絕任務到數據庫重試
RejectedExecutionHandler dbRetryHandler = (r, executor) -> {if (!executor.isShutdown()) {try {// 保存任務到數據庫retryTaskService.save(new RetryTask(r));} catch (Exception e) {log.error("保存重試任務失敗", e);// 保存失敗時可降級為CallerRunsPolicynew ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(r, executor);}}
};
6.3.3 線程池狀態管理
- 狀態定義:
private static final int RUNNING = -1 << COUNT_BITS; // 運行中 private static final int SHUTDOWN = 0 << COUNT_BITS; // 已關閉 private static final int STOP = 1 << COUNT_BITS; // 已停止 private static final int TIDYING = 2 << COUNT_BITS; // 整理中 private static final int TERMINATED = 3 << COUNT_BITS; // 已終止
- 狀態轉換:
RUNNING → SHUTDOWN → TIDYING → TERMINATED
或RUNNING → STOP → TIDYING → TERMINATED
6.4 CompletableFuture
6.4.1 異步編程模型
核心接口
Future
:基礎異步結果接口CompletionStage
:提供鏈式異步操作能力
常用方法分類
方法類型 | 作用 | 示例 |
---|---|---|
創建異步任務 | 啟動異步計算 | supplyAsync(Supplier) runAsync(Runnable) |
結果處理 | 轉換/消費異步結果 | thenApply(Function) thenAccept(Consumer) |
任務組合 | 組合多個異步任務 | thenCombine(...) allOf(...) anyOf(...) |
異常處理 | 捕獲異步異常 | exceptionally(Function) handle(BiFunction) |
6.4.2 核心實現原理
1. 異步執行機制
- 默認使用
ForkJoinPool.commonPool()
- 可指定自定義線程池:
supplyAsync(Supplier, Executor)
2. 鏈式調用實現
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);
}private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = new CompletableFuture<V>();if (e != null || !d.uniApply(this, f, null)) {UniApply<T,V> c = new UniApply<T,V>(e, this, d, f);push(c); // 添加到完成隊列c.tryFire(SYNC); // 嘗試執行}return d;
}
3. 異常傳播機制
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {return uniExceptionallyStage(fn);
}private CompletableFuture<T> uniExceptionallyStage(Function<Throwable, ? extends T> fn) {if (fn == null) throw new NullPointerException();CompletableFuture<T> d = new CompletableFuture<T>();if (!d.uniExceptionally(this, fn)) {UniExceptionally<T> c = new UniExceptionally<T>(this, d, fn);push(c);c.tryFire(SYNC);}return d;
}
6.4.3 高級應用示例
1. 多任務組合
// 兩個獨立任務組合
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);System.out.println(combined.join()); // 輸出: Hello World
2. 超時處理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(3000); } catch (InterruptedException e) {}return "Result";
}).orTimeout(2, TimeUnit.SECONDS) // 超時設置.exceptionally(ex -> "Timeout fallback");System.out.println(future.join()); // 輸出: Timeout fallback
6.5 同步工具類
6.5.1 Semaphore(信號量)
基于AQS共享模式實現
- 狀態表示:AQS的state表示可用許可數
- 獲取許可:
acquire()
→tryAcquireShared(int)
→ CAS減少state - 釋放許可:
release()
→tryReleaseShared(int)
→ CAS增加state
核心源碼
// 非公平模式獲取
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}// 釋放許可
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
應用場景:連接池限流
public class ConnectionPool {private final Semaphore semaphore;private final List<Connection> connections;public ConnectionPool(int size) {this.semaphore = new Semaphore(size);this.connections = new ArrayList<>(size);// 初始化連接...}public Connection getConnection() throws InterruptedException {semaphore.acquire(); // 獲取許可synchronized (connections) {return connections.remove(0);}}public void releaseConnection(Connection conn) {synchronized (connections) {connections.add(conn);}semaphore.release(); // 釋放許可}
}
6.5.2 CountDownLatch與CyclicBarrier對比
特性 | CountDownLatch | CyclicBarrier |
---|---|---|
核心功能 | 一個線程等待其他線程完成 | 多個線程互相等待 |
計數器 | 一次性,減至0后不可重置 | 可通過reset()重置 |
阻塞線程 | 僅等待線程阻塞 | 所有參與線程阻塞 |
觸發動作 | 等待線程繼續執行 | 所有線程到達后執行屏障任務 |
底層實現 | AQS共享模式 | ReentrantLock + Condition |
CountDownLatch示例(等待子任務)
CountDownLatch latch = new CountDownLatch(3);// 啟動3個子任務
for (int i = 0; i < 3; i++) {new Thread(() -> {try {// 子任務執行Thread.sleep(1000);System.out.println("子任務完成");} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown(); // 計數器減1}}).start();
}latch.await(); // 等待計數器為0
System.out.println("所有子任務完成,繼續執行主線程");
CyclicBarrier示例(多線程同步)
// 3個線程到達后執行屏障任務
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("所有線程到達屏障,執行后續任務")
);for (int i = 0; i < 3; i++) {new Thread(() -> {try {// 線程任務Thread.sleep(new Random().nextInt(1000));System.out.println(Thread.currentThread().getName() + "到達屏障");barrier.await(); // 等待其他線程System.out.println(Thread.currentThread().getName() + "繼續執行");} catch (Exception e) {e.printStackTrace();}}).start();
}
6.6 CopyOnWriteArrayList
6.6.1 寫時復制機制
核心原理
- 讀操作:直接訪問volatile數組,無需加鎖
- 寫操作:復制新數組 → 修改新數組 → 替換舊數組(加鎖保證原子性)
核心源碼
// 讀操作(無鎖)
public E get(int index) {return get(getArray(), index);
}// 寫操作(加鎖復制)
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock();}
}
6.6.2 適用場景與局限性
適用場景
- 讀多寫少:如配置列表、監聽器集合
- 迭代安全:不希望迭代時拋出ConcurrentModificationException
局限性
- 內存開銷大:每次寫操作復制整個數組
- 數據一致性弱:讀操作可能獲取舊數據
- 寫性能差:不適合頻繁修改的場景
與Collections.synchronizedList對比
特性 | CopyOnWriteArrayList | Collections.synchronizedList |
---|---|---|
讀性能 | 極高(無鎖) | 一般(需加鎖) |
寫性能 | 差(數組復制) | 一般(加鎖) |
內存占用 | 高 | 低 |
迭代安全性 | 安全(快照迭代) | 不安全(可能拋異常) |
適用場景 | 讀多寫極少 | 讀寫均衡 |
6.7 JUC類高效設計總結
JUC包各類的高效設計可歸納為以下核心思想:
- 分而治之:將競爭分散到多個單元(如LongAdder的Cell數組、ConcurrentHashMap的桶)
- 讀寫分離:讀操作無鎖化(如CopyOnWriteArrayList的寫時復制、ConcurrentHashMap的volatile數組)
- CAS優化:使用無鎖CAS操作減少鎖開銷(如Atomic系列、AQS核心操作)
- 鎖粒度細化:降低鎖競爭范圍(如ConcurrentHashMap的桶級鎖、ReentrantLock的對象級鎖)
- 隊列化管理:使用FIFO隊列有序管理競爭線程(AQS同步隊列)
- 懶加載:延遲初始化資源,減少初始開銷(如ThreadPoolExecutor的核心線程創建)
- 自適應策略:根據競爭情況動態調整策略(如LongAdder的動態擴容、synchronized的鎖升級)
理解這些設計思想,不僅能更好地使用JUC類,還能在自定義并發組件時借鑒這些高效模式。