文章目錄
- 1. 背景說明
- 2. API與方法
- 3. 示例代碼
- 3.1 基礎工具方法
- 3.2 測試任務類
- 3.3 測試和統計方法
- 3.4 測試兩種模式的限頻器
- 3.5 測試緩沖時間與等待耗時
- 4. 完整的測試代碼
- 5. 簡單小結
1. 背景說明
高并發應用場景有3大利器: 緩存、限流、熔斷。
也有說4利器的: 緩存、限流、熔斷、降級。
每一種技術都有自己的適用場景,也有很多使用細節和注意事項。
本文主要介紹 Guava 工具庫中的限頻器(RateLimiter), 也可以稱之為限流器。
限流技術可以簡單分為兩類:
- 限制TPS, 也就是每秒的業務請求數。 有時候也可以用 QPS 來表示, 即每秒請求數。
- 限制并發數, 也就是同一時刻處理的最大并發請求數。 常用的技術,包括線程池+等待隊列,或者簡單使用 信號量(Semaphore) 來進行控制。
限頻器(RateLimiter)的適用場景:
限制客戶端每秒訪問服務器的次數。
可以在單個接口使用,
也可以對多個接口使用,
甚至我們還可以使用注解與參數,通過AOP切面進行靈活的編程。(本文不介紹)
2. API與方法
guava工具庫的MAVEN依賴為:
<properties><guava.version>33.1.0-jre</guava.version>
</properties><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version>
</dependency>
主要的類結構和方法如下所示:
package com.google.common.util.concurrent;public abstract class RateLimiter {// 1. 內部實現創建的是 SmoothBursty 模式的限頻器// permitsPerSecond 參數就是每秒允許的授權數量public static RateLimiter create(double permitsPerSecond)//...// 2. 內部實現創建的是 SmoothWarmingUp 模式的限頻器// 傳入預熱的時間: 在預熱期間內, 每秒發放的許可數比 permitsPerSecond 少// 主要用于保護服務端, 避免剛啟動就被大量的請求打死。public static RateLimiter create(double permitsPerSecond,Duration warmupPeriod) // ...public static RateLimiter create(double permitsPerSecond,long warmupPeriod, TimeUnit unit) //...// 3. 使用過程中, 支持動態修改每秒限頻次數public final void setRate(double permitsPerSecond) // ...// 4. 獲取許可; 拿不到就死等;public double acquire()// ...public double acquire(int permits)//...// 5. 嘗試獲取許可 public boolean tryAcquire()//...public boolean tryAcquire(int permits)//...// 5.1 重點在這里; 嘗試獲取許可時, 可以設置一個容許的緩沖時間;// 使用場景是: 放過短時間內的, 聚簇的, 一定數量的請求;// 比如: n毫秒內, 接連來了m個請求; // 如果這m個請求都需要放過, 就需要設置一定的緩沖時間;// 參見下文的測試代碼;public boolean tryAcquire(Duration timeout)//...public boolean tryAcquire(long timeout, TimeUnit unit)//...public boolean tryAcquire(int permits, Duration timeout)//...public boolean tryAcquire(int permits, long timeout, TimeUnit unit)//...
}// 平滑限頻器
abstract class SmoothRateLimiter extends RateLimiter {static final class SmoothBursty extends SmoothRateLimiter {}// 平滑預熱: 顧名思義, 需要一個預熱時間才能到達static final class SmoothWarmingUp extends SmoothRateLimiter {}
}
3. 示例代碼
這部分依次介紹我們的示例代碼。
3.1 基礎工具方法
下面是一些基礎工具方法:
// 睡眠一定的毫秒數private static void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}// 打印控制臺日志private static void println(String msg) {System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]" + msg);}
3.2 測試任務類
下面是一個測試任務類, 內部使用了 RateLimiter#tryAcquire
方法。
private static class RateLimiterJob implements Runnable {//CountDownLatch latch;RateLimiter rateLimiter;// 結果StringBuilder resultBuilder = new StringBuilder();AtomicInteger passedCounter = new AtomicInteger();AtomicInteger rejectedCounter = new AtomicInteger();public RateLimiterJob(int taskCount, RateLimiter rateLimiter) {this.latch = new CountDownLatch(taskCount);this.rateLimiter = rateLimiter;}@Overridepublic void run() {//boolean passed = rateLimiter.tryAcquire(1, 5, TimeUnit.MILLISECONDS);if (passed) {passedCounter.incrementAndGet();resultBuilder.append("1");} else {rejectedCounter.incrementAndGet();resultBuilder.append("-");}//latch.countDown();}}
也加上了一些并發控制的手段和統計方法, 以方便我們進行測試:
3.3 測試和統計方法
真正的測試和統計方法為:
private static ExecutorService executorService = Executors.newFixedThreadPool(8, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("RateLimiter-1");return t;}});private static String metrics(RateLimiter rateLimiter, int taskCount) {long startMillis = System.currentTimeMillis();// 休息1SrateLimiter.tryAcquire();sleep(1_000);//RateLimiterJob job = new RateLimiterJob(taskCount, rateLimiter);for (int i = 0; i < taskCount; i++) {sleep(10);executorService.submit(job);}// 等待結果try {job.latch.await();} catch (InterruptedException e) {e.printStackTrace();}long costMillis = System.currentTimeMillis() - startMillis;//String result = job.resultBuilder.toString();result = result + "[passed=" + job.passedCounter.get() +", rejected=" + job.rejectedCounter.get() + "]"+ "[耗時=" + costMillis + "ms]";return result;}
這里創建了一個并發線程池, 用來模擬多個并發請求客戶端, 也保證了短時間內有一定的聚簇流量。
metrics 方法, 對 rateLimiter 進行一定數量的任務測試, 并返回統計結果;
3.4 測試兩種模式的限頻器
下面的代碼, 測試兩種模式的限頻器:
private static void testRateLimit() {//double permitsPerSecond = 20D;int taskCount = 100;println("========================================");// 1. SmoothBursty 模式的限頻器: 平滑分配token, 可以看代碼實現RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);// 111111111111111111111111111-1---1---1--1---1---1---1// ---1---1---1----1--1---1---1----1--1---1---1---1// [passed=46, rejected=54][耗時=2346ms]String result = metrics(rateLimiter, taskCount);println("1. SmoothBursty 模式的限頻器.result:==========" + result);println("========================================");// 2. SmoothWarmingUp 模式的限頻器: 系統需要預熱的話,最初的時候,放行的請求會比較少;rateLimiter = RateLimiter.create(permitsPerSecond, 1, TimeUnit.SECONDS);// 1-----------1----------1---------1---------1--------1// -------1------1-----1-----1----1---1---1---1---// [passed=14, rejected=86][耗時=2251ms]result = metrics(rateLimiter, taskCount);println("2. SmoothWarmingUp 模式的限頻器.result:==========" + result);println("========================================");}
我將輸出的內容放在了雙斜線注釋里面, 1
表示放行, -
表示拒絕。
可以看到:
- SmoothBursty 模式, 直接放過了前面的一定量的聚簇流量
- SmoothWarmingUp 模式, 開始時在預熱, 放過的請求較少, 預熱完成后正常放行和拒絕。
3.5 測試緩沖時間與等待耗時
下面的方法, 測試 tryAcquire
方法指定緩沖時間時, 會消耗多少時間等待。
private static void testRateLimitTimeout() {int permitsPerSecond = 500;RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);//int timeout = 50;int clusterCount = timeout * permitsPerSecond / 1000;AtomicInteger passedCount = new AtomicInteger(0);long startMillis = System.currentTimeMillis();long maxTimeoutMillis = 0;for (int i = 0; i < clusterCount; i++) {long beginMillis = System.currentTimeMillis();// 限頻時使用緩沖時間區間: 短暫放過聚集在一起的少量(并發)請求數: // 放過的數量為: timeout * permitsPerSecond/1000boolean passed = rateLimiter.tryAcquire(1, 50, TimeUnit.MILLISECONDS);if (passed) {passedCount.incrementAndGet();}long timeoutMillis = System.currentTimeMillis() - beginMillis;maxTimeoutMillis = Math.max(timeoutMillis, maxTimeoutMillis);}long costMillis = System.currentTimeMillis() - startMillis;// [2025-04-28 22:49:00]testRateLimitTimeout:// [clusterCount=25];[passedCount=25]println("testRateLimitTimeout:[clusterCount=" + clusterCount + "];[passedCount=" + passedCount.get() + "]");// [2025-04-28 22:49:00]testRateLimitTimeout:// 耗時:[costMillis=47][maxTimeoutMillis=3]println("testRateLimitTimeout:耗時:[costMillis=" +costMillis + "][maxTimeoutMillis=" + maxTimeoutMillis + "]");}
我們的測試條件為: timeout = 50; permitsPerSecond = 500
.
放過的聚簇流量公式為: timeout * permitsPerSecond/1000
可以看到, 測試結果里面的日志為:
[clusterCount=25];[passedCount=25]
符合我們的預期和計算。
等待耗時時間最大為 maxTimeoutMillis=3
, 這個等待時間還可以接受:
耗時:
[costMillis=47][maxTimeoutMillis=3]
我們使用時根據需要配置相關參數即可。
4. 完整的測試代碼
完整的測試代碼如下所示:
import com.google.common.util.concurrent.RateLimiter;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;// 測試限頻:
public class RateLimiterTimeoutTest {private static ExecutorService executorService = Executors.newFixedThreadPool(8, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("RateLimiter-1");return t;}});// 測試性能public static void main(String[] args) {testRateLimitTimeout();testRateLimit();}private static void testRateLimitTimeout() {int permitsPerSecond = 500;RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);//int timeout = 50;int clusterCount = timeout * permitsPerSecond / 1000;AtomicInteger passedCount = new AtomicInteger(0);long startMillis = System.currentTimeMillis();long maxTimeoutMillis = 0;for (int i = 0; i < clusterCount; i++) {long beginMillis = System.currentTimeMillis();// 限頻時使用緩沖時間區間: 短暫放過聚集在一起的少量(并發)請求數: // 放過的數量為: timeout * permitsPerSecond/1000boolean passed = rateLimiter.tryAcquire(1, 50, TimeUnit.MILLISECONDS);if (passed) {passedCount.incrementAndGet();}long timeoutMillis = System.currentTimeMillis() - beginMillis;maxTimeoutMillis = Math.max(timeoutMillis, maxTimeoutMillis);}long costMillis = System.currentTimeMillis() - startMillis;// [2025-04-28 22:49:00]testRateLimitTimeout:// [clusterCount=25];[passedCount=25]println("testRateLimitTimeout:[clusterCount=" + clusterCount + "];[passedCount=" + passedCount.get() + "]");// [2025-04-28 22:49:00]testRateLimitTimeout:// 耗時:[costMillis=47][maxTimeoutMillis=3]println("testRateLimitTimeout:耗時:[costMillis=" +costMillis + "][maxTimeoutMillis=" + maxTimeoutMillis + "]");}private static void testRateLimit() {//double permitsPerSecond = 20D;int taskCount = 100;println("========================================");// 1. SmoothBursty模式的限頻器: 平滑分配token, 可以看代碼實現RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);// 111111111111111111111111111-1---1---1--1---1---1---1// ---1---1---1----1--1---1---1----1--1---1---1---1// [passed=46, rejected=54][耗時=2346ms]String result = metrics(rateLimiter, taskCount);println("1. SmoothBursty 模式的限頻器.result:==========" + result);println("========================================");// 2. SmoothWarmingUp模式的限頻器: 系統需要預熱的話,最初的時候,放行的請求會比較少;rateLimiter = RateLimiter.create(permitsPerSecond, 1, TimeUnit.SECONDS);// 1-----------1----------1---------1---------1--------1// -------1------1-----1-----1----1---1---1---1---// [passed=14, rejected=86][耗時=2251ms]result = metrics(rateLimiter, taskCount);println("2. SmoothWarmingUp 模式的限頻器.result:==========" + result);println("========================================");}private static String metrics(RateLimiter rateLimiter, int taskCount) {long startMillis = System.currentTimeMillis();// 休息1SrateLimiter.tryAcquire();sleep(1_000);//RateLimiterJob job = new RateLimiterJob(taskCount, rateLimiter);for (int i = 0; i < taskCount; i++) {sleep(10);executorService.submit(job);}// 等待結果try {job.latch.await();} catch (InterruptedException e) {e.printStackTrace();}long costMillis = System.currentTimeMillis() - startMillis;//String result = job.resultBuilder.toString();result = result + "[passed=" + job.passedCounter.get() +", rejected=" + job.rejectedCounter.get() + "]"+ "[耗時=" + costMillis + "ms]";return result;}private static void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}private static void println(String msg) {System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]" + msg);}private static class RateLimiterJob implements Runnable {//CountDownLatch latch;RateLimiter rateLimiter;// 結果StringBuilder resultBuilder = new StringBuilder();AtomicInteger passedCounter = new AtomicInteger();AtomicInteger rejectedCounter = new AtomicInteger();public RateLimiterJob(int taskCount, RateLimiter rateLimiter) {this.latch = new CountDownLatch(taskCount);this.rateLimiter = rateLimiter;}@Overridepublic void run() {//boolean passed = rateLimiter.tryAcquire(1, 5, TimeUnit.MILLISECONDS);if (passed) {passedCounter.incrementAndGet();resultBuilder.append("1");} else {rejectedCounter.incrementAndGet();resultBuilder.append("-");}//latch.countDown();}}}
測試代碼總的只有100多行, 并不是很復雜。
5. 簡單小結
本文簡單介紹了Guava限頻器(RateLimiter)的用法。
使用要點是 tryAcquire 時需要給一定量的緩沖時間, 避免聚簇的少量請求被誤攔截。
我們的測試條件為: timeout = 50; permitsPerSecond = 500
.
放過的聚簇流量公式為: timeout * permitsPerSecond/1000
。