并發編程常用工具類(下):CyclicBarrier 與 Phaser 的協同應用

在并發編程中,除了CountDownLatch和Semaphore,CyclicBarrier和Phaser也是實現多線程協作的重要工具。它們在處理多階段任務同步、動態調整參與線程等場景中展現出獨特價值。本文作為并發工具類系列的第二篇,將深入解析CyclicBarrier和Phaser的核心機制、實戰案例及適用場景,幫助開發者構建更靈活的線程協作模型。

一、CyclicBarrier:循環屏障的多階段協作

CyclicBarrier(循環屏障)的設計初衷是讓一組線程在到達某個屏障點時暫停,直至所有線程都到達后再共同繼續執行。與CountDownLatch的一次性使用不同,CyclicBarrier的計數器可通過reset()方法重置,支持多輪次的線程同步,這也是其 “循環” 特性的由來。

1.1 核心原理與方法解析

CyclicBarrier基于 “屏障點 + 集體喚醒” 機制實現,核心方法如下:

方法

功能描述

CyclicBarrier(int parties)

構造方法,指定參與同步的線程數量(parties)

CyclicBarrier(int parties, Runnable barrierAction)

帶屏障動作的構造方法,所有線程到達后先執行該動作

int await()

線程到達屏障點后阻塞等待,返回當前線程的到達順序(0~parties-1)

int await(long timeout, TimeUnit unit)

帶超時的等待,超時后屏障被打破,拋出TimeoutException

void reset()

重置屏障至初始狀態,所有等待線程將收到BrokenBarrierException

int getNumberWaiting()

返回當前正在屏障點等待的線程數

boolean isBroken()

判斷屏障是否被打破(如線程中斷、超時等)

關鍵特性:CyclicBarrier的核心是 “所有線程必須同時到達屏障點”,適用于多階段任務中各階段的同步,且支持重復使用。

1.2 典型場景:分階段數據處理

在數據處理流程中,常需將任務分為多個階段(如數據采集→清洗→分析→存儲),每個階段需所有線程完成當前工作后才能進入下一階段。CyclicBarrier能完美管控這種分階段協作。

實戰案例

public class DataProcessDemo {// 3個線程參與處理,所有線程到達后執行階段總結動作private static final CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("\n=== 所有線程完成當前階段,進入下一階段 ==="));public static void main(String[] args) {// 啟動3個數據處理線程for (int i = 0; i < 3; i++) {new Thread(new DataProcessor(i), "處理線程-" + i).start();}}static class DataProcessor implements Runnable {private int threadId;public DataProcessor(int threadId) {this.threadId = threadId;}@Overridepublic void run() {try {// 第一階段:數據采集System.out.println("線程" + threadId + ":開始數據采集");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據采集完成,等待其他線程");barrier.await();// 第二階段:數據清洗System.out.println("線程" + threadId + ":開始數據清洗");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據清洗完成,等待其他線程");barrier.await();// 第三階段:數據存儲System.out.println("線程" + threadId + ":開始數據存儲");Thread.sleep((long) (Math.random() * 1000));System.out.println("線程" + threadId + ":數據存儲完成,等待其他線程");barrier.await();System.out.println("線程" + threadId + ":所有階段處理完成");} catch (InterruptedException | BrokenBarrierException e) {System.out.println("線程" + threadId + ":屏障被打破,異常信息:" + e.getMessage());}}}
}

運行結果片段


線程0:開始數據采集線程1:開始數據采集線程2:開始數據采集線程1:數據采集完成,等待其他線程線程0:數據采集完成,等待其他線程線程2:數據采集完成,等待其他線程=== 所有線程完成當前階段,進入下一階段 ===線程0:開始數據清洗線程1:開始數據清洗線程2:開始數據清洗...

案例解析

  • 每個線程完成階段任務后調用barrier.await(),阻塞等待其他線程;
  • 當 3 個線程均到達屏障點時,先執行屏障動作(打印階段總結),再喚醒所有線程進入下一階段;
  • 若任何線程在等待過程中被中斷或超時,屏障會被標記為 “打破”,所有線程將收到BrokenBarrierException,避免部分線程無限等待。

1.3 與 CountDownLatch 的核心差異

盡管兩者都能實現線程同步,但適用場景截然不同:

維度

CyclicBarrier

CountDownLatch

復用性

可通過reset()重置,支持多輪同步

計數器歸 0 后不可復用,一次性使用

同步邏輯

所有線程相互等待(線程→線程)

一組線程等待另一組線程(線程組→線程組)

核心動作

線程到達屏障點后阻塞,需集體喚醒

線程完成任務后遞減計數器,無需等待

典型場景

分階段任務的各階段同步

初始化等待、事件通知

示例對比:用CountDownLatch實現上述分階段任務需為每個階段創建新的計數器,而CyclicBarrier可通過同一實例完成所有階段同步,代碼更簡潔。

二、Phaser:動態調整的階段同步器

Phaser是 Java 7 引入的高級同步工具,兼具CountDownLatch和CyclicBarrier的功能,且支持動態調整參與線程數量(注冊 / 注銷),適用于線程數量動態變化的多階段任務。

2.1 核心原理與方法解析

Phaser通過 “階段(phase)” 和 “參與者(party)” 概念實現同步,核心方法如下:

方法

功能描述

Phaser(int parties)

構造方法,指定初始參與者數量

int register()

注冊一個參與者,返回當前階段號

boolean deregister()

注銷一個參與者,返回是否為最后一個參與者

int arriveAndAwaitAdvance()

當前參與者到達階段終點,等待其他參與者后進入下一階段

int arriveAndDeregister()

到達階段終點并注銷,適用于完成所有任務的參與者

int getPhase()

返回當前階段號(從 0 開始,溢出后重置為 0)

int getRegisteredParties()

返回當前注冊的參與者數量

關鍵特性:Phaser的階段號隨所有參與者到達而遞增,支持動態增減參與者,且可通過重寫onAdvance(int phase, int registeredParties)方法自定義階段切換邏輯。

2.2 典型場景:動態線程的多階段任務

在分布式計算或并行處理中,線程可能因任務完成而退出,或因新任務加入而新增,Phaser能靈活應對這種動態變化。

實戰案例

public class DynamicTaskDemo {public static void main(String[] args) throws InterruptedException {// 初始3個參與者,重寫階段切換邏輯Phaser phaser = new Phaser(3) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("\n=== 階段" + phase + "完成,當前參與者:" + registeredParties + " ===");// 當參與者為0或完成3個階段時終止return registeredParties == 0 || phase >= 2;}};// 啟動3個初始任務線程for (int i = 0; i < 3; i++) {new Thread(new DynamicWorker(phaser, i), "初始線程-" + i).start();}// 主線程等待所有階段完成while (!phaser.isTerminated()) {Thread.sleep(100);}System.out.println("\n所有階段完成,Phaser終止");}static class DynamicWorker implements Runnable {private Phaser phaser;private int workerId;public DynamicWorker(Phaser phaser, int workerId) {this.phaser = phaser;this.workerId = workerId;}@Overridepublic void run() {try {// 階段0:數據準備System.out.println("線程" + workerId + ":階段0準備中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待進入階段1// 線程0在階段1后注冊新參與者if (workerId == 0 && phaser.getPhase() == 1) {phaser.register();new Thread(new DynamicWorker(phaser, 3), "新增線程-3").start();System.out.println("線程0:注冊新參與者,當前參與者數:" + phaser.getRegisteredParties());}// 階段1:數據處理System.out.println("線程" + workerId + ":階段1處理中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndAwaitAdvance(); // 等待進入階段2// 線程1在階段2后注銷if (workerId == 1) {phaser.deregister();System.out.println("線程1:已注銷,當前參與者數:" + phaser.getRegisteredParties());return; // 線程1完成任務退出}// 階段2:結果匯總System.out.println("線程" + workerId + ":階段2匯總中...");Thread.sleep((long) (Math.random() * 1000));phaser.arriveAndDeregister(); // 完成后注銷} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

運行結果片段

線程0:階段0準備中...線程1:階段0準備中...線程2:階段0準備中...=== 階段0完成,當前參與者:3 ===線程0:階段1處理中...線程1:階段1處理中...線程2:階段1處理中...線程0:注冊新參與者,當前參與者數:4新增線程-3:階段1處理中...=== 階段1完成,當前參與者:4 ===線程0:階段2匯總中...線程2:階段2匯總中...線程1:已注銷,當前參與者數:3新增線程-3:階段2匯總中...=== 階段2完成,當前參與者:3 ===所有階段完成,Phaser終止

案例解析

  • Phaser初始注冊 3 個參與者,階段 0 完成后進入階段 1;
  • 線程 0 在階段 1 動態注冊新參與者(線程 3),參與者數變為 4;
  • 線程 1 在階段 1 完成后注銷,參與者數減為 3;
  • 所有參與者完成階段 2 后,onAdvance()返回true,Phaser終止;
  • 動態注冊 / 注銷功能使Phaser能適應線程數量變化,比CyclicBarrier更靈活。

2.3 高級特性:分層 Phaser

對于復雜任務,可通過Phaser的分層機制(父 Phaser 管理子 Phaser)減少單個 Phaser 的競爭壓力。例如,將 1000 個線程分為 10 組,每組由一個子 Phaser 管理,子 Phaser 再注冊到父 Phaser,實現 “局部同步→全局同步” 的層級協作。

代碼示例

public class HierarchicalPhaserDemo {public static void main(String[] args) {// 父Phaser,初始0個參與者Phaser root = new Phaser(0) {@Overrideprotected boolean onAdvance(int phase, int parties) {System.out.println("全局階段" + phase + "完成,參與組:" + parties);return phase >= 1; // 完成2個全局階段后終止}};// 創建3個子Phaser,父Phaser為rootPhaser[] children = new Phaser[3];for (int i = 0; i < 3; i++) {children[i] = new Phaser(root, 2); // 每個子Phaser管理2個線程}// 啟動6個線程(3組×2)for (int i = 0; i < 3; i++) {int groupId = i;for (int j = 0; j < 2; j++) {new Thread(() -> {for (int phase = 0; phase < 2; phase++) {System.out.println("組" + groupId + "線程" + Thread.currentThread().getId() + ":完成局部階段" + phase);children[groupId].arriveAndAwaitAdvance(); // 等待組內同步}children[groupId].arriveAndDeregister(); // 完成后注銷}).start();}}}
}

核心價值:分層機制降低了單個 Phaser 的競爭頻率,提升高并發場景下的性能。

三、四大工具類的綜合對比與選型

工具類

核心能力

靈活性

典型場景

適用線程數

CountDownLatch

等待多線程完成

低(一次性)

初始化、事件通知

固定

Semaphore

控制資源并發數

中(動態許可)

資源池、限流

不固定

CyclicBarrier

多階段線程同步

中(可重置)

分階段任務

固定

Phaser

動態階段同步

高(動態注冊)

動態線程任務、分層同步

動態變化

選型建議

  • 簡單等待場景用CountDownLatch;
  • 資源限流場景用Semaphore;
  • 固定線程的多階段任務用CyclicBarrier;
  • 動態線程或復雜分層任務用Phaser。

總結

CyclicBarrier和Phaser為多線程協作提供了更靈活的解決方案:CyclicBarrier通過循環屏障實現固定線程的多階段同步,適合分步驟協同工作;Phaser則支持動態調整參與者,能應對線程數量變化的復雜場景。

結合上一篇的CountDownLatch和Semaphore,這四類工具類基本覆蓋了常見的線程協作需求。在實際開發中,需根據線程數量是否固定、是否多階段任務、是否需要動態調整等因素選擇合適的工具,以實現高效、可靠的并發控制。

掌握這些工具類的核心原理和適用場景,不僅能簡化并發代碼的編寫,更能提升系統在高并發場景下的穩定性和性能。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/94334.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/94334.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/94334.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

機器人焊接節氣裝置

在摩托車制造過程中&#xff0c;精密部件的焊接質量直接影響整車的安全性和操控性能。以發動機缸體焊接為例&#xff0c;傳統手工焊接容易出現焊縫不均勻的問題&#xff0c;而采用六軸弧焊機器人后&#xff0c;焊接精度能控制在0.1毫米以內。日本川崎重工的生產數據顯示&#x…

使用yolo11訓練食物浪費檢測數據集VOC+YOLO格式6734張32類別步驟和流程

【數據集介紹】數據集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路徑的txt文件&#xff0c;僅僅包含jpg圖片以及對應的VOC格式xml文件和yolo格式txt文件)圖片數量(jpg文件個數)&#xff1a;6734標注數量(xml文件個數)&#xff1a;6734標注數量(txt文件個數)&#xff1…

掌握PowerPC架構與編程技巧:技術資料詳解

本文還有配套的精品資源&#xff0c;點擊獲取 簡介&#xff1a;PowerPC是一種高性能的RISC架構&#xff0c;最初由IBM、Motorola和Apple聯合開發&#xff0c;被設計用于高端工作站和服務器&#xff0c;同時廣泛應用于嵌入式系統、航空電子設備、游戲主機和超級計算機等領域。…

VR 企業展廳:開啟數字化展示新時代

在當今數字化浪潮席卷各行各業的時代&#xff0c;企業的展示與宣傳方式也在不斷革新。VR&#xff08;虛擬現實&#xff09;技術的出現&#xff0c;為企業展廳帶來了全新的變革&#xff0c;使其從傳統的實體展示空間&#xff0c;轉變為具有無限可能的數字化虛擬空間。一、VR 企業…

測試用例顆粒度全解析

引言&#xff1a;為什么顆粒度是測試團隊的“隱形門檻”&#xff1f;在軟件測試領域&#xff0c;測試用例顆粒度&#xff08;即測試用例的詳細程度&#xff09;看似是一個基礎問題&#xff0c;卻常常成為團隊協作的“隱形門檻”。某電商平臺測試團隊曾出現過這樣的窘境&#xf…

分布式鎖的基本原理和基于lua腳本的實現(Redisson)

為了確保分布式鎖可用&#xff0c;我們要確保鎖的實現同時滿足以下四個條件&#xff1a;- 互斥性。在任意時刻&#xff0c;只有一個客戶端能持有鎖。- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖&#xff0c;也能保證后續其他客戶端能加鎖。- 解鈴還須系…

智慧園區數字孿生全生命周期交付體系:從虛擬建模到全域智聯的快速交付新常態

在數字經濟與綠色低碳轉型的雙重驅動下&#xff0c;智慧園區建設正經歷從“物理空間堆砌”到“數據智能驅動”的范式革命。數字孿生技術作為核心引擎&#xff0c;通過構建物理園區的虛擬鏡像&#xff0c;實現虛實空間的毫秒級同步與智能協同&#xff0c;推動園區管理向全要素感…

電腦忘記開機密碼怎么辦?【圖文詳解】5種方法重置/更改/取消/設置開機密碼?

一、問題背景誰都有馬虎的時候&#xff0c;要是突然忘了電腦開機密碼&#xff0c;就只能對著登錄界面干著急&#xff0c;沒法打開電腦處理工作、查看文件&#xff0c;太影響效率了。別慌&#xff0c;其實有不少簡單實用的辦法能解除或重置密碼&#xff0c;下面就來一一介紹&…

Go語言select

select是什么select是Go語言層面提供的一種多路復用機制&#xff0c;用于檢測當前goroutine連接的多個channel是否有數據準備完畢&#xff0c;可用于讀或寫。Go語言的select語句&#xff0c;是用來起一個goroutine監聽多個Channel的讀寫事件&#xff0c;提高從多個Channel獲取信…

VUE+SPRINGBOOT從0-1打造前后端-前后臺系統-整體示例

一、注冊、登錄、密碼找回二、VUE前臺系統三、VUE后臺系統

深入解析SmolVLA:VLM與動作專家間的注意力機制交互

在機器人學習領域&#xff0c;如何有效地將視覺語言模型&#xff08;VLM&#xff09;的強大感知能力與低級動作控制相結合&#xff0c;是實現通用機器人智能的關鍵挑戰。SmolVLA&#xff08;Small Vision-Language-Action&#xff09;架構正是在這一背景下應運而生&#xff0c;…

Spring Security 認證與授權實現機制

Spring Security 是一個功能強大且高度可定制的身份驗證和訪問控制框架&#xff0c;其認證和授權實現機制如下&#xff1a;一、認證(Authentication)實現 1. 核心組件 AuthenticationManager&#xff1a;認證入口點&#xff0c;委托給AuthenticationProviderAuthenticationProv…

開源的時間跟蹤工具TimeTagger

簡介 什么是 TimeTagger &#xff1f; TimeTagger 是一個開源的時間跟蹤工具&#xff0c;旨在幫助用戶記錄和分析他們的時間使用情況。它提供了一個互動的用戶界面和強大的報告功能&#xff0c;適合個人和自由職業者使用。 主要特點 直觀的用戶界面&#xff1a;基于互動時間線…

學習游戲制作記錄(角色屬性和狀態腳本)8.4

1.實現簡單的角色屬性創建CharactorState腳本&#xff1a;掛載在敵人和玩家身上public float damage;//角色傷害public float maxHp;//最大生命[SerializeField] private float currentHealth;//當前生命void Start(){currentHealth maxHp;//初始化將當前生命設置為最大生命}p…

04-Chapter02-Example01

文章介紹 1、完善項目結構 1.1 新建第二章對應模塊Chapter021.2 新建模塊Chapter02對應包com.itheima1.3 在包com.itheima下新建class類 &#xff0c;類名稱Example01.java項目結構如下&#xff1a;2、編寫Example01.java代碼 P38 package com.itheima;public class Example01…

【達夢MPP(帶主備)集群搭建】

達夢MPP&#xff08;帶主備&#xff09;集群搭建 為了提高MPP系統可靠性&#xff0c;克服由于單節點故障導致整個系統不能繼續正常工作的問題&#xff0c;DM在普通的MPP系統基礎上&#xff0c;引入數據守護主備機制&#xff0c;為每一個MPP節點配置一個實時備庫作為備份節點&a…

Java基礎學習(一):類名規范、返回值、注釋、數據類型

目錄 一、類名規范二、返回值三、注釋四、數據類型 1. 基本類型2. 引用類型3. 強制數據類型轉換4. 進制5. 進制的轉換6. 超范圍運算 相關文章 Java基礎學習&#xff08;二&#xff09;&#xff1a;Java中的變量和常量、final&#xff08;重點&#xff09;、運算、字符串 了…

Nginx相關實驗(2)

nginx的一些高級配置 nginx狀態頁 基于nginx 模塊 ngx_http_stub_status_module 實現&#xff0c; 在編譯安裝nginx的時候需要添加編譯參數 --with-http_stub_status_module 否則配置完成之后監測會是提示語法錯誤 #檢查模塊是否配置 如果不存在需要添加模塊重新編譯 nginx …

2.Java和C++有什么區別

2.Java和C有什么區別 1.在C中&#xff0c;支持操作符重載&#xff0c;而在Java里是不允許的 2.C支持多繼承&#xff0c;而Java不可以&#xff0c;但Java可以單繼承多實現 3.Java內置多線程機制&#xff0c;而C沒有 4.Java里面自帶了垃圾回收機制&#xff0c;可以自動清理緩存等…

MLIR Introduction

簡介 MLIR 項目是一個構建可重用和可擴展編譯器基礎設施創新項目&#xff0c;旨在解決軟件碎片化問題&#xff0c;顯著降低構建特定領域編譯器的成本。 基于MLIR&#xff0c;可以實現&#xff1a; 數據流圖表達&#xff08;如TensorFlow&#xff09;&#xff0c;包含dynamic…