主流熔斷方案選型
1. Netflix Hystrix (經典但已停止維護)
適用場景:傳統Spring Cloud項目,需要快速集成熔斷功能
優點:
成熟穩定,社區資源豐富
與Spring Cloud Netflix套件無縫集成
提供熔斷、降級、隔離等完整功能
缺點:
已停止維護
配置相對復雜
2. Resilience4j (推薦替代方案)
適用場景:新項目或替代Hystrix的項目
優點:
輕量級,函數式編程風格
模塊化設計(熔斷、限流、重試、降級等可單獨使用)
持續維護更新
缺點:
社區生態相對Hystrix較小
Spring Cloud OpenFeign 默認整合的是 Hystrix(已廢棄) 或 Resilience4j,取決于 Spring Cloud 版本。
Spring Cloud 2020.x 之后
Hystrix 移除
推薦使用 Resilience4j 作為 Feign 的容錯機制
3. Sentinel (阿里開源)
適用場景:需要豐富流量控制功能的分布式系統(例如流控、熔斷、熱點、授權、系統規則)
優點:
豐富的流量控制手段(熔斷、限流、系統保護)
實時監控和控制臺
阿里大規模生產驗證
缺點:
學習曲線較陡
文檔主要為中文
?熔斷選型概覽
選型 | 技術/框架 | 特點 | 適用場景 |
---|---|---|---|
Hystrix | Netflix Hystrix(已維護停止) | 經典、成熟、線程隔離,配置復雜,Spring Cloud 2020+ 已移除 | 老項目維護 |
Resilience4j | Java8+ 函數式,輕量無線程池隔離 | 新項目推薦,性能優 | Spring Boot/Spring Cloud 2020+ |
Sentinel | Alibaba Sentinel | 規則豐富,適合流控+熔斷,適配 Dubbo、Spring Cloud | 微服務高并發治理 |
Spring Cloud CircuitBreaker | Spring Cloud 封裝,默認整合 Resilience4j 或 Sentinel | 標準方案 | 與 Spring Cloud 配合使用 |
選型建議
傳統項目維護:如果已經是Spring Cloud Netflix技術棧,繼續使用Hystrix
新項目:推薦使用Resilience4j,它更現代且維護良好
高流量復雜場景:考慮Sentinel,特別是有豐富流量控制需求的系統
一、hystrix實現
1.添加配置文件application.yml
# application.yml
hystrix:command:GlobalCircuitBreaker:execution:isolation:thread:timeoutInMilliseconds: 5000circuitBreaker:requestVolumeThreshold: 20sleepWindowInMilliseconds: 10000errorThresholdPercentage: 50threadpool:default:coreSize: 50maxQueueSize: 1000queueSizeRejectionThreshold: 500
命令屬性 (hystrix.command.GlobalCircuitBreaker)
execution.isolation.thread.timeoutInMilliseconds: 5000
設置命令執行的超時時間為5000毫秒(5秒)
如果命令執行超過這個時間,將被標記為超時失敗
circuitBreaker.requestVolumeThreshold: 20
熔斷器觸發的最小請求數量閾值(滾動窗口內)
只有在20個請求之后,熔斷器才會根據錯誤率判斷是否開啟
circuitBreaker.sleepWindowInMilliseconds: 10000
熔斷器開啟后的休眠時間(10秒)
熔斷開啟后,經過這段時間會允許一個測試請求嘗試訪問服務
circuitBreaker.errorThresholdPercentage: 50
錯誤百分比閾值(50%)
當滾動窗口內請求的錯誤率超過這個百分比時,熔斷器將開啟
線程池屬性 (hystrix.threadpool.default)
coreSize: 50
線程池核心線程數
表示可以同時處理的最大并發請求數
maxQueueSize: 1000
線程池隊列最大容量
當所有核心線程都忙時,請求會被放入隊列,直到達到此上限
queueSizeRejectionThreshold: 500
隊列拒絕閾值
即使maxQueueSize是1000,當隊列達到500時就會開始拒絕新請求
這是為防止隊列過滿而設置的安全閥值
2.添加Application上開啟注解
@EnableCircuitBreaker // 啟用Hystrix斷路器
@EnableHystrixDashboard // 可選:啟用Hystrix儀表盤
public class AuthApplication {@BeanRestTemplate restTemplate() {return new RestTemplate();}public static void main(String[] args) {log.info("啟動授權模塊...............................................");SpringApplication.run(AuthApplication.class, args);}
}
3. 自定義注解,方便使用
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CustomHystrix {String groupKey() default "DefaultGroup";String commandKey() default "";String fallbackMethod() default "";/*** 設置命令執行的超時時間為3000毫秒(5秒)* 如果命令執行超過這個時間,將被標記為超時失敗* @return*/int timeout() default 3000;boolean circuitBreaker() default true;/*** 熔斷器觸發的最小請求數量閾值(滾動窗口內)** 只有在20個請求之后,熔斷器才會根據錯誤率判斷是否開啟* @return*/int requestVolumeThreshold() default 20;/*** 熔斷器開啟后的休眠時間(10秒)** 熔斷開啟后,經過這段時間會允許一個測試請求嘗試訪問服務* @return*/int sleepWindow() default 5000;/*** 錯誤百分比閾值(50%)** 當滾動窗口內請求的錯誤率超過這個百分比時,熔斷器將開啟* @return*/int errorThresholdPercentage() default 50;boolean semaphore() default false;
}
?4, 創建Hystrix切面
import com.cbim.auth.annotion.CustomHystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;@Aspect
@Component
@ConditionalOnClass(HystrixCommand.class)
public class HystrixAspect {// 默認配置private static final HystrixCommand.Setter defaultSetter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("DefaultGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("DefaultCommand")).andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(3000).withCircuitBreakerRequestVolumeThreshold(20).withCircuitBreakerSleepWindowInMilliseconds(5000));@Around("@annotation(com.cbim.auth.annotion.CustomHystrix)")public Object hystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();CustomHystrix annotation = method.getAnnotation(CustomHystrix.class);// 動態配置HystrixCommand.Setter setter = buildSetter(annotation);return new HystrixCommand<Object>(setter) {@Overrideprotected Object run() throws Exception {try {return joinPoint.proceed();} catch (Throwable throwable) {throw new Exception(throwable);}}@Overrideprotected Object getFallback() {try {if (!annotation.fallbackMethod().isEmpty()) {Method fallbackMethod = joinPoint.getTarget().getClass().getMethod(annotation.fallbackMethod(), method.getParameterTypes());return fallbackMethod.invoke(joinPoint.getTarget(), joinPoint.getArgs());}return null;} catch (Exception e) {throw new HystrixRuntimeException(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION,this.getClass(),"Fallback failure",e,null);}}}.execute();}private HystrixCommand.Setter buildSetter(CustomHystrix annotation) {return HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(annotation.groupKey())).andCommandKey(HystrixCommandKey.Factory.asKey(annotation.commandKey())).andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(annotation.timeout()).withCircuitBreakerEnabled(annotation.circuitBreaker()).withCircuitBreakerRequestVolumeThreshold(annotation.requestVolumeThreshold()).withCircuitBreakerSleepWindowInMilliseconds(annotation.sleepWindow()).withCircuitBreakerErrorThresholdPercentage(annotation.errorThresholdPercentage()).withExecutionIsolationStrategy(annotation.semaphore() ?HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE :HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));}
}
5. 在userServiceImpl中
@CustomHystrix(groupKey = "OrderService",fallbackMethod = "getOrderFallback",timeout = 2000)public String health() {try {Thread.sleep(5000);} catch (InterruptedException e) {}log.info("health check");return "success";}public String getOrderFallback() {return "被熔斷了,請稍后重試";}
6, 進行測試
GET http://localhost:9999/health
二、?Resilience4j
核心模塊
resilience4j-circuitbreaker?- 斷路器模式
防止應用程序在遠程服務/資源故障時不斷嘗試執行可能失敗的操作
提供三種狀態:關閉、打開和半開
支持自定義失敗率閾值、等待時間和環形緩沖區大小
resilience4j-ratelimiter?- 速率限制
控制對某些操作的調用頻率
支持自定義限制刷新周期、限制持續時間和超時設置
resilience4j-bulkhead?- 艙壁隔離
限制并發執行的數量
提供兩種實現:
信號量隔離(SemaphoreBulkhead)
固定線程池隔離(ThreadPoolBulkhead)
resilience4j-retry?- 自動重試
為失敗的操作提供自動重試機制
支持自定義重試次數、等待間隔和異常條件
resilience4j-timelimiter?- 超時控制(僅僅只能在異步任務中使用)
為異步操作設置時間限制
防止長時間運行的操作阻塞調用者
執行順序:Retry → CircuitBreaker → RateLimiter → Bulkhead
思考
一、?ratelimiter?和Bulkhead是否有沖突?
不沖突
模式 | 作用 | 控制維度 |
---|---|---|
Bulkhead (艙壁隔離) | 限制并發數量,并隔離資源 | 并發調用數(同一時刻允許的最大請求數) |
RateLimiter (限流) | 限制單位時間的請求次數,保護服務免受過載 | QPS 或 TPS(單位時間允許的請求數) |
RateLimiter:限流保護,避免流量超標
Bulkhead:并發隔離,防止某服務獨占資源
二、Bulkhead 和數據庫連接池是否重復
Bulkhead 和數據庫連接池的區別與聯系
維度 | Bulkhead | 數據庫連接池 |
---|---|---|
本質作用 | 應用層的并發調用隔離,限制方法級別同時執行的調用數 | 連接資源的復用與管理,控制最大數據庫連接數 |
控制范圍 | 微服務線程(或者調用)層面 | 數據庫連接資源層面 |
工作目標 | 防止某個接口/服務過載導致線程池或業務資源耗盡 | 限制數據庫最大連接數,防止數據庫過載 |
觸發點 | 請求進入服務時,根據配置限制并發數量 | 請求申請數據庫連接時,根據連接池大小控制 |
重疊部分 | 可能重疊限制調用并發數 | 控制實際連接數量 |
是否重復 | 不算重復,是應用層和資源層的多層保護 | - |
為什么 Bulkhead 不等于數據庫連接池,且二者都必不可少?
數據庫連接池 限制最大連接數,保障數據庫端資源不被耗盡。
Bulkhead 在業務調用層限制并發,防止線程池被過度占用,避免線程等待數據庫連接時線程積壓,造成系統阻塞甚至雪崩。
舉例:
假設數據庫連接池大小是 10,若沒有 Bulkhead 限制,系統可能會有 100 個線程同時請求數據庫,導致大量線程在等待連接,線程池耗盡。
加上 Bulkhead 限制為 20,業務接口最多允許 20 個并發調用訪問數據庫,從而保護線程池和數據庫連接池都不被耗盡。
Bulkhead 與數據庫連接池是不同層次的保護機制,彼此互補,不沖突不重復
設計時需要合理設置兩者參數,協同保護微服務的線程資源和數據庫資源
在微服務中防止某個業務占用完整的數據庫連接池。
三、retry 重試和 ribbon是否重復?
1. 兩者的區別
Resilience4j Retry:
應用層面:在業務邏輯層或服務調用層實現
功能全面:支持自定義重試策略(固定間隔、指數退避等)
異常過濾:可配置針對特定異常才重試
上下文感知:可以獲取重試次數等上下文信息
組合彈性:可與熔斷、限流等其他彈性模式組合使用
Ribbon Retry:
網絡層面:在HTTP客戶端層實現
功能基礎:主要處理網絡層面的瞬時故障
自動重試:對連接失敗、超時等自動重試
負載均衡:與Ribbon的負載均衡策略結合
2. 是否重復?
不完全是重復,而是不同層次的容錯機制:
Ribbon重試解決的是"網絡層面"的瞬時故障(如TCP連接失敗、請求超時)
Resilience4j Retry解決的是"業務層面"的臨時故障(如依賴服務返回5xx錯誤)
四、RateLimiter限流與Gateway限流的關系
1. 兩者的區別
Resilience4j RateLimiter:
應用級別:保護單個服務實例
精細控制:可針對不同方法/接口設置不同限流規則
快速失敗:超出限制立即拒絕或等待
組合彈性:可與熔斷、重試等組合使用
Gateway限流(如Spring Cloud Gateway RedisRateLimiter):
全局級別:在API網關層實現
集群限流:基于Redis等實現分布式限流
入口控制:保護整個系統入口
網關特性:與路由、鑒權等網關功能集成
2. 是否重復?
不是重復,而是不同層次的流量控制:
Gateway限流是"全局第一道防線",防止流量洪峰沖擊系統
Resilience4j限流是"服務內部精細控制",防止單個服務過載
Ribbon重試:快速重試(ms級),解決網絡抖動
Resilience4j重試:帶退避策略(s級),解決服務暫時不可用
實現
1,添加配置文件application.yml
resilience4j:circuitbreaker:instances:userService:registerHealthIndicator: true # 是否注冊健康指標(供Actuator監控)failureRateThreshold: 50 # 失敗率閾值百分比,達到此值將觸發熔斷minimumNumberOfCalls: 2 # 計算失敗率前所需的最小調用次數automaticTransitionFromOpenToHalfOpenEnabled: true # 是否自動從OPEN轉為HALF_OPENwaitDurationInOpenState: 5s # OPEN狀態持續時間,之后嘗試轉為HALF_OPENpermittedNumberOfCallsInHalfOpenState: 3 # HALF_OPEN狀態下允許的調用次數slidingWindowType: TIME_BASED # 滑動窗口類型(COUNT_BASED基于調用次數,TIME_BASED基于時間)slidingWindowSize: 4 # 滑動窗口大小(基于次數則為調用次數,基于時間則為秒數)recordExceptions: # 記錄為失敗的異常列表- org.springframework.web.client.HttpServerErrorException- java.io.IOException- java.util.concurrent.TimeoutException- java.lang.RuntimeExceptionignoreExceptions: # 忽略不計為失敗的異常列表- com.cbim.exception.CbimExceptionretry:instances:userServiceRetry:maxAttempts: 3 # 最大重試次數(包括初始調用)waitDuration: 500ms # 重試之間的等待時間retryExceptions: # 觸發重試的異常列表- org.springframework.web.client.ResourceAccessException- java.lang.RuntimeExceptionignoreExceptions:- java.lang.IllegalArgumentExceptionratelimiter:instances:userServiceRateLimiter:limitForPeriod: 10 # 限制每個刷新周期內的調用數量limitRefreshPeriod: 1s # 限制刷新周期timeoutDuration: 100ms # 等待獲取許可的最大時間bulkhead:instances:userServiceBulkhead:maxConcurrentCalls: 2 # 最大并發調用數maxWaitDuration: 0 # 嘗試進入艙壁時的最大等待時間timelimiter:instances:userServiceTimeLimiter:timeoutDuration: 1s # 方法調用超時時間cancelRunningFuture: true # 是否取消正在執行的Future
management:endpoint:health:show-details: always # 顯示詳細信息group:circuitbreakers:include: circuitBreakers # 創建專門的分組endpoints:web:exposure:include: health,circuitbreakers # 暴露所需端點health:circuitbreakers:enabled: true # 啟用熔斷器健康指示器
logging:level:io.github.resilience4j: DEBUGorg.springframework.aop: DEBUG
2.在pom文件中添加依賴
<!-- Resilience4j核心依賴 --><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-spring-boot2</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-circuitbreaker</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-retry</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-ratelimiter</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-bulkhead</artifactId><version>1.7.1</version></dependency><dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-timelimiter</artifactId><version>1.7.1</version></dependency><!-- Spring AOP支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency>
3.編寫controller代碼
package com.cbim.order.controller;import com.cbim.auth.service.UserService;
import com.cbim.order.annotion.Resilient;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.cbim.order.entity.User;
import org.springframework.web.client.ResourceAccessException;import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;@RestController
@RequestMapping("/users")
@Slf4j
public class UserController {@Resourceprivate UserService userService;/*** for i in {1..20}; do curl http://172.19.80.1:9997/users/3; done* @param id* @return*/@GetMapping("/{id}")
// @Resilient(fallbackMethod = "getUserFallback")@CircuitBreaker(name = "userService", fallbackMethod = "getUserFallback")@RateLimiter(name = "userServiceRateLimiter")@Bulkhead(name = "userServiceBulkhead")public ResponseEntity<User> getUser(@PathVariable Long id){User user = userService.getUserById(id);return ResponseEntity.ok(user);}// 多個fallback方法可以根據異常類型區分處理private ResponseEntity<User> getUserFallback(Long id, Exception e) {log.warn("Fallback triggered for user id: {}, exception: {}", id, e.getClass().getSimpleName());User fallbackUser = new User(id, "Fallback User", "fallback@example.com");return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(fallbackUser);}private ResponseEntity<User> getUserFallback(Long id, BulkheadFullException e) {log.warn("Bulkhead full for user id: {}", id);User fallbackUser = new User(id, "System Busy", "try.again.later@example.com");return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(fallbackUser);}public ResponseEntity<User> getUserFallback(Long id, Throwable t){log.warn("Fallback called for user id {} due to {}", id, t.toString());return ResponseEntity.ok(new User(23L, "Fallback User", "fallback@example.com"));}/*** TimeLimiter 僅僅只能使用異步函數,同步函數無法使用* @param id* @return*/@TimeLimiter(name = "userServiceTimeLimiter", fallbackMethod = "getUserFallback2")@GetMapping("/TL/{id}")public CompletableFuture<ResponseEntity<User>> getUser2(@PathVariable Long id){return CompletableFuture.supplyAsync(() -> {if (id == 0) {throw new RuntimeException("Simulated failure");}User user = userService.getUserById(id);return ResponseEntity.ok(user);});}public CompletableFuture<ResponseEntity<User>> getUserFallback2(Long id, Throwable t){log.warn("Fallback called due to {}", t.toString());return CompletableFuture.completedFuture(ResponseEntity.ok(new User(0L, "Fallback User", "fallback@example.com")));}}
4. service 和其實現代碼
public interface UserService {User getUserById(Long id);
}
@Slf4j
@RefreshScope
@Service
public class UserServiceImpl implements UserService {private AtomicInteger counter = new AtomicInteger(0);@Override@Retry(name = "userServiceRetry")public User getUserById(Long id) {int attempt = counter.incrementAndGet();System.out.println("Attempt: " + attempt);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}if (id % 3 == 0) { // 模擬30%失敗率throw new RuntimeException("Simulated error");}return new User(id, "qinqingqing01", "123456");}
}
5.進行測試驗證
for i in {1..11}; do curl http://172.19.80.1:9997/users/3; done
for i in {1..11}; do curl http://172.19.80.1:9997/users/TL/1; done
在測試之前需要確定斷路器是否生效