并發編程——06 JUC并發同步工具類的應用實戰

0 常用并發同步工具類的真實應用場景

  • JDK 提供了比synchronized更加高級的各種同步工具,包括ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等,可以實現更加豐富的多線程操作;

    在這里插入圖片描述

1 ReentrantLock(可重入的占用鎖)

1.1 簡介

  • ReentrantLock可重入的獨占鎖

    • “可重入”是指同一線程能多次獲取同一把鎖,不會自己阻塞自己;
    • “獨占”是說同一時間,最多只有一個線程能成功拿到鎖,其他線程得等待;
    • synchronized作用類似,都是解決多線程并發訪問共享資源時的線程安全問題;
  • 相比 synchronizedReentrantLock 多了這些靈活特性:

    • 可中斷:獲取鎖的過程中,線程能響應中斷(比如其他地方調用了 interrupt()),不用死等鎖釋放,更靈活控制執行流程;

    • 可設置超時時間:調用 tryLock(long timeout, TimeUnit unit) 時,線程在指定時間內沒拿到鎖,就會放棄嘗試,避免無限阻塞;

    • 可設置為公平鎖:默認 ReentrantLock 是 “非公平鎖”(新線程和等待隊列里的線程搶鎖,可能插隊),但它支持通過構造方法 ReentrantLock(true) 設為“公平鎖”,嚴格按線程等待順序分配鎖,減少線程饑餓(某些線程一直拿不到鎖);

  • synchronized 一樣,都支持可重入:

    • synchronizedwait/notify 實現線程通信,只能關聯一個等待隊列;
    • ReentrantLock 可通過 newCondition() 創建多個 Condition,精準控制不同線程的等待 / 喚醒,比如生產者 - 消費者模型里,能區分 “生產條件”“消費條件” 分別處理;
  • 應用場景:多線程搶共享資源時,需要獨占訪問保證數據安全,比如賣票系統(如下兩圖)、銀行賬戶轉賬;

    在這里插入圖片描述

    • 線程 A、B 搶鎖:線程 A、B 同時嘗試獲取鎖,假設線程 A 先拿到(鎖的獨占性,同一時間只有 A 能持有),此時 A 可以操作共享資源(比如修改車票庫存 ),B 因為沒搶到,進入 “等待” 狀態;

    在這里插入圖片描述

    • 線程 A 釋放鎖:A 操作完共享資源后,會釋放鎖;接著 B 再次嘗試獲取鎖,這次就能成功拿到,然后 B 開始操作共享資源(修改車票庫存)。

1.2 常用API

  • ReentrantLock 實現了Lock接口規范,常見API如下:

    方法方法聲明功能說明
    lockvoid lock()獲取鎖,調用該方法當前線程會獲取鎖,當鎖獲得后,該方法返回
    lockInterruptiblyvoid lockInterruptibly() throws InterruptedException可中斷的獲取鎖,和 lock() 方法不同之處在于該方法會響應中斷,即在鎖的獲取中可以中斷當前線程
    tryLockboolean tryLock()嘗試非阻塞的獲取鎖,調用該方法后立即返回。如果能夠獲取到鎖返回 true,否則返回 false
    tryLock(帶超時)boolean tryLock(long time, TimeUnit unit) throws InterruptedException超時獲取鎖,當前線程在以下三種情況下會返回:當前線程在超時時間內獲取了鎖;當前線程在超時時間內被中斷;超時時間結束,返回 false
    unlockvoid unlock()釋放鎖
    newConditionCondition newCondition()獲取等待通知組件,該組件和當前的鎖綁定,當前線程只有獲取了鎖,才能調用該組件的 await() 方法,而調用后,當前線程將釋放鎖
  • 基本用法:

    private final Lock lock = new ReentrantLock();public void foo()
    {// 獲取鎖lock.lock();try {// 程序執行邏輯} finally {// finally語句塊可以確保lock被正確釋放lock.unlock();}
    }// 嘗試獲取鎖,最多等待 100 毫秒  
    if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {  try {  // 成功獲取到鎖,執行需要同步的代碼塊  // ... 執行一些操作 ...  } finally {  // 釋放鎖  lock.unlock();  }  
    } else {  // 超時后仍未獲取到鎖,執行備選邏輯  // ... 執行一些不需要同步的操作 ...  
    }  
    
  • 在使用時要注意以下 4 個問題:

    • 默認情況下 ReentrantLock 為非公平鎖而非公平鎖;
    • 加鎖次數和釋放鎖次數一定要保持一致,否則會導致線程阻塞或程序異常;
    • 加鎖操作一定要放在try代碼之前,這樣可以避免未加鎖成功又釋放鎖的異常;
    • 釋放鎖一定要放在finally中,否則會導致線程阻塞;
  • 工作原理:

    • 當有線程調用lock方法時,會用 CAS(Compare-And-Swap,比較并交換) 操作,嘗試把 AQS(AbstractQueuedSynchronizer,抽象隊列同步器,Java 并發包的核心基礎組件 )內部的 state 變量從 0 改成 1

      • state=0 表示“鎖沒人用”,CAS 成功 → 線程拿到鎖,開始執行臨界區代碼(操作共享資源);
      • state=1 表示“鎖被占用”,CAS 失敗 → 線程搶鎖失敗,進入阻塞隊列(CLH 隊列,按 FIFO 排隊 ) 等待;
    • 搶鎖失敗的線程,會被包裝成節點(Node),加入隊列尾部(tail),隊列頭部是 head 節點(代表 “即將拿到鎖的線程”);

      • 隊列里的線程,都在等鎖釋放,避免線程忙等(一直重試搶鎖,浪費 CPU 資源);
      • 隊列是 FIFO(先進先出) 順序,理論上保證線程公平性,但實際還受“公平鎖 / 非公平鎖”策略影響;
    • 當持有鎖的線程執行完 unlock(),會把 state 改回 0(釋放鎖),然后喚醒隊列里的線程。這時分兩種策略:

      • 公平鎖(ReentrantLock(true)):嚴格按隊列順序喚醒:釋放鎖后,優先喚醒 head 節點的下一個節點(head.next),讓隊列里“等最久”的線程拿到鎖;

        • 優點:絕對公平,避免線程 饑餓(某些線程一直搶不到鎖);
          • 缺點:頻繁喚醒 / 切換線程,性能略低(線程上下文切換有開銷);
      • 非公平鎖(默認策略,ReentrantLock()):釋放鎖后,不嚴格按隊列順序,允許新線程和隊列里被喚醒的線程重新用 CAS 搶鎖:

        • 新線程(沒進隊列的)可能直接 CAS 搶鎖成功(插隊),不用進隊列等;

        • 隊列里的線程也會被喚醒,參與競爭;

        • 優點:減少線程切換,吞吐量更高(適合競爭不激烈的場景);

        • 缺點:可能讓隊列里的線程等更久,存在小概率線程饑餓;

    在這里插入圖片描述

1.3 使用

1.3.1 獨占鎖

  • 模擬搶票場景。8張票,10個人搶,如果不加鎖,會出現什么問題?

    /*** 模擬搶票場景*/
    public class ReentrantLockDemo {// 創建 ReentrantLock 實例,默認使用非公平鎖策略private final ReentrantLock lock = new ReentrantLock();//默認非公平// 共享資源:總票數,會有多個線程同時操作這個變量private static int tickets = 8;/*** 購買車票的方法* 核心邏輯:通過加鎖保證同一時間只有一個線程能執行購票操作*/public void buyTicket() {// 1. 獲取鎖:調用 lock() 方法,當前線程會嘗試獲取鎖// 如果鎖未被占用,則當前線程獲得鎖并繼續執行// 如果鎖已被占用,則當前線程會進入阻塞隊列等待lock.lock(); // 獲取鎖// 2. try-finally 結構保證鎖一定會被釋放// 即使代碼執行過程中發生異常,finally 塊也會執行解鎖操作try {// 3. 臨界區:操作共享資源(tickets 變量)if (tickets > 0) { // 檢查是否還有剩余車票try {// 休眠 10ms,放大并發問題的可能性// 如果不加鎖,這里會出現多個線程同時進入判斷并扣減票數的情況Thread.sleep(10); // 模擬出并發效果} catch (InterruptedException e) {e.printStackTrace();}// 打印購票信息,并將票數減 1(原子操作)System.out.println(Thread.currentThread().getName() + "購買了第" + tickets-- + "張票");} else {// 票已售罄時的提示System.out.println("票已經賣完了," + Thread.currentThread().getName() + "搶票失敗");}} finally {// 4. 釋放鎖:無論是否發生異常,都必須釋放鎖// 否則會導致其他線程永遠無法獲取鎖,造成死鎖lock.unlock(); // 釋放鎖}}public static void main(String[] args) {// 創建搶票系統實例(共享同一個鎖和票數變量)ReentrantLockDemo ticketSystem = new ReentrantLockDemo();// 創建 10 個線程模擬 10 個用戶搶票(總票數只有 8 張)for (int i = 1; i <= 10; i++) {Thread thread = new Thread(() -> {// 每個線程執行搶票操作ticketSystem.buyTicket(); // 搶票}, "線程" + i); // 給線程命名,方便觀察輸出// 啟動線程,線程進入就緒狀態,等待 CPU 調度thread.start();}try {// 主線程休眠 3000ms,等待所有搶票線程執行完畢// 避免主線程提前打印剩余票數Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 打印最終剩余票數,驗證是否正確(應該為 0)System.out.println("剩余票數:" + tickets);}
    }
    
  • 不加鎖:出現超賣問題

    在這里插入圖片描述

  • 加鎖:正常,兩人搶票失敗

    在這里插入圖片描述

1.3.2 公平鎖和非公平鎖

  • ReentrantLock 支持公平鎖和非公平鎖兩種模式:

    • 公平鎖:線程在獲取鎖時,按照線程等待的先后順序獲取鎖;
    • 非公平鎖:線程在獲取鎖時,不按照等待的先后順序獲取鎖,而是隨機獲取鎖。ReentrantLock 默認是非公平鎖;
    ReentrantLock lock = new ReentrantLock(); //參數默認false,不公平鎖  
    ReentrantLock lock = new ReentrantLock(true); //公平鎖  
    
  • 比如買票的時候就有可能出現插隊的場景,允許插隊就是非公平鎖,如下圖:

    在這里插入圖片描述

1.3.3 可重入鎖

  • 可重入鎖又名遞歸鎖,是指在同一線程里,只要鎖對象相同,內層方法(或代碼塊)能直接復用已獲取的鎖,不用重新競爭。比如線程執行 方法A 時拿到鎖,方法A 里調用 方法B(也需要同一把鎖),線程進 方法B 時不用等自己釋放鎖,直接繼續用;

  • Java 中ReentrantLocksynchronized都是可重入鎖

    • synchronized:隱式的(JVM 自動管加鎖 / 釋放)、可重入的內置鎖,只要是同一線程、同一對象鎖,內層同步代碼直接重入;
    • ReentrantLock:顯式的(手動 lock() 加鎖、unlock() 釋放)可重入鎖,功能更靈活(支持公平 / 非公平、可中斷、超時獲取等),但得手動配對加鎖釋放,否則容易死鎖;
  • 可重入鎖的一個優點是可一定程度避免死鎖:

    • 要是鎖不可重入,同一線程內層方法需要鎖時,會因為自己占著鎖沒放,導致自己等自己(阻塞),最后死鎖。可重入鎖允許同一線程重復拿鎖,從設計上就避免了這種自己堵死自己的情況;
    • 注意:只是一定程度避免,要是代碼邏輯亂(比如忘記釋放鎖、不同鎖交叉嵌套不當),還是可能死鎖,只是解決了同一線程重入鎖這類場景的死鎖風險;
  • 應用場景:

    • 遞歸操作:遞歸函數里加鎖,每次遞歸調用都是內層“方法”,可重入鎖讓線程不用反復競爭鎖,比如計算階乘時用 ReentrantLock 保護共享變量,遞歸調用時直接重入;
    • 調用同一類其他方法:類里多個 synchronized 方法,線程調完一個調另一個,因為是同一對象鎖,直接重入,不用額外處理;
    • 鎖嵌套:多層代碼塊都需要同一把鎖,外層加鎖后,內層嵌套的加鎖邏輯直接復用,不用釋放外層鎖再重新加;
  • 例:

    class Counter {// 創建 ReentrantLock 對象,作為可重入鎖的實例// ReentrantLock 是顯式鎖,支持可重入、可中斷、公平/非公平等特性private final ReentrantLock lock = new ReentrantLock(); // 遞歸調用方法,演示可重入鎖的核心場景public void recursiveCall(int num) {// 1. 獲取鎖:同一線程再次調用時,可直接重入,不會阻塞自己//    可重入的關鍵體現:鎖對象識別當前持有線程,允許重復獲取lock.lock(); try {// 遞歸終止條件:num 減到 0 時停止if (num == 0) {return;}// 打印當前遞歸層級,證明方法執行(鎖已成功獲取)System.out.println("執行遞歸,num = " + num);// 2. 遞歸調用自身:再次進入方法時,會再次執行 lock.lock()//    由于是【同一線程】操作【同一把鎖】,可直接重入,不會阻塞recursiveCall(num - 1); } finally {// 3. 釋放鎖:遞歸調用多少次,就要釋放多少次//    保證鎖的獲取與釋放次數嚴格匹配,避免死鎖lock.unlock(); }}// 主方法:測試可重入鎖的遞歸場景public static void main(String[] args) throws InterruptedException {// 創建 Counter 實例,所有遞歸調用共享同一把鎖Counter counter = new Counter(); // 啟動遞歸測試:從 num=10 開始調用// 預期行為:線程安全執行遞歸,不會因鎖重入導致阻塞counter.recursiveCall(10); }
    }
    

1.3.4 Condition 詳解

  • Condition 是 Java 并發包里的線程協調工具,依賴 Lock(如 ReentrantLock)使用,比 Objectwait/notify 更靈活,解決線程間按條件等待 / 喚醒的問題。可以把它理解成:給 Lock 搭配“專屬等待隊列”,讓線程能按需等待條件、精準喚醒,而不是像 wait/notify 只能用 Object 的單一隊列;

  • 核心優勢(對比 Object.wait/notify

    • 多條件分離

      • Object 里,一個對象只有 1 個等待隊列(所有 wait() 的線程都擠在一起);
      • Condition 讓一個 Lock 可以有多個等待隊列(比如鎖 lock 可以創建 condition1condition2,不同條件的線程進不同隊列),喚醒時能精準選隊列,避免喚醒無關線程;
    • 更靈活的等待控制

      • 支持超時等待await(long time, TimeUnit unit) ),避免線程無限阻塞;
      • 喚醒時可選單個喚醒(signal())或全部喚醒(signalAll(),比 notify()(隨機喚醒一個)、notifyAll()(喚醒全部)更精準;
  • 核心方法解析

    返回值類型方法作用說明
    voidawait()讓當前線程進入等待,直到被 signal()/signalAll() 喚醒、被中斷,或意外喚醒(如假喚醒) 等待前釋放當前持有的 Lock,喚醒后重新競爭獲取鎖,再繼續執行
    booleanawait(long time, TimeUnit unit)限時等待:等待 time 時間后,若沒被喚醒就自動返回 false;被喚醒則返回 true 同樣會先釋放鎖,超時 / 喚醒后重新搶鎖。
    voidsignal()喚醒 Condition 等待隊列中一個線程(選一個喚醒,類似 notify() 但更可控) 喚醒后,線程不會直接執行,要重新競爭鎖
    voidsignalAll()喚醒 Condition 等待隊列中所有線程(類似 notifyAll()) 線程被喚醒后,重新競爭鎖,搶到鎖的繼續執行
  • 比如生產者 - 消費者模型中,想區分“隊列滿了讓生產者等”和“隊列空了讓消費者等”:

    • ReentrantLock 加鎖,然后創建兩個 ConditionnotFull(生產者等)、notEmpty(消費者等);

    • 生產者發現隊列滿了 → 調用 notFull.await() 等待;消費者取走數據后 → 調用 notFull.signal() 喚醒生產者;

    • 消費者發現隊列空了 → 調用 notEmpty.await() 等待;生產者放入數據后 → 調用 notEmpty.signal() 喚醒消費者;

    • 這樣就能 精準控制不同條件的線程等待 / 喚醒,比 wait/notify 更清晰。

1.3.5 結合 Condition 實現生產者消費者模式

案例:基于ReentrantLockCondition實現一個簡單隊列

public class ReentrantLockDemo3 {public static void main(String[] args) {// 1. 創建容量為 5 的隊列,作為生產者和消費者共享的資源Queue queue = new Queue(5);// 2. 啟動生產者線程:傳入隊列,線程執行 Producer 的 run 方法new Thread(new Producer(queue)).start();// 3. 啟動消費者線程:傳入隊列,線程執行 Customer 的 run 方法new Thread(new Customer(queue)).start();}
}/*** 隊列封裝類:* 用 ReentrantLock + Condition 實現線程安全的生產者-消費者隊列* 核心邏輯:* - 隊列滿時,生產者通過 notFull.await() 等待;* - 隊列空時,消費者通過 notEmpty.await() 等待;* - 生產/消費后,用 signal() 喚醒對應等待線程*/
class Queue {private Object[] items;      // 存儲隊列元素的數組int size = 0;                // 當前隊列中元素數量int takeIndex = 0;           // 消費者取元素的索引int putIndex = 0;            // 生產者放元素的索引private ReentrantLock lock;  // 控制并發的鎖public Condition notEmpty;   // 消費者等待條件:隊列空時阻塞,生產后喚醒public Condition notFull;    // 生產者等待條件:隊列滿時阻塞,消費后喚醒// 初始化隊列,指定容量public Queue(int capacity) {this.items = new Object[capacity];lock = new ReentrantLock();// 為同一把鎖創建兩個 Condition,分別控制“空”和“滿”的等待notEmpty = lock.newCondition();notFull = lock.newCondition();}/*** 生產者放入元素的方法* 必須在 lock 保護下調用,保證線程安全*/public void put(Object value) throws Exception {// 加鎖:同一時間只有一個線程能操作隊列lock.lock();try {// 隊列滿了(size == 數組長度),讓生產者等待// 用 while 而非 if:防止“假喚醒”后直接執行(需再次檢查條件)while (size == items.length) notFull.await();  // 釋放鎖,進入等待隊列,直到被喚醒// 隊列有空位,放入元素items[putIndex] = value;// 索引循環:如果放到數組末尾,重置為 0if (++putIndex == items.length) putIndex = 0;size++;  // 元素數量+1// 生產完成,喚醒等待的消費者(隊列非空了)notEmpty.signal(); } finally {// 測試用:打印生產日志(實際可刪除或放業務邏輯里)System.out.println("producer生產:" + value);// 必須釋放鎖:無論是否異常,保證鎖能被其他線程獲取lock.unlock();}}/*** 消費者取出元素的方法* 必須在 lock 保護下調用,保證線程安全*/public Object take() throws Exception {// 加鎖:同一時間只有一個線程能操作隊列lock.lock();try {// 隊列空了(size == 0),讓消費者等待// 用 while 而非 if:防止“假喚醒”后直接執行(需再次檢查條件)while (size == 0) notEmpty.await();  // 釋放鎖,進入等待隊列,直到被喚醒// 取出元素Object value = items[takeIndex];items[takeIndex] = null;  // 清空位置,避免內存泄漏// 索引循環:如果取到數組末尾,重置為 0if (++takeIndex == items.length) takeIndex = 0;size--;  // 元素數量-1// 消費完成,喚醒等待的生產者(隊列非滿了)notFull.signal(); return value;  // 返回取出的元素} finally {// 釋放鎖:無論是否異常,保證鎖能被其他線程獲取lock.unlock();}}
}/*** 生產者線程:* 每隔 1 秒生產一個隨機數(0~999),放入隊列*/
class Producer implements Runnable {private Queue queue;  // 共享的隊列public Producer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {  // 無限循環生產Thread.sleep(1000);  // 每隔 1 秒生產一次// 生產隨機數,放入隊列queue.put(new Random().nextInt(1000));}} catch (Exception e) {e.printStackTrace();  // 捕獲并打印異常}}
}/*** 消費者線程:* 每隔 2 秒從隊列取出一個元素,打印消費日志*/
class Customer implements Runnable {private Queue queue;  // 共享的隊列public Customer(Queue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {  // 無限循環消費Thread.sleep(2000);  // 每隔 2 秒消費一次// 取出元素并打印消費日志System.out.println("consumer消費:" + queue.take());}} catch (Exception e) {e.printStackTrace();  // 捕獲并打印異常}}
}
  • Condition 的作用

    • notEmpty:消費者專屬等待條件。隊列空時,消費者調用 notEmpty.await() 釋放鎖并阻塞;生產者放入元素后,用 notEmpty.signal() 喚醒。
    • notFull:生產者專屬等待條件。隊列滿時,生產者調用 notFull.await() 釋放鎖并阻塞;消費者取出元素后,用 notFull.signal() 喚醒;
    • 注意:await()signal() 都必須被 lock.lock()lock.unlock() 包裹,即都在 lock 保護范圍內;
  • 為什么用 while 而非 if 檢查條件?防止**“假喚醒”**:線程可能在未被 signal() 的情況下醒來(如系統調度)。用 while 會重新檢查條件,確保隊列狀態符合預期后再繼續執行;

  • 鎖的配對使用lock.lock()lock.unlock() 必須成對出現,且 unlock() 放在 finally 中,保證無論是否發生異常,鎖都會釋放,避免死鎖;

  • 生產者-消費者的節奏:生產者 1 秒生產一次,消費者 2 秒消費一次 → 隊列會逐漸被填滿(生產者更快),但通過 Condition 協調,不會出現“隊列滿了還生產”或“隊列空了還消費”的情況。

1.4 應用場景總結

  • ReentrantLock 最基本的作用是多線程環境下,讓共享資源只能被一個線程獨占訪問,保證操作共享資源時數據不會亂(比如多個線程同時改同一個變量,用鎖讓它們排隊改);

  • 應用場景總結:

    • 解決多線程競爭資源的問題

      • 場景描述:多個線程搶同一資源(比如同時寫同一個數據庫、操作同一個文件、修改同一個內存變量),需要保證同一時間只有一個線程能改,避免數據沖突;

      • 例子:多個線程同時往數據庫同一表插入/修改數據,用 ReentrantLock 加鎖,讓線程排隊執行寫操作,保證數據最終是正確的,不會因為并發寫入導致數據混亂(比如庫存扣減、訂單狀態修改);

    • 實現多線程任務的順序執行

      • 場景描述:希望線程 A 執行完某段邏輯后,線程 B 再執行;或者多個線程嚴格按特定順序跑任務;

      • 例子:比如線程 1 先初始化配置,線程 2 再加載數據,線程 3 最后處理業務。用 ReentrantLock 配合 Condition(條件變量 ),線程 2 等線程 1 釋放鎖并發信號后再執行,線程 3 等線程 2 發信號后執行,實現順序控制;

    • 實現多線程等待/通知機制

      • 場景描述:線程 A 完成某個關鍵步驟后,需要主動通知線程 B、C 可以繼續執行了;或者線程需要等待某個條件滿足后再干活(類似生產者 - 消費者模型);

      • 例子:生產者線程生產完數據,通過 ReentrantLockCondition 發信號,喚醒等待的消費者線程來處理數據;反之,消費者處理完,也能發信號讓生產者繼續生產。這比 Objectwait/notify 更靈活,能精準控制哪些線程被喚醒。

2 Semaphore(信號量)

2.1 簡介

  • Semaphore是多線程同步工具,核心解決控制同時訪問共享資源的線程數量,讓有限的資源(比如數據庫連接、文件句柄)在同一時間被合理數量的線程使用,避免因資源耗盡導致系統崩潰;

  • 工作原理:使用Semaphore的過程實際上是多個線程獲取訪問共享資源許可證的過程

    • Semaphore內部維護一個計數器,可以把它理解成剩余許可證數量。比如設置 Semaphore(3),就代表最多允許 3 個線程同時用資源,相當于發 3 張“訪問許可證”;

    • 線程獲取許可證(acquire()

      • 線程要訪問共享資源時,必須先調用 acquire() 拿許可證;
      • 如果計數器 > 0(還有許可證):線程拿到許可證,計數器減 1,然后繼續執行(訪問資源);
      • 如果計數器 == 0(沒許可證了):線程會被阻塞,直到有其他線程釋放許可證;
    • 線程釋放許可證(release():線程用完資源后,調用release()歸還許可證,計數器加 1,這樣阻塞的線程里就有機會拿到新的許可證,繼續執行;

    在這里插入圖片描述

  • Semaphore 專門用來限制資源的并發訪問數量,典型場景比如:

    • 數據庫連接池:假設連接池只有 10 個連接,用 Semaphore(10) 控制,避免幾百個線程同時搶連接,把數據庫壓垮;

    • 文件訪問:如果一個文件同一時間只能被 3 個線程讀寫,用 Semaphore(3) 限制,防止文件被瘋狂讀寫導致錯誤;

    • 網絡請求:控制同時發起的 HTTP 請求數量,避免把服務器或本地網絡打滿,保證系統穩定。

2.2 常用API

2.2.1 構造器

  • 構造器是用來創建 Semaphore 對象的,主要決定兩件事:

    • 允許同時訪問資源的 最大線程數(許可證數量)
    • 線程獲取許可證時,是用 公平策略 還是 非公平策略
  • Semaphore有兩個構造器:

    public Semaphore(int permits) {sync = new NonfairSync(permits);
    }
    
    • permits:設置最大并發數(比如傳 3,就代表最多允許 3 個線程同時拿許可證);

    • NonfairSync:默認用非公平策略。意思是,當許可證釋放時,新線程和等待隊列里的線程一起搶許可證,新線程可能“插隊”,不用嚴格排隊;

    • 等價寫法new Semaphore(3) 等價于 new Semaphore(3, false),因為默認是非公平的;

    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    • permits:同上,設置最大并發數;

    • fair:布爾值,決定用公平還是非公平策略:

      • fair = true:用公平策略FairSync),線程嚴格按“等待隊列順序”拿許可證,先等的線程一定先拿到,不會被插隊;
      • fair = false:用非公平策略NonfairSync),新線程和等待線程一起搶,可能插隊。

2.2.2 acquire方法

  • acquireSemaphore用于獲取許可證的核心方法,特點是獲取不到許可證時,線程會一直阻塞等待,直到拿到許可證或者被中斷。它有兩種重載形式,適配不同的許可證獲取需求;

    • void acquire() throws InterruptedException

      • 嘗試獲取 1 個許可證。如果當前有可用許可證,直接獲取(許可證計數減 1)并返回;如果沒有可用許可證,當前線程會進入阻塞狀態,直到:

        • 其他線程釋放許可證(使當前線程有可用許可證),此時當前線程會競爭獲取許可證;
        • 當前線程被其他線程中斷(觸發 InterruptedException);
      • 類比:類似去銀行辦事,只有 1 個窗口(許可證 = 1),當前窗口有人(沒許可證),你就只能排隊等著,直到窗口空出來(有人辦完業務,釋放許可證);

    • void acquire(int permits) throws InterruptedException

      • 功能:嘗試獲取指定數量(permits )的許可證。如果 Semaphore 中剩余許可證數量 ≥ permits,直接獲取(許可證計數減 permits)并返回;否則,線程進入阻塞,直到:

        • 其他線程釋放許可證,使剩余許可證 ≥ permits,當前線程競爭獲取;
        • 當前線程被中斷(觸發 InterruptedException);
      • 注意:使用該方法時,要確保最終能釋放對應數量的許可證,否則容易導致其他線程長期無法獲取足夠許可證而阻塞,引發“資源饑餓”問題;

      • 類比:如果銀行辦理大額業務需要占 2 個窗口(permits = 2),只有當至少空出 2 個窗口時,你才能開始辦理,否則就得等;

  • 例:主線程先占許可證,子線程等待,主線程釋放后子線程再獲取

    // 1. 創建 Semaphore:許可證數量=1,公平策略(true 表示嚴格按線程等待順序分配許可證)
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 主線程直接搶許可證:因為初始許可證是 1,主線程能直接拿到(許可證計數變為 0)
    semaphore.acquire();  // 3. 創建子線程,嘗試獲取許可證
    Thread t = new Thread(() -> {  try {// 子線程執行到這,嘗試獲取許可證:但主線程已占用(許可證計數 0),所以子線程進入阻塞等待System.out.println("子線程等待獲取permit"); semaphore.acquire();  // 4. 子線程被喚醒(主線程釋放許可證后),執行到這,打印獲取成功System.out.println("子線程獲取到permit");  } catch (InterruptedException e) { // 子線程等待中被中斷時,會走到這e.printStackTrace();  } finally {// 5. 子線程執行完,釋放許可證(許可證計數 +1 )semaphore.release();  }
    });
    t.start(); // 啟動子線程try {// 6. 主線程休眠 5 秒:模擬做其他事情,期間子線程一直阻塞等待許可證TimeUnit.SECONDS.sleep(5);  
    } catch (InterruptedException e) {e.printStackTrace();
    }// 7. 主線程釋放許可證(許可證計數從 0 變為 1 ),子線程此時會被喚醒,競爭獲取許可證
    System.out.println("主線程釋放permit");  
    semaphore.release();  
    
    • 初始化Semaphore 許可證數量 1,公平策略;
    • 主線程搶許可證semaphore.acquire() 執行后,許可證計數 0,主線程持有許可證;
    • 子線程啟動:執行 semaphore.acquire() 時,因許可證 0,子線程進入阻塞,打印 子線程等待獲取permit
    • 主線程休眠:5 秒內,子線程一直阻塞;
    • 主線程釋放許可證semaphore.release() 執行,許可證計數 1;因為是公平策略,阻塞的子線程被喚醒,競爭拿到許可證,執行后續邏輯,打印 子線程獲取到permit
    • 子線程釋放許可證:子線程執行 semaphore.release(),許可證計數 0(子線程獲取時減 1,釋放時加 1,整體回到初始邏輯)。

2.2.3 tryAcquire方法

  • tryAcquireSemaphore 用于嘗試獲取許可證的方法,特點是:

    • 非阻塞優先:如果拿不到許可證,不會一直阻塞,而是直接返回 false(表示沒拿到);

    • 靈活控制:支持獲取 1 個許可證、獲取指定數量許可證、帶超時等待等場景,比 acquire 更靈活;

  • tryAcquire方法有三種重載形式

    • boolean tryAcquire():嘗試獲取 1 個許可證。如果當前有可用許可證,直接獲取(許可證計數 -1),返回 true;否則,直接返回 false(不阻塞);

    • boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException:嘗試獲取 1 個許可證,但增加超時等待機制。如果一開始沒許可證,線程會阻塞最多 timeout 時間:

      • 期間有許可證釋放,線程拿到許可證,返回 true
      • 超時后還沒許可證,返回 false
      • 等待中被中斷,拋出 InterruptedException
    • boolean tryAcquire(int permits):嘗試獲取 指定數量(permits)的許可證。如果 Semaphore 剩余許可證 ≥ permits,直接獲取(計數 -permits ),返回 true;否則,返回 false(不阻塞);

      • 注意:要確保最終釋放對應數量的許可證,否則會導致其他線程無法獲取足夠許可證;
    • boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException:結合指定數量和超時等待,嘗試獲取 permits 個許可證,最多等 timeout 時間,邏輯和上面類似;

  • 例:

    // 1. 創建 Semaphore:1 個許可證,公平策略
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 啟動線程 t,嘗試獲取許可證
    new Thread(() -> {  // 2.1 嘗試獲取 1 個許可證:返回 true/falseboolean gotPermit = semaphore.tryAcquire(); // 2.2 如果拿到許可證if (gotPermit) { try {System.out.println(Thread.currentThread() + " 拿到許可證");TimeUnit.SECONDS.sleep(5); // 模擬占用 5 秒} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release(); // 釋放許可證(必須!)}}
    }).start();// 3. 主線程休眠 1 秒:確保線程 t 啟動并拿到許可證
    TimeUnit.SECONDS.sleep(1); // 4. 主線程嘗試“帶超時的獲取”:最多等 3 秒
    if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) { System.out.println("主線程拿到許可證");
    } else {System.out.println("主線程 3 秒內沒拿到許可證,失敗");
    }
    
    1. 線程 t 啟動tryAcquire() 拿到許可證(gotPermit = true),打印日志,休眠 5 秒;
    2. 主線程休眠 1 秒:等線程 t 啟動并占用許可證;
    3. 主線程嘗試獲取:調用 tryAcquire(3, ...),但線程 t 會占用許可證 5 秒,所以主線程等 3 秒后超時,進入 else,打印 主線程 3 秒內沒拿到許可證,失敗
  • 例:

    // 1. 創建 Semaphore:5 個許可證,公平策略
    final Semaphore semaphore = new Semaphore(5, true);  // 2. 嘗試獲取 5 個許可證:成功(因為初始有 5 個)
    assert semaphore.tryAcquire(5) : "獲取 5 個許可證失敗"; // 3. 此時許可證已耗盡(5 - 5 = 0 ),嘗試獲取 1 個許可證:失敗
    assert !semaphore.tryAcquire() : "獲取 1 個許可證失敗"; 
    
    • tryAcquire(5) 一次性拿 5 個許可證,成功后 Semaphore 剩余 0 個;

    • 后續 tryAcquire()(默認拿 1 個)返回 false,因為沒許可證了。

2.2.4 正確使用release

  • Semaphorerelease 是用來 歸還許可證 的,讓其他線程有機會獲取。它有兩種形式:

    • release():歸還 1 個許可證,內部計數器 +1;

    • release(int permits):歸還 permits 個許可證,內部計數器 +permits

    • 關鍵點:許可證數量有限,必須誰拿的誰還,否則會導致計數器混亂,破壞 Semaphore 控制并發的邏輯;

  • 錯誤用法示例(用 finally 無腦釋放)

    // 1. 創建 1 個許可證的 Semaphore(公平策略)
    final Semaphore semaphore = new Semaphore(1, true);  // 2. 線程 t1:拿到許可證后,霸占 1 小時(休眠)
    Thread t1 = new Thread(() -> {  try {semaphore.acquire(); // 拿許可證System.out.println("t1 拿到許可證");TimeUnit.HOURS.sleep(1); // 霸占 1 小時} catch (InterruptedException e) {System.out.println("t1 被中斷");} finally {semaphore.release(); // 不管怎樣,finally 里釋放}
    });
    t1.start(); 
    TimeUnit.SECONDS.sleep(1); // 等 t1 啟動// 3. 線程 t2:嘗試拿許可證,但若被中斷,會在 finally 里錯誤釋放
    Thread t2 = new Thread(() -> {  try {semaphore.acquire(); // 嘗試拿許可證(但 t1 沒釋放,所以會阻塞)System.out.println("t2 拿到許可證");} catch (InterruptedException e) {System.out.println("t2 被中斷");} finally {semaphore.release(); // 問題:t2 沒拿到許可證,卻釋放了!}
    });
    t2.start(); 
    TimeUnit.SECONDS.sleep(2); // 4. 主線程邏輯:中斷 t2,然后自己拿許可證
    t2.interrupt(); // 中斷 t2(此時 t2 還在阻塞等許可證)
    semaphore.acquire(); // 主線程嘗試拿許可證
    System.out.println("主線程拿到許可證");  
    
    • t2 根本沒拿到許可證(因為 t1 霸占著),但由于 release 寫在 finally 里,t2 被中斷時,會錯誤地歸還 1 個許可證(相當于無中生有多了 1 個許可證);

    • 原本 Semaphore 只有 1 個許可證,被 t1 占用后,計數器是 0。但 t2 錯誤釋放后,計數器變成 1,導致主線程能直接拿到許可證(而預期中 t1 要 1 小時后才釋放,主線程不該拿到);

  • 修改后的 t2 邏輯:

    Thread t2 = new Thread(() -> {  boolean acquired = false; // 標記是否成功拿到許可證try {semaphore.acquire(); // 嘗試拿許可證acquired = true; // 拿到了,標記為 trueSystem.out.println("t2 拿到許可證");} catch (InterruptedException e) {System.out.println("t2 被中斷");} finally {// 只有成功拿到許可證(acquired=true),才釋放if (acquired) { semaphore.release(); }}
    });
    
    • acquired 標記是否真的拿到許可證,只有拿到許可證的線程,才在 finally 里釋放,避免沒拿到卻釋放的問題;
  • Semaphore 的設計里,不強制檢查釋放許可證的線程是否真的拿過,而是靠開發者自己保證。官方文檔說明:“沒有要求釋放許可證的線程必須是通過 acquire 拿到許可證的,正確用法由開發者通過編程規范保證。”

2.3 使用

2.3.1 Semaphore實現商品服務接口限流

  • Semaphore可以用于實現限流功能,即限制某個操作或資源在一定時間內的訪問次數;

  • 代碼:限制同一時間,最多有 N 個線程能訪問接口(比如下面代碼中的 N=2),超過的請求要么排隊,要么直接拒絕,保證服務穩定;

    @Slf4j
    public class SemaphoreDemo {/*** 同一時刻最多只允許有兩個并發* 即許可證數量=2 → 同一時間最多允許 2 個線程訪問*/private static Semaphore semaphore = new Semaphore(2);// 創建線程池,最多 10 個線程(模擬大量請求)private static Executor executor = Executors.newFixedThreadPool(10);public static void main(String[] args) {// 循環 10 次,模擬 10 個請求for(int i = 0; i < 10; i++){ // 提交任務到線程池,執行 getProductInfo() 或 getProductInfo2()executor.execute(() -> getProductInfo());}}// 阻塞式限流public static String getProductInfo() {// 1. 嘗試獲取許可證:拿不到就阻塞,直到有許可證try {semaphore.acquire();log.info("請求服務"); // 拿到許可證,執行邏輯Thread.sleep(2000);  // 模擬接口執行耗時(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 釋放許可證:不管是否異常,必須釋放,讓其他線程能用semaphore.release();}return "返回商品詳情信息";}// 非阻塞式限流public static String getProductInfo2() {// 1. 嘗試獲取許可證:拿不到直接返回 false,不阻塞if(!semaphore.tryAcquire()){log.error("請求被流控了"); // 沒拿到許可證,直接拒絕return "請求被流控了";}try {log.info("請求服務"); // 拿到許可證,執行邏輯Thread.sleep(2000); // 模擬接口執行耗時(2 秒)} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 2. 釋放許可證:必須釋放semaphore.release();}return "返回商品詳情信息";}
    }  
    
  • 假設運行 getProductInfo()

    • 前 2 個線程能拿到許可證,執行 log.info("請求服務"),然后 sleep 2 秒。

    • 第 3~10 個線程調用 acquire() 時,因為許可證被占滿,會阻塞等待。

    • 2 秒后,前 2 個線程 release() 歸還許可證,阻塞的線程開始競爭,每次放 2 個執行,直到所有請求處理完。

  • 如果運行 getProductInfo2():前 2 個線程拿到許可證,執行邏輯;第 3 個線程 tryAcquire() 返回 false,直接走限流邏輯(log.error)。

2.3.2 Semaphore限制同時在線的用戶數量

  • 模擬一個登錄系統,最多限制給定數量的人員同時在線,如果所能申請的許可證不足,那么將告訴用戶無法登錄,請稍后重試;

  • 主類SemaphoreDemo7(模擬多用戶登錄):

    public class SemaphoreDemo7 {public static void main(String[] args) {// 最多允許 10 個用戶同時在線final int MAX_PERMIT_LOGIN_ACCOUNT = 10; LoginService loginService = new LoginService(MAX_PERMIT_LOGIN_ACCOUNT);// 啟動 20 個線程(模擬 20 個用戶登錄)IntStream.range(0, 20).forEach(i -> new Thread(() -> {// 執行登錄boolean login = loginService.login(); if (!login) {// 登錄失敗(超過并發數)System.out.println(Thread.currentThread() + " 因超過最大在線數被拒絕");return;}try {simulateWork(); // 模擬登錄后的業務操作} finally {loginService.logout(); // 退出時釋放許可證}}, "User-" + i).start());}// 模擬登錄后的業務操作(隨機休眠,模擬用戶在線時長)private static void simulateWork() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) { /* 忽略中斷 */ }}
    }
    
    • 創建 LoginService,傳入最大在線數 10

    • 啟動 20 個線程(用戶),調用 loginService.login() 嘗試登錄;

    • 登錄成功 → 執行 simulateWork()(模擬用戶在線操作,隨機休眠 )→ 退出時 logout() 釋放許可證;

    • 登錄失敗 → 直接提示并返回;

  • LoginService 類(控制登錄邏輯)

    private static class LoginService {private final Semaphore semaphore; public LoginService(int maxPermitLoginAccount) {// 創建 Semaphore:許可證數量=maxPermitLoginAccount,公平策略(true)this.semaphore = new Semaphore(maxPermitLoginAccount, true); }public boolean login() {// 嘗試獲取許可證:非阻塞,拿不到直接返回 falseboolean success = semaphore.tryAcquire(); if (success) {System.out.println(Thread.currentThread() + " 登錄成功");}return success;}public void logout() {// 釋放許可證:登錄成功的用戶退出時,歸還許可證semaphore.release(); System.out.println(Thread.currentThread() + " 退出成功");}
    }
    
    • Semaphore 許可證數量 = 最大在線用戶數(10),保證同一時間最多 10 個線程(用戶)能拿到許可證;

      • login()tryAcquire() 嘗試拿許可證:
        • 拿到 → 返回 true(登錄成功);
        • 拿不到 → 返回 false(登錄失敗,超過并發);
    • logout()release() 釋放許可證,讓其他用戶可登錄;

  • 運行效果:

    • 登錄成功:前 10 個線程(用戶)能拿到許可證,打印 登錄成功,執行 simulateWork() 隨機休眠;

    • 登錄失敗:后 10 個線程調用 tryAcquire() 時,許可證已耗盡,返回 false,打印 因超過最大在線數被拒絕

    • 用戶退出:休眠結束后,線程執行 logout() 釋放許可證,Semaphore 計數器 +1,后續新線程(或之前阻塞的線程)有機會拿到許可證登錄;

  • 如果把 login 里的 tryAcquire() 換成 acquire()(阻塞式獲取 ):

    public boolean login() {try {// 阻塞式獲取:拿不到就一直等,直到有許可證semaphore.acquire(); System.out.println(Thread.currentThread() + " 登錄成功");return true;} catch (InterruptedException e) {// 被中斷時返回登錄失敗return false; }
    }
    
    • 效果:超過并發數的用戶不會直接失敗,而是阻塞等待,直到有用戶退出釋放許可證,再繼續登錄。

2.4 應用場景總結

  • Semaphore(信號量)是高并發工具,核心能力是控制同時訪問共享資源的線程數量,讓有限的資源(比如連接池、文件句柄)在高并發下被合理使用,避免系統被壓垮;

  • 應用場景總結

    • 限流(流量控制):系統的某個資源(比如接口、數據庫連接)能承受的并發量有限,需要限制同時訪問的線程數,防止資源被打滿導致系統崩潰;

      • 接口限流:比如商品詳情接口,最多允許 100 個線程同時訪問,用 Semaphore(100) 控制,超過的請求排隊或拒絕;
      • 數據庫連接限流:數據庫連接池有 20 個連接,用 Semaphore(20) 控制,避免幾千個線程同時搶連接,把數據庫壓垮;
      • 側重于控制并發訪問量,保護資源不被壓垮,比如接口、網關層的流量控制;
    • 資源池(維護有限資源):系統有一組有限資源(比如數據庫連接、文件句柄、網絡端口),需要讓線程按需借用、用完歸還,保證資源被合理復用;

      • 數據庫連接池:初始化 Semaphore(連接數),線程需要連接時 acquire() 拿許可證(同時從池里取連接),用完后 release() 釋放許可證(同時把連接還回池);
      • 文件訪問池:如果有 5 個文件句柄,用 Semaphore(5) 控制,線程訪問文件時拿許可證,訪問完歸還,保證同一時間最多 5 個線程操作文件;
      • 側重于管理有限資源的借用/歸還,保證資源復用,比如連接池、句柄池的資源調度;
  • 但本質都是Semaphore 的許可證數量,限制同時使用資源的線程數

3 CountDownLatch(閉鎖)

3.1 簡介

  • CountDownLatch多線程同步工具,解決的問題是:讓一個或多個線程等待其他多個任務全部完成后,再繼續執行。比如:

    • 主線程要等 10 個子線程都跑完初始化任務,才開始處理業務;
    • 或者多個線程要等某個“總開關”任務完成,再一起執行;
  • 工作流程:

    • 初始化計數器CountDownLatch latch = new CountDownLatch(N);,這里的 N 是需要等待的任務數量(比如有 3 個子線程要執行,N=3);

    • 等待線程latch.await();,調用 await() 的線程(比如主線程)會阻塞等待,直到 N 減到 0;

    • 任務線程計數減 1:每個子任務線程執行完自己的邏輯后,調用 latch.countDown(); ,讓計數器 N-1

    • 計數器歸 0,等待線程喚醒:當所有子任務線程都調用 countDown()N 變成 0 ,之前阻塞的線程(await() 的線程)會被喚醒,繼續執行;

    在這里插入圖片描述

    TA:等待線程、T1/T2/T3:任務線程

    cnt = 3:對應 CountDownLatch latch = new CountDownLatch(3);,表示需要等待 3 個任務完成

    過程:

    • 線程 TA 調用 await()TA執行到latch.await() 時,會檢查計數器cnt:此時cnt=3≠0,所以TA進入阻塞狀態(awaiting...,暫停執行;

    • 任務線程 T1 完成T1 執行 latch.countDown() → 計數器 cnt3→2。此時 cnt≠0TA 仍阻塞;

    • 任務線程 T2 完成T2 執行 latch.countDown() → 計數器 cnt2→1。此時 cnt≠0TA 仍阻塞;

    • 任務線程 T3 完成T3 執行 latch.countDown() → 計數器 cnt1→0

    • cnt=0 時,CountDownLatch喚醒所有等待的線程(這里是 TA):TA從阻塞狀態恢復(resumed),繼續執行后續邏輯;

  • 關鍵特性:

    • 一次性:計數器 N 減到 0 后,就不能再重置或復用,只能用一次;

    • 多線程等待:可以有多個線程調用 await() ,一起等待 N 歸 0 后被喚醒;

    • 任務結束的寬泛性:子任務結束包括正常跑完或者拋異常終止,只要調用 countDown() ,就會讓計數器減 1;

  • 典型場景:

    • 并行任務匯總:比如計算一個大數組的和,拆成 10 個子數組并行計算,主線程等 10 個子線程都算完,再匯總結果;
    • 系統啟動初始化:系統啟動時,需要初始化 5 個服務(比如緩存、數據庫連接、配置加載),主線程等 5 個服務都初始化完,再對外提供服務;
    • 測試多線程并發:測試時,讓 100 個線程等信號,信號發出(countDown())后一起執行,模擬高并發場景。

3.2 常用API

3.2.1 構造器

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
  • CountDownLatch 構造時,count 必須 ≥0,否則拋 IllegalArgumentException
  • count 減到 0 后,無法重置,CountDownLatch 只能用一次。

3.2.2 常用方法

  • 總覽:

    // 1. await():調用的線程會阻塞,直到 count 減到 0
    public void await() throws InterruptedException {};  // 2. await(long timeout, TimeUnit unit):阻塞等待,但最多等 timeout 時間;
    //    若超時后 count 仍≠0,不再等待,返回 false
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {};  // 3. countDown():讓 count 減 1,直到 count=0 時喚醒所有等待的線程
    public void countDown() {};  
    
  • await():調用線程進入阻塞狀態,直到 countDownLatchcount 減到 0;

    • 如果等待中被其他線程中斷,會拋出 InterruptedException
    • count=0 時,調用 await()立即返回,不阻塞;
  • await(long timeout, TimeUnit unit):阻塞等待,但增加了超時退出機制;

    • 返回值為true → 等待中 count 減到 0(正常喚醒);
    • 返回值為false → 超時后 count 仍≠0(放棄等待);
  • countDown():讓 count 減 1。當 count1→0 時,會喚醒所有等待的線程await() 的線程);

    • count 已經是 0 時,調用 countDown() 會被忽略(count 最小為 0);
    • 只有 count1→0 時,才會觸發喚醒;count3→2 這類變化,不會喚醒線程;
  • 例:

    // 1. 初始化:count=2 → 需要 2 次 countDown() 才會喚醒 await() 的線程
    CountDownLatch latch = new CountDownLatch(2);  // 2. 第一次 countDown() → count=2→1
    latch.countDown();  // 3. 第二次 countDown() → count=1→0 → 喚醒所有 await() 的線程
    latch.countDown();  // 4. 第三次 countDown() → count 已經是 0,調用被忽略
    latch.countDown();  // 5. count=0,調用 await() 直接返回,不阻塞
    latch.await();  
    

3.3 使用

3.3.1 多任務完成后合并匯總

  • 開發中常見多個任務并行執行,必須等所有任務完成后,再統一處理結果 的需求:

    • 比如“數據詳情頁”需要同時調用 5 個接口(并行),等所有接口返回數據后,再合并結果展示;

    • 或者“多個數據操作完成后,統一做校驗(check)”;

  • 代碼:

    public class CountDownLatchDemo2 {public static void main(String[] args) throws Exception {// 1. 初始化 CountDownLatch:需要等待 5 個任務完成CountDownLatch countDownLatch = new CountDownLatch(5); // 2. 啟動 5 個線程(模擬 5 個并行任務)for (int i = 0; i < 5; i++) { final int index = i;new Thread(() -> {try {// 模擬任務執行耗時(1~3 秒隨機)Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000)); System.out.println("任務 " + index + " 執行完成");// 3. 任務完成,計數器減 1(countDownLatch.countDown())countDownLatch.countDown(); } catch (InterruptedException e) {e.printStackTrace();}}).start();}// 4. 主線程阻塞等待:直到 5 個任務都完成(count=0)countDownLatch.await(); // 5. 所有任務完成后,主線程執行匯總邏輯System.out.println("主線程:在所有任務運行完成后,進行結果匯總"); }
    }
    
    • 初始化 CountDownLatchnew CountDownLatch(5) → 表示需要等待 5 個任務完成(計數器初始值 5);

    • 啟動并行任務:循環創建 5 個線程,模擬 5 個并行任務。每個線程:

      • Thread.sleep(...):模擬任務執行耗時(隨機 1~3 秒);

      • countDownLatch.countDown():任務完成后,計數器減 1(5→4→3→2→1→0);

    • 主線程等待countDownLatch.await() → 主線程阻塞,直到計數器減到 0(所有任務完成);

    • 匯總結果:計數器歸 0 后,主線程被喚醒,執行 System.out.println(...) 做結果匯總;

  • 運行效果

    • 5 個任務線程會隨機順序完成(因為 sleep 時間隨機),比如:

      任務 2 執行完成  
      任務 0 執行完成  
      任務 4 執行完成  
      任務 1 執行完成  
      任務 3 執行完成  
      
    • 主線程必須等所有任務打印完,才會輸出:

      主線程:在所有任務運行完成后,進行結果匯總  
      
  • 核心價值

    • 并行效率:5 個任務并行執行,不用等待前一個任務完成再執行下一個,節省時間;

    • 同步控制:主線程通過 CountDownLatch 精準等待所有任務完成,保證匯總邏輯在所有數據就緒后執行。

3.3.2 電商場景中的應用——等待所有子任務結束

  • 需求:根據商品品類 ID,獲取 10 個商品,并行計算每個商品的最終價格(需調用 ERP、CRM 等系統,計算復雜、耗時),最后匯總所有價格返回;

    ERP系統:ERP是企業資源計劃系統,它整合企業內部各部門的核心業務流程,如財務、采購、生產、銷售和人力資源等,以實現數據共享和資源優化;

    CRM系統:CRM是客戶關系管理系統,它專注于管理公司與當前及潛在客戶的交互和業務往來,旨在改善客戶服務、提升銷售效率并維護客戶關系;

    • 串行問題:如果一個一個計算(串行),總耗時 = 獲取商品時間 + 10×單個商品計算時間,商品越多越慢;

      在這里插入圖片描述

    • 并行優化:用多線程并行計算商品價格,總耗時 = 獲取商品時間 + 最長單個商品計算時間,效率更高;

      在這里插入圖片描述

  • 代碼:工具方法 & 數據類

    // 根據品類 ID 獲取商品 ID 列表(模擬返回 1~10 號商品)
    private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();
    }// 商品價格數據類:存儲商品 ID 和計算后的價格
    private static class ProductPrice {private final int prodID; // 商品 IDprivate double price;    // 計算后的價格// 構造方法、get/set、toString 略...
    }
    
  • 主邏輯:并行計算商品價格

    public static void main(String[] args) throws InterruptedException {// 1. 獲取商品 ID 列表(1~10)final int[] products = getProductsByCategoryId(); // 2. 轉換為 ProductPrice 列表(初始價格未計算)List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new) // 每個商品 ID 對應一個 ProductPrice.collect(Collectors.toList()); // 3. 初始化 CountDownLatch:需要等待 10 個商品計算完成(products.length=10)final CountDownLatch latch = new CountDownLatch(products.length); // 4. 為每個商品啟動線程,并行計算價格list.forEach(pp -> {new Thread(() -> {try {System.out.println(pp.getProdID() + " -> 開始計算商品價格.");// 模擬耗時操作(調用外部系統):隨機休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 計算商品價格(模擬業務邏輯:奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶數商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇數商品 7.1 折}System.out.println(pp.getProdID() + " -> 價格計算完成.");} catch (InterruptedException e) {e.printStackTrace();} finally {// 6. 任務完成,計數器減 1latch.countDown(); }}).start(); // 啟動線程});// 7. 主線程阻塞等待:直到 10 個商品都計算完成(latch.await())latch.await(); // 8. 所有商品計算完成,匯總結果System.out.println("所有價格計算完成.");list.forEach(System.out::println); 
    }
    
    • 準備商品數據:調用 getProductsByCategoryId() 獲取 1~10 號商品 ID,轉成 ProductPrice 列表(初始價格未計算);

    • 初始化 CountDownLatchnew CountDownLatch(10) → 表示需要等待10 個商品的計算任務完成;

    • 并行計算價格:為每個商品啟動線程:

      • 模擬耗時操作(TimeUnit.SECONDS.sleep(...));
      • 根據商品 ID 奇偶,設置不同折扣價格(模擬業務邏輯);
      • 任務完成后,latch.countDown() 讓計數器減 1;
    • 主線程等待 & 匯總結果latch.await() 阻塞主線程,直到 10 個任務都完成(計數器歸 0),最后打印所有商品的計算結果。

3.4 應用場景總結

  • 并行任務同步:多個任務并行執行(比如 5 個線程同時下載文件),必須等所有任務都完成后,再執行下一步(比如合并文件);

    • CountDownLatch 讓主線程等待所有并行任務完成,保證后續操作在所有任務就緒后執行;
  • 多任務匯總:需要統計多個線程的執行結果(比如 10 個線程分別計算一部分數據,最后匯總總和);

    • 主線程等所有線程計算完,再統一匯總結果,避免部分數據未計算就開始匯總的問題;
  • 資源初始化:系統啟動時,需要初始化多個資源(比如緩存、數據庫連接、配置加載),必須等所有資源初始化完成,再對外提供服務;

    • 主線程等待所有資源初始化任務完成,保證系統啟動后資源可用。

3.5 不足

  • CountDownLatch一次性工具

    • 構造時設置的計數器(比如 new CountDownLatch(5)),一旦減到 0,就無法重置或復用

    • 如果業務需要“重復等待多個任務完成”,CountDownLatch 無法滿足,必須重新創建新的實例。

4 CyclicBarrier(回環柵欄/循環屏障)

4.1 簡介

  • CyclicBarrier多線程同步工具,解決的問題是:讓一組線程互相等待,直到所有線程都到達同一個“屏障點”,然后再一起繼續執行

  • 關鍵特點:可循環使用(屏障可以重置,重復讓線程等待、一起執行);

  • 工作流程:

    1. 初始化屏障CyclicBarrier barrier = new CyclicBarrier(N);N 是“需要等待的線程數量”(比如 5 個線程要一起執行后續邏輯);

    2. 線程到達屏障點:每個線程執行到 barrier.await(); 時,會阻塞等待,直到有 N 個線程都調用了 await()

    3. 所有線程到達,一起執行:當第 N 個線程調用 await() 后,所有阻塞的線程會被同時喚醒,繼續執行后續邏輯;

    4. 循環使用:喚醒后,屏障可以重置(通過 reset() 方法 ),再次讓新的一組線程等待、一起執行;

  • 適合把一個大任務拆成多個子任務并行執行,等所有子任務完成后,再統一做下一步的場景,且需要重復執行該流程。典型場景有:

    • 并行計算 + 合并結果:比如計算一個大數組的和,拆成 10 個子數組并行計算,等所有子數組算完,再合并總和。計算完一次后,還能再拆新的數組,重復使用屏障。

    • 多階段任務:系統升級時,先讓 5 個節點并行執行數據遷移,全部完成后,再一起執行驗證數據,驗證完還能繼續下一階段(比如啟動服務),屏障可循環用;

  • CountDownLatch的核心區別

    特性CyclicBarrierCountDownLatch
    是否可循環可循環(屏障可重置,重復用)一次性(計數器到 0 后無法重置)
    等待的目標等待“一組線程互相到達屏障點”等待“其他線程完成任務(計數減到 0)”
    典型場景多階段并行任務(可重復)單次多任務同步(不可重復)

4.2 常用API

4.2.1 構造器

  • 有兩個構造器:

    public CyclicBarrier(int parties)
    
    • parties:需要等待的線程總數。比如傳 4,表示必須有 4 個線程都調用 await(),屏障才會放行;

    • 作用:初始化一個基礎的循環屏障,所有線程到達后一起執行后續邏輯;

    public CyclicBarrier(int parties, Runnable barrierAction)
    
    • parties:同上,需要等待的線程總數;

    • barrierAction:一個 Runnable 任務。當所有線程到達屏障后,會優先執行這個任務,再讓所有線程繼續執行;

    • 作用:適合線程到達屏障后,需要先統一處理一些邏輯(比如匯總數據、初始化資源)的場景;

  • 工作原理:以CyclicBarrier barrier = new CyclicBarrier(4, new Runnable(){...});為例

    在這里插入圖片描述

    1. 初始化

      • parties=4 → 需要 4 個線程到達屏障;
      • barrierAction → 所有線程到達后執行的任務;
    2. 線程到達屏障:每個線程執行 barrier.await(); 時:

      • 計數器(count)減 1(初始4 → 線程 1 調用后變成3 → 線程 2 調用后變成2 → 線程 3 調用后變成1 → 線程 4 調用后變成0);
      • 前 3 個線程調用 await() 后,會阻塞等待
    3. 屏障放行(所有線程到達):第 4 個線程調用 await() 后,count=0,執行 barrierAction,然后喚醒所有阻塞的線程,一起繼續執行后續邏輯;

    4. 循環復用:屏障放行后,count重置為初始值(4),可以再次讓新的一組線程(4 個)等待、觸發屏障。

4.2.2 常用方法

  • 總覽:

    // 1. await():線程調用后阻塞,直到所有線程都調用 await(),屏障放行
    public int await() throws InterruptedException, BrokenBarrierException {}// 2. await(long timeout, TimeUnit unit):帶超時的 await(),超時后屏障視為“被破壞”
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {}// 3. reset():重置屏障,讓計數器回到初始值,可重復使用
    public void reset() {}
    
  • await():線程調用await() 后,會阻塞等待,直到有 parties 個線程都調用 await()parties 是構造器傳入的線程數);

    • InterruptedException:等待中的線程被中斷;

    • BrokenBarrierException:屏障被破壞(比如其他線程 await() 時被中斷、超時 ),導致當前線程無法繼續等待;

    • 返回值:返回當前線程在到達屏障的線程組中的索引(比如 4 個線程到達,第一個調用 await() 的線程返回 3,最后一個返回 0 ,索引從 0 開始逆序);

  • await(long timeout, TimeUnit unit):和 await() 類似,但增加超時機制。如果在 timeout 時間內,湊不齊 parties 個線程調用 await(),則觸發超時,屏障被標記為破壞。防止線程因其他線程異常,無限期阻塞;

    • 除了 InterruptedExceptionBrokenBarrierException,還可能拋出 TimeoutException(超時);
  • reset():重置CyclicBarrier,讓計數器回到初始值(parties),屏障狀態恢復到未使用;

    • 注意:重置時,若有線程正在 await(),會觸發 BrokenBarrierException(因為屏障被強制重置,這些線程的等待被打斷)。

4.3 使用

4.3.1 等待所有子任務結束

  • 需求:根據品類 ID 獲取 10 個商品,并行計算每個商品的最終價格(模擬調用外部系統,耗時隨機),等所有商品價格計算完成后,匯總結果返回;

  • 工具方法 & 數據類

    // 根據品類 ID 獲取商品 ID 列表(1~10 號商品)
    private static int[] getProductsByCategoryId() {return IntStream.rangeClosed(1, 10).toArray();
    }// 商品價格數據類:存儲商品 ID 和計算后的價格
    private static class ProductPrice {private final int prodID; // 商品 IDprivate double price;     // 計算后的價格// 構造方法、get/set、toString 略...
    }
    
  • 主邏輯:用 CyclicBarrier 同步多線程

    public static void main(String[] args) throws InterruptedException {// 1. 獲取商品 ID 列表,轉換為 ProductPrice 列表final int[] products = getProductsByCategoryId();List<ProductPrice> list = Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList()); // 2. 初始化 CyclicBarrier:需要等待 list.size()(10)個線程到達屏障final CyclicBarrier barrier = new CyclicBarrier(list.size()); // 3. 存儲線程的列表(用于后續 join 等待)final List<Thread> threadList = new ArrayList<>(); // 4. 為每個商品啟動線程,并行計算價格list.forEach(pp -> {Thread thread = new Thread(() -> {try {System.out.println(pp.getProdID() + " 開始計算商品價格.");// 模擬耗時操作(調用外部系統):隨機休眠 0~9 秒TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); // 5. 計算商品價格(奇偶商品不同折扣)if (pp.getProdID() % 2 == 0) {pp.setPrice(pp.getProdID() * 0.90); // 偶數商品 9 折} else {pp.setPrice(pp.getProdID() * 0.71); // 奇數商品 7.1 折}System.out.println(pp.getProdID() + " -> 價格計算完成.");// 6. 等待其他線程:調用 await(),直到所有線程都到達屏障barrier.await(); } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});threadList.add(thread); // 記錄線程thread.start();         // 啟動線程});// 7. 等待所有線程執行完成(通過 join 保證主線程等所有子線程跑完)threadList.forEach(t -> {try {t.join(); } catch (InterruptedException e) {e.printStackTrace();}});// 8. 所有商品價格計算完成,匯總結果System.out.println("所有價格計算完成.");list.forEach(System.out::println); 
    }
    
    • 準備商品數據:調用 getProductsByCategoryId() 獲取 1~10 號商品 ID,轉成 ProductPrice 列表(初始價格未計算);

    • 初始化 CyclicBarriernew CyclicBarrier(list.size()) → 表示需要等待 10 個線程 到達屏障(每個商品對應一個線程);

    • 并行計算價格:為每個商品啟動線程:

      • 模擬耗時操作(TimeUnit.SECONDS.sleep(...));
      • 根據商品 ID 奇偶,設置不同折扣價格;
      • 調用 barrier.await():線程到達屏障,阻塞等待其他線程;
    • 屏障放行:當第 10 個線程調用 await() 后,所有阻塞的線程會被同時喚醒,繼續執行后續邏輯;

    • 主線程等待 & 匯總結果:通過 threadList.forEach(t -> t.join()) 讓主線程等待所有子線程執行完成,最后打印所有商品的計算結果。

4.3.2 CyclicBarrier的循環特性——模擬跟團旅游

  • 需求:跟團旅游

    • 第一階段(上車屏障)

      • 導游要求:所有游客上車后,大巴才出發(對應 CyclicBarrier 的第一次 await());
      • 類比:10 個游客 + 1 個導游(主線程)= 11 個線程,湊齊后屏障放行;
    • 第二階段(下車屏障)

      • 導游要求:所有游客下車后,大巴才去下一個景點(對應 CyclicBarrier 的第二次 await());
      • 類比:同一組線程(游客 + 導游)再次湊齊,屏障放行,實現“循環復用”;

    在這里插入圖片描述

  • 游客線程邏輯(Tourist 類)

    private static class Tourist implements Runnable {private final int touristID; // 游客編號private final CyclicBarrier barrier; // 循環屏障public Tourist(int touristID, CyclicBarrier barrier) {this.touristID = touristID;this.barrier = barrier;}@Overridepublic void run() {// 1. 模擬上車(第一階段:上車同步)System.out.printf("游客:%d 乘坐旅游大巴\n", touristID);spendSeveralSeconds(); // 模擬上車耗時waitAndPrint("游客:%d 上車,等別人上車.\n"); // 調用 await(),等待湊齊 11 個線程// 2. 模擬下車(第二階段:下車同步)System.out.printf("游客:%d 到達目的地\n", touristID);spendSeveralSeconds(); // 模擬下車耗時waitAndPrint("游客:%d 下車,等別人下車.\n"); // 再次調用 await(),等待湊齊 11 個線程}// 調用 barrier.await(),并打印日志private void waitAndPrint(String message) {System.out.printf(message, touristID);try {barrier.await(); // 線程到達屏障,阻塞等待} catch (InterruptedException | BrokenBarrierException e) {// 忽略異常}}// 模擬隨機耗時(上車/下車的時間)private void spendSeveralSeconds() {try {TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {// 忽略異常}}
    }
    
  • 主線程邏輯(導游視角)

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {// parties=11 → 需要 11 個線程到達屏障(10 個游客線程 + 1 個主線程)final CyclicBarrier barrier = new CyclicBarrier(11);// 啟動 10 個游客線程for (int i = 0; i < 10; i++) {new Thread(new Tourist(i, barrier)).start();}// 4. 主線程(導游)參與第一階段屏障:等待所有游客上車barrier.await(); System.out.println("導游:所有的游客都上了車.");// 5. 主線程(導游)參與第二階段屏障:等待所有游客下車barrier.await(); System.out.println("導游:所有的游客都下車了.");
    }
    
  • 上車同步

    • 10 個游客線程 + 1 個主線程(導游),共 11 個線程;

    • 每個游客線程執行到 waitAndPrint("游客:%d 上車,等別人上車.\n") → 調用 barrier.await(),阻塞等待;

    • 當 11 個線程都調用 await() 后,屏障放行:

      • 打印所有“游客上車等待”的日志;

      • 主線程繼續執行,打印 導游:所有的游客都上了車.

  • 下車同步

    • 同一組 11 個線程(10 個游客 + 1 個主線程 ),再次執行到 waitAndPrint("游客:%d 下車,等別人下車.\n") → 調用 barrier.await(),阻塞等待;

    • 當 11 個線程都調用 await() 后,屏障放行:

      • 打印所有“游客下車等待”的日志;

      • 主線程繼續執行,打印 導游:所有的游客都下車了.

  • CyclicBarrier 的循環特性

    • 可重復觸發:同一 CyclicBarrier 實例,可通過多次 await() 實現“多階段同步”(上車→下車);

    • 線程組復用:同一組線程(游客 + 導游 )參與多個階段的屏障,無需重新創建實例。

4.4 應用場景總結

  • CyclicBarrier多線程同步工具,核心解決讓一組線程互相等待,全部到達同一屏障點后,再一起繼續執行的問題,且可循環使用(屏障可重置,重復同步多階段任務);

  • 應用場景:

    • 多線程任務拆分與合并:一個復雜任務(比如計算大數據集的總和)拆成多個子任務(比如 10 個線程各算一部分),必須等所有子任務完成后,再合并結果;

    • 多線程數據處理同步:多個線程并行處理不同的數據分片(比如處理 5 個文件),必須等所有線程處理完自己的數據,再統一匯總、校驗或持久化。

4.5 CyclicBarrier VS CountDownLatch

  • 可復用性

    • CountDownLatch一次性工具。構造時設置的計數器(比如 new CountDownLatch(5)),一旦減到 0,無法重置或復用;

    • CyclicBarrier可循環復用。計數器(parties)可以通過 reset() 重置,重復讓新的線程組等待、觸發屏障;

  • 等待目標

    • CountDownLatchawait() 的線程等待其他線程調用 countDown() 把計數器減到 0(主線程等子線程完成);

    • CyclicBarrierawait() 的線程等待其他線程也到達屏障點(調用 await()(線程組互相等待);

  • 計數器特性

    • CountDownLatch:計數器只能遞減(從 N→0),且無法重置;

    • CyclicBarrier:計數器可以重置(通過 reset() 回到初始值 parties),支持多輪同步。

5 Exchange(數據交換機)

5.1 簡介

  • Exchanger 專門解決兩個線程需要互相交換數據的場景,讓兩個線程在“交換點”(調用 exchange 方法時)同步,安全交換數據;

  • 工作流程

    • 線程 1 調用 exchange(object1):線程 1 會阻塞等待,直到線程 2 也調用 exchange 方法;
    • 線程 2 調用 exchange(object2):此時兩個線程都到達“交換點”,Exchanger 會將 object1 傳遞給線程 2,將 object2 傳遞給線程 1;
    • 交換后繼續執行:線程 1 拿到 object2,線程 2 拿到 object1,繼續執行后續邏輯;

    在這里插入圖片描述

5.2 常用API

  • public V exchange(V x) throws InterruptedException

    • 功能

      • 當前線程攜帶數據 x,阻塞等待另一個線程到達交換點;
      • 對方線程到達后,交換數據:當前線程接收對方數據,返回給調用方;
    • 異常:等待中若線程被中斷,拋出 InterruptedException

  • public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    • 同上,但增加超時機制。如果在 timeout 時間內,對方線程未到達交換點,拋出 TimeoutException
    • 適用場景:防止線程因對方異常,無限期阻塞。

5.3 使用

5.3.1 模擬交易場景

  • 模擬買賣雙方交易

    • 賣家帶“商品”(goods = "電腦"),買家帶“錢”(money = "$4000");

    • 雙方必須都到達“交易點”(調用 exchanger.exchange(...)),才能交換數據(一手交錢,一手交貨);

  • 代碼:

    public class ExchangerDemo {private static Exchanger exchanger = new Exchanger(); static String goods = "電腦";static String money = "$4000";public static void main(String[] args) throws InterruptedException {System.out.println("準備交易,一手交錢一手交貨...");// 賣家線程:攜帶 goods,等待買家new Thread(() -> {try {System.out.println("賣家到了,已經準備好貨:" + goods);// 交換數據:賣家發送 goods,接收 moneyString receivedMoney = (String) exchanger.exchange(goods); System.out.println("賣家收到錢:" + receivedMoney);} catch (Exception e) { /* 忽略異常 */ }}).start();// 主線程休眠 3 秒,模擬買家延遲到達Thread.sleep(3000); // 買家線程:攜帶 money,等待賣家new Thread(() -> {try {System.out.println("買家到了,已經準備好錢:" + money);// 交換數據:買家發送 money,接收 goodsString receivedGoods = (String) exchanger.exchange(money); System.out.println("買家收到貨:" + receivedGoods);} catch (Exception e) { /* 忽略異常 */ }}).start();}
    }
    
    • 同步交換:賣家先調用 exchange(goods) 會阻塞,直到買家調用 exchange(money),雙方交換數據;

    • 數據流向:賣家發送 goods → 接收 money;買家發送 money → 接收 goods

5.3.2 模擬對賬場景

  • 模擬數據對賬

    • 線程 1 生成數據 A,線程 2 生成數據 B
    • 雙方交換數據后,線程 2 校驗 AB 是否一致;
  • 代碼:

    public class ExchangerDemo2 {private static final Exchanger<String> exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) {// 線程 1:發送數據 AthreadPool.execute(() -> {try {String A = "12379871924sfkhfksdhfks";exchanger.exchange(A); // 發送 A,等待線程 2} catch (InterruptedException e) { /* 忽略 */ }});// 線程 2:發送數據 B,接收數據 A,校驗一致性threadPool.execute(() -> {try {String B = "32423423jkmjkfsbfj";String A = exchanger.exchange(B); // 發送 B,接收 ASystem.out.println("A和B數據是否一致:" + A.equals(B));System.out.println("A= " + A);System.out.println("B= " + B);} catch (InterruptedException e) { /* 忽略 */ }});threadPool.shutdown();}
    }
    
    • 數據校驗:線程 2 接收線程 1 的數據 A 后,對比自己的 B,判斷是否一致;

    • 線程池簡化:用線程池管理兩個線程,避免手動創建 Thread

5.3.3 模擬隊列中交換數據

  • 模擬生產者 - 消費者模式,但通過 Exchanger 動態交換“滿隊列”和“空隊列”:

    • 生產者往 emptyQueue 放數據,滿了就和消費者交換隊列(拿空隊列繼續生產);

    • 消費者從 fullQueue 取數據,空了就和生產者交換隊列(拿滿隊列繼續消費);

  • 代碼:

    public class ExchangerDemo3 {// 滿隊列(消費者初始用)、空隊列(生產者初始用)private static ArrayBlockingQueue<String> fullQueue = new ArrayBlockingQueue<>(5); private static ArrayBlockingQueue<String> emptyQueue = new ArrayBlockingQueue<>(5); private static Exchanger<ArrayBlockingQueue<String>> exchanger = new Exchanger<>(); public static void main(String[] args) {new Thread(new Producer()).start(); // 啟動生產者new Thread(new Consumer()).start(); // 啟動消費者}// 生產者:往隊列放數據,滿了就交換隊列static class Producer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = emptyQueue; try {while (current != null) {String str = UUID.randomUUID().toString();try {current.add(str); // 往隊列放數據System.out.println("producer:生產了一個序列:" + str + ">>>>>加入到交換區");Thread.sleep(2000);} catch (IllegalStateException e) {// 隊列滿了,交換隊列(拿空隊列)System.out.println("producer:隊列已滿,換一個空的");current = exchanger.exchange(current); }}} catch (Exception e) { /* 忽略 */ }}}// 消費者:從隊列取數據,空了就交換隊列static class Consumer implements Runnable {@Overridepublic void run() {ArrayBlockingQueue<String> current = fullQueue; try {while (current != null) {if (!current.isEmpty()) {String str = current.poll(); // 從隊列取數據System.out.println("consumer:消耗一個序列:" + str);Thread.sleep(1000);} else {// 隊列空了,交換隊列(拿滿隊列)System.out.println("consumer:隊列空了,換個滿的");current = exchanger.exchange(current); System.out.println("consumer:換滿的成功~~~~~~~~~~~~~~~~~~~~~~");}}} catch (Exception e) { /* 忽略 */ }}}
    }
    
    • 動態隊列交換

      • 生產者隊列滿 → 用 exchanger.exchange(current) 交換出空隊列,繼續生產;
      • 消費者隊列空 → 用 exchanger.exchange(current) 交換出滿隊列,繼續消費;
    • 解耦生產和消費:通過交換隊列,避免生產者/消費者因隊列滿/空阻塞,靈活控制數據流轉。

5.4 應用場景總結

  • 數據交換:兩個線程需要安全交換數據(如交易場景的“錢 - 貨”交換);
    • 保證“交換原子性”,避免數據不一致;
  • 數據采集:采集線程(生產者)和處理線程(消費者)交換數據(如日志采集→日志處理);
    • 解耦數據生產和消費,通過交換數據緩沖,提升系統吞吐量。

6 Phaser(階段協同器)

6.1 簡介

  • Phaser 用于協調多個線程的多階段執行,支持:

    • 動態調整參與線程的數量(可增、可減);

    • 分階段同步(線程完成當前階段,再一起進入下一階段);

    • CyclicBarrier 更靈活(支持動態線程數、多階段),比 CountDownLatch 更強大(可循環、可動態調整);

  • 核心特性

    • 多階段同步:線程可以分多個階段執行(如 phase-0 → phase-1 → phase-2),每個階段都需要線程同步后再繼續;

    • 動態線程管理

      • 可通過 register() 動態增加參與線程;

      • 可通過 arriveAndDeregister() 動態減少參與線程;

    • 靈活的階段控制:每個階段完成后,可自定義邏輯(重寫 onAdvance 方法),決定是否繼續下一階段;

  • 工作流程

    1. 階段 0(phase-0
      • 多個線程執行“階段 0”的任務;
      • 線程調用 arriveAndAwaitAdvance() 表示“階段 0 完成”,等待其他線程也完成“階段 0”;
    2. 進入階段 1(phase-1
      • 所有線程都完成“階段 0”后,一起進入“階段 1”;
      • 重復“執行任務 → 同步等待”的流程;
    3. 多階段循環:支持多個階段(phase-0 → phase-1 → phase-2 → ...),直到手動終止或所有線程退出;

    在這里插入圖片描述

6.2 常用 API

  • 構造方法

    構造方法作用
    Phaser()初始化一個“參與任務數為 0”的 Phaser,后續用 register() 動態添加線程
    Phaser(int parties)指定初始參與線程數(類似 CyclicBarrierparties
    Phaser(Phaser parent)作為子階段協同器,依附于父 Phaser,適合復雜多階段場景
    Phaser(Phaser parent, int parties)結合父 Phaser 和初始線程數,更靈活的初始化
  • 增減參與線程

    方法作用
    int register()動態增加一個參與線程,返回當前階段號
    int bulkRegister(int parties)動態增加多個參與線程(批量注冊),返回當前階段號
    int arriveAndDeregister()線程完成任務后,退出參與(減少一個線程),返回當前階段號
  • 到達、等待方法

    方法作用
    int arrive()標記“當前線程完成階段任務”,但不等待其他線程,繼續執行
    int arriveAndAwaitAdvance()標記“當前線程完成階段任務”,等待其他線程也完成,再進入下一階段
    int awaitAdvance(int phase)等待進入指定階段(需當前階段匹配)
    int awaitAdvanceInterruptibly(int phase)同上,但等待中可被中斷
    int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)帶超時的等待,超時后拋出異常
  • 階段自定義邏輯

    protected boolean onAdvance(int phase, int registeredParties) 
    
    • 作用:每個階段完成后,自動調用此方法,決定是否繼續下一階段;

    • 返回值

      • true:階段結束,Phaser 不再繼續(可用于終止多階段流程);
      • false:繼續下一階段。

6.3 使用

  • 需求:模擬了公司團建的多階段活動,團建分4個階段,參與人數動態變化:

    • 階段0:所有人到公司集合 → 出發去公園

    • 階段1:所有人到公園門口 → 出發去餐廳

    • 階段2:部分人到餐廳(有人提前離開,有人新增加入)→ 開始用餐

    • 階段3:用餐結束 → 活動終止

    • 參與人數不固定(有人早退、有人中途加入),每個階段必須等人齊了再繼續

  • 代碼:

    public class PhaserDemo {public static void main(String[] args) {final Phaser phaser = new Phaser() {// 每個階段完成后自動調用下面的 onAdvance 方法,打印階段總結,并判斷是否終止(只剩主線程時終止)@Overrideprotected boolean onAdvance(int phase, int registeredParties) {// registeredParties 是當前注冊的線程數(包括主線程),減去 1 得到實際員工數// 主線程:作為協調者,全程參與并動態添加中途加入者int staffs = registeredParties - 1;// 每個階段完成后的提示信息switch (phase) {case 0:System.out.println("大家都到公司了,出發去公園,人數:" + staffs);break;case 1:System.out.println("大家都到公園門口了,出發去餐廳,人數:" + staffs);break;case 2:System.out.println("大家都到餐廳了,開始用餐,人數:" + staffs);break;}// 終止條件:只剩主線程(registeredParties == 1)return registeredParties == 1;}};// 注冊主線程————讓主線程全程參與phaser.register();final StaffTask staffTask = new StaffTask();// 全程參與者:3 人(參與所有 4 個階段)for (int i = 0; i < 3; i++) {// 添加任務數phaser.register();new Thread(() -> {try {staffTask.step1Task();//到達后等待其他任務到達phaser.arriveAndAwaitAdvance();staffTask.step2Task();phaser.arriveAndAwaitAdvance();staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 早退者:2 人(只參與前 2 個階段,到公園后離開)for (int i = 0; i < 2; i++) {phaser.register();new Thread(() -> {try {staffTask.step1Task();phaser.arriveAndAwaitAdvance();staffTask.step2Task();System.out.println("員工【" + Thread.currentThread().getName() + "】回家了");// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 中途加入者:4 人(從階段 2 開始參與,直接到餐廳聚餐)while (!phaser.isTerminated()) {int phase = phaser.arriveAndAwaitAdvance();if (phase == 2) {for (int i = 0; i < 4; i++) {phaser.register();new Thread(() -> {try {staffTask.step3Task();phaser.arriveAndAwaitAdvance();staffTask.step4Task();// 完成了,注銷離開phaser.arriveAndDeregister();} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}}static final Random random = new Random();// 封裝了 4 個階段的具體動作(從家出發→到公司→去公園→去餐廳→用餐),每個階段用 Thread.sleep 模擬耗時static class StaffTask {public void step1Task() throws InterruptedException {// 第一階段:來公司集合String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "從家出發了……");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達公司");}public void step2Task() throws InterruptedException {// 第二階段:出發去公園String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出發去公園玩");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達公園門口集合");}public void step3Task() throws InterruptedException {// 第三階段:去餐廳String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "出發去餐廳");Thread.sleep(random.nextInt(5000));System.out.println(staff + "到達餐廳");}public void step4Task() throws InterruptedException {// 第四階段:就餐String staff = "員工【" + Thread.currentThread().getName() + "】";System.out.println(staff + "開始用餐");Thread.sleep(random.nextInt(5000));System.out.println(staff + "用餐結束,回家");}}
    }
    
    • 階段0:到公司集合
      • 主線程 + 3個全程者 + 2個早退者 → 共6個線程,調用 step1Task()(從家到公司)
      • 完成后調用 phaser.arriveAndAwaitAdvance() → 等待所有人到公司
      • 階段結束:onAdvance 觸發,打印“出發去公園,人數:5”(6-1=5)
    • 階段1:到公園門口
      • 所有線程調用 step2Task()(從公司到公園),完成后調用 phaser.arriveAndAwaitAdvance() → 等待所有人到公園
      • 2個早退者完成后調用 phaser.arriveAndDeregister() → 退出(注冊線程數變為6-2=4)
      • 階段結束:onAdvance 觸發,打印“出發去餐廳,人數:3”(4-1=3,只剩3個全程者+主線程)
    • 階段2:到餐廳集合
      • 主線程動態添加:檢測到階段2時,新增4個中途加入者 → 注冊線程數變為4+4=8
        • 3個全程者調用 step3Task()(從公園到餐廳)
        • 4個新加入者直接調用 step3Task()(到餐廳)
      • 所有人調用 phaser.arriveAndAwaitAdvance() → 等待到餐廳
      • 階段結束:onAdvance 觸發,打印“開始用餐,人數:7”(8-1=7,3+4=7個員工+主線程)
    • 階段3:用餐結束
      • 所有7人調用 step4Task()(用餐),完成后調用 phaser.arriveAndDeregister() → 所有人退出(注冊線程數逐漸減少至1,只剩主線程)
      • 終止條件:onAdvance 檢測到 registeredParties == 1 → 返回 truePhaser 終止
  • Phaser 核心特性體現

    • 多階段同步:通過 arriveAndAwaitAdvance() 實現每個階段的等待,確保人齊后再進入下一階段;

    • 動態線程管理

      • phaser.register():新增參與者(如中途加入的4人);

      • phaser.arriveAndDeregister():參與者退出(如早退者和用餐結束的人);

    • 階段自定義邏輯onAdvance 方法實現每個階段的總結,并控制流程終止條件;

    • 靈活的協同:相比 CyclicBarrier(固定線程數),Phaser 能應對“有人早退、有人中途加入”的動態場景。

6.4 應用場景總結

  • 多線程任務分配:把一個復雜任務拆成多個子任務,分配給不同線程并行執行,且需要協調子任務的進度(比如所有子任務完成后,再合并結果);

    • Phaser 分階段管理:

      • 階段 0:子任務分配,線程開始執行
      • 階段 1:所有子任務完成,合并結果
    • 支持動態調整線程數(比如某個子任務需要更多線程,用 register() 新增);

  • 多級任務流程:任務需要分多個層級/階段執行,必須等當前級所有任務完成,才能觸發下一級任務(比如“數據采集→數據清洗→數據匯總→結果輸出”);

    • 每個層級對應 Phaser 的一個階段(phase-0 采集→phase-1 清洗→phase-2 匯總);

    • 通過 arriveAndAwaitAdvance() 確保“當前級完成后,再進入下一級”,流程更清晰;

  • 模擬并行計算:模擬分布式并行計算(比如科學計算、大數據處理 ),需要協調多個線程的“計算階段”(比如矩陣計算分塊執行,所有分塊完成后再合并);

    • Phaser 同步“分塊計算階段”和“合并階段”,確保:
      • 所有分塊計算完成(階段 0 同步);
      • 合并結果后,再進入下一階段(階段 1 同步);
  • 階段性任務:任務天然是階段性的,每個階段需要所有線程同步后再繼續(比如“團隊項目”:需求評審→開發→測試→上線,每個階段必須全員完成);

    • 每個階段對應 Phaserphase,通過 arriveAndAwaitAdvance() 實現“階段同步”;

    • 支持動態調整參與線程(比如測試階段需要新增測試人員,用 register() 加入);

  • 上面所有場景都需要多階段同步 + 動態線程協作,每個階段必須等所有線程完成,再進入下一階段。Phaser 優勢:

    • CyclicBarrier 更靈活:支持動態增減線程多階段自定義邏輯onAdvance);

    • CountDownLatch 更強大:可循環分階段,而非一次性同步。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/95046.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/95046.shtml
英文地址,請注明出處:http://en.pswp.cn/web/95046.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Apple登錄接入記錄

Apple文檔——通過 Apple 登錄 使用入門 - 通過 Apple 登錄 - Apple Developer Apple文檔——設計要求——登錄通過 Apple 登錄 | Apple Developer Documentation 插件github版——apple-signin-unity&#xff08;README 中為接入步驟&#xff09; GitHub - lupidan/apple-…

【小程序-慕尚花坊04】網絡請求并發與loading

網絡請求并發與loading一&#xff0c;網絡請求并發與loading1&#xff0c;并發處理1.1&#xff0c;異步實現方式2.2&#xff0c;Promise.all異步方式封裝2&#xff0c;loading加載2.1&#xff0c;loading的基本使用2.2&#xff0c;loading與并發結合案例2.3&#xff0c;loading…

CentOS 7 升級 OpenSSH 10.0p2 完整教程(含 Telnet 備份)

&#x1f539; CentOS 7 升級 OpenSSH 10.0p2 完整教程&#xff08;含 Telnet 備份&#xff09; 注意&#xff1a;為了避免升級 SSH 時無法遠程登錄&#xff0c;建議先啟用 Telnet 服務 作為備用連接方式。 CentOS 7 默認 OpenSSH 版本是 7.x&#xff0c;升級到 10.0p2 需要 源…

aragfw9.dll aqnky-ef.dll aqua dock.dll apscon~1.dll apropdll.dll app_web_yqnqasrp.dll app_web_

在使用電腦系統時經常會出現丟失找不到某些文件的情況&#xff0c;由于很多常用軟件都是采用 Microsoft Visual Studio 編寫的&#xff0c;所以這類軟件的運行需要依賴微軟Visual C運行庫&#xff0c;比如像 QQ、迅雷、Adobe 軟件等等&#xff0c;如果沒有安裝VC運行庫或者安裝…

rabbitMQ延時隊列實現,怎么保證消息的冪等

一、RabbitMQ 延時隊列實現方式 基于 TTL&#xff08;Time-To-Live&#xff09; 死信隊列&#xff08;Dead Letter Queue&#xff09; 這是最常用的實現方式&#xff0c;核心思路是&#xff1a; (1)消息設置過期時間&#xff08;TTL&#xff09; (2)消息過期后進入綁定的死信隊…

前沿技術觀察:從AI 時代到量子計算的下一站

前沿技術觀察&#xff1a;從AI 時代到量子計算的下一站&#x1f680; 技術的浪潮一波接一波&#xff0c;從 人工智能 到 區塊鏈&#xff0c;再到 邊緣計算、元宇宙、量子計算&#xff0c;這些前沿技術正在深刻影響我們的生活與產業格局。 對于開發者和技術愛好者來說&#xff0…

通過Kubernetes安裝mysql5服務

以下是清晰、結構化的操作流程優化說明&#xff0c;按步驟梳理從部署到配置持久化、暴露服務的完整過程&#xff1a;一、基礎部署&#xff1a;快速驗證 MySQL 可用性創建有狀態工作負載進入 KubeSphere 項目 → 工作負載 → 有狀態副本集 → 創建&#xff0c;選擇 通過鏡像創建…

【mysql】SQL 中 IS 與 = 的區別:一個 NULL 值引發的思考

SQL 中 IS 與 的區別&#xff1a;一個 NULL 值引發的思考為什么查詢結果總是少一條數據&#xff1f;可能是 NULL 在搗鬼在 SQL 查詢中&#xff0c;很多開發者都曾遇到過這樣的困惑&#xff1a;明明看起來正確的查詢語句&#xff0c;返回的結果卻總是與預期不符。這往往是因為沒…

openGauss筆記

1、安裝 直接用docker安裝 2、國產化 符合國產化要求 3、客戶端 3.1 dbeaver 社區版本&#xff08;25.1.4&#xff09;即可&#xff0c;驅動建議用離線版本&#xff0c;在官網下載最新的&#xff0c;然后在驅動管理里面進行添加本地的jar 3.1.1 驅動配置3.1.2 依賴 需要java版本…

SQL語言增刪改查之C與R

本節通關要求1、掌握 SQL 語句對數據庫進行的創建 Create 和讀取 Retireve 操作的指令&#xff1b;2、多練習&#x1f3ae;說明&#xff1a;操作對象是數據表中的數據行&#xff0c;也就是表中的記錄。請明確操作對象&#xff0c;不要誤傷友軍。背景&#xff1a;create table i…

棧溢出問題

brpc 的 bthread 默認協程棧大小是 128KB&#xff08;非 pthread 模式&#xff09;。如果在一個bthread中&#xff0c;它執行的函數內定義了一個局部變量map&#xff0c;有很多個元素&#xff0c;map的大小超過了128KB&#xff0c;協程會自動申請新的棧空間嗎&#xff1f;這里要…

Android之穿山甲廣告接入

文章目錄前言一、效果圖二、實現步驟1.引入庫2.build.gradle依賴3.Application初始化3.開屏廣告4.插屏廣告5.懶人做法總結前言 項目接入廣告已經是常見的現象了&#xff0c;但是還有很多朋友或者初學者沒有接觸過&#xff0c;或者沒有接觸過穿山甲&#xff0c;今天就來看一下&…

Web開發工具一套式部署Maven/Nvm/Mysql/Redis

前言&#xff1a; 對于一個純小白且電腦沒有任何環境的計算機學生&#xff0c;如何快速跑通Java前后端項目呢&#xff1f; 先附上百度網盤 地址&#xff1a; Web開發工具 。 以下鏈接來自不同作者&#xff0c;如有侵犯&#xff0c;請聯系我刪除。 1.Jdk 部署地址&#xff1a…

Deepseek法務提示指令收集

參考網絡資料&#xff0c;收集一些法務提示指令&#xff0c;可用于Agent LLM、以及LLM法律相關開發。 https://zhuanlan.zhihu.com/p/22588251815 1 基礎指令 1) 身份認證模塊 【身份與版本聲明】 您是由DeepSeek研發的法律智能輔助系統V4.2版&#xff0c;內核經司法部《生成…

Tiptrans轉運 | 免費5國轉運地址

Tiptrans 是一家總部位于捷克的國際包裹轉運與虛擬地址服務平臺&#xff0c;主要提供全球虛擬收貨地址&#xff08;英國、德國、香港、美國等&#xff09;&#xff0c;讓用戶在當地網店購物&#xff0c;再由 Tiptrans 轉運到海外。除了物流服務&#xff0c;Tiptrans 也提供虛擬…

STM32手動移植FreeRTOS

&#x1f4e6; 準備工作 獲取FreeRTOS源碼: 訪問 FreeRTOS官網 或其 GitHub倉庫 下載最新版內核源碼。 你也可以使用Git克隆&#xff08;注意要包含子模塊&#xff09;&#xff1a;git clone https://github.com/FreeRTOS/FreeRTOS.git --recurse-submodules。 準備STM32基礎…

C5僅支持20MHZ帶寬,如果路由器5Gwifi處于40MHZ帶寬信道時,會出現配網失敗

是的&#xff0c;這會導致“怎么都連不上”。結論先說&#xff1a;如果路由器把 5 GHz 固定在 40 MHz&#xff08;或以上&#xff09;帶寬&#xff0c;而你的 C5 只支持 5 GHz 的 20 MHz 帶寬&#xff0c;那么 STA 連接一定會失敗。固件里不可能“把 40 MHz AP 連成 20 MHz”&a…

堅鵬請教DEEPSEEK:請問中國領先的AI智能體服務商有哪些?知行學

堅鵬請教DEEPSEEK&#xff1a;請問中國領先的AI智能體服務商有哪些&#xff1f;深圳知行學教育科技公司名列榜首根據2025年8月底多家權威機構發布的榜單和報告&#xff0c;比如德本咨詢&#xff08;DBC&#xff09;的“2025企業級AI Agent應用TOP50”榜單、IDC的《中國AI AGENT…

【開題答辯全過程】以 投票系統為例,包含答辯的問題和答案

個人簡介一名14年經驗的資深畢設內行人&#xff0c;語言擅長Java、php、微信小程序、Python、Golang、安卓Android等開發項目包括大數據、深度學習、網站、小程序、安卓、算法。平常會做一些項目定制化開發、代碼講解、答辯教學、文檔編寫、也懂一些降重方面的技巧。感謝大家的…

C++異常處理指南:構建健壯程序的錯誤處理機制

在程序開發的世界里&#xff0c;“錯誤” 是繞不開的話題。你可能寫過一個簡單的計算器&#xff0c;卻因為用戶輸入 “50” 而崩潰&#xff1b;也可能在操作數據庫時&#xff0c;因為權限不足導致數據讀取失敗&#xff1b;甚至在申請內存時&#xff0c;因為系統資源耗盡而無法繼…