大綱
1.漏桶算法的實現對比
(1)普通思路的漏桶算法實現
(2)節省線程的漏桶算法實現
(3)Sentinel中的漏桶算法實現
(4)Sentinel中的漏桶算法與普通漏桶算法的區別
(5)Sentinel中的漏桶算法存在的問題
2.令牌桶算法的實現對比
(1)普通思路的令牌桶算法實現
(2)節省線程的令牌桶算法實現
(3)Guava中的令牌桶算法實現
(4)Sentinel中的令牌桶算法實現
(5)Sentinel中的令牌桶算法總結
1.漏桶算法的實現對比
(1)普通思路的漏桶算法實現
(2)節省線程的漏桶算法實現
(3)Sentinel中的漏桶算法實現
(4)Sentinel中的漏桶算法與普通漏桶算法的區別
(5)Sentinel中的漏桶算法存在的問題
(1)普通思路的漏桶算法實現
一.漏桶算法的處理流程
二.漏桶算法的主要特點
三.漏桶算法的優缺點
一.漏桶算法的處理流程
漏桶算法的核心思想是以固定速率流出。
步驟一:當新的請求到達時,會將新的請求放入緩沖區(請求隊列)中,類似于往水桶里注水。
步驟二:系統會以固定的速度處理緩沖區中的請求,類似于水從窟窿中以固定的速度流出,比如開啟一個后臺線程定時以固定的速度從緩沖區中取出請求然后進行分發處理。
步驟三:如果緩沖區已滿,則新的請求將被拒絕或丟棄,類似于水溢出。
二.漏桶算法的主要特點
特點一:固定速率
水從桶底的孔洞中以固定速率流出,類似于網絡中以固定速率發送數據包。但寫入速度不固定,也就是請求不是勻速產生的。相當于生產者生產消息不固定,消費者消費消息是勻速消費的。
特點二:有限容量
桶的容量有限,當桶滿時,新到達的水會溢出,即拒絕超過容量的請求。
特點三:先進先出(FIFO)
水按照先進先出的順序從桶中流出,類似于請求的處理順序。
這種算法的一個重要特性是:無論請求的接收速率如何變化,請求的處理速率始終是穩定的,這就確保了系統的負載不會超過預設的閾值。但是由于請求的處理速率是固定的,所以無法處理突發流量。此外如果入口流量過大,漏桶可能會溢出,導致請求丟失。
三.漏桶算法的優缺點
優點一:平滑流量
由于以固定的速率處理請求,所以可以有效地平滑和整形流量,避免流量的突發和波動,類似于消息隊列的削峰填谷的作用。
優點二:防止過載
當流入的請求超過桶的容量時,可以直接丟棄請求,防止系統過載。
缺點一:無法處理突發流量
由于漏桶的出口速度是固定的,無法處理突發流量。例如,即使在流量較小的時候,也無法以更快的速度處理請求。
缺點二:可能會丟失數據
如果入口流量過大,超過了桶的容量,那么就需要丟棄部分請求。在一些不能接受丟失請求的場景中,這可能是一個問題。
缺點三:不適合處理速率變化大的場景
如果處理速率變化大,或需要動態調整處理速率,則無法滿足。
漏桶算法適用于需要以固定速率處理請求的場景。在多數業務場景中,其實并不需要按照嚴格的速率進行請求處理。而且多數業務場景都需要應對突發流量的能力,所以會使用令牌桶算法。
(2)節省線程的漏桶算法實現
漏桶算法可以通過延遲計算的方式來實現。延遲計算指的是不需要單獨的線程來定時生成令牌或從漏桶中定時取請求,而是由調用限流器的線程自己計算是否有足夠的令牌以及需要sleep的時間。延遲計算的方式可以節省一個線程資源。
public class LeakyBucketLimiter {//桶的最大容量public static long threshold = 10;//當前桶內的水量public static long count = 0;//漏水速率(每秒5次)public static long leakRate = 5;//上次漏水時間public static long lastLeakTime = System.currentTimeMillis();//限流方法,返回true表示通過public boolean canPass() {//調用漏水方法this.leak();//判斷是否超過最大請求數量if (count < threshold) {count++;return true;}return false;}//漏水方法,計算并更新這段時間內漏水量private void leak() {//獲取系統當前時間long currentTime = System.currentTimeMillis();//計算這段時間內,需要流出的水量long leakWater = (currentTime - lastLeakTime) * leakRate / 1000;count = Math.max(count - leakWater, 0);//更新最近一次的漏水時間lastLeakTime = currentTime;}
}
(3)Sentinel中的漏桶算法實現
在RateLimiterController的canPass()方法中,為了判斷是否超出QPS閾值,通過原子類變量latestPassedTime簡化成單線程讓請求先后通過的處理模型。為了盡量讓業務不受Sentinel影響,采用預估請求的被處理時間點的方式。也就是無需等前面的請求完全被處理完,才確定后面的請求被處理的時間。因為在普通的漏桶算法中,是處理完一個請求,才從漏桶取出水滴。而RateLimiterController的漏桶算法,則是假設請求已經被通過了。
具體的判斷邏輯如下:首先獲取系統的當前時間currentTime。然后計算在滿足流控規則中限制的QPS閾值count的情況下,先后的兩個請求被允許通過時的最小時間間隔costTime。接著計算當前請求最早的預期通過時間expectedTime,也就是此次請求預計會在幾時幾分幾秒內通過。最后比較expectedTime和currentTime就可知當前請求是否允許通過了。
一.如果expectedTime小于等于currentTime
也就是當前請求最早的預期通過時間比系統當前時間小。如果在此時(currentTime)通過當前請求,則當前請求的通過時間就比它最早的預期通過時間(expectedTime)要晚,即當前請求和最近通過的請求的時間間隔變大了,所以此時不會超QPS閾值。于是返回true允許通過,同時更新最近允許請求通過的時間戳為當前時間。
二.如果expectedTime大于currentTime
也就是當前請求最早的預期通過時間比系統當前時間大。如果在此時(currentTime)通過當前請求,則當前請求的通過時間就比它最早的預期通過時間(expectedTime)要早,即當前請求和最近通過的請求的時間間隔變小了,比最小間隔時間costTime還小,所以此時必然會超QPS閾值。因此返回進行等待或者返回false不允許通過,等待的最小時間就是:最近通過請求的時間 + 先后兩個請求允許通過時的最小間隔時間 - 當前時間。
需要注意:Sentinel流量控制的漏桶算法,只能限制在costTime內的流量激增,限制不了costTime外的流量激增。比如系統啟動完一瞬間就涌入大量并發請求,此時的流量激增限制不了。又比如系統處理完正常流量的最后一個請求,隔了costTime+的時間后,突然涌入超QPS閾值的并發請求,此時也限制不了這種情況的流量激增。但如果系統處理完正常流量的最后一個請求,隔了costTime-的時間后,突然涌入超QPS閾值的并發請求,此時則可以限制這種情況的流量激增。
同時,為了避免等待的各個并發線程被同時喚醒,可以利用原子變量的addAndGet()方法 + 假設等待請求已被通過的方式,實現需要等待的并發請求進行睡眠等待的時間都不一樣,從而實現并發請求排隊等待的效果。
實現排隊等待效果的核心邏輯:由于latestPassedTime的原子性,每個線程都會獲得不一樣的oldTime。接著根據oldTime - 當前時間,就可以得到每個線程需要睡眠等待的時間waitTime。此時的waitTime都將會不一樣,從而避免并發線程同時被喚醒的情況。將latestPassedTime按costTime進行自增,其實相當于假設當前請求在不超過QPS閾值的情況下,被允許通過了。
public class RateLimiterController implements TrafficShapingController {//排隊等待的意思是超出閾值后等待一段時間,maxQueueingTimeMs就是請求在隊列中的最大等待時間private final int maxQueueingTimeMs;//流控規則中限制QPS的閾值,也就是QPS超出多少后會進行限制private final double count;//最近允許一個請求通過的時間,每次請求通過后就會更新此時間,可以根據該時間計算出當前請求最早的預期通過時間//注意:Sentinel是在業務前面的,盡量不要讓業務受到Sentinel的影響,所以不需要等請求完全被處理完,才確定請求被通過的時間private final AtomicLong latestPassedTime = new AtomicLong(-1);public RateLimiterController(int timeOut, double count) {this.maxQueueingTimeMs = timeOut;this.count = count;}@Overridepublic boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);}@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {//acquireCount代表每次從桶底流出多少個請求//如果acquireCount小于等于0,則表示無需限流直接通過,不過acquireCount一般默認是1if (acquireCount <= 0) {return true;}//如果限流規則的count(即限制QPS的閾值)小于等于0,則直接拒絕,相當于一個請求也不能放行if (count <= 0) {return false;}//1.首先獲取系統的當前時間long currentTime = TimeUtil.currentTimeMillis();//2.然后計算,在滿足流控規則中限制的QPS閾值count的情況下,先后的兩個請求被允許通過時的最小間隔時間(假設請求是單線程處理的)long costTime = Math.round(1.0 * (acquireCount) / count * 1000);//3.接著計算當前請求最早的預期通過時間 = 滿足QPS閾值下的兩個請求的最小時間間隔 + 上次請求的通過時間long expectedTime = costTime + latestPassedTime.get();//4.最后判斷當前請求最早的預期通過時間是否比系統當前時間小if (expectedTime <= currentTime) {//等價于沒有超出QPS閾值//當前請求最早的預期通過時間比系統當前時間小//如果在此時(currentTime)通過當前請求,那么當前請求的實際通過時間就比它最早的預期通過時間(expectedTime)要晚//也就是當前請求和最近通過的請求的時間間隔變大了,所以此時不會超QPS閾值,返回true允許通過//由這里可知,latestPassedTime并不會影響costTime,也就是說,多個線程可以并發執行到這里而不受閾值的影響//這意味著,Sentinel流量控制的漏桶算法,只能限制在costTime時間內的流量激增,限制不了costTime時間外的流量激增//比如系統啟動完的那一瞬間就涌入超出QPS閾值的并發請求,此時的這種流量激增是限制不了的;//又比如系統正常運行時處理完了正常流量的最后一個請求,隔了costTime+的時間后,突然涌入超出QPS閾值的并發請求,此時也限制不了;//只能限制住這樣的一種情況:系統正常運行處理完正常流量的最后一個請求,隔了costTime-的時間,突然涌入超出QPS閾值的并發請求latestPassedTime.set(currentTime);return true;} else {//如果不是,即當前請求最早的預期通過時間比系統當前時間大//則說明latestPassedTime.get()大了,也就是上一個可能由于QPS超出閾值的原因導致請求處理慢了,所以需要進行等待//計算當前請求的等待時間,用于判斷是否超出流控規則設置的最大等待時間long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();if (waitTime > maxQueueingTimeMs) {//如果超出最大等待時間,則直接返回falsereturn false;} else {//等價于超出了QPS閾值//當前請求最早的預期通過時間比系統當前時間大//如果在此時(currentTime)通過當前請求,那么當前請求的實際通過時間就比它最早的預期通過時間(expectedTime)要早//也就是當前請求和最近通過的請求的時間間隔變小了,比最小間隔時間costTime還小//所以此時必然會超QPS閾值,因此返回進行等待或者返回false不允許通過//而等待的最小時間,就是最近通過請求的時間 + 先后兩個請求允許通過時的最小間隔時間 - 當前時間//首先通過latestPassedTime這個原子變量的addAndGet()方法//將最近通過請求的時間latestPassedTime,加上先后兩次請求需要的最小間隔時間costTime,得到當前請求本來預期的通過時間//注意://當多個并發線程執行到此處時,由于latestPassedTime的原子性,每個線程都會獲得不一樣的oldTime//接著根據oldTime - 當前時間,就可以得到每個線程需要睡眠等待的時間waitTime//此時的waitTime都將會不一樣,從而避免并發線程同時被喚醒的情況//將latestPassedTime進行自增,其實相當于假設當前請求在不超過QPS閾值的情況下,被允許通過了long oldTime = latestPassedTime.addAndGet(costTime);try {//然后計算當前請求需要等待多久 = 當前請求最早的預期通過時間 - 當前系統時間waitTime = oldTime - TimeUtil.currentTimeMillis();//如果等待時間大于流控規則設置的最大等待時間,則需要回滾剛才更新的最近通過請求的時間//也就是將latestPassedTime減去costTime,然后返回false表示請求無法通過if (waitTime > maxQueueingTimeMs) {//如果發現新計算的等待時間 大于 最大等待時間,則需要回滾latestPassedTimelatestPassedTime.addAndGet(-costTime);return false;}//in race condition waitTime may <= 0if (waitTime > 0) {//當前請求需要進行等待Thread.sleep(waitTime);}return true;} catch (InterruptedException e) {}}}return false;}
}
(4)Sentinel中的漏桶算法與普通漏桶算法的區別
區別一:普通漏桶算法使用的是真實隊列
節省線程的漏桶算法會有一個單獨的字段去記錄當前桶內的水量,也就是請求量。每通過一個請求,則該字段值-1。反之,每新進一個請求,此字段值+1。
區別二:Sentinel漏桶算法使用的是虛擬隊列
它沒有單獨的字段去記錄當前桶內的請求量,而是根據最近通過請求的時間得出當前請求最早的預期通過時間來實現。其本質就是先假設當前請求可以通過,然后再按照先后請求在QPS閾值下可以允許通過時的最大時間間隔,來計算出當前請求最早的預期通過時間,再對比是否和當前發生沖突。
區別三:普通漏桶算法使用的策略是直接拒絕
如果流入速度大于流出速度,則直接拒絕。
區別四:Sentinel漏桶算法使用的策略是排隊等待
如果超出了閾值,則不會直接拒絕請求,而是會等待一段時間,只要在這段時間內能處理到這個請求就不會拒絕掉。
(5)Sentinel中的漏桶算法存在的問題
問題一:在costTime時間內出現流量激增才能限流
如果在costTime時間外,即最后一次請求通過的時間已經好久了,突然流量激增以及并發進入系統,那么此時是無法限制住的。
問題二:Sentinel排隊等待流控效果支持的QPS閾值不能超過1000
如果超過1000,且costTime計大于等于0.5,則會認為間隔時間都是1ms。如果costTime小于0.5,則認為配置失效,相當于沒有配置此條流控規則。
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
long costTime = Math.round(1.0 * (1) / 1100 * 1000)約等于0.9ms;
默認情況下,acquireCount的值是1,那么:如果QPS閾值count在1000~2000,此時costTime = 1,限流不受閾值影響。如果QPS閾值count大于2000,此時costTime = 0,限流配置失效。
2.令牌桶算法的實現對比
(1)普通思路的令牌桶算法實現
(2)節省線程的令牌桶算法實現
(3)Guava中的令牌桶算法實現
(4)Sentinel中的令牌桶算法實現
(5)Sentinel中的令牌桶算法總結
(1)普通思路的令牌桶算法實現
一.令牌桶算法的處理流程
二.令牌桶算法的特點
三.令牌桶算法的優缺點
四.漏桶算法和令牌桶算法的對比
五.令牌桶算法和漏桶算法的核心區別
一.令牌桶算法的處理流程
令牌桶算法的核心思想是以固定速率流入。
步驟一:初始化令牌桶,設置其容量和生成速率。
步驟二:當有新請求到來時,檢查令牌桶中是否有足夠的令牌。如果有足夠的令牌,則允許請求通過,并從令牌桶中扣除相應數量令牌。如果沒有足夠的令牌,則拒絕請求。
步驟三:系統會以固定的速度添加令牌,直到達到令牌桶容量,比如開啟一個后臺線程以固定的速度向令牌桶中添加令牌。
二.令牌桶算法的特點
特點一:支持突發流量
令牌桶算法允許在限流內應對突發流量,有助于提高系統的響應能力。
特點二:平滑處理速率
和漏桶算法一樣,令牌桶算法也可以平滑處理流量,避免處理速率突變。
令牌桶算法的一個重要特性是,它能夠處理突發流量。當桶中有足夠的令牌時,可以一次性處理多個請求。這對于需要處理突發流量的應用場景非常有用,但是又不會無限制的增加處理速率導致壓垮服務器,因為桶內令牌數量是有限制的。
三.令牌桶算法的優缺點
優點一:可以處理突發流量
令牌桶算法可以處理突發流量。當桶滿時,能夠以最大速度處理請求。這對于需要處理突發流量的應用場景非常有用。
優點二:限制請求處理的平均速率
在長期運行中,請求的處理速率會被限制在預定義的平均速率下,也就是生成令牌的速率。
優點三:靈活性
與漏桶算法相比,令牌桶算法提供了更大的靈活性。例如,可以動態地調整生成令牌的速率。
缺點一:可能導致過載
如果令牌產生速度過快,可能會導致大量突發流量,使網絡或服務過載。
缺點二:需要存儲空間
令牌桶需要一定的存儲空間來保存令牌,可能會導致內存資源的浪費。
四.漏桶算法和令牌桶算法的對比
漏桶算法是突然往桶里注水,但是漏水的窟窿是固定大小的,因此流出水的速度是固定的,也就是"生產不限速,消費限速"。
令牌桶算法是突然從桶中抽水,也就是固定大小的窟窿變成了入水口,而沒桶蓋的桶口變成了出水口。相當于入水速度變得固定了,而出水速度不做限制了,也就是"生產限速,消費不限速"。
五.令牌桶算法和漏桶算法的核心區別
令牌桶算法允許用戶在短時間內發起更多請求,從而支持突發流量。漏桶算法只能支持每秒固定處理一定數量的請求,從而不支持突發流量。這就是令牌桶和漏桶的核心區別。
(2)節省線程的令牌桶算法實現
令牌桶算法可以通過延遲計算的方式來實現。延遲計算指的是不需要單獨的線程來定時生成令牌或從漏桶中定時取請求,而是由調用限流器的線程自己計算是否有足夠的令牌以及需要sleep的時間。延遲計算的方式可以節省一個線程資源。
public class TokenBucketLimiter {//桶的最大容量public static long threshold = 10;//當前桶內的令牌數public static long count = 0;//令牌生成速率(每秒5次)public static long tokenRate = 5;//上次生成令牌的時間public static long lastRefillTime = System.currentTimeMillis();//限流方法,返回true表示通過public boolean canPass() {//調用生成令牌方法this.refillTokens();//判斷桶內是否還有令牌if (count > 0) {count--;return true;}return false;}//生成令牌方法,計算并更新這段時間內生成的令牌數量private void refillTokens() {long currentTime = System.currentTimeMillis();//計算這段時間內,需要生成的令牌數量long refillTokens = (currentTime - lastRefillTime) * tokenRate / 1000;//更新桶內的令牌數count = Math.min(count + refillTokens, threshold);//更新令牌生成時間lastRefillTime = currentTime;}
}
(3)Guava中的令牌桶算法實現
一.SmoothBursty的初始化
二.SmoothBursty的acquire()方法
三.SmoothWarmingUp的初始化
四.SmoothWarmingUp的acquire()方法
SmoothBursty和SmoothWarmingUp這兩種限流器都使用了預支令牌的思路來實現,就是當前線程獲取令牌的代價(阻塞時間)需要由下一個線程來支付。這樣可以減少線程阻塞的概率,因為下一個請求不確定什么時候才來。如果下一個請求很久才來,那么這段時間產生的新令牌已滿足下一個線程的需求,這樣就不用阻塞了。
一.SmoothBursty的初始化
RateLimiter不保存上一次請求的時間,但是它保存下一次請求期望到達的時間。如果下一個請求的預期到達時間實際上已經過去了,并且假設下次請求期望到達的時間點是past,現在的時間點是now。那么now - past的這段時間表示RateLimiter沒有被使用,所以在這段空閑的時間內RateLimiter就會增加storedPermits的數量。
@Beta
@GwtIncompatible
@SuppressWarnings("GoodTime")
public abstract class RateLimiter {...//Creates a RateLimiter with the specified stable throughput, //given as "permits per second" (commonly referred to as QPS, queries per second).//The returned RateLimiter ensures that on average no more than permitsPerSecond are issued during any given second, //with sustained requests being smoothly spread over each second.//When the incoming request rate exceeds permitsPerSecond the rate limiter will release one permit every (1.0 / permitsPerSecond) seconds. //When the rate limiter is unused, bursts of up to permitsPerSecond permits will be allowed, //with subsequent requests being smoothly limited at the stable rate of permitsPerSecond.//創建一個具有指定穩定吞吐量的RateLimiter,傳入的"permits per second"通常稱為QPS、每秒查詢量;//返回的RateLimiter確保在任何給定的秒期間平均不超過permitsPerSecond的令牌被發出,持續的請求將在每一秒內被平穩地通過;//當傳入請求的速率超過permitsPerSecond時,速率限制器將每隔(1.0/permitsPerSecond)秒釋放一個令牌;//當速率限制器未被使用時,將允許突發式的高達permitsPerSecond的令牌,而隨后的請求將以permitsPerSecond的穩定速率被平滑地限制;//對外暴露的創建方法//@param permitsPerSecond the rate of the returned RateLimiter, measured in how many permits become available per second.public static RateLimiter create(double permitsPerSecond) {//The default RateLimiter configuration can save the unused permits of up to one second. //This is to avoid unnecessary stalls in situations like this: //A RateLimiter of 1qps, and 4 threads, all calling acquire() at these moments://T0 at 0 seconds、T1 at 1.05 seconds、T2 at 2 seconds、T3 at 3 seconds//Due to the slight delay of T1, T2 would have to sleep till 2.05 seconds, and T3 would also have to sleep till 3.05 seconds.//默認的RateLimiter配置可以保存長達一秒鐘的未被使用的令牌;//這是為了避免在這種情況下出現不必要的停頓://一個由1qps和4個線程組成的RateLimiter,所有線程都在如下這些時刻調用acquired()://Thread0在0秒、Thread1在1.05秒、Thread2在2秒、Thread3在3秒//由于Thread1的輕微延遲,Thread2必須睡眠到2.05秒,Thread3也必須睡眠到3.05秒//內部調用一個qps設定 + 起始時間StopWatch的構建函數.//這里傳入的SleepingStopwatch是一個以系統啟動時間的一個相對時間的計量.//后面的讀時間偏移是以這個開始的時間偏移為起始的.return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());}@VisibleForTestingstatic RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {//指定了令牌桶中最多存儲1秒的令牌數RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);//調用RateLimiter.setRate()方法rateLimiter.setRate(permitsPerSecond);return rateLimiter;}//Updates the stable rate of this RateLimiter, //that is, the permitsPerSecond argument provided in the factory method that constructed the RateLimiter. //Currently throttled threads will not be awakened as a result of this invocation, //thus they do not observe the new rate; only subsequent requests will.//Note though that, since each request repays (by waiting, if necessary) the cost of the previous request, //this means that the very next request after an invocation to setRate() will not be affected by the new rate; //it will pay the cost of the previous request, which is in terms of the previous rate.//The behavior of the RateLimiter is not modified in any other way, //e.g. if the RateLimiter was configured with a warmup period of 20 seconds, //it still has a warmup period of 20 seconds after this method invocation.//更新該RateLimiter的穩定速率,即在構造RateLimiter的工廠方法中提供permitsPerSecond參數;//當前被限流的線程將不會由于這個調用而被喚醒,因此它們沒有觀察到新的速率;只有隨后的請求才會;//但是要注意的是,由于每個請求(如果需要,通過等待)會償還先前請求的成本,//這意味著調用setRate()方法后的下一個請求將不會受到新速率的影響,//它將按照先前的速率處理先前請求的成本;//RateLimiter的行為不會以任何其他方式修改,//例如:如果RateLimiter被配置為具有20秒的預熱周期,在該方法調用之后,它仍然有20秒的預熱期;//@param permitsPerSecond the new stable rate of this {@code RateLimiter}public final void setRate(double permitsPerSecond) {checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");//在同步代碼塊中設定速率synchronized (mutex()) {//調用SmoothRateLimiter.doSetRate()方法doSetRate(permitsPerSecond, stopwatch.readMicros());}}...
}@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {//The currently stored permits. //令牌桶中當前緩存的未消耗的令牌數double storedPermits;//The maximum number of stored permits. //令牌桶中允許存放的最大令牌數double maxPermits;//The interval between two unit requests, at our stable rate.//E.g., a stable rate of 5 permits per second has a stable interval of 200ms.//按照我們穩定的速率,兩個單位請求之間的時間間隔;例如,每秒5個令牌的穩定速率具有200ms的穩定間隔double stableIntervalMicros;//The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests.//下一個請求(無論大小)將被批準的時間.//在批準請求后,這將在未來進一步推進,大請求比小請求更能推動這一進程。private long nextFreeTicketMicros = 0L;//could be either in the past or future...//這是一個可以重復調用的函數.//第一次調用和非第一次調用的過程有些不一樣,目的是設定設定最大令牌數maxPermits和已存儲的令牌數storedPermits@Overridefinal void doSetRate(double permitsPerSecond, long nowMicros) {//調用SmoothRateLimiter.resync()方法,重試計算和同步存儲的預分配的令牌.resync(nowMicros);//計算穩定的發放令牌的時間間隔. 單位us, 比如qps為5, 則為200ms即20萬us的間隔進行令牌發放. double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;this.stableIntervalMicros = stableIntervalMicros;//調用SmoothBursty.doSetRate()設定最大令牌數maxPermits和已存儲的令牌數storedPermitsdoSetRate(permitsPerSecond, stableIntervalMicros);}//Updates storedPermits and nextFreeTicketMicros based on the current time.//根據當前時間,更新storedPermits和nextFreeTicketMicros變量//注意: 在初始化SmoothBursty時會第一次調用resync()方法,此時各值的情況如下://coolDownIntervalMicros = 0、nextFreeTicketMicros = 0、newPermits = 無窮大.//maxPermits = 0(初始值,還沒有重新計算)、最后得到的: storedPermits = 0;//同時,nextFreeTicketMicros = "起始時間"void resync(long nowMicros) {//if nextFreeTicket is in the past, resync to nowif (nowMicros > nextFreeTicketMicros) {double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();storedPermits = min(maxPermits, storedPermits + newPermits);nextFreeTicketMicros = nowMicros;}}abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);...//This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.//The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in terms of time, //in this sense: if a RateLimiter is 2qps, and this time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.//SmoothBursty實現了一個"突發式"的速率限制器RateLimiter,其中的storedPermits會被轉換為0;//它可以保存的最大令牌數量(當RateLimiter未使用時)是根據時間定義的,//從這個意義上說:如果RateLimiter是2qps,并且這個時間被指定為10秒,那么我們最多可以保存2*10=20個令牌;static final class SmoothBursty extends SmoothRateLimiter {//The work (permits) of how many seconds can be saved up if this RateLimiter is unused?//如果這個速率限制器RateLimiter沒有被使用,那么可以節省多少秒的工作(令牌)?final double maxBurstSeconds;SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {super(stopwatch);this.maxBurstSeconds = maxBurstSeconds;}@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {//初次設定的時候,oldMaxPermits = 0.0double oldMaxPermits = this.maxPermits;//新的(當前的)maxPermits為burst的時間周期(1秒) * 每周期的令牌數.maxPermits = maxBurstSeconds * permitsPerSecond;if (oldMaxPermits == Double.POSITIVE_INFINITY) {//if we don't special-case this, we would get storedPermits == NaN, belowstoredPermits = maxPermits;} else {//初始化SmoothBursty,執行到此處時,storedPermits為0storedPermits = (oldMaxPermits == 0.0) ? 0.0 : storedPermits * maxPermits / oldMaxPermits;}}@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {return 0L;}@Overridedouble coolDownIntervalMicros() {return stableIntervalMicros;}}...
}
二.SmoothBursty的acquire()方法
@Beta
@GwtIncompatible
@SuppressWarnings("GoodTime")
public abstract class RateLimiter {...//無限等待的獲取//Acquires the given number of permits from this RateLimiter, //blocking until the request can be granted. //Tells the amount of time slept, if any.//@param permits the number of permits to acquire,獲取的令牌數量//@return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited@CanIgnoreReturnValuepublic double acquire(int permits) {//調用RateLimiter.reserve()方法//預支令牌并獲取需要阻塞的時間:即預定數量為permits的令牌數,并返回需要等待的時間long microsToWait = reserve(permits);//將需要等待的時間補齊, 從而滿足限流的需求,即根據microsToWait來讓線程sleep(共性)stopwatch.sleepMicrosUninterruptibly(microsToWait);//返回這次調用使用了多少時間給調用者return 1.0 * microsToWait / SECONDS.toMicros(1L);}//Reserves the given number of permits from this RateLimiter for future use, //returning the number of microseconds until the reservation can be consumed.//從這個RateLimiter限速器中保留給定數量的令牌,以備將來使用,返回可以使用保留前的微秒數//@return time in microseconds to wait until the resource can be acquired, never negativefinal long reserve(int permits) {checkPermits(permits);//由于涉及并發操作,所以必須使用synchronized進行互斥處理synchronized (mutex()) {//調用RateLimiter.reserveAndGetWaitLength()方法return reserveAndGetWaitLength(permits, stopwatch.readMicros());}}//Reserves next ticket and returns the wait time that the caller must wait for.//預定下一個ticket,并且返回需要等待的時間final long reserveAndGetWaitLength(int permits, long nowMicros) {//調用SmoothRateLimiter.reserveEarliestAvailable()方法long momentAvailable = reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0);}//Reserves the requested number of permits and returns the time that those permits can be used (with one caveat).//保留請求數量的令牌,并返回可以使用這些令牌的時間(有一個警告)//生產令牌、獲取令牌、計算阻塞時間的具體細節由子類來實現//@return the time that the permits may be used, or, if the permits may be used immediately, an arbitrary past or present timeabstract long reserveEarliestAvailable(int permits, long nowMicros);...
}@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {//The currently stored permits. //令牌桶中當前緩存的未消耗的令牌數double storedPermits;//The maximum number of stored permits. //令牌桶中允許存放的最大令牌數double maxPermits;//The interval between two unit requests, at our stable rate.//E.g., a stable rate of 5 permits per second has a stable interval of 200ms.//按照我們穩定的速率,兩個單位請求之間的時間間隔;例如,每秒5個令牌的穩定速率具有200ms的穩定間隔double stableIntervalMicros;//The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests.//下一個請求(無論大小)將被批準的時間. 在批準請求后,這將在未來進一步推進,大請求比小請求更能推動這一進程.private long nextFreeTicketMicros = 0L;//could be either in the past or future ...@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) {//1.根據nextFreeTicketMicros計算新產生的令牌數,更新當前未使用的令牌數storedPermits//獲取令牌時調用SmoothRateLimiter.resync()方法與初始化時的調用不一樣.//此時會把"還沒有使用"的令牌存儲起來.//但是如果計數時間nextFreeTicketMicros是在未來. 那就不做任何處理.resync(nowMicros);//下一個請求(無論大小)將被批準的時間long returnValue = nextFreeTicketMicros;//2.計算需要阻塞等待的時間//2.1.先從桶中取未消耗的令牌,如果桶中令牌數不足,看最多能取多少個//存儲的令牌可供消費的數量double storedPermitsToSpend = min(requiredPermits, this.storedPermits);//2.2.計算是否需要等待新鮮的令牌(當桶中現有的令牌數不足時就需要等待新鮮的令牌),如果需要,則計算需要等待的令牌數//需要等待的令牌:新鮮的令牌double freshPermits = requiredPermits - storedPermitsToSpend;//計算需要等待的時間//分兩部分計算:waitMicros = 從桶中獲取storedPermitsToSpend個現有令牌的代價 + 等待生成freshPermits個新鮮令牌的代價//從桶中取storedPermitsToSpend個現有令牌也是有代價的,storedPermitsToWaitTime()方法是個抽象方法,會由SmoothBursty和SmoothWarmingUp實現//對于SmoothBursty來說,storedPermitsToWaitTime()會返回0,表示已經存儲的令牌不需要等待.//而生成新鮮令牌需要等待的代價是:新鮮令牌的個數freshPermits * 每個令牌的耗時stableIntervalMicroslong waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros);//3.更新nextFreeTicketMicros//由于新鮮的令牌可能已被預消費,所以nextFreeTicketMicros就得往后移,以表示這段時間被預消費了this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);//4.扣減令牌數,更新桶內剩余令牌//最后把上面計算的可扣減的令牌數量從存儲的令牌里減掉this.storedPermits -= storedPermitsToSpend;//返回請求需要等待的時間//需要注意returnValue被賦值的是上次的nextFreeTicketMicros,說明當前這次請求獲取令牌的代價由下一個請求去支付return returnValue;}//Updates storedPermits and nextFreeTicketMicros based on the current time.//根據當前時間,更新storedPermits和nextFreeTicketMicros變量//計算nextFreeTicketMicros到當前時間內新產生的令牌數,這個就是延遲計算void resync(long nowMicros) {//if nextFreeTicket is in the past, resync to now//一般當前的時間是大于下個請求被批準的時間//此時:會把過去的時間換成令牌數存儲起來,注意存儲的令牌數不能大于最大的令牌數//當RateLimiter初始化好后,可能剛開始沒有流量,或者是一段時間沒有流量后突然來了流量//此時可以往"后"預存儲一秒時間的令牌數. 也就是這里所說的burst能力//如果nextFreeTicketMicros在未來的一個時間點,那這個if判斷便不滿足//此時,不需要進行更新storedPermits和nextFreeTicketMicros變量//此種情況發生在:"預借"了令牌的時候if (nowMicros > nextFreeTicketMicros) {//時間差除以生成一個新鮮令牌的耗時,coolDownIntervalMicros()是抽象方法,由子類實現double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();//更新令牌桶內已存儲的令牌個數,注意不超過最大限制storedPermits = min(maxPermits, storedPermits + newPermits);//更新nextFreeTicketMicros為當前時間nextFreeTicketMicros = nowMicros;}}//Translates a specified portion of our currently stored permits which we want to spend/acquire, into a throttling time.//Conceptually, this evaluates the integral of the underlying function we use, for the range of [(storedPermits - permitsToTake), storedPermits].//This always holds: 0 <= permitsToTake <= storedPermits//從桶中取出已存儲的令牌的代價,由子類實現//這是一個抽象函數,SmoothBursty中的實現會直接返回0,可以認為已經預分配的令牌,在獲取時不需要待待時間abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);//Returns the number of microseconds during cool down that we have to wait to get a new permit.//每生成一個新鮮令牌的耗時,由子類實現abstract double coolDownIntervalMicros();...static final class SmoothBursty extends SmoothRateLimiter {...@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {return 0L;}@Overridedouble coolDownIntervalMicros() {return stableIntervalMicros;}}...
}