CyclicBarrier 基本用法
簡介
CyclicBarrier 是 Java 并發包(java.util.concurrent)中的一個同步輔助類。它允許一組線程相互等待,直到到達某個公共屏障點(common barrier point)。只有當所有參與的線程都到達屏障點時,這些線程才能繼續執行。這與CountDownLatch不同,后者是一次性的,而CyclicBarrier可以在等待的線程被釋放后重新使用。
使用場景
多線程任務中,讓多個線程在某個特定的點上同步,確保完成所有線程都完成某個階段后,再一起進入下一階段。
比如:一個計算任務被分成了多個部分,每個部分由不同的線程進行處理,處理完成后需要匯總結果,這時候需要所有線程都完成計算后,才能進行匯總。
基本用法
創建 CyclicBarrier
使用 new CyclicBarrier(int parties) 創建一個新的 CyclicBarrier,其中 parties 參數指定了必須調用 await() 方法以使所有線程都能通過屏障的線程數量。
也可以提供第二個參數,這是一個 Runnable,當最后一個線程到達屏障時會執行這個Runnable,這可以用于在所有線程到達屏障之前執行一些特定的操作。
線程中使用 await() 方法
每個線程在需要等待其他線程的地方調用 await() 方法。該方法會掛起當前線程,直到所有線程都調用了 await() 方法。
如果當前線程不是最后一個到達屏障的線程,await() 將阻塞直到所有線程都到達。如果當前線程是最后一個到達的,則所有線程都會被喚醒繼續執行。
重置屏障
可以通過調用 reset() 方法來重置 CyclicBarrier,這將導致所有等待的線程收到一個 BrokenBarrierException 異常,并且屏障計數會被重置為其初始狀態。
示例
說明:
使用休眠函數模擬實際業務執行的耗時,使用線程池多線程的執行任務,所有任務完成后,匯總計算結果。
公共代碼
給出模擬函數,以及線程池定義等前置條件代碼。
package com.ysx.utils.concurrent;import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class CyclicBarrierTest {private static final Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierTest.class);// 使用map模擬所有的數據,每個線程占據一個keyprivate final Map<Integer, Integer> countMap = new ConcurrentHashMap<>();// 核心線程數private static final int CORE_POOL_SIZE = 5;// 線程池private static final ExecutorService THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE * 2,10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000));/*** 使用一個休眠函數,模擬實際業務的計算,并且等待其他線程的結束** @param cyclicBarrier 屏障獨顯* @param index 數據的key,這里用index表示*/private void calculate(CyclicBarrier cyclicBarrier, int index) {SecureRandom secureRandom = new SecureRandom();// 隨機休眠時間,并作為數據值int randomSleepSeconds = secureRandom.nextInt(10);LOGGER.info("<{}> index: {}, randomSleepSeconds: {}.", Thread.currentThread().getName(), index, randomSleepSeconds);// 休眠,模擬線程計算耗時try {Thread.sleep(randomSleepSeconds * 1000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 記錄計算結果countMap.put(index, randomSleepSeconds);// 等待其他線程到達這個點LOGGER.info("<{}> index: {}, waiting other thread.", Thread.currentThread().getName(), index);try {cyclicBarrier.await();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}// 通過了這個屏障LOGGER.info("<{}> index: {}, has passed barrier.", Thread.currentThread().getName(), index);}}
場景1 基本用法
任務數量不超過線程池的核心線程數
@Test
@DisplayName("基本用法:任務數量不超過線程池的核心線程數")
public void test1() {int taskNum = 3;int threadNum = Math.min(CORE_POOL_SIZE, taskNum);// 這里 CyclicBarrier 的構造函數的參數 parties 表示需要攔截的線程的數量,示例中有3個子線程,1個主線程CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum + 1);for (int i = 0; i < threadNum; i++) {final int index = i;THREAD_POOL_EXECUTOR.execute(() -> calculate(cyclicBarrier, index));}try {// 等待所有子線程任務執行完成,超時時間為1分鐘LOGGER.info("waiting all thread.");cyclicBarrier.await(1, TimeUnit.MINUTES);LOGGER.info("<{}> has passed barrier.", Thread.currentThread().getName());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}LOGGER.info("finish calculate.");int sum = countMap.values().stream().mapToInt(Integer::intValue).sum();LOGGER.info("sum: {}.", sum);
}
運行后日志打印參考:(日志打印忽略時間戳)
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:105 - waiting all thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-3> index: 2, randomSleepSeconds: 7.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-1> index: 0, randomSleepSeconds: 7.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-2> index: 1, randomSleepSeconds: 7.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-2> index: 1, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-3> index: 2, waiting other thread.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-1> index: 0, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-3> index: 2, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:108 - <main> has passed barrier.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-1> index: 0, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:116 - finish calculate.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-2> index: 1, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:118 - sum: 21.
日志說明:
- 有3個任務,分別用3個子線程執行,主線程調用
cyclicBarrier.await(1, TimeUnit.MINUTES);
方法,表示等待子線程任務執行完成,之后匯總結果。
場景2 構造函數中指定Runnable
構造函數Runnable的用法:該Runnable優先級高
@Test
@DisplayName("構造函數Runnable的用法:該Runnable優先級高")
public void test2() {int taskNum = 3;int threadNum = Math.min(CORE_POOL_SIZE, taskNum);// 這里 CyclicBarrier 的構造函數的參數 parties 表示需要攔截的線程的數量,示例中有3個子線程,1個主線程CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum + 1, new Runnable() {@Overridepublic void run() {LOGGER.info("<{}> is first priority.", Thread.currentThread().getName());}});for (int i = 0; i < threadNum; i++) {final int index = i;THREAD_POOL_EXECUTOR.execute(() -> calculate(cyclicBarrier, index));}try {// 等待所有子線程任務執行完成,超時時間為1分鐘LOGGER.info("waiting all thread.");cyclicBarrier.await(1, TimeUnit.MINUTES);LOGGER.info("<{}> has passed barrier.", Thread.currentThread().getName());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}LOGGER.info("finish calculate.");int sum = countMap.values().stream().mapToInt(Integer::intValue).sum();LOGGER.info("sum: {}.", sum);
}
打印日志參考:
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:139 - waiting all thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-3> index: 2, randomSleepSeconds: 9.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-2> index: 1, randomSleepSeconds: 7.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-1> index: 0, randomSleepSeconds: 6.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-1> index: 0, waiting other thread.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-2> index: 1, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-3> index: 2, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:130 - <pool-2-thread-3> is first priority.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-1> index: 0, has passed barrier.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-2> index: 1, has passed barrier.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-3> index: 2, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:142 - <main> has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:150 - finish calculate.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:152 - sum: 22.
日志說明:
- 即構造函數的Runnable的優先級最高,在其他所有到達屏障點之后,這個線程的代碼最先執行。
場景3 通用場景:分批處理
使用線程池處理的場景,由于線程池數量是有限的,所以一次并發執行的線程數是有上限的,比如為5,當總任務數為13時,就需要把任務分批處理,以線程池核心線程數量分批,13個任務和分為3批,每個批次分別為5,5,3個任務。在所有批次執行完成之后,再匯總所有的數據。
分批場景需要考慮最后一批的執行,最后一批的數量可能存在小于核心線程數量的情況,要以實際的數量為準。
參考代碼如下:
@Test
@DisplayName("通用用法:任務數量存在超過線程池的核心線程數的場景,需要分批處理")
public void test3() {int taskNum = 13;// 計算分批計算的批次:任務數/核心線程數,并向上取整int batchNum = (int) Math.ceil((double) taskNum / CORE_POOL_SIZE);for (int batchIndex = 0; batchIndex < batchNum; batchIndex++) {int threadNum = CORE_POOL_SIZE;if (batchIndex == batchNum - 1) {// 如果是最后一批,需要以實際剩余的任務數作為線程數threadNum = taskNum - batchIndex * CORE_POOL_SIZE;}LOGGER.info("batch calculate, batchIndex: {}, threadNum: {}.", batchIndex, threadNum);// 這里 CyclicBarrier 的構造函數的參數 parties 表示需要攔截的線程的數量,這里有threadNum個子線程,1個主線程CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum + 1);for (int threadIndex = 0; threadIndex < threadNum; threadIndex++) {// 當前數據的index需要單獨計數final int index = threadIndex + batchIndex * CORE_POOL_SIZE;THREAD_POOL_EXECUTOR.execute(() -> calculate(cyclicBarrier, index));}try {// 當前批次,等待所有子線程任務執行完成,超時時間為1分鐘LOGGER.info("waiting all thread in this batch, batchIndex: {}.", batchIndex);cyclicBarrier.await(1, TimeUnit.MINUTES);LOGGER.info("<{}> has passed barrier in this batch, batchIndex: {}.", Thread.currentThread().getName(), batchIndex);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);}}LOGGER.info("finish calculate.");int sum = countMap.values().stream().mapToInt(Integer::intValue).sum();LOGGER.info("sum: {}.", sum);
}
參考日志:
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:168 - batch calculate, batchIndex: 0, threadNum: 5.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:178 - waiting all thread in this batch, batchIndex: 0.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-1> index: 0, randomSleepSeconds: 3.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-3> index: 2, randomSleepSeconds: 8.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-5> index: 4, randomSleepSeconds: 8.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-4> index: 3, randomSleepSeconds: 8.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-2> index: 1, randomSleepSeconds: 7.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-1> index: 0, waiting other thread.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-2> index: 1, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-3> index: 2, waiting other thread.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-5> index: 4, waiting other thread.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-4> index: 3, waiting other thread.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:181 - <main> has passed barrier in this batch, batchIndex: 0.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:168 - batch calculate, batchIndex: 1, threadNum: 5.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:178 - waiting all thread in this batch, batchIndex: 1.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-1> index: 0, has passed barrier.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-1> index: 5, randomSleepSeconds: 8.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-2> index: 1, has passed barrier.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-3> index: 2, has passed barrier.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-3> index: 7, randomSleepSeconds: 8.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-2> index: 6, randomSleepSeconds: 8.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-5> index: 4, has passed barrier.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-5> index: 8, randomSleepSeconds: 5.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-4> index: 3, has passed barrier.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-4> index: 9, randomSleepSeconds: 3.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-4> index: 9, waiting other thread.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-5> index: 8, waiting other thread.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-2> index: 6, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-3> index: 7, waiting other thread.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-1> index: 5, waiting other thread.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:181 - <main> has passed barrier in this batch, batchIndex: 1.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-4> index: 9, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:168 - batch calculate, batchIndex: 2, threadNum: 3.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:178 - waiting all thread in this batch, batchIndex: 2.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-5> index: 8, has passed barrier.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-5> index: 10, randomSleepSeconds: 2.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-3> index: 7, has passed barrier.
[pool-2-thread-2] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-2> index: 6, has passed barrier.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-3> index: 12, randomSleepSeconds: 5.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:67 - <pool-2-thread-4> index: 11, randomSleepSeconds: 9.
[pool-2-thread-1] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-1> index: 5, has passed barrier.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-5> index: 10, waiting other thread.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-3> index: 12, waiting other thread.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:79 - <pool-2-thread-4> index: 11, waiting other thread.
[pool-2-thread-5] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-5> index: 10, has passed barrier.
[pool-2-thread-3] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-3> index: 12, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:181 - <main> has passed barrier in this batch, batchIndex: 2.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:191 - finish calculate.
[pool-2-thread-4] INFO com.ysx.utils.concurrent.CyclicBarrierTest:88 - <pool-2-thread-4> index: 11, has passed barrier.
[main] INFO com.ysx.utils.concurrent.CyclicBarrierTest:193 - sum: 82.
日志說明:
- 分批處理,每個批次內部是并發執行的。
- 在所有批次執行完成之后,再匯總計算結果。
源代碼地址
- github
- gitee