為什么80%的碼農都做不了架構師?>>> ??
? ? ? ?令牌桶算法是網絡流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一種算法。典型情況下,令牌桶算法用來控制發送到網絡上的數據的數目,并允許突發數據的發送。
? ??大小固定的令牌桶可自行以恒定的速率源源不斷地產生令牌。如果令牌不被消耗,或者被消耗的速度小于產生的速度,令牌就會不斷地增多,直到把桶填滿。后面再產生的令牌就會從桶中溢出。最后桶中可以保存的最大令牌數永遠不會超過桶的大小。 ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??
? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ???——摘自百度百科
那么什么是令牌桶算法呢?
? ??? ??簡單來說就是,生產者和消費者之間的事情,生產者往一個桶(Bucket)中丟令牌(Token),消費者從里面去撿令牌,生產者以一定的速率丟令牌,直到桶裝滿了,令牌就溢出了,消費者持續從桶里面撿令牌,沒有令牌的話,就持續等待,直到有令牌出現。
?
?這里我們看下具體令牌桶算法的實現(Guava中的RateLimiter),以及在實際生產中的應用場景(限制接口訪問頻次,保護后端系統)
? ? ? ?我們在暴露對外接口的時候,對于高頻次訪問的接口(例如查詢接口),需要考慮到相關的保護措施,避免接口瞬時訪問量過大,導致服務端不可用的場景產生。因此,我們可以使用RateLimiter,來做相關的頻控。
? 下面是RateLimiter的使用Demo:
? ?1、引入相關的依賴
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>19.0</version></dependency>
?2、編寫相關的Demo
public class RateLimiterTest {public static void main(String[] args) throws InterruptedException {RateLimiter limiter = RateLimiter.create(10);// 代碼1Thread.currentThread().sleep(1000);//步驟1if (limiter.tryAcquire(20))//代碼2System.out.println("======== Time1:" + System.currentTimeMillis() / 1000);Thread.currentThread().sleep(1001);if (limiter.tryAcquire(1))//代碼3System.out.println("======== Time2:" + System.currentTimeMillis() / 1000);if (limiter.tryAcquire(5))System.out.println("======== Time3:" + System.currentTimeMillis() / 1000);}
}
3、運行結果如下
場景1:
======== Time1:1533114071
======== Time2:1533114072
場景2:修改代碼:去掉步驟1,運行結果如下:
======== Time1:1533114155
場景3:修改相關代碼如下:
public class RateLimiterTest {public static void main(String[] args) throws InterruptedException {RateLimiter limiter = RateLimiter.create(10);// 代碼1Thread.currentThread().sleep(2000);if (limiter.tryAcquire(21))//代碼2System.out.println("======== Time1:" + System.currentTimeMillis() / 1000);Thread.currentThread().sleep(1001);if (limiter.tryAcquire(1))//代碼3System.out.println("======== Time2:" + System.currentTimeMillis() / 1000);if (limiter.tryAcquire(5))System.out.println("======== Time3:" + System.currentTimeMillis() / 1000);}
}
? ? 結果如下:
======== Time1:1533114623
?下面我們來分析這三種情況產生的原因,順便也分析下RateLimiter中的令牌桶算法是如何實現的。
在分析之前,說明一點,我之前一直以為令牌桶算法,是定時器機制,定時往桶里面放令牌,但是有些時候并不是這樣的。先聲明一下。
我們來分析下代碼:
?????????代碼行1:
?RateLimiter limiter = RateLimiter.create(10);
????這行代碼,我們知道是創建一個每秒產生10個permit的速率器
? ? ??代碼行2:
? ? ? ? ? ?limiter.tryAcquire(20) ?//嘗試從速率器中獲取20個permit,獲取成功 true;失敗 false
? ????代碼行3:
? ? ? ? ? ? limiter.tryAcquire(1) //嘗試從速率器中獲取1個permit,獲取成功 true;失敗 false
????????為什么相同的代碼,不同的休眠時間導致不同的結果呢?
結論:
????1、RateLimiter 速率器,通過預支將來的令牌來進行限制頻控,什么意思呢?打個比方:速率器相當于工廠,獲取令牌許可的線程相當于經銷商,經銷商過來取貨,工廠每天的生產的貨品是一定的(100噸/天),A經銷商來取貨,第一天取了200噸貨,工廠沒有這么多貨,怎么辦呢?為了留住這個經銷商,廠長做了決定,給200噸,現在的100噸先給你,明天的100噸也給你,然后把200噸貨品的提取清單給了A經銷商,A很滿意的離開了。過了一會,B來了,B要10噸貨物,這個時候,廠長就沒有那么好說話了(誰讓大客戶已經到手了呢?),說10噸貨物可以,你
后天來吧,明天和今天的活已經都賣完了。這個時候通過這種方式,來限制一天只賣/生產100噸的貨物。
? 根據這個結論我們來看相關的代碼:
RateLimiter limiter = RateLimiter.create(10);
調用的是:
@VisibleForTestingstatic RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds 注意 這里的maxBurstSeconds指定的是1s 直接影響后面的maxPermit*/);rateLimiter.setRate(permitsPerSecond);//見下文代碼return rateLimiter;}
setRate(permitsPerSecond)如下:
public final void setRate(double permitsPerSecond) {checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");synchronized (mutex()) {doSetRate(permitsPerSecond, stopwatch.readMicros());//stopwatch.readMirco 獲取的是創建以來的系統時間 這里調用SmoothRateLimiter.doSetRate()}}
?SmoothRateLimiter.doSetRate()
@Overridefinal void doSetRate(double permitsPerSecond, long nowMicros) {resync(nowMicros);//你可以認為這邊是重設相關的nextFreeTicketMicros和storedPermits 這個函數是相關計算頻控的重要組成部分 ------1double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;this.stableIntervalMicros = stableIntervalMicros;doSetRate(permitsPerSecond, stableIntervalMicros);//這個函數是RateLimiter創建時候 初始化maxpermits和StorePermits的相關部分 也是一個重要的部分 ---2}
我們來看1的實現:
? ??
/*** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.* 基于當前的時間 計算相關的storedPermits和nextFreeTicketMicros * storedPermits:當前存儲的令牌數* nextFreeTicketMicros:下次可以獲取令牌的時間 其實這么講不太準確 應該說是,上次令牌獲取之后預支到下次可以獲取令牌的最早時間* 此處再創建的時候 nextFreeTicketMicros基本就是創建時候的系統時間*/void resync(long nowMicros) {// if nextFreeTicket is in the past, resync to nowif (nowMicros > nextFreeTicketMicros) {storedPermits = min(maxPermits,storedPermits+ (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());nextFreeTicketMicros = nowMicros; }}
????????我們可以看到,我們這里通過計算當前時間和下次可以獲取令牌的時間,相互計算差值,然后除以一個令牌產生的時間間隔,來計算當前時段可以產生多少令牌,然后和我們的 ? ? maxPermits來取最小值,由此我們可以看到storedPermits最多只能存儲maxPermits數量的令牌,這也是令牌桶大小所限制的。
????我們再來看2代碼的實現:
@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {double oldMaxPermits = this.maxPermits;maxPermits = maxBurstSeconds * permitsPerSecond;//設置最大可存儲的令牌數 這里的maxBurstSeconds 就是之前設置的1s 所以maxPermits數值上等于我們設置的permitsPerSecondif (oldMaxPermits == Double.POSITIVE_INFINITY) {// if we don't special-case this, we would get storedPermits == NaN, belowstoredPermits = maxPermits;} else {storedPermits = (oldMaxPermits == 0.0)? 0.0 // initial state: storedPermits * maxPermits / oldMaxPermits;}}
到這里我們的初始化RateLimiter結束了。我們來明確其中的幾個概念:
????maxPermits:最大存儲的令牌數,即令牌桶的大小
????storedPermits:已存儲的令牌數<=maxPermits,當然這個是通過計算算出來的
????nextFreeTicketMicros:上次獲取令牌時預支的最早能夠再次獲取令牌的時間
????nowMicros:當前系統時間
好,我們接下來看如何獲取令牌:
? 代碼2:
limiter.tryAcquire(20)
?具體的代碼實現如下:
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {//timeout = 0 unit=MICROSECONDSlong timeoutMicros = max(unit.toMicros(timeout), 0);checkPermits(permits);//校驗參數long microsToWait;synchronized (mutex()) {//互斥量 long nowMicros = stopwatch.readMicros();if (!canAcquire(nowMicros, timeoutMicros)) {//此處判斷當前時間是否大于等于上次預支最早時間 ----1return false;} else {microsToWait = reserveAndGetWaitLength(permits, nowMicros);//當前線程獲取到permit需要等待的時間 ---2}}stopwatch.sleepMicrosUninterruptibly(microsToWait);//線程等待 獲取permitreturn true;}
我們來看1的實現部分:
private boolean canAcquire(long nowMicros, long timeoutMicros) {return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;}
@Overridefinal long queryEarliestAvailable(long nowMicros) {return nextFreeTicketMicros;}
有此可見,如果當前時間+超時時間>=預支的最早時間,那么是可以獲取許可的,反之則不能獲取許可
再看代碼2的實現:
final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable = reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0);//計算需要等待的時間}
SmoothRateLimiter.reserveEarliestAvailable()
@Overridefinal long reserveEarliestAvailable(int requiredPermits, long nowMicros) {resync(nowMicros);//這里是重設相關的storedPermits和nenextFreeTicketMicros 這個在前文我們講過 需要注意的是 這邊的nextFreeTicketMicros設置的是nowMicros 可能會有人有疑問,nextFreeTicketMicros不是預支的最早獲取permit的時間嗎?怎么是nowMicros了呢?我們下面看long returnValue = nextFreeTicketMicros;//這里返回的其實就是nowMiscrosdouble storedPermitsToSpend = min(requiredPermits, this.storedPermits);//本次能消費的最多的permitdouble freshPermits = requiredPermits - storedPermitsToSpend;//需要預支的permitlong waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)+ (long) (freshPermits * stableIntervalMicros);//預支的生產的時間try {this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);//這里就是重設了預支下次能夠獲取permit的最早時間了 這邊將waitMiscros加上了} catch (ArithmeticException e) {this.nextFreeTicketMicros = Long.MAX_VALUE;}this.storedPermits -= storedPermitsToSpend;//扣除本地消費的permitreturn returnValue;//返回當前時間}
??????這樣就完成了前后兩個permit之間獲取的的聯動性,并不是有一個定時任務往中間放permit,而是直接預支的后面消費者的時間來進行控制的,這樣有一個好處就是,第一次獲取permit的時候,其實可以獲取N多個permit,并不做限制,只是這么多的permit會導致后面消費者卡死在那邊,當然,消費者在timeOut范圍內獲取不到permit也就直接返回了。
?