目錄
滑動窗口算法
Sentinel
數據模型
示例
大致流程
???????entry
???????entryWithPriority
???????FlowSlot.entry
???????checkFlow
???????canPass
???????avgUsedTokens
???????passQps
???????pass
???????currentWindow
calculateTimeIdx
???????calculateWindowStart
???????values
滑動窗口算法
? ? ? ?滑動窗口算法是將時間周期分為n個小周期,分別記錄每個小周期內的訪問次數,并且根據時間滑動刪除過期的小周期。
? ? ? ?如下圖,假設時間周期為1min,將1min再分割成2個小周期,統計每個小周期的訪問數量,則可以看到,第一個時間周期內訪問數量為75,第二個時間周期內訪問數量為100,超過100的數量被限流掉了。
? ? ? ?由此可見,當滑動窗口格子劃分得越多,那么滑動窗口的滾動就越平滑,限流的統計就越精確。可以很好的解決固定窗口的流動問題。
Sentinel
? ? ? ?滑動窗口算法也是Sentinel的默認算法。
???????數據模型
英文名稱 | 中文名稱 | 備注 |
array | 窗口數組 | |
windowLengthInMs | 單個窗口時間長度 | |
sampleCount | 總窗口數量 | |
intervalInMs | 時間窗口總長度 | sampleCount * windowLengthInMs |
示例
|
大致流程
????點擊示例里的entry()方法,如下所示:
???????entry
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
????return entryWithPriority(resourceWrapper, count, false, args);
}
? ? ? ? 點擊entryWithPriority()方法,如下所示:
???????entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)?throws BlockException {
????.. ...
????Entry e = new CtEntry(resourceWrapper, chain, context);
????try {
????????chain.entry(context, resourceWrapper, null, count, prioritized, args);
????} catch (BlockException e1) {
????????e.exit(count, args);
????????throw e1;
????} catch (Throwable e1) {
????????// This should not happen, unless there are errors existing in Sentinel internal.
????????RecordLog.info("Sentinel unexpected exception", e1);
????}
????return e;
}
? ? ? ?點擊chain.entry()方法,因為我們這次探究的是限流算法,所以選擇FlowSlot類,如下所示:
???????FlowSlot.entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,?boolean prioritized, Object... args) throws Throwable {
????checkFlow(resourceWrapper, context, node, count, prioritized);
????fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
? ? ? ? 點擊checkFlow()方法,如下所示:
???????checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,?Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
????????if (ruleProvider == null || resource == null) {
????????????return;
????????}
????????Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
????????if (rules != null) {
????????????for (FlowRule rule : rules) {
????????????????if (!canPassCheck(rule, context, node, count, prioritized)) {
????????????????????throw new FlowException(rule.getLimitApp(), rule);
????????????????}
????????????}
????????}
}
???????canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
????int curCount = avgUsedTokens(node);
????if (curCount + acquireCount > count) {
????????... ...
????????return false;
????}
????return true;
}
? ? ? ?在這里,獲取已經使用的token數量,加上待申請的數量,如果超過流控規則里設置的最大值,則返回false。
? ? ? ?點擊avgUsedTokens()方法,如下所示:
???????avgUsedTokens
private int avgUsedTokens(Node node) {
????????if (node == null) {
????????????return DEFAULT_AVG_USED_TOKENS;
????????}
????????return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
? ? ? ?在這里,通過調用node.passQps()獲取已經使用的token數量。
? ? ? ?點擊passQps(),如下所示:
???????passQps
public double passQps() {
????return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
? ? ? ?在這里,rollingCounterInSecond對象保存了時間窗口對象的數組。pass()方法可以獲取每個有效的時間窗口對象里已經使用的令牌數量,getWindowIntervalInSec()方法是時間窗口總長度,以秒為單位。兩者相除就可以已使用的QPS。
? ? ? ?點擊pass()方法,如下所示:
???????pass
public long pass() {
????????data.currentWindow();
????????long pass = 0;
????????List<MetricBucket> list = data.values();
????????for (MetricBucket window : list) {
????????????pass += window.pass();
????????}
????????return pass;
????}
? ? ? ?在這里,會調用currentWindow()刷新當前窗口信息,然后累加每個窗口的計數值作為當前計數周期的計數值。
? ? ? ?點擊currentWindow()方法,如下所示:
???????currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
????int idx = calculateTimeIdx(timeMillis);
????long windowStart = calculateWindowStart(timeMillis);
????while (true) {
????????WindowWrap<T> old = array.get(idx);
????????if (old == null) {
????????????// 創建新窗口
????????????WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());
????????????if (array.compareAndSet(idx, null, window)) {
????????????????return window;
????????????}
????????} else if (windowStart == old.windowStart()) {
????????????// 命中當前有效窗口
????????????return old;
????????} else if (windowStart > old.windowStart()) {
????????????// 窗口已過期,重置并復用
????????????if (updateLock.tryLock()) {
????????????????try {
????????????????????return resetWindowTo(old, windowStart);
????????????????} finally {
????????????????????updateLock.unlock();
????????????????}
????????????}
????????}
????}
}
在這里:
- 獲取當前時間窗口的索引
- 獲取當前窗口的起始時間
- 根據當前時間窗口的索引,獲取時間窗口對象,如果時間窗口對象為空,則創建一個新時間窗口對象;如果已經存在時間窗口對象,則返回該對象;如果時間窗口對象已過期,則重置并復用。
calculateTimeIdx
public int calculateTimeIdx(long timeMillis) {
????long timeId = timeMillis / windowLengthInMs;
????return (int)(timeId % array.length());
}
? ? ? ?在這里,根據當前時間戳計算對應窗口索引。
???????calculateWindowStart
protected long calculateWindowStart(long timeMillis) {
????return timeMillis - timeMillis % windowLengthInMs;
}
? ? ? ?在這里,計算當前窗口的起始時間(對齊到窗口邊界)。
? ? ? ?點擊步驟pass的values()方法,如下所示:
???????values
public List<T> values() {
????return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
????if (timeMillis < 0) {
????????return new ArrayList<T>();
????}
????int size = array.length();
????List<T> result = new ArrayList<T>(size);
????for (int i = 0; i < size; i++) {
????????WindowWrap<T> windowWrap = array.get(i);
????????if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
????????????continue;
????????}
????????result.add(windowWrap.value());
????}
????return result;
}
? ? ? ? 在這里,循環時間窗口數組,忽略已經失效的時間窗口對象,將有效的時間窗口對象保存在一個列表對象里,并作為方法返回值進行返回。
???????isWindowDeprecated
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
????return time - windowWrap.windowStart() > intervalInMs;
}
? ? ? ? 在這里,將當前時間減去指定時間窗口對象的起始時間,如果結果大于計數周期時長,則表明指定的時間窗口對象已經失效。