0 常用并發同步工具類的真實應用場景
-
JDK 提供了比
synchronized
更加高級的各種同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以實現更加豐富的多線程操作;
1 ReentrantLock(可重入的占用鎖)
1.1 簡介
-
ReentrantLock
是可重入的獨占鎖;- “可重入”是指同一線程能多次獲取同一把鎖,不會自己阻塞自己;
- “獨占”是說同一時間,最多只有一個線程能成功拿到鎖,其他線程得等待;
- 和
synchronized
作用類似,都是解決多線程并發訪問共享資源時的線程安全問題;
-
相比
synchronized
,ReentrantLock
多了這些靈活特性:-
可中斷:獲取鎖的過程中,線程能響應中斷(比如其他地方調用了
interrupt()
),不用死等鎖釋放,更靈活控制執行流程; -
可設置超時時間:調用
tryLock(long timeout, TimeUnit unit)
時,線程在指定時間內沒拿到鎖,就會放棄嘗試,避免無限阻塞; -
可設置為公平鎖:默認
ReentrantLock
是 “非公平鎖”(新線程和等待隊列里的線程搶鎖,可能插隊),但它支持通過構造方法ReentrantLock(true)
設為“公平鎖”,嚴格按線程等待順序分配鎖,減少線程饑餓(某些線程一直拿不到鎖);
-
-
與
synchronized
一樣,都支持可重入:synchronized
靠wait/notify
實現線程通信,只能關聯一個等待隊列;ReentrantLock
可通過newCondition()
創建多個Condition
,精準控制不同線程的等待 / 喚醒,比如生產者 - 消費者模型里,能區分 “生產條件”“消費條件” 分別處理;
-
應用場景:多線程搶共享資源時,需要獨占訪問保證數據安全,比如賣票系統(如下兩圖)、銀行賬戶轉賬;
- 線程 A、B 搶鎖:線程 A、B 同時嘗試獲取鎖,假設線程 A 先拿到(鎖的獨占性,同一時間只有 A 能持有),此時 A 可以操作共享資源(比如修改車票庫存 ),B 因為沒搶到,進入 “等待” 狀態;
- 線程 A 釋放鎖:A 操作完共享資源后,會釋放鎖;接著 B 再次嘗試獲取鎖,這次就能成功拿到,然后 B 開始操作共享資源(修改車票庫存)。
1.2 常用API
-
ReentrantLock 實現了Lock接口規范,常見API如下:
方法 方法聲明 功能說明 lock
void lock()
獲取鎖,調用該方法當前線程會獲取鎖,當鎖獲得后,該方法返回 lockInterruptibly
void lockInterruptibly() throws InterruptedException
可中斷的獲取鎖,和 lock()
方法不同之處在于該方法會響應中斷,即在鎖的獲取中可以中斷當前線程tryLock
boolean tryLock()
嘗試非阻塞的獲取鎖,調用該方法后立即返回。如果能夠獲取到鎖返回 true
,否則返回false
tryLock
(帶超時)boolean tryLock(long time, TimeUnit unit) throws InterruptedException
超時獲取鎖,當前線程在以下三種情況下會返回:當前線程在超時時間內獲取了鎖;當前線程在超時時間內被中斷;超時時間結束,返回 false
unlock
void unlock()
釋放鎖 newCondition
Condition newCondition()
獲取等待通知組件,該組件和當前的鎖綁定,當前線程只有獲取了鎖,才能調用該組件的 await()
方法,而調用后,當前線程將釋放鎖 -
基本用法:
private final Lock lock = new ReentrantLock();public void foo() {// 獲取鎖lock.lock();try {// 程序執行邏輯} finally {// finally語句塊可以確保lock被正確釋放lock.unlock();} }// 嘗試獲取鎖,最多等待 100 毫秒 if (lock.tryLock(100, TimeUnit.MILLISECONDS)) { try { // 成功獲取到鎖,執行需要同步的代碼塊 // ... 執行一些操作 ... } finally { // 釋放鎖 lock.unlock(); } } else { // 超時后仍未獲取到鎖,執行備選邏輯 // ... 執行一些不需要同步的操作 ... }
-
在使用時要注意以下 4 個問題:
- 默認情況下 ReentrantLock 為非公平鎖而非公平鎖;
- 加鎖次數和釋放鎖次數一定要保持一致,否則會導致線程阻塞或程序異常;
- 加鎖操作一定要放在
try
代碼之前,這樣可以避免未加鎖成功又釋放鎖的異常; - 釋放鎖一定要放在
finally
中,否則會導致線程阻塞;
-
工作原理:
-
當有線程調用
lock
方法時,會用 CAS(Compare-And-Swap,比較并交換) 操作,嘗試把 AQS(AbstractQueuedSynchronizer,抽象隊列同步器,Java 并發包的核心基礎組件 )內部的state
變量從0
改成1
;state=0
表示“鎖沒人用”,CAS 成功 → 線程拿到鎖,開始執行臨界區代碼(操作共享資源);state=1
表示“鎖被占用”,CAS 失敗 → 線程搶鎖失敗,進入阻塞隊列(CLH 隊列,按 FIFO 排隊 ) 等待;
-
搶鎖失敗的線程,會被包裝成節點(Node),加入隊列尾部(tail),隊列頭部是
head
節點(代表 “即將拿到鎖的線程”);- 隊列里的線程,都在等鎖釋放,避免線程忙等(一直重試搶鎖,浪費 CPU 資源);
- 隊列是 FIFO(先進先出) 順序,理論上保證線程公平性,但實際還受“公平鎖 / 非公平鎖”策略影響;
-
當持有鎖的線程執行完
unlock()
,會把state
改回0
(釋放鎖),然后喚醒隊列里的線程。這時分兩種策略:-
公平鎖(
ReentrantLock(true)
):嚴格按隊列順序喚醒:釋放鎖后,優先喚醒head
節點的下一個節點(head.next
),讓隊列里“等最久”的線程拿到鎖;- 優點:絕對公平,避免線程 饑餓(某些線程一直搶不到鎖);
- 缺點:頻繁喚醒 / 切換線程,性能略低(線程上下文切換有開銷);
- 優點:絕對公平,避免線程 饑餓(某些線程一直搶不到鎖);
-
非公平鎖(默認策略,
ReentrantLock()
):釋放鎖后,不嚴格按隊列順序,允許新線程和隊列里被喚醒的線程重新用 CAS 搶鎖:-
新線程(沒進隊列的)可能直接 CAS 搶鎖成功(插隊),不用進隊列等;
-
隊列里的線程也會被喚醒,參與競爭;
-
優點:減少線程切換,吞吐量更高(適合競爭不激烈的場景);
-
缺點:可能讓隊列里的線程等更久,存在小概率線程饑餓;
-
-
-
1.3 使用
1.3.1 獨占鎖
-
模擬搶票場景。8張票,10個人搶,如果不加鎖,會出現什么問題?
/*** 模擬搶票場景*/ public class ReentrantLockDemo {// 創建 ReentrantLock 實例,默認使用非公平鎖策略private final ReentrantLock lock = new ReentrantLock();//默認非公平// 共享資源:總票數,會有多個線程同時操作這個變量private static int tickets = 8;/*** 購買車票的方法* 核心邏輯:通過加鎖保證同一時間只有一個線程能執行購票操作*/public void buyTicket() {// 1. 獲取鎖:調用 lock() 方法,當前線程會嘗試獲取鎖// 如果鎖未被占用,則當前線程獲得鎖并繼續執行// 如果鎖已被占用,則當前線程會進入阻塞隊列等待lock.lock(); // 獲取鎖// 2. try-finally 結構保證鎖一定會被釋放// 即使代碼執行過程中發生異常,finally 塊也會執行解鎖操作try {// 3. 臨界區:操作共享資源(tickets 變量)if (tickets > 0) { // 檢查是否還有剩余車票try {// 休眠 10ms,放大并發問題的可能性// 如果不加鎖,這里會出現多個線程同時進入判斷并扣減票數的情況Thread.sleep(10); // 模擬出并發效果} catch (InterruptedException e) {e.printStackTrace();}// 打印購票信息,并將票數減 1(原子操作)System.out.println(Thread.currentThread().getName() + "購買了第" + tickets-- + "張票");} else {// 票已售罄時的提示System.out.println("票已經賣完了," + Thread.currentThread().getName() + "搶票失敗");}} finally {// 4. 釋放鎖:無論是否發生異常,都必須釋放鎖// 否則會導致其他線程永遠無法獲取鎖,造成死鎖lock.unlock(); // 釋放鎖}}public static void main(String[] args) {// 創建搶票系統實例(共享同一個鎖和票數變量)ReentrantLockDemo ticketSystem = new ReentrantLockDemo();// 創建 10 個線程模擬 10 個用戶搶票(總票數只有 8 張)for (int i = 1; i <= 10; i++) {Thread thread = new Thread(() -> {// 每個線程執行搶票操作ticketSystem.buyTicket(); // 搶票}, "線程" + i); // 給線程命名,方便觀察輸出// 啟動線程,線程進入就緒狀態,等待 CPU 調度thread.start();}try {// 主線程休眠 3000ms,等待所有搶票線程執行完畢// 避免主線程提前打印剩余票數Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 打印最終剩余票數,驗證是否正確(應該為 0)System.out.println("剩余票數:" + tickets);} }
-
不加鎖:出現超賣問題
-
加鎖:正常,兩人搶票失敗
1.3.2 公平鎖和非公平鎖
-
ReentrantLock 支持公平鎖和非公平鎖兩種模式:
- 公平鎖:線程在獲取鎖時,按照線程等待的先后順序獲取鎖;
- 非公平鎖:線程在獲取鎖時,不按照等待的先后順序獲取鎖,而是隨機獲取鎖。ReentrantLock 默認是非公平鎖;
ReentrantLock lock = new ReentrantLock(); //參數默認false,不公平鎖 ReentrantLock lock = new ReentrantLock(true); //公平鎖
-
比如買票的時候就有可能出現插隊的場景,允許插隊就是非公平鎖,如下圖:
1.3.3 可重入鎖
-
可重入鎖又名遞歸鎖,是指在同一線程里,只要鎖對象相同,內層方法(或代碼塊)能直接復用已獲取的鎖,不用重新競爭。比如線程執行
方法A
時拿到鎖,方法A
里調用方法B
(也需要同一把鎖),線程進方法B
時不用等自己釋放鎖,直接繼續用; -
Java 中
ReentrantLock
和synchronized
都是可重入鎖synchronized
:隱式的(JVM 自動管加鎖 / 釋放)、可重入的內置鎖,只要是同一線程、同一對象鎖,內層同步代碼直接重入;ReentrantLock
:顯式的(手動lock()
加鎖、unlock()
釋放)可重入鎖,功能更靈活(支持公平 / 非公平、可中斷、超時獲取等),但得手動配對加鎖釋放,否則容易死鎖;
-
可重入鎖的一個優點是可一定程度避免死鎖:
- 要是鎖不可重入,同一線程內層方法需要鎖時,會因為自己占著鎖沒放,導致自己等自己(阻塞),最后死鎖。可重入鎖允許同一線程重復拿鎖,從設計上就避免了這種自己堵死自己的情況;
- 注意:只是一定程度避免,要是代碼邏輯亂(比如忘記釋放鎖、不同鎖交叉嵌套不當),還是可能死鎖,只是解決了同一線程重入鎖這類場景的死鎖風險;
-
應用場景:
- 遞歸操作:遞歸函數里加鎖,每次遞歸調用都是內層“方法”,可重入鎖讓線程不用反復競爭鎖,比如計算階乘時用
ReentrantLock
保護共享變量,遞歸調用時直接重入; - 調用同一類其他方法:類里多個
synchronized
方法,線程調完一個調另一個,因為是同一對象鎖,直接重入,不用額外處理; - 鎖嵌套:多層代碼塊都需要同一把鎖,外層加鎖后,內層嵌套的加鎖邏輯直接復用,不用釋放外層鎖再重新加;
- 遞歸操作:遞歸函數里加鎖,每次遞歸調用都是內層“方法”,可重入鎖讓線程不用反復競爭鎖,比如計算階乘時用
-
例:
class Counter {// 創建 ReentrantLock 對象,作為可重入鎖的實例// ReentrantLock 是顯式鎖,支持可重入、可中斷、公平/非公平等特性private final ReentrantLock lock = new ReentrantLock(); // 遞歸調用方法,演示可重入鎖的核心場景public void recursiveCall(int num) {// 1. 獲取鎖:同一線程再次調用時,可直接重入,不會阻塞自己// 可重入的關鍵體現:鎖對象識別當前持有線程,允許重復獲取lock.lock(); try {// 遞歸終止條件:num 減到 0 時停止if (num == 0) {return;}// 打印當前遞歸層級,證明方法執行(鎖已成功獲取)System.out.println("執行遞歸,num = " + num);// 2. 遞歸調用自身:再次進入方法時,會再次執行 lock.lock()// 由于是【同一線程】操作【同一把鎖】,可直接重入,不會阻塞recursiveCall(num - 1); } finally {// 3. 釋放鎖:遞歸調用多少次,就要釋放多少次// 保證鎖的獲取與釋放次數嚴格匹配,避免死鎖lock.unlock(); }}// 主方法:測試可重入鎖的遞歸場景public static void main(String[] args) throws InterruptedException {// 創建 Counter 實例,所有遞歸調用共享同一把鎖Counter counter = new Counter(); // 啟動遞歸測試:從 num=10 開始調用// 預期行為:線程安全執行遞歸,不會因鎖重入導致阻塞counter.recursiveCall(10); } }
1.3.4 Condition 詳解
-
Condition
是 Java 并發包里的線程協調工具,依賴Lock
(如ReentrantLock
)使用,比Object
的wait/notify
更靈活,解決線程間按條件等待 / 喚醒的問題。可以把它理解成:給Lock
搭配“專屬等待隊列”,讓線程能按需等待條件、精準喚醒,而不是像wait/notify
只能用 Object 的單一隊列; -
核心優勢(對比
Object.wait/notify
)-
多條件分離:
Object
里,一個對象只有 1 個等待隊列(所有wait()
的線程都擠在一起);Condition
讓一個Lock
可以有多個等待隊列(比如鎖lock
可以創建condition1
、condition2
,不同條件的線程進不同隊列),喚醒時能精準選隊列,避免喚醒無關線程;
-
更靈活的等待控制:
- 支持超時等待(
await(long time, TimeUnit unit)
),避免線程無限阻塞; - 喚醒時可選單個喚醒(
signal()
)或全部喚醒(signalAll()
),比notify()
(隨機喚醒一個)、notifyAll()
(喚醒全部)更精準;
- 支持超時等待(
-
-
核心方法解析
返回值類型 方法 作用說明 void
await()
讓當前線程進入等待,直到被 signal()
/signalAll()
喚醒、被中斷,或意外喚醒(如假喚醒) 等待前釋放當前持有的Lock
,喚醒后重新競爭獲取鎖,再繼續執行boolean
await(long time, TimeUnit unit)
限時等待:等待 time
時間后,若沒被喚醒就自動返回false
;被喚醒則返回true
同樣會先釋放鎖,超時 / 喚醒后重新搶鎖。void
signal()
喚醒 此 Condition
等待隊列中一個線程(選一個喚醒,類似notify()
但更可控) 喚醒后,線程不會直接執行,要重新競爭鎖void
signalAll()
喚醒 此 Condition
等待隊列中所有線程(類似notifyAll()
) 線程被喚醒后,重新競爭鎖,搶到鎖的繼續執行 -
比如生產者 - 消費者模型中,想區分“隊列滿了讓生產者等”和“隊列空了讓消費者等”:
-
用
ReentrantLock
加鎖,然后創建兩個Condition
:notFull
(生產者等)、notEmpty
(消費者等); -
生產者發現隊列滿了 → 調用
notFull.await()
等待;消費者取走數據后 → 調用notFull.signal()
喚醒生產者; -
消費者發現隊列空了 → 調用
notEmpty.await()
等待;生產者放入數據后 → 調用notEmpty.signal()
喚醒消費者; -
這樣就能 精準控制不同條件的線程等待 / 喚醒,比
wait/notify
更清晰。
-
1.3.5 結合 Condition 實現生產者消費者模式
案例:基于ReentrantLock
和Condition
實現一個簡單隊列
public class ReentrantLockDemo3 {public static void main(String[] args) {// 1. 創建容量為 5 的隊列,作為生產者和消費者共享的資源Queue queue = new Queue(5);// 2. 啟動生產者線程:傳入隊列,線程執行 Producer 的 run 方法new Thread(new Producer(queue)).start();// 3. 啟動消費者線程:傳入隊列,線程執行 Customer 的 run 方法new Thread(new Customer(queue)).start();}
}/*** 隊列封裝類:* 用 ReentrantLock + Condition 實現線程安全的生產者-消費者隊列* 核心邏輯:* - 隊列滿時,生產者通過 notFull.await() 等待;* - 隊列空時,消費者通過 notEmpty.await() 等待;* - 生產/消費后,用 signal() 喚醒對應等待線程*/
class Queue {private Object[] items; // 存儲隊列元素的數組int size = 0; // 當前隊列中元素數量int takeIndex = 0; // 消費者取元素的索引int putIndex = 0; // 生產者放元素的索引private ReentrantLock lock; // 控制并發的鎖public Condition notEmpty; // 消費者等待條件:隊列空時阻塞,生產后喚醒public Condition notFull; // 生產者等待條件:隊列滿時阻塞,消費后喚醒// 初始化隊列,指定容量public Queue(int capacity) {this.items = new Object[capacity];lock = new ReentrantLock();// 為同一把鎖創建兩個 Condition,分別控制“空”和“滿”的等待notEmpty = lock.newCondition();notFull = lock.newCondition();}/*** 生產者放入元素的方法* 必須在 lock 保護下調用,保證線程安全*/public void put(Object value) throws Exception {// 加鎖:同一時間只有一個線程能操作隊列lock.lock();try {// 隊列滿了(size == 數組長度),讓生產者等待// 用 while 而非 if:防止“假喚醒”后直接執行(需再次檢查條件)while (size == items.length) notFull.await(); // 釋放鎖,進入等待隊列,直到被喚醒// 隊列有空位,放入元素items[putIndex] = value;// 索引循環:如果放到數組末尾,重置為 0if (++putIndex == items.length) putIndex = 0;size++; // 元素數量+1// 生產完成,喚醒等待的消費者(隊列非空了)notEmpty.signal(); } finally {// 測試用:打印生產日志(實際可刪除或放業務邏輯里)System.out.println("producer生產:" + value);// 必須釋放鎖:無論是否異常,保證鎖能被其他線程獲取lock.unlock();}}/*** 消費者取出元素的方法* 必須在 lock 保護下調用,保證線程安全*/public Object take() throws Exception {// 加鎖:同一時間只有一個線程能操作隊列lock.lock();try {// 隊列空了(size == 0),讓消費者等待// 用 while 而非 if:防止“假喚醒”后直接執行(需再次檢查條件)while (size == 0) notEmpty.await(); // 釋放鎖,進入等待隊列,直到被喚醒// 取出元素Object value = items[takeIndex];items[takeIndex] = null; // 清空位置,避免內存泄漏// 索引循環:如果取到數組末尾,重置為 0if (++takeIndex == items.length) takeIndex = 0;size--; // 元素數量-1// 消費完成,喚醒等待的生產者(隊列非滿了)notFull.signal(); return value; // 返回取出的元素} finally {// 釋放鎖:無論是否異常,保證鎖能被其他線程獲取lock.unlock();}}
}/*** 生產者線程:* 每隔 1 秒生產一個隨機數(0~999),放入隊列*/
class Producer implements Runnable {private Queue queue; // 共享的隊列public Producer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) { // 無限循環生產Thread.sleep(1000); // 每隔 1 秒生產一次// 生產隨機數,放入隊列queue.put(new Random().nextInt(1000));}} catch (Exception e) {e.printStackTrace(); // 捕獲并打印異常}}
}/*** 消費者線程:* 每隔 2 秒從隊列取出一個元素,打印消費日志*/
class Customer implements Runnable {private Queue queue; // 共享的隊列public Customer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) { // 無限循環消費Thread.sleep(2000); // 每隔 2 秒消費一次// 取出元素并打印消費日志System.out.println("consumer消費:" + queue.take());}} catch (Exception e) {e.printStackTrace(); // 捕獲并打印異常}}
}
-
Condition
的作用:notEmpty
:消費者專屬等待條件。隊列空時,消費者調用notEmpty.await()
釋放鎖并阻塞;生產者放入元素后,用notEmpty.signal()
喚醒。notFull
:生產者專屬等待條件。隊列滿時,生產者調用notFull.await()
釋放鎖并阻塞;消費者取出元素后,用notFull.signal()
喚醒;- 注意:
await()
和signal()
都必須被lock.lock()
和lock.unlock()
包裹,即都在 lock 保護范圍內;
-
為什么用
while
而非if
檢查條件?防止**“假喚醒”**:線程可能在未被signal()
的情況下醒來(如系統調度)。用while
會重新檢查條件,確保隊列狀態符合預期后再繼續執行; -
鎖的配對使用:
lock.lock()
和lock.unlock()
必須成對出現,且unlock()
放在finally
中,保證無論是否發生異常,鎖都會釋放,避免死鎖; -
生產者-消費者的節奏:生產者 1 秒生產一次,消費者 2 秒消費一次 → 隊列會逐漸被填滿(生產者更快),但通過
Condition
協調,不會出現“隊列滿了還生產”或“隊列空了還消費”的情況。
1.4 應用場景總結
-
ReentrantLock
最基本的作用是多線程環境下,讓共享資源只能被一個線程獨占訪問,保證操作共享資源時數據不會亂(比如多個線程同時改同一個變量,用鎖讓它們排隊改); -
應用場景總結:
-
解決多線程競爭資源的問題
-
場景描述:多個線程搶同一資源(比如同時寫同一個數據庫、操作同一個文件、修改同一個內存變量),需要保證同一時間只有一個線程能改,避免數據沖突;
-
例子:多個線程同時往數據庫同一表插入/修改數據,用
ReentrantLock
加鎖,讓線程排隊執行寫操作,保證數據最終是正確的,不會因為并發寫入導致數據混亂(比如庫存扣減、訂單狀態修改);
-
-
實現多線程任務的順序執行
-
場景描述:希望線程 A 執行完某段邏輯后,線程 B 再執行;或者多個線程嚴格按特定順序跑任務;
-
例子:比如線程 1 先初始化配置,線程 2 再加載數據,線程 3 最后處理業務。用
ReentrantLock
配合Condition
(條件變量 ),線程 2 等線程 1 釋放鎖并發信號后再執行,線程 3 等線程 2 發信號后執行,實現順序控制;
-
-
實現多線程等待/通知機制
-
場景描述:線程 A 完成某個關鍵步驟后,需要主動通知線程 B、C 可以繼續執行了;或者線程需要等待某個條件滿足后再干活(類似生產者 - 消費者模型);
-
例子:生產者線程生產完數據,通過
ReentrantLock
的Condition
發信號,喚醒等待的消費者線程來處理數據;反之,消費者處理完,也能發信號讓生產者繼續生產。這比Object
的wait/notify
更靈活,能精準控制哪些線程被喚醒。
-
-
2 Semaphore(信號量)
2.1 簡介
-
Semaphore
是多線程同步工具,核心解決控制同時訪問共享資源的線程數量,讓有限的資源(比如數據庫連接、文件句柄)在同一時間被合理數量的線程使用,避免因資源耗盡導致系統崩潰; -
工作原理:使用
Semaphore
的過程實際上是多個線程獲取訪問共享資源許可證的過程-
Semaphore
內部維護一個計數器,可以把它理解成剩余許可證數量。比如設置Semaphore(3)
,就代表最多允許 3 個線程同時用資源,相當于發 3 張“訪問許可證”; -
線程獲取許可證(
acquire()
):- 線程要訪問共享資源時,必須先調用
acquire()
拿許可證; - 如果計數器 > 0(還有許可證):線程拿到許可證,計數器減 1,然后繼續執行(訪問資源);
- 如果計數器 == 0(沒許可證了):線程會被阻塞,直到有其他線程釋放許可證;
- 線程要訪問共享資源時,必須先調用
-
線程釋放許可證(
release()
):線程用完資源后,調用release()
歸還許可證,計數器加 1,這樣阻塞的線程里就有機會拿到新的許可證,繼續執行;
-
-
Semaphore
專門用來限制資源的并發訪問數量,典型場景比如:-
數據庫連接池:假設連接池只有 10 個連接,用
Semaphore(10)
控制,避免幾百個線程同時搶連接,把數據庫壓垮; -
文件訪問:如果一個文件同一時間只能被 3 個線程讀寫,用
Semaphore(3)
限制,防止文件被瘋狂讀寫導致錯誤; -
網絡請求:控制同時發起的 HTTP 請求數量,避免把服務器或本地網絡打滿,保證系統穩定。
-
2.2 常用API
2.2.1 構造器
-
構造器是用來創建
Semaphore
對象的,主要決定兩件事:- 允許同時訪問資源的 最大線程數(許可證數量);
- 線程獲取許可證時,是用 公平策略 還是 非公平策略;
-
Semaphore
有兩個構造器:public Semaphore(int permits) {sync = new NonfairSync(permits); }
-
permits
:設置最大并發數(比如傳3
,就代表最多允許 3 個線程同時拿許可證); -
NonfairSync
:默認用非公平策略。意思是,當許可證釋放時,新線程和等待隊列里的線程一起搶許可證,新線程可能“插隊”,不用嚴格排隊; -
等價寫法:
new Semaphore(3)
等價于new Semaphore(3, false)
,因為默認是非公平的;
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
-
permits
:同上,設置最大并發數; -
fair
:布爾值,決定用公平還是非公平策略:fair = true
:用公平策略(FairSync
),線程嚴格按“等待隊列順序”拿許可證,先等的線程一定先拿到,不會被插隊;fair = false
:用非公平策略(NonfairSync
),新線程和等待線程一起搶,可能插隊。
-
2.2.2 acquire
方法
-
acquire
是Semaphore
用于獲取許可證的核心方法,特點是獲取不到許可證時,線程會一直阻塞等待,直到拿到許可證或者被中斷。它有兩種重載形式,適配不同的許可證獲取需求;-
void acquire() throws InterruptedException
-
嘗試獲取 1 個許可證。如果當前有可用許可證,直接獲取(許可證計數減 1)并返回;如果沒有可用許可證,當前線程會進入阻塞狀態,直到:
- 其他線程釋放許可證(使當前線程有可用許可證),此時當前線程會競爭獲取許可證;
- 當前線程被其他線程中斷(觸發
InterruptedException
);
-
類比:類似去銀行辦事,只有 1 個窗口(許可證 = 1),當前窗口有人(沒許可證),你就只能排隊等著,直到窗口空出來(有人辦完業務,釋放許可證);
-
-
void acquire(int permits) throws InterruptedException
-
功能:嘗試獲取指定數量(
permits
)的許可證。如果Semaphore
中剩余許可證數量 ≥permits
,直接獲取(許可證計數減permits
)并返回;否則,線程進入阻塞,直到:- 其他線程釋放許可證,使剩余許可證 ≥
permits
,當前線程競爭獲取; - 當前線程被中斷(觸發
InterruptedException
);
- 其他線程釋放許可證,使剩余許可證 ≥
-
注意:使用該方法時,要確保最終能釋放對應數量的許可證,否則容易導致其他線程長期無法獲取足夠許可證而阻塞,引發“資源饑餓”問題;
-
類比:如果銀行辦理大額業務需要占 2 個窗口(
permits = 2
),只有當至少空出 2 個窗口時,你才能開始辦理,否則就得等;
-
-
-
例:主線程先占許可證,子線程等待,主線程釋放后子線程再獲取
// 1. 創建 Semaphore:許可證數量=1,公平策略(true 表示嚴格按線程等待順序分配許可證) final Semaphore semaphore = new Semaphore(1, true); // 2. 主線程直接搶許可證:因為初始許可證是 1,主線程能直接拿到(許可證計數變為 0) semaphore.acquire(); // 3. 創建子線程,嘗試獲取許可證 Thread t = new Thread(() -> { try {// 子線程執行到這,嘗試獲取許可證:但主線程已占用(許可證計數 0),所以子線程進入阻塞等待System.out.println("子線程等待獲取permit"); semaphore.acquire(); // 4. 子線程被喚醒(主線程釋放許可證后),執行到這,打印獲取成功System.out.println("子線程獲取到permit"); } catch (InterruptedException e) { // 子線程等待中被中斷時,會走到這e.printStackTrace(); } finally {// 5. 子線程執行完,釋放許可證(許可證計數 +1 )semaphore.release(); } }); t.start(); // 啟動子線程try {// 6. 主線程休眠 5 秒:模擬做其他事情,期間子線程一直阻塞等待許可證TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) {e.printStackTrace(); }// 7. 主線程釋放許可證(許可證計數從 0 變為 1 ),子線程此時會被喚醒,競爭獲取許可證 System.out.println("主線程釋放permit"); semaphore.release();
- 初始化:
Semaphore
許可證數量1
,公平策略; - 主線程搶許可證:
semaphore.acquire()
執行后,許可證計數0
,主線程持有許可證; - 子線程啟動:執行
semaphore.acquire()
時,因許可證0
,子線程進入阻塞,打印子線程等待獲取permit
; - 主線程休眠:5 秒內,子線程一直阻塞;
- 主線程釋放許可證:
semaphore.release()
執行,許可證計數1
;因為是公平策略,阻塞的子線程被喚醒,競爭拿到許可證,執行后續邏輯,打印子線程獲取到permit
; - 子線程釋放許可證:子線程執行
semaphore.release()
,許可證計數0
(子線程獲取時減 1,釋放時加 1,整體回到初始邏輯)。
- 初始化:
2.2.3 tryAcquire
方法
-
tryAcquire
是Semaphore
用于嘗試獲取許可證的方法,特點是:-
非阻塞優先:如果拿不到許可證,不會一直阻塞,而是直接返回
false
(表示沒拿到); -
靈活控制:支持獲取 1 個許可證、獲取指定數量許可證、帶超時等待等場景,比
acquire
更靈活;
-
-
tryAcquire
方法有三種重載形式-
boolean tryAcquire()
:嘗試獲取 1 個許可證。如果當前有可用許可證,直接獲取(許可證計數 -1),返回true
;否則,直接返回false
(不阻塞); -
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
:嘗試獲取 1 個許可證,但增加超時等待機制。如果一開始沒許可證,線程會阻塞最多timeout
時間:- 期間有許可證釋放,線程拿到許可證,返回
true
; - 超時后還沒許可證,返回
false
; - 等待中被中斷,拋出
InterruptedException
;
- 期間有許可證釋放,線程拿到許可證,返回
-
boolean tryAcquire(int permits)
:嘗試獲取 指定數量(permits
)的許可證。如果Semaphore
剩余許可證 ≥permits
,直接獲取(計數 -permits
),返回true
;否則,返回false
(不阻塞);- 注意:要確保最終釋放對應數量的許可證,否則會導致其他線程無法獲取足夠許可證;
-
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
:結合指定數量和超時等待,嘗試獲取permits
個許可證,最多等timeout
時間,邏輯和上面類似;
-
-
例:
// 1. 創建 Semaphore:1 個許可證,公平策略 final Semaphore semaphore = new Semaphore(1, true); // 2. 啟動線程 t,嘗試獲取許可證 new Thread(() -> { // 2.1 嘗試獲取 1 個許可證:返回 true/falseboolean gotPermit = semaphore.tryAcquire(); // 2.2 如果拿到許可證if (gotPermit) { try {System.out.println(Thread.currentThread() + " 拿到許可證");TimeUnit.SECONDS.sleep(5); // 模擬占用 5 秒} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 釋放許可證(必須!)}} }).start();// 3. 主線程休眠 1 秒:確保線程 t 啟動并拿到許可證 TimeUnit.SECONDS.sleep(1); // 4. 主線程嘗試“帶超時的獲取”:最多等 3 秒 if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println("主線程拿到許可證"); } else {System.out.println("主線程 3 秒內沒拿到許可證,失敗"); }
- 線程 t 啟動:
tryAcquire()
拿到許可證(gotPermit = true
),打印日志,休眠 5 秒; - 主線程休眠 1 秒:等線程 t 啟動并占用許可證;
- 主線程嘗試獲取:調用
tryAcquire(3, ...)
,但線程 t 會占用許可證 5 秒,所以主線程等 3 秒后超時,進入else
,打印主線程 3 秒內沒拿到許可證,失敗
;
- 線程 t 啟動:
-
例:
// 1. 創建 Semaphore:5 個許可證,公平策略 final Semaphore semaphore = new Semaphore(5, true); // 2. 嘗試獲取 5 個許可證:成功(因為初始有 5 個) assert semaphore.tryAcquire(5) : "獲取 5 個許可證失敗"; // 3. 此時許可證已耗盡(5 - 5 = 0 ),嘗試獲取 1 個許可證:失敗 assert !semaphore.tryAcquire() : "獲取 1 個許可證失敗";
-
tryAcquire(5)
一次性拿 5 個許可證,成功后Semaphore
剩余 0 個; -
后續
tryAcquire()
(默認拿 1 個)返回false
,因為沒許可證了。
-
2.2.4 正確使用release
-
Semaphore
的release
是用來 歸還許可證 的,讓其他線程有機會獲取。它有兩種形式:-
release()
:歸還 1 個許可證,內部計數器 +1; -
release(int permits)
:歸還permits
個許可證,內部計數器 +permits
; -
關鍵點:許可證數量有限,必須誰拿的誰還,否則會導致計數器混亂,破壞
Semaphore
控制并發的邏輯;
-
-
錯誤用法示例(用
finally
無腦釋放)// 1. 創建 1 個許可證的 Semaphore(公平策略) final Semaphore semaphore = new Semaphore(1, true); // 2. 線程 t1:拿到許可證后,霸占 1 小時(休眠) Thread t1 = new Thread(() -> { try {semaphore.acquire(); // 拿許可證System.out.println("t1 拿到許可證");TimeUnit.HOURS.sleep(1); // 霸占 1 小時} catch (InterruptedException e) {System.out.println("t1 被中斷");} finally {semaphore.release(); // 不管怎樣,finally 里釋放} }); t1.start(); TimeUnit.SECONDS.sleep(1); // 等 t1 啟動// 3. 線程 t2:嘗試拿許可證,但若被中斷,會在 finally 里錯誤釋放 Thread t2 = new Thread(() -> { try {semaphore.acquire(); // 嘗試拿許可證(但 t1 沒釋放,所以會阻塞)System.out.println("t2 拿到許可證");} catch (InterruptedException e) {System.out.println("t2 被中斷");} finally {semaphore.release(); // 問題:t2 沒拿到許可證,卻釋放了!} }); t2.start(); TimeUnit.SECONDS.sleep(2); // 4. 主線程邏輯:中斷 t2,然后自己拿許可證 t2.interrupt(); // 中斷 t2(此時 t2 還在阻塞等許可證) semaphore.acquire(); // 主線程嘗試拿許可證 System.out.println("主線程拿到許可證");
-
t2
根本沒拿到許可證(因為t1
霸占著),但由于release
寫在finally
里,t2
被中斷時,會錯誤地歸還 1 個許可證(相當于無中生有多了 1 個許可證); -
原本
Semaphore
只有 1 個許可證,被t1
占用后,計數器是 0。但t2
錯誤釋放后,計數器變成 1,導致主線程能直接拿到許可證(而預期中t1
要 1 小時后才釋放,主線程不該拿到);
-
-
修改后的
t2
邏輯:Thread t2 = new Thread(() -> { boolean acquired = false; // 標記是否成功拿到許可證try {semaphore.acquire(); // 嘗試拿許可證acquired = true; // 拿到了,標記為 trueSystem.out.println("t2 拿到許可證");} catch (InterruptedException e) {System.out.println("t2 被中斷");} finally {// 只有成功拿到許可證(acquired=true),才釋放if (acquired) { semaphore.release(); }} });
- 用
acquired
標記是否真的拿到許可證,只有拿到許可證的線程,才在finally
里釋放,避免沒拿到卻釋放的問題;
- 用
-
Semaphore
的設計里,不強制檢查釋放許可證的線程是否真的拿過,而是靠開發者自己保證。官方文檔說明:“沒有要求釋放許可證的線程必須是通過acquire
拿到許可證的,正確用法由開發者通過編程規范保證。”
2.3 使用
2.3.1 Semaphore
實現商品服務接口限流
-
Semaphore
可以用于實現限流功能,即限制某個操作或資源在一定時間內的訪問次數; -
代碼:限制同一時間,最多有
N
個線程能訪問接口(比如下面代碼中的N=2
),超過的請求要么排隊,要么直接拒絕,保證服務穩定;@Slf4j public class SemaphoreDemo {/*** 同一時刻最多只允許有兩個并發* 即許可證數量=2 → 同一時間最多允許 2 個線程訪問*/private static Semaphore semaphore = new Semaphore(2);// 創建線程池,最多 10 個線程(模擬大量請求)private static Executor executor = Executors.newFixedThreadPool(10);public static void main(String[] args) {// 循環 10 次,模擬 10 個請求for(int i = 0; i < 10; i++){ // 提交任務到線程池,執行 getProductInfo() 或 getProductInfo2()executor.execute(() -> getProductInfo());}}// 阻塞式限流public static String getProductInfo() {// 1. 嘗試獲取許可證:拿不到就阻塞,直到有許可證try {semaphore.acquire();log.info("請求服務"); // 拿到許可證,執行邏輯Thread.sleep(2000); // 模擬接口執行耗時(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 釋放許可證:不管是否異常,必須釋放,讓其他線程能用semaphore.release();}return "返回商品詳情信息";}// 非阻塞式限流public static String getProductInfo2() {// 1. 嘗試獲取許可證:拿不到直接返回 false,不阻塞if(!semaphore.tryAcquire()){log.error("請求被流控了"); // 沒拿到許可證,直接拒絕return "請求被流控了";}try {log.info("請求服務"); // 拿到許可證,執行邏輯Thread.sleep(2000); // 模擬接口執行耗時(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 釋放許可證:必須釋放semaphore.release();}return "返回商品詳情信息";} }
-
假設運行
getProductInfo()
:-
前 2 個線程能拿到許可證,執行
log.info("請求服務")
,然后 sleep 2 秒。 -
第 3~10 個線程調用
acquire()
時,因為許可證被占滿,會阻塞等待。 -
2 秒后,前 2 個線程
release()
歸還許可證,阻塞的線程開始競爭,每次放 2 個執行,直到所有請求處理完。
-
-
如果運行
getProductInfo2()
:前 2 個線程拿到許可證,執行邏輯;第 3 個線程tryAcquire()
返回false
,直接走限流邏輯(log.error
)。
2.3.2 Semaphore
限制同時在線的用戶數量
-
模擬一個登錄系統,最多限制給定數量的人員同時在線,如果所能申請的許可證不足,那么將告訴用戶無法登錄,請稍后重試;
-
主類
SemaphoreDemo7
(模擬多用戶登錄):public class SemaphoreDemo7 {public static void main(String[] args) {// 最多允許 10 個用戶同時在線final int MAX_PERMIT_LOGIN_ACCOUNT = 10; LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);// 啟動 20 個線程(模擬 20 個用戶登錄)IntStream.range(0, 20).forEach(i -> new Thread(() -> {// 執行登錄boolean login = loginService.login(); if (!login) {// 登錄失敗(超過并發數)System.out.println(Thread.currentThread() + " 因超過最大在線數被拒絕");return;}try {simulateWork(); // 模擬登錄后的業務操作} finally {loginService.logout(); // 退出時釋放許可證}}, "User-" + i).start());}// 模擬登錄后的業務操作(隨機休眠,模擬用戶在線時長)private static void simulateWork() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) { /* 忽略中斷 */ }} }
-
創建
LoginService
,傳入最大在線數10
; -
啟動 20 個線程(用戶),調用
loginService.login()
嘗試登錄; -
登錄成功 → 執行
simulateWork()
(模擬用戶在線操作,隨機休眠 )→ 退出時logout()
釋放許可證; -
登錄失敗 → 直接提示并返回;
-
-
LoginService
類(控制登錄邏輯)private static class LoginService {private final Semaphore semaphore; public LoginService(int maxPermitLoginAccount) {// 創建 Semaphore:許可證數量=maxPermitLoginAccount,公平策略(true)this.semaphore = new Semaphore(maxPermitLoginAccount, true); }public boolean login() {// 嘗試獲取許可證:非阻塞,拿不到直接返回 falseboolean success = semaphore.tryAcquire(); if (success) {System.out.println(Thread.currentThread() + " 登錄成功");}return success;}public void logout() {// 釋放許可證:登錄成功的用戶退出時,歸還許可證semaphore.release(); System.out.println(Thread.currentThread() + " 退出成功");} }
-
Semaphore
許可證數量 = 最大在線用戶數(10
),保證同一時間最多 10 個線程(用戶)能拿到許可證;login()
用tryAcquire()
嘗試拿許可證:- 拿到 → 返回
true
(登錄成功); - 拿不到 → 返回
false
(登錄失敗,超過并發);
- 拿到 → 返回
-
logout()
用release()
釋放許可證,讓其他用戶可登錄;
-
-
運行效果:
-
登錄成功:前 10 個線程(用戶)能拿到許可證,打印
登錄成功
,執行simulateWork()
隨機休眠; -
登錄失敗:后 10 個線程調用
tryAcquire()
時,許可證已耗盡,返回false
,打印因超過最大在線數被拒絕
; -
用戶退出:休眠結束后,線程執行
logout()
釋放許可證,Semaphore
計數器 +1,后續新線程(或之前阻塞的線程)有機會拿到許可證登錄;
-
-
如果把
login
里的tryAcquire()
換成acquire()
(阻塞式獲取 ):public boolean login() {try {// 阻塞式獲取:拿不到就一直等,直到有許可證semaphore.acquire(); System.out.println(Thread.currentThread() + " 登錄成功");return true;} catch (InterruptedException e) {// 被中斷時返回登錄失敗return false; } }
- 效果:超過并發數的用戶不會直接失敗,而是阻塞等待,直到有用戶退出釋放許可證,再繼續登錄。
2.4 應用場景總結
-
Semaphore
(信號量)是高并發工具,核心能力是控制同時訪問共享資源的線程數量,讓有限的資源(比如連接池、文件句柄)在高并發下被合理使用,避免系統被壓垮; -
應用場景總結
-
限流(流量控制):系統的某個資源(比如接口、數據庫連接)能承受的并發量有限,需要限制同時訪問的線程數,防止資源被打滿導致系統崩潰;
- 接口限流:比如商品詳情接口,最多允許 100 個線程同時訪問,用
Semaphore(100)
控制,超過的請求排隊或拒絕; - 數據庫連接限流:數據庫連接池有 20 個連接,用
Semaphore(20)
控制,避免幾千個線程同時搶連接,把數據庫壓垮; - 側重于控制并發訪問量,保護資源不被壓垮,比如接口、網關層的流量控制;
- 接口限流:比如商品詳情接口,最多允許 100 個線程同時訪問,用
-
資源池(維護有限資源):系統有一組有限資源(比如數據庫連接、文件句柄、網絡端口),需要讓線程按需借用、用完歸還,保證資源被合理復用;
- 數據庫連接池:初始化
Semaphore(連接數)
,線程需要連接時acquire()
拿許可證(同時從池里取連接),用完后release()
釋放許可證(同時把連接還回池); - 文件訪問池:如果有 5 個文件句柄,用
Semaphore(5)
控制,線程訪問文件時拿許可證,訪問完歸還,保證同一時間最多 5 個線程操作文件; - 側重于管理有限資源的借用/歸還,保證資源復用,比如連接池、句柄池的資源調度;
- 數據庫連接池:初始化
-
-
但本質都是用
Semaphore
的許可證數量,限制同時使用資源的線程數。
3 CountDownLatch(閉鎖)
3.1 簡介
-
CountDownLatch
是多線程同步工具,解決的問題是:讓一個或多個線程等待其他多個任務全部完成后,再繼續執行。比如:- 主線程要等 10 個子線程都跑完初始化任務,才開始處理業務;
- 或者多個線程要等某個“總開關”任務完成,再一起執行;
-
工作流程:
-
初始化計數器:
CountDownLatch latch = new CountDownLatch(N);
,這里的N
是需要等待的任務數量(比如有 3 個子線程要執行,N=3
); -
等待線程:
latch.await();
,調用await()
的線程(比如主線程)會阻塞等待,直到N
減到 0; -
任務線程計數減 1:每個子任務線程執行完自己的邏輯后,調用
latch.countDown();
,讓計數器N-1
; -
計數器歸 0,等待線程喚醒:當所有子任務線程都調用
countDown()
,N
變成 0 ,之前阻塞的線程(await()
的線程)會被喚醒,繼續執行;
TA
:等待線程、T1/T2/T3
:任務線程cnt = 3
:對應CountDownLatch latch = new CountDownLatch(3);
,表示需要等待 3 個任務完成;過程:
-
線程
TA
調用await()
:TA
執行到latch.await()
時,會檢查計數器cnt
:此時cnt=3≠0
,所以TA
進入阻塞狀態(awaiting...
),暫停執行; -
任務線程
T1
完成:T1
執行latch.countDown()
→ 計數器cnt
從3→2
。此時cnt≠0
,TA
仍阻塞; -
任務線程
T2
完成:T2
執行latch.countDown()
→ 計數器cnt
從2→1
。此時cnt≠0
,TA
仍阻塞; -
任務線程
T3
完成:T3
執行latch.countDown()
→ 計數器cnt
從1→0
; -
當
cnt=0
時,CountDownLatch
會喚醒所有等待的線程(這里是TA
):TA
從阻塞狀態恢復(resumed
),繼續執行后續邏輯;
-
-
關鍵特性:
-
一次性:計數器
N
減到 0 后,就不能再重置或復用,只能用一次; -
多線程等待:可以有多個線程調用
await()
,一起等待N
歸 0 后被喚醒; -
任務結束的寬泛性:子任務結束包括正常跑完或者拋異常終止,只要調用
countDown()
,就會讓計數器減 1;
-
-
典型場景:
- 并行任務匯總:比如計算一個大數組的和,拆成 10 個子數組并行計算,主線程等 10 個子線程都算完,再匯總結果;
- 系統啟動初始化:系統啟動時,需要初始化 5 個服務(比如緩存、數據庫連接、配置加載),主線程等 5 個服務都初始化完,再對外提供服務;
- 測試多線程并發:測試時,讓 100 個線程等信號,信號發出(
countDown()
)后一起執行,模擬高并發場景。
3.2 常用API
3.2.1 構造器
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
CountDownLatch
構造時,count
必須 ≥0,否則拋IllegalArgumentException
;count
減到 0 后,無法重置,CountDownLatch
只能用一次。
3.2.2 常用方法
-
總覽:
// 1. await():調用的線程會阻塞,直到 count 減到 0 public void await() throws InterruptedException {}; // 2. await(long timeout, TimeUnit unit):阻塞等待,但最多等 timeout 時間; // 若超時后 count 仍≠0,不再等待,返回 false public boolean await(long timeout, TimeUnit unit) throws InterruptedException {}; // 3. countDown():讓 count 減 1,直到 count=0 時喚醒所有等待的線程 public void countDown() {};
-
await()
:調用線程進入阻塞狀態,直到countDownLatch
的count
減到 0;- 如果等待中被其他線程中斷,會拋出
InterruptedException
; count=0
時,調用await()
會立即返回,不阻塞;
- 如果等待中被其他線程中斷,會拋出
-
await(long timeout, TimeUnit unit)
:阻塞等待,但增加了超時退出機制;- 返回值為
true
→ 等待中count
減到 0(正常喚醒); - 返回值為
false
→ 超時后count
仍≠0(放棄等待);
- 返回值為
-
countDown()
:讓count
減 1。當count
從1→0
時,會喚醒所有等待的線程(await()
的線程);count
已經是 0 時,調用countDown()
會被忽略(count
最小為 0);- 只有
count
從1→0
時,才會觸發喚醒;count
從3→2
這類變化,不會喚醒線程;
-
例:
// 1. 初始化:count=2 → 需要 2 次 countDown() 才會喚醒 await() 的線程 CountDownLatch latch = new CountDownLatch(2); // 2. 第一次 countDown() → count=2→1 latch.countDown(); // 3. 第二次 countDown() → count=1→0 → 喚醒所有 await() 的線程 latch.countDown(); // 4. 第三次 countDown() → count 已經是 0,調用被忽略 latch.countDown(); // 5. count=0,調用 await() 直接返回,不阻塞 latch.await();
3.3 使用
3.3.1 多任務完成后合并匯總
-
開發中常見多個任務并行執行,必須等所有任務完成后,再統一處理結果 的需求:
-
比如“數據詳情頁”需要同時調用 5 個接口(并行),等所有接口返回數據后,再合并結果展示;
-
或者“多個數據操作完成后,統一做校驗(check)”;
-
-
代碼:
public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {// 1. 初始化 CountDownLatch:需要等待 5 個任務完成CountDownLatch countDownLatch = new CountDownLatch(5); // 2. 啟動 5 個線程(模擬 5 個并行任務)for (int i = 0; i < 5; i++) { final int index = i;new Thread(() -> {try {// 模擬任務執行耗時(1~3 秒隨機)Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000)); System.out.println("任務 " + index + " 執行完成");// 3. 任務完成,計數器減 1(countDownLatch.countDown())countDownLatch.countDown(); } catch (InterruptedException e) {e.printStackTrace();}}).start();}// 4. 主線程阻塞等待:直到 5 個任務都完成(count=0)countDownLatch.await(); // 5. 所有任務完成后,主線程執行匯總邏輯System.out.println("主線程:在所有任務運行完成后,進行結果匯總"); } }
-
初始化
CountDownLatch
:new CountDownLatch(5)
→ 表示需要等待 5 個任務完成(計數器初始值5
); -
啟動并行任務:循環創建 5 個線程,模擬 5 個并行任務。每個線程:
-
Thread.sleep(...)
:模擬任務執行耗時(隨機 1~3 秒); -
countDownLatch.countDown()
:任務完成后,計數器減 1(5→4→3→2→1→0
);
-
-
主線程等待:
countDownLatch.await()
→ 主線程阻塞,直到計數器減到0
(所有任務完成); -
匯總結果:計數器歸
0
后,主線程被喚醒,執行System.out.println(...)
做結果匯總;
-
-
運行效果
-
5 個任務線程會隨機順序完成(因為
sleep
時間隨機),比如:任務 2 執行完成 任務 0 執行完成 任務 4 執行完成 任務 1 執行完成 任務 3 執行完成
-
主線程必須等所有任務打印完,才會輸出:
主線程:在所有任務運行完成后,進行結果匯總
-
-
核心價值
-
并行效率:5 個任務并行執行,不用等待前一個任務完成再執行下一個,節省時間;
-
同步控制:主線程通過
CountDownLatch
精準等待所有任務完成,保證匯總邏輯在所有數據就緒后執行。
-
3.3.2 電商場景中的應用——等待所有子任務結束
-
需求:根據商品品類 ID,獲取 10 個商品,并行計算每個商品的最終價格(需調用 ERP、CRM 等系統,計算復雜、耗時),最后匯總所有價格返回;
ERP系統:ERP是企業資源計劃系統,它整合企業內部各部門的核心業務流程,如財務、采購、生產、銷售和人力資源等,以實現數據共享和資源優化;
CRM系統:CRM是客戶關系管理系統,它專注于管理公司與當前及潛在客戶的交互和業務往來,旨在改善客戶服務、提升銷售效率并維護客戶關系;
-
串行問題:如果一個一個計算(串行),總耗時 = 獲取商品時間 + 10×單個商品計算時間,商品越多越慢;
-
并行優化:用多線程并行計算商品價格,總耗時 = 獲取商品時間 + 最長單個商品計算時間,效率更高;
-
-
代碼:工具方法 & 數據類
// 根據品類 ID 獲取商品 ID 列表(模擬返回 1~10 號商品) private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray(); }// 商品價格數據類:存儲商品 ID 和計算后的價格 private static class ProductPrice {private final int prodID; // 商品 IDprivate double price; // 計算后的價格// 構造方法、get/set、toString 略... }
-
主邏輯:并行計算商品價格
public static void main(String[] args) throws InterruptedException {// 1. 獲取商品 ID 列表(1~10)final int[] products = getProductsByCategoryId(); // 2. 轉換為 ProductPrice 列表(初始價格未計算)List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new) // 每個商品 ID 對應一個 ProductPrice.collect(Collectors.toList()); // 3. 初始化 CountDownLatch:需要等待 10 個商品計算完成(products.length=10)final CountDownLatch latch = new CountDownLatch(products.length); // 4. 為每個商品啟動線程,并行計算價格list.forEach(pp -> {new Thread(() -> {try {System.out.println(pp.getProdID() + " -> 開始計算商品價格.");// 模擬耗時操作(調用外部系統):隨機休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 計算商品價格(模擬業務邏輯:奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶數商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇數商品 7.1 折}System.out.println(pp.getProdID() + " -> 價格計算完成.");} catch (InterruptedException e) {e.printStackTrace();} finally {// 6. 任務完成,計數器減 1latch.countDown(); }}).start(); // 啟動線程});// 7. 主線程阻塞等待:直到 10 個商品都計算完成(latch.await())latch.await(); // 8. 所有商品計算完成,匯總結果System.out.println("所有價格計算完成.");list.forEach(System.out::println); }
-
準備商品數據:調用
getProductsByCategoryId()
獲取 1~10 號商品 ID,轉成ProductPrice
列表(初始價格未計算); -
初始化
CountDownLatch
:new CountDownLatch(10)
→ 表示需要等待10 個商品的計算任務完成; -
并行計算價格:為每個商品啟動線程:
- 模擬耗時操作(
TimeUnit.SECONDS.sleep(...)
); - 根據商品 ID 奇偶,設置不同折扣價格(模擬業務邏輯);
- 任務完成后,
latch.countDown()
讓計數器減 1;
- 模擬耗時操作(
-
主線程等待 & 匯總結果:
latch.await()
阻塞主線程,直到 10 個任務都完成(計數器歸 0),最后打印所有商品的計算結果。
-
3.4 應用場景總結
-
并行任務同步:多個任務并行執行(比如 5 個線程同時下載文件),必須等所有任務都完成后,再執行下一步(比如合并文件);
- 用
CountDownLatch
讓主線程等待所有并行任務完成,保證后續操作在所有任務就緒后執行;
- 用
-
多任務匯總:需要統計多個線程的執行結果(比如 10 個線程分別計算一部分數據,最后匯總總和);
- 主線程等所有線程計算完,再統一匯總結果,避免部分數據未計算就開始匯總的問題;
-
資源初始化:系統啟動時,需要初始化多個資源(比如緩存、數據庫連接、配置加載),必須等所有資源初始化完成,再對外提供服務;
- 主線程等待所有資源初始化任務完成,保證系統啟動后資源可用。
3.5 不足
-
CountDownLatch
是一次性工具:-
構造時設置的計數器(比如
new CountDownLatch(5)
),一旦減到0
,就無法重置或復用; -
如果業務需要“重復等待多個任務完成”,
CountDownLatch
無法滿足,必須重新創建新的實例。
-
4 CyclicBarrier(回環柵欄/循環屏障)
4.1 簡介
-
CyclicBarrier
是多線程同步工具,解決的問題是:讓一組線程互相等待,直到所有線程都到達同一個“屏障點”,然后再一起繼續執行; -
關鍵特點:可循環使用(屏障可以重置,重復讓線程等待、一起執行);
-
工作流程:
-
初始化屏障:
CyclicBarrier barrier = new CyclicBarrier(N);
,N
是“需要等待的線程數量”(比如 5 個線程要一起執行后續邏輯); -
線程到達屏障點:每個線程執行到
barrier.await();
時,會阻塞等待,直到有N
個線程都調用了await()
; -
所有線程到達,一起執行:當第
N
個線程調用await()
后,所有阻塞的線程會被同時喚醒,繼續執行后續邏輯; -
循環使用:喚醒后,屏障可以重置(通過
reset()
方法 ),再次讓新的一組線程等待、一起執行;
-
-
適合把一個大任務拆成多個子任務并行執行,等所有子任務完成后,再統一做下一步的場景,且需要重復執行該流程。典型場景有:
-
并行計算 + 合并結果:比如計算一個大數組的和,拆成 10 個子數組并行計算,等所有子數組算完,再合并總和。計算完一次后,還能再拆新的數組,重復使用屏障。
-
多階段任務:系統升級時,先讓 5 個節點并行執行數據遷移,全部完成后,再一起執行驗證數據,驗證完還能繼續下一階段(比如啟動服務),屏障可循環用;
-
-
與
CountDownLatch
的核心區別特性 CyclicBarrier
CountDownLatch
是否可循環 可循環(屏障可重置,重復用) 一次性(計數器到 0 后無法重置) 等待的目標 等待“一組線程互相到達屏障點” 等待“其他線程完成任務(計數減到 0)” 典型場景 多階段并行任務(可重復) 單次多任務同步(不可重復)
4.2 常用API
4.2.1 構造器
-
有兩個構造器:
public CyclicBarrier(int parties)
-
parties
:需要等待的線程總數。比如傳4
,表示必須有 4 個線程都調用await()
,屏障才會放行; -
作用:初始化一個基礎的循環屏障,所有線程到達后一起執行后續邏輯;
public CyclicBarrier(int parties, Runnable barrierAction)
-
parties
:同上,需要等待的線程總數; -
barrierAction
:一個Runnable
任務。當所有線程到達屏障后,會優先執行這個任務,再讓所有線程繼續執行; -
作用:適合線程到達屏障后,需要先統一處理一些邏輯(比如匯總數據、初始化資源)的場景;
-
-
工作原理:以
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable(){...});
為例-
初始化:
parties=4
→ 需要 4 個線程到達屏障;barrierAction
→ 所有線程到達后執行的任務;
-
線程到達屏障:每個線程執行
barrier.await();
時:- 計數器(
count
)減 1(初始4
→ 線程 1 調用后變成3
→ 線程 2 調用后變成2
→ 線程 3 調用后變成1
→ 線程 4 調用后變成0
); - 前 3 個線程調用
await()
后,會阻塞等待;
- 計數器(
-
屏障放行(所有線程到達):第 4 個線程調用
await()
后,count=0
,執行barrierAction
,然后喚醒所有阻塞的線程,一起繼續執行后續邏輯; -
循環復用:屏障放行后,
count
會重置為初始值(4),可以再次讓新的一組線程(4 個)等待、觸發屏障。
-
4.2.2 常用方法
-
總覽:
// 1. await():線程調用后阻塞,直到所有線程都調用 await(),屏障放行 public int await() throws InterruptedException, BrokenBarrierException {}// 2. await(long timeout, TimeUnit unit):帶超時的 await(),超時后屏障視為“被破壞” public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {}// 3. reset():重置屏障,讓計數器回到初始值,可重復使用 public void reset() {}
-
await()
:線程調用await()
后,會阻塞等待,直到有parties
個線程都調用await()
(parties
是構造器傳入的線程數);-
InterruptedException
:等待中的線程被中斷; -
BrokenBarrierException
:屏障被破壞(比如其他線程await()
時被中斷、超時 ),導致當前線程無法繼續等待; -
返回值:返回當前線程在到達屏障的線程組中的索引(比如 4 個線程到達,第一個調用
await()
的線程返回3
,最后一個返回0
,索引從0
開始逆序);
-
-
await(long timeout, TimeUnit unit)
:和await()
類似,但增加超時機制。如果在timeout
時間內,湊不齊parties
個線程調用await()
,則觸發超時,屏障被標記為破壞。防止線程因其他線程異常,無限期阻塞;- 除了
InterruptedException
和BrokenBarrierException
,還可能拋出TimeoutException
(超時);
- 除了
-
reset()
:重置CyclicBarrier
,讓計數器回到初始值(parties
),屏障狀態恢復到未使用;- 注意:重置時,若有線程正在
await()
,會觸發BrokenBarrierException
(因為屏障被強制重置,這些線程的等待被打斷)。
- 注意:重置時,若有線程正在
4.3 使用
4.3.1 等待所有子任務結束
-
需求:根據品類 ID 獲取 10 個商品,并行計算每個商品的最終價格(模擬調用外部系統,耗時隨機),等所有商品價格計算完成后,匯總結果返回;
-
工具方法 & 數據類
// 根據品類 ID 獲取商品 ID 列表(1~10 號商品) private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray(); }// 商品價格數據類:存儲商品 ID 和計算后的價格 private static class ProductPrice {private final int prodID; // 商品 IDprivate double price; // 計算后的價格// 構造方法、get/set、toString 略... }
-
主邏輯:用
CyclicBarrier
同步多線程public static void main(String[] args) throws InterruptedException {// 1. 獲取商品 ID 列表,轉換為 ProductPrice 列表final int[] products = getProductsByCategoryId();List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList()); // 2. 初始化 CyclicBarrier:需要等待 list.size()(10)個線程到達屏障final CyclicBarrier barrier = new CyclicBarrier(list.size()); // 3. 存儲線程的列表(用于后續 join 等待)final List<Thread> threadList = new ArrayList<>(); // 4. 為每個商品啟動線程,并行計算價格list.forEach(pp -> {Thread thread = new Thread(() -> {try {System.out.println(pp.getProdID() + " 開始計算商品價格.");// 模擬耗時操作(調用外部系統):隨機休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 計算商品價格(奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶數商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇數商品 7.1 折}System.out.println(pp.getProdID() + " -> 價格計算完成.");// 6. 等待其他線程:調用 await(),直到所有線程都到達屏障barrier.await(); } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});threadList.add(thread); // 記錄線程thread.start(); // 啟動線程});// 7. 等待所有線程執行完成(通過 join 保證主線程等所有子線程跑完)threadList.forEach(t -> {try {t.join(); } catch (InterruptedException e) {e.printStackTrace();}});// 8. 所有商品價格計算完成,匯總結果System.out.println("所有價格計算完成.");list.forEach(System.out::println); }
-
準備商品數據:調用
getProductsByCategoryId()
獲取 1~10 號商品 ID,轉成ProductPrice
列表(初始價格未計算); -
初始化
CyclicBarrier
:new CyclicBarrier(list.size())
→ 表示需要等待 10 個線程 到達屏障(每個商品對應一個線程); -
并行計算價格:為每個商品啟動線程:
- 模擬耗時操作(
TimeUnit.SECONDS.sleep(...)
); - 根據商品 ID 奇偶,設置不同折扣價格;
- 調用
barrier.await()
:線程到達屏障,阻塞等待其他線程;
- 模擬耗時操作(
-
屏障放行:當第 10 個線程調用
await()
后,所有阻塞的線程會被同時喚醒,繼續執行后續邏輯; -
主線程等待 & 匯總結果:通過
threadList.forEach(t -> t.join())
讓主線程等待所有子線程執行完成,最后打印所有商品的計算結果。
-
4.3.2 CyclicBarrier的循環特性——模擬跟團旅游
-
需求:跟團旅游
-
第一階段(上車屏障):
- 導游要求:所有游客上車后,大巴才出發(對應
CyclicBarrier
的第一次await()
); - 類比:10 個游客 + 1 個導游(主線程)= 11 個線程,湊齊后屏障放行;
- 導游要求:所有游客上車后,大巴才出發(對應
-
第二階段(下車屏障):
- 導游要求:所有游客下車后,大巴才去下一個景點(對應
CyclicBarrier
的第二次await()
); - 類比:同一組線程(游客 + 導游)再次湊齊,屏障放行,實現“循環復用”;
- 導游要求:所有游客下車后,大巴才去下一個景點(對應
-
-
游客線程邏輯(
Tourist
類)private static class Tourist implements Runnable {private final int touristID; // 游客編號private final CyclicBarrier barrier; // 循環屏障public Tourist(int touristID, CyclicBarrier barrier) {this.touristID = touristID;this.barrier = barrier;}@Overridepublic void run() {// 1. 模擬上車(第一階段:上車同步)System.out.printf("游客:%d 乘坐旅游大巴\n", touristID);spendSeveralSeconds(); // 模擬上車耗時waitAndPrint("游客:%d 上車,等別人上車.\n"); // 調用 await(),等待湊齊 11 個線程// 2. 模擬下車(第二階段:下車同步)System.out.printf("游客:%d 到達目的地\n", touristID);spendSeveralSeconds(); // 模擬下車耗時waitAndPrint("游客:%d 下車,等別人下車.\n"); // 再次調用 await(),等待湊齊 11 個線程}// 調用 barrier.await(),并打印日志private void waitAndPrint(String message) {System.out.printf(message, touristID);try {barrier.await(); // 線程到達屏障,阻塞等待} catch (InterruptedException | BrokenBarrierException e) {// 忽略異常}}// 模擬隨機耗時(上車/下車的時間)private void spendSeveralSeconds() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {// 忽略異常}} }
-
主線程邏輯(導游視角)
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {// parties=11 → 需要 11 個線程到達屏障(10 個游客線程 + 1 個主線程)final CyclicBarrier barrier = new CyclicBarrier(11);// 啟動 10 個游客線程for (int i = 0; i < 10; i++) {new Thread(new Tourist(i, barrier)).start();}// 4. 主線程(導游)參與第一階段屏障:等待所有游客上車barrier.await(); System.out.println("導游:所有的游客都上了車.");// 5. 主線程(導游)參與第二階段屏障:等待所有游客下車barrier.await(); System.out.println("導游:所有的游客都下車了."); }
-
上車同步
-
10 個游客線程 + 1 個主線程(導游),共 11 個線程;
-
每個游客線程執行到
waitAndPrint("游客:%d 上車,等別人上車.\n")
→ 調用barrier.await()
,阻塞等待; -
當 11 個線程都調用
await()
后,屏障放行:-
打印所有“游客上車等待”的日志;
-
主線程繼續執行,打印
導游:所有的游客都上了車.
;
-
-
-
下車同步
-
同一組 11 個線程(10 個游客 + 1 個主線程 ),再次執行到
waitAndPrint("游客:%d 下車,等別人下車.\n")
→ 調用barrier.await()
,阻塞等待; -
當 11 個線程都調用
await()
后,屏障放行:-
打印所有“游客下車等待”的日志;
-
主線程繼續執行,打印
導游:所有的游客都下車了.
;
-
-
-
CyclicBarrier
的循環特性-
可重復觸發:同一
CyclicBarrier
實例,可通過多次await()
實現“多階段同步”(上車→下車); -
線程組復用:同一組線程(游客 + 導游 )參與多個階段的屏障,無需重新創建實例。
-
4.4 應用場景總結
-
CyclicBarrier
是多線程同步工具,核心解決讓一組線程互相等待,全部到達同一屏障點后,再一起繼續執行的問題,且可循環使用(屏障可重置,重復同步多階段任務); -
應用場景:
-
多線程任務拆分與合并:一個復雜任務(比如計算大數據集的總和)拆成多個子任務(比如 10 個線程各算一部分),必須等所有子任務完成后,再合并結果;
-
多線程數據處理同步:多個線程并行處理不同的數據分片(比如處理 5 個文件),必須等所有線程處理完自己的數據,再統一匯總、校驗或持久化。
-
4.5 CyclicBarrier VS CountDownLatch
-
可復用性
-
CountDownLatch
:一次性工具。構造時設置的計數器(比如new CountDownLatch(5)
),一旦減到0
,無法重置或復用; -
CyclicBarrier
:可循環復用。計數器(parties
)可以通過reset()
重置,重復讓新的線程組等待、觸發屏障;
-
-
等待目標
-
CountDownLatch
:await()
的線程等待其他線程調用countDown()
把計數器減到0
(主線程等子線程完成); -
CyclicBarrier
:await()
的線程等待其他線程也到達屏障點(調用await()
)(線程組互相等待);
-
-
計數器特性
-
CountDownLatch
:計數器只能遞減(從N→0
),且無法重置; -
CyclicBarrier
:計數器可以重置(通過reset()
回到初始值parties
),支持多輪同步。
-
5 Exchange(數據交換機)
5.1 簡介
-
Exchanger
專門解決兩個線程需要互相交換數據的場景,讓兩個線程在“交換點”(調用exchange
方法時)同步,安全交換數據; -
工作流程
- 線程 1 調用
exchange(object1)
:線程 1 會阻塞等待,直到線程 2 也調用exchange
方法; - 線程 2 調用
exchange(object2)
:此時兩個線程都到達“交換點”,Exchanger
會將object1
傳遞給線程 2,將object2
傳遞給線程 1; - 交換后繼續執行:線程 1 拿到
object2
,線程 2 拿到object1
,繼續執行后續邏輯;
- 線程 1 調用
5.2 常用API
-
public V exchange(V x) throws InterruptedException
-
功能:
- 當前線程攜帶數據
x
,阻塞等待另一個線程到達交換點; - 對方線程到達后,交換數據:當前線程接收對方數據,返回給調用方;
- 當前線程攜帶數據
-
異常:等待中若線程被中斷,拋出
InterruptedException
;
-
-
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
- 同上,但增加超時機制。如果在
timeout
時間內,對方線程未到達交換點,拋出TimeoutException
; - 適用場景:防止線程因對方異常,無限期阻塞。
- 同上,但增加超時機制。如果在
5.3 使用
5.3.1 模擬交易場景
-
模擬買賣雙方交易:
-
賣家帶“商品”(
goods = "電腦"
),買家帶“錢”(money = "$4000"
); -
雙方必須都到達“交易點”(調用
exchanger.exchange(...)
),才能交換數據(一手交錢,一手交貨);
-
-
代碼:
public class ExchangerDemo {private static Exchanger exchanger = new Exchanger(); static String goods = "電腦";static String money = "$4000";public static void main(String[] args) throws InterruptedException {System.out.println("準備交易,一手交錢一手交貨...");// 賣家線程:攜帶 goods,等待買家new Thread(() -> {try {System.out.println("賣家到了,已經準備好貨:" + goods);// 交換數據:賣家發送 goods,接收 moneyString receivedMoney = (String) exchanger.exchange(goods); System.out.println("賣家收到錢:" + receivedMoney);} catch (Exception e) { /* 忽略異常 */ }}).start();// 主線程休眠 3 秒,模擬買家延遲到達Thread.sleep(3000); // 買家線程:攜帶 money,等待賣家new Thread(() -> {try {System.out.println("買家到了,已經準備好錢:" + money);// 交換數據:買家發送 money,接收 goodsString receivedGoods = (String) exchanger.exchange(money); System.out.println("買家收到貨:" + receivedGoods);} catch (Exception e) { /* 忽略異常 */ }}).start();} }
-
同步交換:賣家先調用
exchange(goods)
會阻塞,直到買家調用exchange(money)
,雙方交換數據; -
數據流向:賣家發送
goods
→ 接收money
;買家發送money
→ 接收goods
。
-
5.3.2 模擬對賬場景
-
模擬數據對賬:
- 線程 1 生成數據
A
,線程 2 生成數據B
; - 雙方交換數據后,線程 2 校驗
A
和B
是否一致;
- 線程 1 生成數據
-
代碼:
public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) {// 線程 1:發送數據 AthreadPool.execute(() -> {try {String A = "12379871924sfkhfksdhfks";exchanger.exchange(A); // 發送 A,等待線程 2} catch (InterruptedException e) { /* 忽略 */ }});// 線程 2:發送數據 B,接收數據 A,校驗一致性threadPool.execute(() -> {try {String B = "32423423jkmjkfsbfj";String A = exchanger.exchange(B); // 發送 B,接收 ASystem.out.println("A和B數據是否一致:" + A.equals(B));System.out.println("A= " + A);System.out.println("B= " + B);} catch (InterruptedException e) { /* 忽略 */ }});threadPool.shutdown();} }
-
數據校驗:線程 2 接收線程 1 的數據
A
后,對比自己的B
,判斷是否一致; -
線程池簡化:用線程池管理兩個線程,避免手動創建
Thread
;
-
5.3.3 模擬隊列中交換數據
-
模擬生產者 - 消費者模式,但通過
Exchanger
動態交換“滿隊列”和“空隊列”:-
生產者往
emptyQueue
放數據,滿了就和消費者交換隊列(拿空隊列繼續生產); -
消費者從
fullQueue
取數據,空了就和生產者交換隊列(拿滿隊列繼續消費);
-
-
代碼:
public class ExchangerDemo3 {// 滿隊列(消費者初始用)、空隊列(生產者初始用)private static ArrayBlockingQueue<String> fullQueue = new ArrayBlockingQueue<>(5); private static ArrayBlockingQueue<String> emptyQueue = new ArrayBlockingQueue<>(5); private static Exchanger<ArrayBlockingQueue<String>> exchanger = new Exchanger<>(); public static void main(String[] args) {new Thread(new Producer()).start(); // 啟動生產者new Thread(new Consumer()).start(); // 啟動消費者}// 生產者:往隊列放數據,滿了就交換隊列static class Producer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = emptyQueue; try {while (current != null) {String str = UUID.randomUUID().toString();try {current.add(str); // 往隊列放數據System.out.println("producer:生產了一個序列:" + str + ">>>>>加入到交換區");Thread.sleep(2000);} catch (IllegalStateException e) {// 隊列滿了,交換隊列(拿空隊列)System.out.println("producer:隊列已滿,換一個空的");current = exchanger.exchange(current); }}} catch (Exception e) { /* 忽略 */ }}}// 消費者:從隊列取數據,空了就交換隊列static class Consumer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = fullQueue; try {while (current != null) {if (!current.isEmpty()) {String str = current.poll(); // 從隊列取數據System.out.println("consumer:消耗一個序列:" + str);Thread.sleep(1000);} else {// 隊列空了,交換隊列(拿滿隊列)System.out.println("consumer:隊列空了,換個滿的");current = exchanger.exchange(current); System.out.println("consumer:換滿的成功~~~~~~~~~~~~~~~~~~~~~~");}}} catch (Exception e) { /* 忽略 */ }}} }
-
動態隊列交換:
- 生產者隊列滿 → 用
exchanger.exchange(current)
交換出空隊列,繼續生產; - 消費者隊列空 → 用
exchanger.exchange(current)
交換出滿隊列,繼續消費;
- 生產者隊列滿 → 用
-
解耦生產和消費:通過交換隊列,避免生產者/消費者因隊列滿/空阻塞,靈活控制數據流轉。
-
5.4 應用場景總結
- 數據交換:兩個線程需要安全交換數據(如交易場景的“錢 - 貨”交換);
- 保證“交換原子性”,避免數據不一致;
- 數據采集:采集線程(生產者)和處理線程(消費者)交換數據(如日志采集→日志處理);
- 解耦數據生產和消費,通過交換數據緩沖,提升系統吞吐量。
6 Phaser(階段協同器)
6.1 簡介
-
Phaser
用于協調多個線程的多階段執行,支持:-
動態調整參與線程的數量(可增、可減);
-
分階段同步(線程完成當前階段,再一起進入下一階段);
-
比
CyclicBarrier
更靈活(支持動態線程數、多階段),比CountDownLatch
更強大(可循環、可動態調整);
-
-
核心特性
-
多階段同步:線程可以分多個階段執行(如
phase-0 → phase-1 → phase-2
),每個階段都需要線程同步后再繼續; -
動態線程管理:
-
可通過
register()
動態增加參與線程; -
可通過
arriveAndDeregister()
動態減少參與線程;
-
-
靈活的階段控制:每個階段完成后,可自定義邏輯(重寫
onAdvance
方法),決定是否繼續下一階段;
-
-
工作流程
- 階段 0(
phase-0
):- 多個線程執行“階段 0”的任務;
- 線程調用
arriveAndAwaitAdvance()
表示“階段 0 完成”,等待其他線程也完成“階段 0”;
- 進入階段 1(
phase-1
):- 所有線程都完成“階段 0”后,一起進入“階段 1”;
- 重復“執行任務 → 同步等待”的流程;
- 多階段循環:支持多個階段(
phase-0 → phase-1 → phase-2 → ...
),直到手動終止或所有線程退出;
- 階段 0(
6.2 常用 API
-
構造方法
構造方法 作用 Phaser()
初始化一個“參與任務數為 0”的 Phaser
,后續用register()
動態添加線程Phaser(int parties)
指定初始參與線程數(類似 CyclicBarrier
的parties
)Phaser(Phaser parent)
作為子階段協同器,依附于父 Phaser
,適合復雜多階段場景Phaser(Phaser parent, int parties)
結合父 Phaser
和初始線程數,更靈活的初始化 -
增減參與線程
方法 作用 int register()
動態增加一個參與線程,返回當前階段號 int bulkRegister(int parties)
動態增加多個參與線程(批量注冊),返回當前階段號 int arriveAndDeregister()
線程完成任務后,退出參與(減少一個線程),返回當前階段號 -
到達、等待方法
方法 作用 int arrive()
標記“當前線程完成階段任務”,但不等待其他線程,繼續執行 int arriveAndAwaitAdvance()
標記“當前線程完成階段任務”,等待其他線程也完成,再進入下一階段 int awaitAdvance(int phase)
等待進入指定階段(需當前階段匹配) int awaitAdvanceInterruptibly(int phase)
同上,但等待中可被中斷 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
帶超時的等待,超時后拋出異常 -
階段自定義邏輯
protected boolean onAdvance(int phase, int registeredParties)
-
作用:每個階段完成后,自動調用此方法,決定是否繼續下一階段;
-
返回值:
true
:階段結束,Phaser
不再繼續(可用于終止多階段流程);false
:繼續下一階段。
-
6.3 使用
-
需求:模擬了公司團建的多階段活動,團建分4個階段,參與人數動態變化:
-
階段0:所有人到公司集合 → 出發去公園
-
階段1:所有人到公園門口 → 出發去餐廳
-
階段2:部分人到餐廳(有人提前離開,有人新增加入)→ 開始用餐
-
階段3:用餐結束 → 活動終止
-
參與人數不固定(有人早退、有人中途加入),每個階段必須等人齊了再繼續
-
-
代碼:
public class PhaserDemo {public static void main(String[] args) {final Phaser phaser = new Phaser() {// 每個階段完成后自動調用下面的 onAdvance 方法,打印階段總結,并判斷是否終止(只剩主線程時終止)@Overrideprotected boolean onAdvance(int phase, int registeredParties) {// registeredParties 是當前注冊的線程數(包括主線程),減去 1 得到實際員工數// 主線程:作為協調者,全程參與并動態添加中途加入者int staffs = registeredParties - 1;// 每個階段完成后的提示信息switch (phase) {case 0:System.out.println("大家都到公司了,出發去公園,人數:" + staffs);break;case 1:System.out.println("大家都到公園門口了,出發去餐廳,人數:" + staffs);break;case 2:System.out.println("大家都到餐廳了,開始用餐,人數:" + staffs);break;}// 終止條件:只剩主線程(registeredParties == 1)return registeredParties == 1;}};// 注冊主線程————讓主線程全程參與phaser.register();final StaffTask staffTask = new StaffTask();// 全程參與者:3 人(參與所有 4 個階段)for (int i = 0; i < 3; i++) {// 添加任務數phaser.register();new Thread(() -> {try {staffTask.step1Task();//到達后等待其他任務到達phaser.arriveAndAwaitAdvance();staffTask.step2Task();phaser.arriveAndAwaitAdvance();staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 早退者:2 人(只參與前 2 個階段,到公園后離開)for (int i = 0; i < 2; i++) {phaser.register();new Thread(() -> {try {staffTask.step1Task();phaser.arriveAndAwaitAdvance();staffTask.step2Task();System.out.println("員工【" + Thread.currentThread().getName() + "】回家了");// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 中途加入者:4 人(從階段 2 開始參與,直接到餐廳聚餐)while (!phaser.isTerminated()) {int phase = phaser.arriveAndAwaitAdvance();if (phase == 2) {for (int i = 0; i < 4; i++) {phaser.register();new Thread(() -> {try {staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}}static final Random random = new Random();// 封裝了 4 個階段的具體動作(從家出發→到公司→去公園→去餐廳→用餐),每個階段用 Thread.sleep 模擬耗時static class StaffTask {public void step1Task() throws InterruptedException {// 第一階段:來公司集合String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "從家出發了……");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達公司");}public void step2Task() throws InterruptedException {// 第二階段:出發去公園String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出發去公園玩");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達公園門口集合");}public void step3Task() throws InterruptedException {// 第三階段:去餐廳String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出發去餐廳");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達餐廳");}public void step4Task() throws InterruptedException {// 第四階段:就餐String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "開始用餐");Thread.sleep(random.nextInt(5000));System.out.println(staff + "用餐結束,回家");}} }
-
階段0:到公司集合
- 主線程 + 3個全程者 + 2個早退者 → 共6個線程,調用
step1Task()
(從家到公司) - 完成后調用
phaser.arriveAndAwaitAdvance()
→ 等待所有人到公司 - 階段結束:
onAdvance
觸發,打印“出發去公園,人數:5”(6-1=5)
- 主線程 + 3個全程者 + 2個早退者 → 共6個線程,調用
-
階段1:到公園門口
- 所有線程調用
step2Task()
(從公司到公園),完成后調用phaser.arriveAndAwaitAdvance()
→ 等待所有人到公園 - 2個早退者完成后調用
phaser.arriveAndDeregister()
→ 退出(注冊線程數變為6-2=4) - 階段結束:
onAdvance
觸發,打印“出發去餐廳,人數:3”(4-1=3,只剩3個全程者+主線程)
- 所有線程調用
-
階段2:到餐廳集合
- 主線程動態添加:檢測到階段2時,新增4個中途加入者 → 注冊線程數變為4+4=8
- 3個全程者調用
step3Task()
(從公園到餐廳) - 4個新加入者直接調用
step3Task()
(到餐廳)
- 3個全程者調用
- 所有人調用
phaser.arriveAndAwaitAdvance()
→ 等待到餐廳 - 階段結束:
onAdvance
觸發,打印“開始用餐,人數:7”(8-1=7,3+4=7個員工+主線程)
- 主線程動態添加:檢測到階段2時,新增4個中途加入者 → 注冊線程數變為4+4=8
-
階段3:用餐結束
- 所有7人調用
step4Task()
(用餐),完成后調用phaser.arriveAndDeregister()
→ 所有人退出(注冊線程數逐漸減少至1,只剩主線程) - 終止條件:
onAdvance
檢測到registeredParties == 1
→ 返回true
,Phaser
終止
- 所有7人調用
-
-
Phaser
核心特性體現-
多階段同步:通過
arriveAndAwaitAdvance()
實現每個階段的等待,確保人齊后再進入下一階段; -
動態線程管理:
-
phaser.register()
:新增參與者(如中途加入的4人); -
phaser.arriveAndDeregister()
:參與者退出(如早退者和用餐結束的人);
-
-
階段自定義邏輯:
onAdvance
方法實現每個階段的總結,并控制流程終止條件; -
靈活的協同:相比
CyclicBarrier
(固定線程數),Phaser
能應對“有人早退、有人中途加入”的動態場景。
-
6.4 應用場景總結
-
多線程任務分配:把一個復雜任務拆成多個子任務,分配給不同線程并行執行,且需要協調子任務的進度(比如所有子任務完成后,再合并結果);
-
用
Phaser
分階段管理:- 階段 0:子任務分配,線程開始執行
- 階段 1:所有子任務完成,合并結果
-
支持動態調整線程數(比如某個子任務需要更多線程,用
register()
新增);
-
-
多級任務流程:任務需要分多個層級/階段執行,必須等當前級所有任務完成,才能觸發下一級任務(比如“數據采集→數據清洗→數據匯總→結果輸出”);
-
每個層級對應
Phaser
的一個階段(phase-0
采集→phase-1
清洗→phase-2
匯總); -
通過
arriveAndAwaitAdvance()
確保“當前級完成后,再進入下一級”,流程更清晰;
-
-
模擬并行計算:模擬分布式并行計算(比如科學計算、大數據處理 ),需要協調多個線程的“計算階段”(比如矩陣計算分塊執行,所有分塊完成后再合并);
- 用
Phaser
同步“分塊計算階段”和“合并階段”,確保:- 所有分塊計算完成(階段 0 同步);
- 合并結果后,再進入下一階段(階段 1 同步);
- 用
-
階段性任務:任務天然是階段性的,每個階段需要所有線程同步后再繼續(比如“團隊項目”:需求評審→開發→測試→上線,每個階段必須全員完成);
-
每個階段對應
Phaser
的phase
,通過arriveAndAwaitAdvance()
實現“階段同步”; -
支持動態調整參與線程(比如測試階段需要新增測試人員,用
register()
加入);
-
-
上面所有場景都需要多階段同步 + 動態線程協作,每個階段必須等所有線程完成,再進入下一階段。
Phaser
優勢:-
比
CyclicBarrier
更靈活:支持動態增減線程、多階段自定義邏輯(onAdvance
); -
比
CountDownLatch
更強大:可循環分階段,而非一次性同步。
-