在并發編程中,除了CountDownLatch和Semaphore,CyclicBarrier和Phaser也是實現多線程協作的重要工具。它們在處理多階段任務同步、動態調整參與線程等場景中展現出獨特價值。本文作為并發工具類系列的第二篇,將深入解析CyclicBarrier和Phaser的核心機制、實戰案例及適用場景,幫助開發者構建更靈活的線程協作模型。
一、CyclicBarrier:循環屏障的多階段協作
CyclicBarrier(循環屏障)的設計初衷是讓一組線程在到達某個屏障點時暫停,直至所有線程都到達后再共同繼續執行。與CountDownLatch的一次性使用不同,CyclicBarrier的計數器可通過reset()方法重置,支持多輪次的線程同步,這也是其 “循環” 特性的由來。
1.1 核心原理與方法解析
CyclicBarrier基于 “屏障點 + 集體喚醒” 機制實現,核心方法如下:
方法 | 功能描述 |
CyclicBarrier(int parties) | 構造方法,指定參與同步的線程數量(parties) |
CyclicBarrier(int parties, Runnable barrierAction) | 帶屏障動作的構造方法,所有線程到達后先執行該動作 |
int await() | 線程到達屏障點后阻塞等待,返回當前線程的到達順序(0~parties-1) |
int await(long timeout, TimeUnit unit) | 帶超時的等待,超時后屏障被打破,拋出TimeoutException |
void reset() | 重置屏障至初始狀態,所有等待線程將收到BrokenBarrierException |
int getNumberWaiting() | 返回當前正在屏障點等待的線程數 |
boolean isBroken() | 判斷屏障是否被打破(如線程中斷、超時等) |
關鍵特性:CyclicBarrier的核心是 “所有線程必須同時到達屏障點”,適用于多階段任務中各階段的同步,且支持重復使用。
1.2 典型場景:分階段數據處理
在數據處理流程中,常需將任務分為多個階段(如數據采集→清洗→分析→存儲),每個階段需所有線程完成當前工作后才能進入下一階段。CyclicBarrier能完美管控這種分階段協作。
實戰案例:
public class DataProcessDemo {// 3個線程參與處理,所有線程到達后執行階段總結動作private static final CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("\n=== 所有線程完成當前階段,進入下一階段 ==="));public static void main(String[] args) {// 啟動3個數據處理線程for (int i = 0; i < 3; i++) {new Thread(new DataProcessor(i), "處理線程-" + i).start();}}static class DataProcessor implements Runnable {private int threadId;public DataProcessor(int threadId) {this.threadId = threadId;}@Overridepublic void run() {try {// 第一階段:數據采集System.out.println("線程" + threadId + ":開始數據采集");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據采集完成,等待其他線程");barrier.await();// 第二階段:數據清洗System.out.println("線程" + threadId + ":開始數據清洗");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據清洗完成,等待其他線程");barrier.await();// 第三階段:數據存儲System.out.println("線程" + threadId + ":開始數據存儲");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據存儲完成,等待其他線程");barrier.await();System.out.println("線程" + threadId + ":所有階段處理完成");} catch (InterruptedException | BrokenBarrierException e) {System.out.println("線程" + threadId + ":屏障被打破,異常信息:" + e.getMessage());}}}
}
運行結果片段:
線程0:開始數據采集線程1:開始數據采集線程2:開始數據采集線程1:數據采集完成,等待其他線程線程0:數據采集完成,等待其他線程線程2:數據采集完成,等待其他線程=== 所有線程完成當前階段,進入下一階段 ===線程0:開始數據清洗線程1:開始數據清洗線程2:開始數據清洗...
案例解析:
- 每個線程完成階段任務后調用barrier.await(),阻塞等待其他線程;
- 當 3 個線程均到達屏障點時,先執行屏障動作(打印階段總結),再喚醒所有線程進入下一階段;
- 若任何線程在等待過程中被中斷或超時,屏障會被標記為 “打破”,所有線程將收到BrokenBarrierException,避免部分線程無限等待。
1.3 與 CountDownLatch 的核心差異
盡管兩者都能實現線程同步,但適用場景截然不同:
維度 | CyclicBarrier | CountDownLatch |
復用性 | 可通過reset()重置,支持多輪同步 | 計數器歸 0 后不可復用,一次性使用 |
同步邏輯 | 所有線程相互等待(線程→線程) | 一組線程等待另一組線程(線程組→線程組) |
核心動作 | 線程到達屏障點后阻塞,需集體喚醒 | 線程完成任務后遞減計數器,無需等待 |
典型場景 | 分階段任務的各階段同步 | 初始化等待、事件通知 |
示例對比:用CountDownLatch實現上述分階段任務需為每個階段創建新的計數器,而CyclicBarrier可通過同一實例完成所有階段同步,代碼更簡潔。
二、Phaser:動態調整的階段同步器
Phaser是 Java 7 引入的高級同步工具,兼具CountDownLatch和CyclicBarrier的功能,且支持動態調整參與線程數量(注冊 / 注銷),適用于線程數量動態變化的多階段任務。
2.1 核心原理與方法解析
Phaser通過 “階段(phase)” 和 “參與者(party)” 概念實現同步,核心方法如下:
方法 | 功能描述 |
Phaser(int parties) | 構造方法,指定初始參與者數量 |
int register() | 注冊一個參與者,返回當前階段號 |
boolean deregister() | 注銷一個參與者,返回是否為最后一個參與者 |
int arriveAndAwaitAdvance() | 當前參與者到達階段終點,等待其他參與者后進入下一階段 |
int arriveAndDeregister() | 到達階段終點并注銷,適用于完成所有任務的參與者 |
int getPhase() | 返回當前階段號(從 0 開始,溢出后重置為 0) |
int getRegisteredParties() | 返回當前注冊的參與者數量 |
關鍵特性:Phaser的階段號隨所有參與者到達而遞增,支持動態增減參與者,且可通過重寫onAdvance(int phase, int registeredParties)方法自定義階段切換邏輯。
2.2 典型場景:動態線程的多階段任務
在分布式計算或并行處理中,線程可能因任務完成而退出,或因新任務加入而新增,Phaser能靈活應對這種動態變化。
實戰案例:
public class DynamicTaskDemo {public static void main(String[] args) throws InterruptedException {// 初始3個參與者,重寫階段切換邏輯Phaser phaser = new Phaser(3) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("\n=== 階段" + phase + "完成,當前參與者:" + registeredParties + " ===");// 當參與者為0或完成3個階段時終止return registeredParties == 0 || phase >= 2;}};// 啟動3個初始任務線程for (int i = 0; i < 3; i++) {new Thread(new DynamicWorker(phaser, i), "初始線程-" + i).start();}// 主線程等待所有階段完成while (!phaser.isTerminated()) {Thread.sleep(100);}System.out.println("\n所有階段完成,Phaser終止");}static class DynamicWorker implements Runnable {private Phaser phaser;private int workerId;public DynamicWorker(Phaser phaser, int workerId) {this.phaser = phaser;this.workerId = workerId;}@Overridepublic void run() {try {// 階段0:數據準備System.out.println("線程" + workerId + ":階段0準備中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待進入階段1// 線程0在階段1后注冊新參與者if (workerId == 0 && phaser.getPhase() == 1) {phaser.register();new Thread(new DynamicWorker(phaser, 3), "新增線程-3").start();System.out.println("線程0:注冊新參與者,當前參與者數:" + phaser.getRegisteredParties());}// 階段1:數據處理System.out.println("線程" + workerId + ":階段1處理中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待進入階段2// 線程1在階段2后注銷if (workerId == 1) {phaser.deregister();System.out.println("線程1:已注銷,當前參與者數:" + phaser.getRegisteredParties());return; // 線程1完成任務退出}// 階段2:結果匯總System.out.println("線程" + workerId + ":階段2匯總中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndDeregister(); // 完成后注銷} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
運行結果片段:
線程0:階段0準備中...線程1:階段0準備中...線程2:階段0準備中...=== 階段0完成,當前參與者:3 ===線程0:階段1處理中...線程1:階段1處理中...線程2:階段1處理中...線程0:注冊新參與者,當前參與者數:4新增線程-3:階段1處理中...=== 階段1完成,當前參與者:4 ===線程0:階段2匯總中...線程2:階段2匯總中...線程1:已注銷,當前參與者數:3新增線程-3:階段2匯總中...=== 階段2完成,當前參與者:3 ===所有階段完成,Phaser終止
案例解析:
- Phaser初始注冊 3 個參與者,階段 0 完成后進入階段 1;
- 線程 0 在階段 1 動態注冊新參與者(線程 3),參與者數變為 4;
- 線程 1 在階段 1 完成后注銷,參與者數減為 3;
- 所有參與者完成階段 2 后,onAdvance()返回true,Phaser終止;
- 動態注冊 / 注銷功能使Phaser能適應線程數量變化,比CyclicBarrier更靈活。
2.3 高級特性:分層 Phaser
對于復雜任務,可通過Phaser的分層機制(父 Phaser 管理子 Phaser)減少單個 Phaser 的競爭壓力。例如,將 1000 個線程分為 10 組,每組由一個子 Phaser 管理,子 Phaser 再注冊到父 Phaser,實現 “局部同步→全局同步” 的層級協作。
代碼示例:
public class HierarchicalPhaserDemo {public static void main(String[] args) {// 父Phaser,初始0個參與者Phaser root = new Phaser(0) {@Overrideprotected boolean onAdvance(int phase, int parties) {System.out.println("全局階段" + phase + "完成,參與組:" + parties);return phase >= 1; // 完成2個全局階段后終止}};// 創建3個子Phaser,父Phaser為rootPhaser[] children = new Phaser[3];for (int i = 0; i < 3; i++) {children[i] = new Phaser(root, 2); // 每個子Phaser管理2個線程}// 啟動6個線程(3組×2)for (int i = 0; i < 3; i++) {int groupId = i;for (int j = 0; j < 2; j++) {new Thread(() -> {for (int phase = 0; phase < 2; phase++) {System.out.println("組" + groupId + "線程" + Thread.currentThread().getId() + ":完成局部階段" + phase);children[groupId].arriveAndAwaitAdvance(); // 等待組內同步}children[groupId].arriveAndDeregister(); // 完成后注銷}).start();}}}
}
核心價值:分層機制降低了單個 Phaser 的競爭頻率,提升高并發場景下的性能。
三、四大工具類的綜合對比與選型
工具類 | 核心能力 | 靈活性 | 典型場景 | 適用線程數 |
CountDownLatch | 等待多線程完成 | 低(一次性) | 初始化、事件通知 | 固定 |
Semaphore | 控制資源并發數 | 中(動態許可) | 資源池、限流 | 不固定 |
CyclicBarrier | 多階段線程同步 | 中(可重置) | 分階段任務 | 固定 |
Phaser | 動態階段同步 | 高(動態注冊) | 動態線程任務、分層同步 | 動態變化 |
選型建議:
- 簡單等待場景用CountDownLatch;
- 資源限流場景用Semaphore;
- 固定線程的多階段任務用CyclicBarrier;
- 動態線程或復雜分層任務用Phaser。
總結
CyclicBarrier和Phaser為多線程協作提供了更靈活的解決方案:CyclicBarrier通過循環屏障實現固定線程的多階段同步,適合分步驟協同工作;Phaser則支持動態調整參與者,能應對線程數量變化的復雜場景。
結合上一篇的CountDownLatch和Semaphore,這四類工具類基本覆蓋了常見的線程協作需求。在實際開發中,需根據線程數量是否固定、是否多階段任務、是否需要動態調整等因素選擇合適的工具,以實現高效、可靠的并發控制。
掌握這些工具類的核心原理和適用場景,不僅能簡化并發代碼的編寫,更能提升系統在高并發場景下的穩定性和性能。