本文屬于sentinel學習筆記系列。網上看到吳就業老師的專欄,寫的好值得推薦,我整理的有所刪減,推薦看原文。
https://blog.csdn.net/baidu_28523317/category_10400605.html
sentinel 實現限流降級、熔斷降級、黑白名單限流降級、系統自適應限流降級以及熱點參數限流降級都是由 ProcessorSlot、Checker、Rule、RuleManager 組合完成。ProcessorSlot 作為調用鏈路的切入點,負責調用 Checker 檢查當前請求是否可以放行;Checker 則根據資源名稱從 RuleManager 中拿到為該資源配置的 Rule(規則),取 ClusterNode 統計的實時指標數據與規則對比,如果達到規則的閾值則拋出 Block 異常,拋出 Block 異常意味著請求被拒絕,也就實現了限流或熔斷。
可以總結為以下三個步驟:
- 在 ProcessorSlot#entry 方法中調用 Checker#check 方法,并將 DefaultNode 傳遞給 Checker。
- Checker 從 DefaultNode 拿到 ClusterNode,并根據資源名稱從 RuleManager 獲取為該資源配置的規則。
- Checker 從 ClusterNode 中獲取當前時間窗口的某項指標數據(QPS、avgRt 等)與規則的閾值對比,如果達到規則的閾值則拋出 Block 異常(也有可能將 check 交給 Rule 去實現)。
限流規則與規則配置加載器
rule
規則是圍繞資源配置的,接口Rule 只定義獲取資源。
public interface Rule {/*** Get target resource of this rule.** @return target resource of this rule*/String getResource();}
public abstract class AbstractRule implements Rule {/*** rule id. */private Long id;/*** Resource name. 資源名*/private String resource;/*** 流控對應的調用來源*/private String limitApp;
Rule、AbstractRule 與其它實現類的關系如下圖所示:?
FlowRule 是限流規則配置類,FlowRule 繼承 AbstractRule 并實現 Rule 接口。FlowRule 源碼如下?
public class FlowRule extends AbstractRule {public FlowRule() {super();setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}public FlowRule(String resourceName) {super();setResource(resourceName);setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}/*** The threshold type of flow control (0: thread count, 1: QPS).* 限流閾值類型*/private int grade = RuleConstant.FLOW_GRADE_QPS;/*** Flow control threshold count. 限流閾值 配置的是qps類型則代表qps的值;配置的是線程數類型則代表線程數*/private double count;/*** Flow control strategy based on invocation chain.* 流控限流策略* {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin);* {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource);* {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource).*/private int strategy = RuleConstant.STRATEGY_DIRECT;/*** Reference resource in flow control with relevant resource or context.* 關聯流控的資源*/private String refResource;/*** Rate limiter control behavior. 流控效果控制* 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter*/private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;//對應流控效果為Warm Up情況下,冷啟動時長(預熱時長),單位秒private int warmUpPeriodSec = 10;/*** Max queueing time in rate limiter behavior.* 對應流控效果為排隊等待情況下,出現的超時時間*/private int maxQueueingTimeMs = 500;// 對應新增流控規則頁面的是否集群private boolean clusterMode;/*** Flow rule config for cluster mode.集群流控的相關配置*/private ClusterFlowConfig clusterConfig;/*** The traffic shaping (throttling) controller. 流量整形的實現,不同流控效果有不同算法*/private TrafficShapingController controller;
字段屬性有些多,可以對比sentinel 限流保護-筆記-CSDN博客?跟官網文檔來理解
RuleManager
?Sentinel 中用來管理規則配置的類都以規則類的名稱+Manger 命名,用來加載限流規則配置以及緩存限流規則配置的類為 FlowRuleManager,其部分源碼如下:
public class FlowRuleManager {// 緩存限流規則private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();// PropertyListener 監聽器private static final FlowPropertyListener LISTENER = new FlowPropertyListener();//SentinelProperty ,默認的 DynamicSentinelPropertyprivate static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();/** the corePool size of SCHEDULER must be set at 1, so the two task ({@link #startMetricTimerListener()} can run orderly by the SCHEDULER **/@SuppressWarnings("PMD.ThreadPoolCreationRule")private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,new NamedThreadFactory("sentinel-metrics-record-task", true));static {//給默認的 SentinelProperty 注冊監聽器(FlowPropertyListener)currentProperty.addListener(LISTENER);startMetricTimerListener();}。。。public static List<FlowRule> getRules() {List<FlowRule> rules = new ArrayList<FlowRule>();for (Map.Entry<String, List<FlowRule>> entry : flowRules.entrySet()) {rules.addAll(entry.getValue());}return rules;}//更新規則public static void loadRules(List<FlowRule> rules) {currentProperty.updateValue(rules);}
?我們之前demo對用FlowRuleManager.loadRules()來更新規則生效,注意1.8.6版本這里面
SentinelProperty ,SentinelProperty 是 Sentinel 提供的一個接口,可注冊到 Sentinel 提供的各種規則的 Manager,例如 FlowRuleManager,并且可以給 SentinelProperty 添加監聽器,在配置改變時,你可以調用 SentinelProperty#updateValue 方法,由它負責調用監聽器去更新規則,而不需要調用 FlowRuleManager#loadRules 方法。這塊先先不展開梳理,后面結合nacos再看。
限流處理器插槽:FlowSlot
FlowSlot 是實現限流功能的切入點,它作為 ProcessorSlot 插入到 ProcessorSlotChain 鏈表中,在 entry 方法中調用 Checker 去判斷是否需要拒絕當前請求,如果需要拒絕請求則拋出 Block 異常。FlowSlot 的源碼如下:
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}/*** Package-private for test.** @param checker flow rule checker* @since 1.6.1*/FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}//校驗是否限流void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}// 規則生產者private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Override // 參數為資源名稱public Collection<FlowRule> apply(String resource) {// Flow rule map should not be null.Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();return flowRules.get(resource);}};
}
限流規則檢查器:FlowRuleChecker
FlowRuleChecker 負責判斷是否需要拒絕當前請求,方法很多,先看看調用checkFlow
public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}// 獲取匹配的規則Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//遍歷規則for (FlowRule rule : rules) { // 檢查規則能否通過if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}
注意:遍歷限流規則,只要有一個限流規則達到限流閾值即可拋出 FlowException,使用 FlowException 目的是標志當前請求因為達到限流閾值被拒絕,FlowException 是 BlockException 的子類;
canPassCheck 方法返回 true 說明允許請求通過,反之則不允許通過。canPassCheck 方法源碼如下:
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {String limitApp = rule.getLimitApp();//當前限流規則只對哪個調用來源生效,一般不為null,默認為“default”(不限定調用來源)if (limitApp == null) {return true;}// 集群模式下的規則檢測if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}//單機模式下規則檢測return passLocalCheck(rule, context, node, acquireCount, prioritized);}
先不討論集群限流的情況,看看單機的passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {//根據調用來源和“調用關系限流策略”選擇 DefaultNode;Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
- selectNodeByRequesterAndStrategy返回ClusterBuilderSlot階段生成的ClusterNode
-
getRater 返回TrafficShapingController,在默認模式下返回流控效果策略DefaultController。DefaultController#canPass 完成canPassCheck。
下面分別看看這兩個方法。
流控節點選擇
selectNodeByRequesterAndStrategy 方法的實現有多種情況。原碼如下:
public class FlowRuleChecker {static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {// The limit app should not be empty.限流規則針對哪個來源生效String limitApp = rule.getLimitApp();// 基于調用關系的限流策略int strategy = rule.getStrategy();// 遠程來源String origin = context.getOrigin();if (limitApp.equals(origin) && filterOrigin(origin)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {// 1 Matches limit origin, return origin statistic node.return context.getOriginNode();}//2return selectReferenceNode(rule, context, node);} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {if (strategy == RuleConstant.STRATEGY_DIRECT) {//3 Return the cluster node.return node.getClusterNode();}//4return selectReferenceNode(rule, context, node);} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {if (strategy == RuleConstant.STRATEGY_DIRECT) {// 5return context.getOriginNode();}//6return selectReferenceNode(rule, context, node);}return null;}static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {String refResource = rule.getRefResource();int strategy = rule.getStrategy();if (StringUtil.isEmpty(refResource)) {return null;}if (strategy == RuleConstant.STRATEGY_RELATE) {return ClusterBuilderSlot.getClusterNode(refResource);}if (strategy == RuleConstant.STRATEGY_CHAIN) {if (!refResource.equals(context.getName())) {return null;}return node;}// No node.return null;}
如果當前限流規則的 limitApp 為 default,則說明該限流規則對任何調用來源都生效,針對所有調用來源限流,否則只針對指定調用來源限流。
1?如果調用來源與當前限流規則的 limitApp 相等,且 strategy 為 STRATEGY_DIRECT,則使用調用來源的 StatisticNode,實現針對調用來源限流。
2?前置條件與(1)相同,依然是針對來源限流。selectReferenceNode
- strategy 為 STRATEGY_RELATE:使用引用資源的 ClusterNode;
- strategy 為 STRATEGY_CHAIN:使用當前資源的 DefauleNode。
3當 limitApp 為 default 時,針對所有來源限流。如果 strategy 為 STRATEGY_DIRECT,則使用當前資源的 ClusterNode。
4?前置條件與(3)相同,依然是針對所有來源限流。selectReferenceNode
5?如果 limitApp 為 other,且該資源的所有限流規則都沒有針對當前的調用來源限流。如果 strategy 為 STRATEGY_DIRECT,則使用 origin 的 StatisticNode。
6??前置條件與(5)一樣。selectReferenceNode
從 selectNodeByRequesterAndStrategy 方法可以看出,Sentinel 之所以針對每個資源統計訪問來源的指標數據,也是為了實現對豐富的限流策略的支持.比如針對調用來源限流可限制并發量較高的來源服務的請求,而對并發量低的來源服務的請求可不限流,或者是對一些并沒有那么重要的來源服務限流。
TrafficShapingController
Sentinel 支持對超出限流閾值的流量采取效果控制器控制這些流量,流量效果控制支持:直接拒絕、Warm Up(冷啟動)、勻速排隊。對應 FlowRule 中的 controlBehavior 字段。在調用 FlowRuleManager#loadRules 方法時,FlowRuleManager 會將限流規則配置的 controlBehavior 轉為對應的 TrafficShapingController。
public interface TrafficShapingController {/*** Check whether given resource entry can pass with provided count.* 判斷當前請求是否能通過* @param node resource node* @param acquireCount count to acquire* @param prioritized whether the request is prioritized* @return true if the resource entry can pass; false if it should be blocked*/boolean canPass(Node node, int acquireCount, boolean prioritized);/*** Check whether given resource entry can pass with provided count.* 判斷當前請求是否能通過* @param node resource node* @param acquireCount count to acquire* @return true if the resource entry can pass; false if it should be blocked*/boolean canPass(Node node, int acquireCount);
}
DefaultController
DefaultController 是默認使用的流量效果控制器,直接拒絕超出閾值的請求。當 QPS 超過限流規則配置的閾值,新的請求就會被立即拒絕,拋出 FlowException。
@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {// 獲取當前已使用的token:qps 算每秒被放行的請求數,threads 統計的當前并行占用的線程數int curCount = avgUsedTokens(node);//當前已使用token + 獲取的token 大于token數量的場景if (curCount + acquireCount > count) {//qps 且prioritized 參數的值為 true(有優先級的請求可以占用未來時間窗口的統計指標)if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();//當前請求需要等待的時間,單位毫秒waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {//將休眠之后對應的時間窗口的 pass(通過)這項指標數據的值加上 acquireCountnode.addWaitingRequest(currentTime + waitInMs, acquireCount);// 添加占用未來的 pass 指標的數量node.addOccupiedPass(acquireCount);// 休眠等待,當前線程阻塞sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.//休眠結束后拋出 PriorityWait 異常,表示當前請求是等待了 waitInMs 之后通過的throw new PriorityWaitException(waitInMs);}}return false;}return true;}private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
?一般情況下,prioritized 參數的值為 false,所以這個 canPass 方法實現的流量效果就是直接拒絕。
限于篇幅,其他限流RateLimiterController、WarmUpController、WarmUpRateLimiterController 待整理。