限流盡可能在滿足需求的情況下越簡單越好!
分布式限流是指在分布式系統中對請求進行限制,以防止系統過載或濫用資源。以下是常見的分布式限流策略及其實現方式:
1、基于 Redis 的固定窗口限流
原理:
- 設定一個時間窗口(如 1 秒)
- 使用 Redis 維護一個計數器,存儲當前窗口的請求數
- 當請求到來時,
INCR
計數器,如果超過閾值則拒絕 - 過期后自動刪除鍵,進入下一個窗口
優缺點: ? 簡單易實現
? 在窗口交界處可能會出現短時間的突發流量("臨界突增")
public class RedisRateLimiter {private final StringRedisTemplate redisTemplate;// 命令前綴private final String key;private final int rate;private final int window;public RedisRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {this.redisTemplate = redisTemplate;this.key = key;this.rate = rate;this.window = window;}// 檢查并獲取令牌public boolean acquire() {String currentKey = key + "_" + (getCurrentSeconds() / window);Long currentCount = redisTemplate.opsForValue().increment(currentKey);redisTemplate.expire(currentKey, window, TimeUnit.SECONDS);log.info("當前獲取到的令牌數 key {} count {} result {} ",currentKey,currentCount,currentCount > rate);if (currentCount > rate){return false;}return true;}private long getCurrentSeconds() {return System.currentTimeMillis()/1000;}public void acquireSleep() {int count = 0;while (!acquire()){sleep(1);count++;}}private void sleep(int second) {try {TimeUnit.SECONDS.sleep(second);} catch (InterruptedException e) {e.printStackTrace();}}public boolean acquireSleep(int waitSecond) {int count = 0;while (!acquire()){if (count >= waitSecond){return false;}sleep(1);count++;log.info("RedisRateLimiter[{}] try acquire sleep {}",key,count);}return true;}public static void main(String[] args) throws InterruptedException {ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);logger.setLevel(Level.OFF);StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();RedisRateLimiter redisRateLimiter = new RedisRateLimiter(stringRedisTemplate,"request_interface",16,10);// 模擬 50 個并發線程,每個線程嘗試獲取 10 次令牌final int threadCount = 50;ExecutorService executor = Executors.newFixedThreadPool(threadCount);CountDownLatch latch = new CountDownLatch(threadCount);for (int i = 0; i < threadCount; i++) {executor.submit(() -> {// 每個線程嘗試多次調用限流方法for (int j = 0; j < 10; j++) {redisRateLimiter.acquireSleep();System.out.println("當前線程:"+Thread.currentThread().getName()+",獲取到令牌,時間"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));// 模擬每次請求間隔 100 毫秒redisRateLimiter.milliseconds(100);}latch.countDown();});}latch.await();executor.shutdown();}private static StringRedisTemplate getStringRedisTemplate() {// 1. 創建單機模式的配置RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("127.0.0.1");redisStandaloneConfiguration.setPort(6379);// 2. 構造 LettuceConnectionFactory,并初始化LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);factory.afterPropertiesSet(); // 初始化連接工廠// 3. 創建 StringRedisTemplate 并設置連接工廠StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(factory);stringRedisTemplate.afterPropertiesSet(); // 初始化模板return stringRedisTemplate;}private void milliseconds(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}}
main方法中的計算結果可以看到在并發環境下嚴格的執行10s16次請求(也就是1分鐘96次請求),這個就有個弊端,在并發環境下他們一拿到令牌同一秒就執行請求了。這個就是突發流量。
我的業務就是1分鐘允許請求100次對方接口,像這種雖然嚴格按照1分鐘不超過100次請求但是有突發流量對方還是返回了頻率過高,可能對方計算頻率方式不一樣吧。所以這種方式不太可取。
當前線程:pool-1-thread-30,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-6,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-18,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-35,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-38,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-37,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-33,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-44,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-3,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-45,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-5,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-20,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-11,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-43,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-15,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-29,獲取到令牌,時間2025-03-15 00:17:11
當前線程:pool-1-thread-4,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-16,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-12,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-24,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-26,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-8,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-14,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-49,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-42,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-21,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-1,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-10,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-31,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-50,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-36,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-48,獲取到令牌,時間2025-03-15 00:17:20
當前線程:pool-1-thread-9,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-19,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-47,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-2,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-34,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-46,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-41,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-22,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-17,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-27,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-28,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-32,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-25,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-13,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-40,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-23,獲取到令牌,時間2025-03-15 00:17:30
當前線程:pool-1-thread-39,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-7,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-34,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-13,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-2,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-22,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-28,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-46,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-25,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-19,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-9,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-32,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-39,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-17,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-47,獲取到令牌,時間2025-03-15 00:17:40
當前線程:pool-1-thread-41,獲取到令牌,時間2025-03-15 00:17:40
2. 基于 Redis 的滑動窗口限流
原理:
- 維護一個基于時間的列表(ZSET,有序集合)
- 每次請求時,記錄當前時間戳到 ZSET
- 刪除超出窗口時間范圍的請求
- 統計 ZSET 中當前窗口內的請求數,超出閾值則拒絕
優缺點: ? 解決了固定窗口的臨界突增問題
? 存儲和計算成本比固定窗口稍高
原理說明
- 利用 Redis 的有序集合(ZSet),以請求的時間戳作為 score,每個請求入隊一個唯一的 member(例如時間戳+UUID)。
- 每次請求時,先移除時間窗口外的記錄(score 小于當前時間減去窗口長度)。
- 統計當前窗口內的請求數量,若數量超過設定閾值,則拒絕請求。
@Slf4j
public class RedisSlidingWindowRateLimiter {private final StringRedisTemplate redisTemplate;private final String key;private final int rate;private final int window; // 窗口長度,單位秒public RedisSlidingWindowRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {this.redisTemplate = redisTemplate;this.key = key;this.rate = rate;// 限制窗口長度在 1 分鐘以內Assert.isTrue(window > 0 && window <= 60, "窗口只支持一分鐘內");this.window = window;}// 檢查并獲取令牌public boolean acquire() {long now = System.currentTimeMillis();// 計算窗口起始時間(單位毫秒)long windowStart = now - window * 1000;// 移除過期的請求記錄redisTemplate.opsForZSet().removeRangeByScore(key, 0, windowStart);// 添加當前請求記錄,member 用當前時間戳加 UUID 保證唯一性,score 為當前時間String member = now + "_" + UUID.randomUUID().toString();redisTemplate.opsForZSet().add(key, member, now);// 統計當前窗口內的請求數量Long count = redisTemplate.opsForZSet().count(key, windowStart, now);// 為了避免 key 永不過期,設置一個過期時間(窗口長度)redisTemplate.expire(key, window, TimeUnit.SECONDS);if (count != null && count > rate) {return false;}return true;}// 采用輪詢方式等待獲取令牌public void acquireSleep() {int count = 0;while (!acquire()){ThreadUtil.sleep(1, TimeUnit.SECONDS);count++;log.info("RedisSlidingWindowRateLimiter[{}] try acquire sleep {}", key, count);}}public boolean acquireSleep(int waitSecond) {int count = 0;while (!acquire()){if (count >= waitSecond){return false;}ThreadUtil.sleep(1, TimeUnit.SECONDS);count++;log.info("RedisSlidingWindowRateLimiter[{}] try acquire sleep {}", key, count);}return true;}
}
代碼說明
- 移除過期記錄:調用
removeRangeByScore
清理掉窗口外的請求數據。 - 添加當前請求:將當前請求的時間戳與 UUID 組合后添加到 ZSet 中,score 為當前時間,確保在滑動窗口內計數。
- 統計計數:通過
count
方法統計當前窗口內的請求數,如果超出限制則返回 false。
3. 基于 Redis 的令牌桶限流
原理:
- 設定一個容量為
max_tokens
的令牌桶,初始裝滿 - 以固定速率向桶中添加令牌(如每秒 10 個)
- 每次請求需要消耗一個令牌,沒有令牌時拒絕請求
- 通常使用 Redis 的
Lua
腳本實現原子操作
優缺點: ? 更加平滑,支持突發流量
? 需要額外的定時任務或后臺線程補充令牌
原理說明
- 令牌桶算法中,設定一個桶最大容量
capacity
,同時以一定速率refillRate
補充令牌。 - 每次請求需要消耗一個令牌,若當前桶內令牌不足,則拒絕請求。
- 為保證原子性,利用 Redis 的 Lua 腳本將令牌獲取和補充過程封裝為原子操作。
@Slf4j
public class RedisTokenBucketRateLimiter {private final StringRedisTemplate redisTemplate;private final String key;// 桶的容量(最大令牌數)private final int capacity;// 令牌補充速率,單位:個/秒private final double refillRate;// Lua 腳本,用于原子化處理令牌桶邏輯private static final String LUA_SCRIPT = "local tokens_key = KEYS[1] .. ':tokens' \n" +"local timestamp_key = KEYS[1] .. ':ts' \n" +"local capacity = tonumber(ARGV[1]) \n" +"local refill_rate = tonumber(ARGV[2]) \n" +"local current_time = tonumber(ARGV[3]) \n" +"local requested = tonumber(ARGV[4]) \n" +"local tokens = tonumber(redis.call('get', tokens_key) or capacity) \n" +"local last_refill = tonumber(redis.call('get', timestamp_key) or current_time) \n" +"local delta = current_time - last_refill \n" +"local tokens_to_add = delta * refill_rate \n" +"tokens = math.min(capacity, tokens + tokens_to_add) \n" +"if tokens < requested then \n" +" return 0 \n" +"else \n" +" tokens = tokens - requested \n" +" redis.call('set', tokens_key, tokens) \n" +" redis.call('set', timestamp_key, current_time) \n" +" return 1 \n" +"end";public RedisTokenBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double refillRate) {this.redisTemplate = redisTemplate;this.key = key;this.capacity = capacity;this.refillRate = refillRate;}// 檢查并獲取令牌public boolean acquire() {// 當前時間(單位秒)long currentTime = System.currentTimeMillis() / 1000;// 請求消耗 1 個令牌Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {List<byte[]> keys = Collections.singletonList(key.getBytes());List<byte[]> args = Arrays.asList(String.valueOf(capacity).getBytes(),String.valueOf(refillRate).getBytes(),String.valueOf(currentTime).getBytes(),"1".getBytes());return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.size(), keys.toArray(new byte[0][]), args.toArray(new byte[0][]));});return result != null && result == 1;}
}
代碼說明
- Lua 腳本邏輯:
- 獲取當前桶中剩余令牌數和上次補充時間,若不存在則默認初始化為滿桶狀態。
- 根據當前時間與上次更新時間的差值計算應補充的令牌數,并更新桶內令牌。
- 判斷是否有足夠令牌供本次請求(默認請求 1 個令牌),若不足返回 0,否則扣減令牌并更新上次補充時間,返回 1。
- 原子執行:通過 redisTemplate 的
eval
方法保證 Lua 腳本的原子性,避免并發問題。
4. 基于 Redis 的漏桶限流
原理:
- 設定一個隊列模擬漏桶
- 按固定速率從隊列取出請求執行
- 請求過多時,超出隊列長度的請求被丟棄
優缺點: ? 輸出速率穩定,不受突發流量影響
? 可能會丟棄部分流量
原理說明
- 漏桶算法中,將請求看作向桶中注入的“水”,桶以固定速率漏水(處理請求)。
- 當桶中水量超過預設容量時,則拒絕新請求。
- 同樣利用 Lua 腳本保證原子操作。
@Slf4j
public class RedisLeakyBucketRateLimiter {private final StringRedisTemplate redisTemplate;private final String key;// 桶的容量(允許的最大突發請求數)private final int capacity;// 漏水速率,單位:個/秒,表示每秒可處理的請求數private final double leakRate;// Lua 腳本,用于原子化處理漏桶邏輯private static final String LUA_SCRIPT = "local level_key = KEYS[1] .. ':level' \n" +"local timestamp_key = KEYS[1] .. ':ts' \n" +"local capacity = tonumber(ARGV[1]) \n" +"local leak_rate = tonumber(ARGV[2]) \n" +"local current_time = tonumber(ARGV[3]) \n" +"local level = tonumber(redis.call('get', level_key) or '0') \n" +"local last_time = tonumber(redis.call('get', timestamp_key) or current_time) \n" +"local delta = current_time - last_time \n" +"local leaked = delta * leak_rate \n" +// 計算漏水后桶內水量,不能低于 0"level = math.max(0, level - leaked) \n" +"if level + 1 > capacity then \n" +" return 0 \n" +"else \n" +" level = level + 1 \n" +" redis.call('set', level_key, level) \n" +" redis.call('set', timestamp_key, current_time) \n" +" return 1 \n" +"end";public RedisLeakyBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double leakRate) {this.redisTemplate = redisTemplate;this.key = key;this.capacity = capacity;this.leakRate = leakRate;}// 檢查并獲取請求處理資格public boolean acquire() {// 當前時間(單位秒)long currentTime = System.currentTimeMillis() / 1000;Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {List<byte[]> keys = Collections.singletonList(key.getBytes());List<byte[]> args = Arrays.asList(String.valueOf(capacity).getBytes(),String.valueOf(leakRate).getBytes(),String.valueOf(currentTime).getBytes());return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.size(), keys.toArray(new byte[0][]), args.toArray(new byte[0][]));});return result != null && result == 1;}
}
代碼說明
- Lua 腳本邏輯:
- 從 Redis 中獲取當前桶內水量(即請求數量)和上次更新的時間。
- 根據當前時間與上次更新時間的差值和設定的漏水速率計算“漏掉”的水量,并更新桶內水量(不能低于 0)。
- 判斷加入當前請求后是否超過桶的容量,超過則返回 0(拒絕),否則將水量加 1 并更新記錄,返回 1 表示允許。
- 原子執行:同樣通過
eval
方法保證操作原子性,避免并發修改問題。
總結
- 滑動窗口:使用 Redis ZSet 記錄請求時間戳,動態統計窗口內請求數,平滑控制突發流量。
- 令牌桶:通過 Lua 腳本實現令牌的自動補充和扣減,支持一定的突發請求。
- 漏桶:用固定漏水速率保證請求以均勻的速率被處理,避免瞬間大量請求。