文章目錄
- 核心思想:組隊出游,人到齊了才出發 🚌
- 最簡單易懂的代碼示例
- 代碼解析
- 運行效果分析
- `CyclicBarrier` vs `CountDownLatch` 的關鍵區別
- CyclicBarrier在業務系統里面通常有什么常用的應用場景
- 核心應用模式
- 1. 數據并行處理與ETL(最經典)
- 2. 模擬高并發測試
- 3. 多線程數據計算與合并
- 4. 游戲服務器中的多玩家同步
- 總結
核心思想:組隊出游,人到齊了才出發 🚌
想象一下,您和幾個朋友約好一起去旅游。大家從各自的家里出發,約定在火車站門口集合。CyclicBarrier
就好比是這個“集合點”。
- 規則:必須所有人都到達火車站門口后,大家才能一起進站上車。先到的人必須在門口等著,直到最后一個人到達。
CyclicBarrier barrier = new CyclicBarrier(3);
- 這等于約定了這次出游的小隊有 3個人。這個
3
就是需要到達“柵欄”(Barrier)的線程數量。
- 這等于約定了這次出游的小隊有 3個人。這個
barrier.await();
(等待)- 每個朋友(線程)到達火車站門口時,就調用一次這個方法。
- 這個方法的意思是:“我到了,我開始等待其他人。” 然后這個線程就會被阻塞。
- 觸發開柵:
- 當第3個朋友(最后一個線程)也到達并調用
await()
時,“集結”條件達成! - 柵欄會“打開”,所有之前在
await()
處等待的線程會被同時喚醒,然后大家一起繼續執行后面的任務(比如進站)。
- 當第3個朋友(最后一個線程)也到達并調用
- 循環使用 (Cyclic):
- 最關鍵的是,這個柵欄是可以重復使用的。比如大家進站后,又可以約定在“檢票口”作為下一個集合點,再次使用同一個
CyclicBarrier
等待所有人檢票后一起上車。
- 最關鍵的是,這個柵欄是可以重復使用的。比如大家進站后,又可以約定在“檢票口”作為下一個集合點,再次使用同一個
最簡單易懂的代碼示例
下面我們就用代碼來模擬 3個朋友約定集合 的場景。我們還會用到 CyclicBarrier
的一個高級功能:當最后一個人到達時,可以指定一個額外的任務(比如由最后一個人高喊“人齊了,出發!”)。
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleCyclicBarrierDemo {public static void main(String[] args) {// 1. 創建一個 CyclicBarrier// 參數1: 參與的線程數量(小隊人數為3)// 參數2: 當最后一個線程到達柵欄時,要執行的任務(可選)final int teamSize = 3;final CyclicBarrier barrier = new CyclicBarrier(teamSize, () -> {// 這個任務會由最后一個到達柵欄的線程來執行System.out.println("\n*** 所有人都到齊了!由我(最后到達者)來宣布:出發! ***\n");});// 創建一個線程池來管理我們的朋友線程ExecutorService executor = Executors.newFixedThreadPool(teamSize);System.out.println("--- 3個朋友各自從家里出發 ---");// 2. 模擬3個朋友出發for (int i = 0; i < teamSize; i++) {final String friendName = "朋友-" + (i + 1);executor.submit(() -> {try {// 模擬從家里到集合點的耗時int travelTime = new Random().nextInt(5000) + 1000; // 隨機1-6秒System.out.println("[" + friendName + "] 出發了,大概需要 " + travelTime / 1000 + " 秒...");Thread.sleep(travelTime);System.out.println(">>> [" + friendName + "] 到達集合點,開始等待其他人...");// 3. 關鍵!調用 await() 表示自己已到達,并開始等待// 線程會在這里被阻塞,直到所有3個線程都調用了 await()barrier.await();// --- 當所有人都到達后,線程會從 await() 返回,繼續執行 ---System.out.println("<<< [" + friendName + "] 跟隨大部隊一起出發!");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});}// 關閉線程池executor.shutdown();}
}
代碼解析
new CyclicBarrier(3, ...)
: 創建了一個需要3個線程“報到”的柵欄。同時還指定了一個“柵欄任務”,這個任務會在第3個線程報到時,由那個線程順便完成。barrier.await()
: 這是每個線程的“報到點”。先到的線程會在這里停下(阻塞),耐心等待后到的同伴。barrierAction
(第二個參數): 當最后一個(第3個)線程調用await()
時,它不會立刻阻塞,而是先去執行這個我們預設好的任務(打印“所有人都到齊了…”),執行完之后,柵欄才會打開,所有線程(包括它自己)才能繼續前進。
運行效果分析
當你運行程序時,你會看到:
- 3個朋友會不按順序地、在不同時間點打印“到達集合點…”。
- 先到的1、2個朋友在打印完這句話后就不會有任何輸出了,因為它們被
await()
阻塞了。 - 直到最后一個朋友也打印了“到達集合點…”,并調用了
await()
后,你會立刻看到:
a. 首先打印出*** 所有人都到齊了!... ***
這條消息(由最后到達的線程執行)。
b. 緊接著,3個線程會幾乎同時打印出<<< [朋友-X] 跟隨大部隊一起出發!
的信息。
CyclicBarrier
vs CountDownLatch
的關鍵區別
特性 | CyclicBarrier (循環柵欄) | CountDownLatch (倒數門閂) |
---|---|---|
作用對象 | 一組線程相互等待 | 一個或多個線程等待其他線程 |
關系 | 朋友之間,地位平等 | 老板和員工,一個等多個 |
可重用性 | 可循環使用。所有線程通過后,計數器重置,可用于下一輪集合。 | 一次性。計數減到0后就失效了,無法重置。 |
功能 | 功能更強大,可以附加一個barrierAction 任務。 | 功能相對簡單。 |
簡單總結:CountDownLatch
是一個線程等其他所有線程完事,而 CyclicBarrier
是所有線程互相等,等齊了再一起干下一件事。
CyclicBarrier在業務系統里面通常有什么常用的應用場景
好的,這個問題非常好!從學習一個工具的API到理解它在真實業務系統中的價值,是成為一個優秀工程師的關鍵一步。
CyclicBarrier
的核心是“讓一組線程相互等待,直到所有線程都到達一個公共的屏障點,然后再一起繼續執行”。基于這個特性,它在業務系統中有幾個非常經典和常用的應用場景。
核心應用模式
所有場景都可以歸結為一個模式:將一個大任務拆分成多個獨立的子任務,讓多個線程并行處理,但在某些關鍵的“同步點”(屏障),這些線程必須等待彼此,直到所有線程都到達這個點,然后才能一起進入下一個階段。
下面是幾個具體的業務場景:
1. 數據并行處理與ETL(最經典)
這是 CyclicBarrier
最常見的用途,尤其是在大數據和數據倉庫領域。想象一下,你需要處理一個巨大的數據文件(例如,幾千萬條用戶數據),整個過程分為三個階段:
- 數據讀取(Read): 從文件中讀取數據到內存。
- 數據處理(Process): 對內存中的數據進行清洗、轉換、計算。
- 數據寫入(Write): 將處理好的數據寫入到數據庫。
為了提高效率,你可以啟動多個線程,每個線程負責處理文件的一部分。但這里的關鍵是,必須等所有線程都完成了上一個階段,才能一起開始下一個階段。
- 場景: 必須等所有線程都讀取完畢,數據才算完整,才能開始處理。
- 場景: 必須等所有線程都處理完畢,才能開始寫入,以保證數據的一致性。
CyclicBarrier
在這里的作用:
就像一個階段性的流水線。
- 創建一個
CyclicBarrier
,參與方數量就是你的線程數。 - 每個線程完成自己的數據讀取后,調用
barrier.await()
。 - 當最后一個線程也完成讀取并調用
await()
后,所有線程被喚醒,一起進入數據處理階段。 - 處理完后,再次調用
barrier.await()
,等待所有同伴,然后一起進入數據寫入階段。 CyclicBarrier
的“可循環使用”特性在這里得到了完美體現。
// 偽代碼
void processData() {// 階段1:讀取數據readMyChunk();barrier.await(); // 等待所有線程讀完// 階段2:處理數據processMyChunk();barrier.await(); // 等待所有線程處理完// 階段3:寫入數據writeMyChunk();
}
2. 模擬高并發測試
在做性能壓測時,我們常常需要模擬“在同一瞬間,有大量用戶請求同時到達服務器”的場景,以測試系統的瞬時承載能力。
如果只是簡單地用一個 for 循環啟動幾百個線程,那么這些線程的啟動時間會有先后,無法做到真正的“同時并發”。
CyclicBarrier
在這里的作用:
就像一個賽跑的起跑線。
- 創建一個
CyclicBarrier
,參與方數量就是并發用戶數(比如500)。 - 啟動500個線程,每個線程模擬一個用戶。
- 每個線程在真正發起HTTP請求之前,先執行各自的準備工作(比如準備參數、建立連接等)。
- 準備工作完成后,每個線程都調用
barrier.await()
。 - 此時,所有500個線程都會在“起跑線”上被阻塞,等待發令槍。
- 當最后一個(第500個)線程也準備好并調用
await()
時,柵欄打開,所有500個線程會幾乎在同一時刻被喚醒,然后同時向服務器發起請求,從而達到了模擬瞬時高并發的目的。
3. 多線程數據計算與合并
在科學計算或金融分析等領域,一個復雜的計算任務可以被分解。例如,計算一個大矩陣,可以把矩陣切成好幾塊,分給不同線程計算。
- 場景:每個線程計算完自己的那一小塊后,需要用同伴們計算出的結果來計算下一輪的迭代值。
CyclicBarrier
的作用:確保所有線程都完成了當前輪次的計算,并將結果保存在一個共享位置后,大家才能一起進入下一輪迭代。這保證了每一輪迭代的初始數據都是完整和同步的。
4. 游戲服務器中的多玩家同步
在網絡游戲中,CyclicBarrier
也非常有用。
- 場景1:游戲開始前。一個游戲房間需要湊齊比如5個玩家才能開始。服務器可以為這個房間創建一個5個參與方的
CyclicBarrier
。每個玩家客戶端加載完地圖資源后,就向服務器報到(相當于調用await()
)。當第5個玩家也準備好后,服務器的柵欄打開,向所有5個客戶端同時發送“游戲開始”的指令。 - 場景2:回合制游戲。在一回合結束后,需要等待所有玩家都確認“回合結束”,才能一起進入下一回合的準備階段。
總結
應用場景 | 核心問題 | CyclicBarrier的作用 |
---|---|---|
數據并行處理 | 需要分階段、按步驟地處理數據,且每一階段都依賴上一階段的全部結果。 | 作為階段同步點,確保所有線程同步進入下一階段。 |
高并發測試 | 需要模擬大量線程在同一時刻觸發某個動作。 | 作為起跑線,將所有線程“壓”在同一點,然后同時釋放。 |
并行算法 | 算法需要多輪迭代,且每一輪的開始都依賴上一輪所有線程的計算結果。 | 作為迭代同步點,確保所有線程同步進入下一輪迭代。 |
游戲同步 | 需要等待所有參與者都達到某個狀態(如加載完成、回合結束)后才能繼續。 | 作為玩家狀態同步點,確保游戲邏輯同步進行。 |
總而言之,當你遇到一個需要多個對等的線程相互協作,步調一致地完成一個分階段任務時,CyclicBarrier
就是一個非常理想的選擇。