在 Java 并發編程領域,JDK 提供的工具類是簡化多線程協作的重要武器。這些工具類基于 AQS(AbstractQueuedSynchronizer)框架實現,封裝了復雜的同步邏輯,讓開發者無需深入底層即可實現高效的線程協作。本文作為并發工具類系列的第一篇,將重點解析CountDownLatch和Semaphore的核心原理、典型使用場景及實戰案例,幫助開發者掌握其在多線程協作中的應用技巧。
一、CountDownLatch:等待多線程完成的計數器
CountDownLatch(倒計時門閂)是一種經典的線程同步工具,其核心功能是讓一個或多個線程等待其他線程完成指定操作后再繼續執行。它通過一個遞減的計數器實現線程間的協調,計數器歸零時,所有等待的線程將被喚醒。
1.1 核心原理與方法解析
CountDownLatch的設計基于 “計數器 + 等待喚醒” 機制,核心方法如下:
方法 | 功能描述 |
CountDownLatch(int count) | 構造方法,初始化計數器值(count為需要等待的線程操作數,必須≥0) |
void await() | 調用線程進入阻塞狀態,直至計數器歸 0 或被中斷 |
boolean await(long timeout, TimeUnit unit) | 帶超時時間的等待,超時后無論計數器是否歸 0,線程都會喚醒并返回false |
void countDown() | 將計數器值減 1,當值為 0 時,喚醒所有因await()阻塞的線程 |
關鍵特性:CountDownLatch的計數器是一次性的,一旦歸 0,后續調用countDown()不會再改變其狀態,因此無法重復使用。
1.2 典型場景:主線程等待子線程初始化完成
在大型應用啟動過程中,主線程往往需要等待多個初始化任務(如加載配置文件、初始化數據庫連接、預熱緩存等)完成后才能啟動核心業務。CountDownLatch完美適配這種 “等待多任務完成” 的場景。
實戰案例:
public class SystemInitDemo {// 初始化計數器,需等待3個核心任務完成private static final CountDownLatch initLatch = new CountDownLatch(3);public static void main(String[] args) throws InterruptedException {System.out.println("系統啟動:開始等待初始化任務...");// 啟動配置加載任務new Thread(() -> {try {System.out.println("配置加載任務:開始加載系統配置...");Thread.sleep(1500); // 模擬配置加載耗時System.out.println("配置加載任務:完成加載");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown(); // 任務完成,計數器減1}}, "配置線程").start();// 啟動數據庫連接任務new Thread(() -> {try {System.out.println("數據庫任務:開始建立連接池...");Thread.sleep(2000); // 模擬連接池初始化耗時System.out.println("數據庫任務:連接池初始化完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown();}}, "數據庫線程").start();// 啟動緩存預熱任務new Thread(() -> {try {System.out.println("緩存任務:開始預熱熱點數據...");Thread.sleep(1000); // 模擬緩存預熱耗時System.out.println("緩存任務:熱點數據預熱完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {initLatch.countDown();}}, "緩存線程").start();// 主線程等待所有初始化任務完成initLatch.await();System.out.println("系統啟動:所有初始化任務完成,啟動核心服務...");}
}
運行結果:
系統啟動:開始等待初始化任務...配置加載任務:開始加載系統配置...數據庫任務:開始建立連接池...緩存任務:開始預熱熱點數據...緩存任務:熱點數據預熱完成配置加載任務:完成加載數據庫任務:連接池初始化完成系統啟動:所有初始化任務完成,啟動核心服務...
案例解析:
- 主線程通過initLatch.await()阻塞等待,直到 3 個初始化線程都調用countDown()使計數器歸 0;
- 即使各任務執行時間不同(數據庫任務耗時最長),主線程也會等待所有任務完成后再繼續,確保系統啟動的完整性;
- finally塊中調用countDown()保證即使任務異常,計數器也能正確遞減,避免主線程無限等待。
1.3 反向應用:子線程等待主線程指令
CountDownLatch不僅能讓主線程等待子線程,還能通過反向設計實現 “子線程等待主線程信號”。例如在并發測試中,讓所有測試線程準備就緒后,等待主線程發出 “開始” 指令,確保所有線程同時執行測試代碼,消除啟動順序帶來的誤差。
示例代碼:
public class ConcurrentTestDemo {// 計數器初始化為1,代表主線程的"開始"信號private static final CountDownLatch startSignal = new CountDownLatch(1);// 記錄并發執行結果private static final AtomicInteger result = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {int threadCount = 5; // 并發線程數// 啟動5個測試線程for (int i = 0; i < threadCount; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ":準備就緒,等待開始信號");startSignal.await(); // 等待主線程指令// 收到信號后執行并發操作result.incrementAndGet();System.out.println(Thread.currentThread().getName() + ":執行完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}, "測試線程-" + i).start();}// 主線程準備3秒后發出開始信號Thread.sleep(3000);System.out.println("主線程:發出開始信號");startSignal.countDown(); // 計數器歸0,喚醒所有測試線程// 等待所有測試線程完成(實際場景可再用一個CountDownLatch)Thread.sleep(1000);System.out.println("所有線程執行完成,最終結果:" + result.get()); // 預期結果為5}
}
核心價值:通過startSignal確保所有子線程在同一時間點開始執行,真實模擬高并發場景,提升測試準確性。
二、Semaphore:控制資源并發訪問的信號量
Semaphore(信號量)是用于控制資源并發訪問數量的工具類,它通過維護一組 “許可”(permit)實現對資源的限流。線程需要先獲取許可才能訪問資源,訪問結束后釋放許可,供其他線程使用。
2.1 核心原理與方法解析
Semaphore的核心是 “許可管理”,通過控制許可數量限制并發線程數,核心方法如下:
方法 | 功能描述 |
Semaphore(int permits) | 構造方法,初始化許可數量(permits為允許同時訪問的線程數) |
Semaphore(int permits, boolean fair) | 帶公平性參數的構造方法,fair=true時按線程請求順序分配許可 |
void acquire() | 獲取 1 個許可,若暫時無可用許可,線程會阻塞等待 |
boolean tryAcquire() | 嘗試獲取 1 個許可,立即返回結果(成功true/ 失敗false),不阻塞 |
boolean tryAcquire(long timeout, TimeUnit unit) | 超時嘗試獲取許可,超時未獲取則返回false |
void release() | 釋放 1 個許可,將其歸還給信號量 |
int availablePermits() | 返回當前可用的許可數量 |
關鍵特性:Semaphore的許可數量可以動態調整,release()方法可在未獲取許可的情況下釋放,從而增加總許可數(需謹慎使用)。
2.2 典型場景:資源池的并發訪問控制
數據庫連接池、線程池等資源池場景中,資源數量有限,Semaphore可用于限制同時訪問資源的線程數,防止因資源耗盡導致的系統異常。
實戰案例:
public class ConnectionPoolDemo {// 數據庫連接池(模擬10個連接)private static final int POOL_SIZE = 10;private static final List<Connection> connectionPool = new ArrayList<>(POOL_SIZE);// 信號量控制并發訪問,許可數等于連接池大小private static final Semaphore semaphore = new Semaphore(POOL_SIZE, true); // 公平模式// 初始化連接池static {for (int i = 0; i < POOL_SIZE; i++) {connectionPool.add(new MockConnection("連接-" + (i + 1)));}}// 獲取數據庫連接public static Connection getConnection() throws InterruptedException {semaphore.acquire(); // 獲取許可(若連接池滿則等待)synchronized (connectionPool) {return connectionPool.remove(0); // 從池內取出連接}}// 釋放數據庫連接public static void releaseConnection(Connection connection) {if (connection != null) {synchronized (connectionPool) {connectionPool.add(connection); // 連接放回池內}semaphore.release(); // 釋放許可}}// 模擬數據庫連接類static class MockConnection {private String name;MockConnection(String name) { this.name = name; }@Overridepublic String toString() { return name; }}public static void main(String[] args) {// 模擬20個線程并發請求連接for (int i = 0; i < 20; i++) {new Thread(() -> {Connection conn = null;try {conn = getConnection();System.out.println(Thread.currentThread().getName() + "獲取到" + conn + ",當前可用許可:" + semaphore.availablePermits());Thread.sleep(1000); // 模擬數據庫操作耗時} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {releaseConnection(conn);if (conn != null) {System.out.println(Thread.currentThread().getName() + "釋放了" + conn + ",當前可用許可:" + semaphore.availablePermits());}}}, "業務線程-" + i).start();}}
}
運行結果片段:
業務線程-0獲取到連接-1,當前可用許可:9業務線程-1獲取到連接-2,當前可用許可:8...業務線程-9獲取到連接-10,當前可用許可:0// 此時許可耗盡,線程10-19進入等待業務線程-0釋放了連接-1,當前可用許可:1業務線程-10獲取到連接-1,當前可用許可:0
...
案例解析:
- Semaphore通過 10 個許可限制同時使用連接的線程數,與連接池容量匹配,避免資源過度占用;
- 公平模式(fair=true)確保線程按請求順序獲取許可,減少饑餓現象;
- getConnection()和releaseConnection()通過同步塊保證連接池操作的線程安全,結合信號量實現完整的資源管控。
2.3 擴展場景:接口限流與流量控制
Semaphore可用于接口限流,通過控制單位時間內的請求數保護系統穩定。例如限制某 API 每秒最多處理 100 個請求,超出部分直接拒絕或排隊等待。
示例代碼:
public class ApiRateLimiter {private final Semaphore semaphore;private final int maxRequestsPerSecond; // 每秒最大請求數public ApiRateLimiter(int maxRequestsPerSecond) {this.maxRequestsPerSecond = maxRequestsPerSecond;this.semaphore = new Semaphore(maxRequestsPerSecond);// 定時任務:每秒重置許可數量ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {int permitsToRelease = maxRequestsPerSecond - semaphore.availablePermits();if (permitsToRelease > 0) {semaphore.release(permitsToRelease); // 補充許可至上限}}, 1, 1, TimeUnit.SECONDS);}// 嘗試訪問APIpublic boolean tryAccess() {return semaphore.tryAcquire();}public static void main(String[] args) {// 限制每秒最多5個請求ApiRateLimiter limiter = new ApiRateLimiter(5);// 模擬10個并發請求for (int i = 0; i < 10; i++) {new Thread(() -> {if (limiter.tryAccess()) {System.out.println(Thread.currentThread().getName() + ":API訪問成功");} else {System.out.println(Thread.currentThread().getName() + ":API訪問被限流");}}, "請求線程-" + i).start();}}
}
運行結果:
請求線程-0:API訪問成功
請求線程-1:API訪問成功
請求線程-2:API訪問成功
請求線程-3:API訪問成功
請求線程-4:API訪問成功
請求線程-5:API訪問被限流
請求線程-6:API訪問被限流
...
限流原理:通過定時任務每秒補充許可,使Semaphore的許可數始終維持在maxRequestsPerSecond,從而實現固定速率的流量控制。
三、CountDownLatch 與 Semaphore 的對比與協同
特性 | CountDownLatch | Semaphore |
核心功能 | 等待多個線程完成操作 | 控制并發訪問資源的線程數 |
計數器特性 | 一次性遞減,歸 0 后不可重置 | 可重復獲取和釋放,動態調整 |
典型場景 | 初始化協調、并發測試同步 | 資源池控制、接口限流 |
線程協作方向 | 多線程→主線程(或反之) | 線程間競爭資源 |
協同案例:在分布式任務調度中,可用CountDownLatch等待所有任務節點準備就緒,再用Semaphore控制同時執行任務的節點數,實現 “先同步準備,再限流執行” 的流程。
總結
CountDownLatch和Semaphore是解決多線程協作問題的利器:CountDownLatch通過計數器實現線程間的等待協調,適合初始化、測試同步等場景;Semaphore通過許可管理控制資源并發訪問,適合資源池、限流等場景。掌握這兩個工具類的核心原理和使用技巧,能顯著提升并發編程的效率和可靠性。
下一篇將介紹CyclicBarrier、Phaser等其他常用工具類,敬請期待。