sentinel的限流和熔斷
- 前言
- 源碼分析
- 滑動窗口源碼
- 限流源碼
- 熔斷源碼
- 完結撒花,sentinel源碼還是挺簡單的,如有需要收藏的看官,順便也用發財的小手點點贊哈,如有錯漏,也歡迎各位在評論區評論!
前言
平時發起一個請求,在系統內部微服務之間調用就會形成一條調用鏈路,那對于每個服務要起到保護的作用,如何保護呢?本文就來做一個介紹,筆者平時是使用sentinel居多,是通過其提供的限流和熔斷的措施來保護的,于是文本來剖析sentinel的限流和熔斷;
先介紹雪崩問題:
例如a1服務——》b服務——》c服務;
a2服務——》b服務——》c服務;
a3服務——》b服務;
d服務——》a3服務;
在a1服務調用b時,往下調用c服務,因服務c故障,會占用著b服務連接至超時時間,此時又會有服務a2調b服務,再調c服務,會因服務c故障,繼續堆積連接在b服務上至連接超時,堆積到服務b性能瓶頸也會導致服務b故障,此時a3服務調用b服務也會繼續導致a3服務堆積連接,依次導致所有服務故障,這就是雪崩;
市面上解決雪崩的方案:
流量控制是限流的,用于保護下游服務,然后其余三種是下游發生了故障的前提下,保護上游服務的方案,其中艙壁模式,也是線程隔離,下游發生故障時,是為調上游的多個服務,建立屬于它們的線程池,仍然會調下游,這樣就會占用著線程;而熔斷降級則不會,下游有問題,例如上游根據調用下游的失敗比例、失敗數來決定觸發熔斷;
所以說流量控制避免雪崩,有了流量控制就不會發生雪崩,而其余三種則解決雪崩問題;
再介紹限流與熔斷的概念:
限流是保護當前服務,提前對服務做了qps限流保護;
熔斷則是某個服務因沒有限流發生了故障,然后上游調用該服務時,不會再調用了,而是在上游定義了關于該服務的降級邏輯,起到保護上游服務的作用,避免因為該服務的故障,導致雪崩問題;
那限流、熔斷、雪崩之間的關系?
更像是因為沒有限流容易導致服務提供方問題,然后服務消費方用熔斷來保護自己,從而避免雪崩;
源碼分析
在 Sentinel 里面,所有的資源都對應一個資源名稱以及一個 Entry。Entry 可以通過對主流框架的適配自動創建,也可以通過注解的方式或調用 API 顯式創建;每一個 Entry 創建的時候,同時也會創建一系列功能插槽(slot chain);
例如服務a——》服務b——》服務c,每個服務就是一個保護資源,默認是一個controller一個保護資源,在一個服務里其實可以有多個保護資源,用SentinelResource注解標識controller往后調的一個個service層的方法即可,sentinel對于保護資源的調用鏈路是如下圖:
調用鏈路就是這8個slot,簡單來說就是統計訪問的qps,判斷是否達到觸發限流(FlowSlot)或熔斷(DegradeSlot)閾值,每個slot拋出blockExeception,就代表訪問停止了,就不會觸發往后的slot調用,它是使用責任鏈模式調用的。
SentinelResource的切面,主要是調SphU.entry方法
//該方法是查找slot鏈,上面說的slot責任鏈就是指這個鏈
com.alibaba.csp.sentinel.CtSph#entryWithPriority
ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);//lookProcessChain方法:第一次訪問資源時,使用copyOnWrite的方式添加到map中,后面就從map獲取即可
ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);if (chain == null) {synchronized(LOCK) {chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);if (chain == null) {if (chainMap.size() >= 6000) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}//創建的slotChain是通過spi的方式從METAINFO目錄獲取所有的slotpublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();Iterator var3 = sortedSlotList.iterator();while(var3.hasNext()) {ProcessorSlot slot = (ProcessorSlot)var3.next();if (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);} else {//獲取的每個slot形成一個AbstractLinkedProcessorSlot鏈表,因為每個slot繼承了該類chain.addLast((AbstractLinkedProcessorSlot)slot);}}return chain;}
//創建完slotChain后,就開始執行每個slot
chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);public class DefaultProcessorSlotChain extends ProcessorSlotChain {AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {//這里是第一個slot,從這里開始調用public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {super.fireExit(context, resourceWrapper, count, args);}};
}//調用下一個slot是通過當前slot的next屬性(就是下一個slot元素)public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {if (this.next != null) {this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}
這里詳細介紹了slot調用鏈路的過程,本文主要是關注限流和熔斷,所以只需看FlowSlot和DegradeSlot即可;
滑動窗口源碼
是用StatisticSlot統計訪問資源的線程數、qps。
滑動窗口的原理:時間窗口設定qps閾值,先看看固定時間窗的缺點,一個時間窗口內是不會超出閾值,但一個時間窗的后半個時間窗和下一個時間窗的前半個時間窗加起來會超過閾值。于是將一個時間窗劃分為多個時間樣本,例如1s的時間窗劃分為5個樣本時間窗(每個200ms),即使統計到一個時間窗的后半個時間窗和后一個時間窗的前半個時間窗時,會統計每個樣本時間窗來是否達到閾值,從而解決該問題;
注意:樣本時間窗劃分得越小,統計得越準確;
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry//訪問鏈表所有的slotthis.fireEntry(context, resourceWrapper, node, count, prioritized, args);//統計線程數node.increaseThreadNum();//統計qpsnode.addPassRequest(count);
這里有覺得奇怪嗎?先是訪問所有的slot是否通過后(任何一個slot都不拋出BlockException),再統計資源的線程數和qps,其實是類似于elastic search寫文檔時,先寫內存,然后再寫transaction log文件,這樣是為了寫內存成功后再寫入transaction log文件,不至于寫入文件后,再回滾內存。同樣是為了校驗通過后,再統計,不至于統計完才校驗失敗。
基礎知識:
這里是時間窗口類
時間窗口類包含的屬性,以及包含的時間樣本窗口類WindowWrap
public void addPass(int count) {//獲取當前時間點所在的樣本窗口WindowWrap<MetricBucket> wrap = this.data.currentWindow();//將當前請求的計數量添加到當前樣本窗口的統計數據中((MetricBucket)wrap.value()).addPass(count);
}
就不往里跟了,主要是根據當前時間點統計是在當前時間窗的哪個樣本時間窗里,然后往里添加
限流源碼
對應的就是這些規則
是在FlowSlot中,根據staticFlow統計的qps、線程數,來判斷是否達到閾值,從而觸發限流
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {//檢測并應用流控規則this.checkFlow(resourceWrapper, context, node, count, prioritized);//觸發下一個slotthis.fireEntry(context, resourceWrapper, node, count, prioritized, args);
}public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider != null && resource != null) {//獲取到指定資源的所有流控規則Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());if (rules != null) {Iterator var8 = rules.iterator();//這個應用流控規則。若是無法通過則拋出異常,后續規則不再應用while(var8.hasNext()) {FlowRule rule = (FlowRule)var8.next();if (!this.canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}}
熔斷源碼
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {this.performChecking(context, resourceWrapper);this.fireEntry(context, resourceWrapper, node, count, prioritized, args);}void performChecking(Context context, ResourceWrapper r) throws BlockException {//獲取當前資源所有熔斷器List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());if (circuitBreakers != null && !circuitBreakers.isEmpty()) {Iterator var4 = circuitBreakers.iterator();CircuitBreaker cb;//逐個嘗試所有熔斷器do {if (!var4.hasNext()) {return;}cb = (CircuitBreaker)var4.next();//若沒有通過當前熔斷器,則直接拋出異常} while(cb.tryPass(context));throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}
}
一般來說,熔斷降級其實是對于服務的調用方來說的。在項目中會經常調用其它服務或者是第三方接口,而對于這些接口,一旦它們出現不穩定,就有可能導致自身服務長時間等待,從而出現響應延遲等等問題。此時服務調用方就可基于熔斷降級方式解決。一旦第三方接口響應時間過長,那么就可以使用慢調用比例規則,當出現大量長時間響應的情況,那么就直接熔斷,不去請求。雖然說熔斷降級是針對服務的調用方來說,但是Sentinel本身并沒有限制熔斷降級一定是調用其它的服務。