概要
Spring實現了一套重試機制,功能簡單實用。Spring Retry是從Spring Batch獨立出來的一個功能,已經廣泛應用于Spring Batch,Spring Integration, Spring for Apache Hadoop等Spring項目。本文將講述如何使用Spring Retry及其實現原理。
背景
重試,其實很多時候都需要的,為了保證容錯性,可用性,一致性等。一般用來應對外部系統的一些不可預料的返回、異常等,特別是網絡延遲,中斷等情況。還有在現在流行的微服務治理框架中,通常都有自己的重試與超時配置,比如dubbo可以設置retries=1,timeout=500調用失敗只重試1次,超過500ms調用仍未返回則調用失敗。如果要做重試,要為特定的某個操作做重試功能,則要硬編碼,大概邏輯基本都是寫個循環,根據返回或異常,計數失敗次數,然后設定退出條件。這樣做,且不說每個操作都要寫這種類似的代碼,而且重試邏輯和業務邏輯混在一起,給維護和擴展帶來了麻煩。從面向對象的角度來看,我們應該把重試的代碼獨立出來。
使用介紹
?先舉個例子:
@Configuration
@EnableRetry
public class Application {@Beanpublic RetryService retryService(){return new RetryService();}public static void main(String[] args) throws Exception{ApplicationContext applicationContext = new AnnotationConfigApplicationContext("springretry");RetryService service1 = applicationContext.getBean("service", RetryService.class);service1.service();}
}@Service("service")
public class RetryService {@Retryable(value = IllegalAccessException.class, maxAttempts = 5,backoff= @Backoff(value = 1500, maxDelay = 100000, multiplier = 1.2))public void service() throws IllegalAccessException {System.out.println("service method...");throw new IllegalAccessException("manual exception");}@Recoverpublic void recover(IllegalAccessException e){System.out.println("service retry after Recover => " + e.getMessage());}}
@EnableRetry - 表示開啟重試機制?
@Retryable - 表示這個方法需要重試,它有很豐富的參數,可以滿足你對重試的需求?
@Backoff - 表示重試中的退避策略?
@Recover - 兜底方法,即多次重試后還是失敗就會執行這個方法
Spring-Retry 的功能豐富在于其重試策略和退避策略,還有兜底,監聽器等操作。
?
重試策略
看一下Spring Retry自帶的一些重試策略,主要是用來判斷當方法調用異常時是否需要重試。(下文原理部分會深入分析實現)
?
-
SimpleRetryPolicy 默認最多重試3次
-
TimeoutRetryPolicy 默認在1秒內失敗都會重試
-
ExpressionRetryPolicy 符合表達式就會重試
-
CircuitBreakerRetryPolicy 增加了熔斷的機制,如果不在熔斷狀態,則允許重試
-
CompositeRetryPolicy 可以組合多個重試策略
-
NeverRetryPolicy 從不重試(也是一種重試策略哈)
-
AlwaysRetryPolicy 總是重試
退避策略
看一下退避策略,退避是指怎么去做下一次的重試,在這里其實就是等待多長時間。(下文原理部分會深入分析實現)
-
FixedBackOffPolicy 默認固定延遲1秒后執行下一次重試
-
ExponentialBackOffPolicy 指數遞增延遲執行重試,默認初始0.1秒,系數是2,那么下次延遲0.2秒,再下次就是延遲0.4秒,如此類推,最大30秒。
-
ExponentialRandomBackOffPolicy 在上面那個策略上增加隨機性
-
UniformRandomBackOffPolicy 這個跟上面的區別就是,上面的延遲會不停遞增,這個只會在固定的區間隨機
-
StatelessBackOffPolicy 這個說明是無狀態的,所謂無狀態就是對上次的退避無感知,從它下面的子類也能看出來
原理
原理部分分開兩部分來講,一是重試機制的切入點,即它是如何使得你的代碼實現重試功能的;二是重試機制的詳細,包括重試的邏輯以及重試策略和退避策略的實現。
切入點
@EnableRetry
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(proxyTargetClass = false)
@Import(RetryConfiguration.class)
@Documented
public @interface EnableRetry {/*** Indicate whether subclass-based (CGLIB) proxies are to be created as opposed* to standard Java interface-based proxies. The default is {@code false}.** @return whether to proxy or not to proxy the class*/boolean proxyTargetClass() default false;}
可以看到
@EnableAspectJAutoProxy(proxyTargetClass = false)
這個并不陌生,就是打開Spring AOP功能。重點看看
@Import(RetryConfiguration.class)
@Import相當于注冊這個Bean
我們看看這個RetryConfiguration
是個什么東西
它是一個AbstractPointcutAdvisor,它有一個pointcut和一個advice。
我們知道,在IOC過程中會根據PointcutAdvisor類來對Bean進行Pointcut的過濾,
然后生成對應的AOP代理類,用advice來加強處理。
看看RetryConfiguration的初始化:
@PostConstructpublic void init() {Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);retryableAnnotationTypes.add(Retryable.class);//創建pointcutthis.pointcut = buildPointcut(retryableAnnotationTypes);//創建advicethis.advice = buildAdvice();if (this.advice instanceof BeanFactoryAware) {((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);}}protected Pointcut buildPointcut(Set<Class<? extends Annotation>> retryAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> retryAnnotationType : retryAnnotationTypes) {Pointcut filter = new AnnotationClassOrMethodPointcut(retryAnnotationType);if (result == null) {result = new ComposablePointcut(filter);}else {result.union(filter);}}return result;}
上面代碼用到了AnnotationClassOrMethodPointcut,
其實它最終還是用到了AnnotationMethodMatcher來根據注解進行切入點的過濾。
這里就是@Retryable注解了。
//創建advice對象,即攔截器protected Advice buildAdvice() {//下面關注這個對象AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();if (retryContextCache != null) {interceptor.setRetryContextCache(retryContextCache);}if (retryListeners != null) {interceptor.setListeners(retryListeners);}if (methodArgumentsKeyGenerator != null) {interceptor.setKeyGenerator(methodArgumentsKeyGenerator);}if (newMethodArgumentsIdentifier != null) {interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);}if (sleeper != null) {interceptor.setSleeper(sleeper);}return interceptor;
}
?
AnnotationAwareRetryOperationsInterceptor
繼承關系
可以看出AnnotationAwareRetryOperationsInterceptor是一個MethodInterceptor,
在創建AOP代理過程中如果目標方法符合pointcut的規則,它就會加到interceptor列表中,
然后做增強,我們看看invoke方法做了什么增強。
@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());if (delegate != null) {return delegate.invoke(invocation);}else {return invocation.proceed();}}
這里用到了委托,主要是需要根據配置委托給具體“有狀態”的interceptor還是“無狀態”的interceptor。
private MethodInterceptor getDelegate(Object target, Method method) {if (!this.delegates.containsKey(target) || !this.delegates.get(target).containsKey(method)) {synchronized (this.delegates) {if (!this.delegates.containsKey(target)) {this.delegates.put(target, new HashMap<Method, MethodInterceptor>());}Map<Method, MethodInterceptor> delegatesForTarget = this.delegates.get(target);if (!delegatesForTarget.containsKey(method)) {Retryable retryable = AnnotationUtils.findAnnotation(method, Retryable.class);if (retryable == null) {retryable = AnnotationUtils.findAnnotation(method.getDeclaringClass(), Retryable.class);}if (retryable == null) {retryable = findAnnotationOnTarget(target, method);}if (retryable == null) {return delegatesForTarget.put(method, null);}MethodInterceptor delegate;//支持自定義MethodInterceptor,而且優先級最高if (StringUtils.hasText(retryable.interceptor())) {delegate = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);}else if (retryable.stateful()) {//得到“有狀態”的interceptordelegate = getStatefulInterceptor(target, method, retryable);}else {//得到“無狀態”的interceptordelegate = getStatelessInterceptor(target, method, retryable);}delegatesForTarget.put(method, delegate);}}}return this.delegates.get(target).get(method);}
?
getStatefulInterceptor和getStatelessInterceptor都是差不多,
先看看比較簡單的getStatelessInterceptor。
getStatelessInterceptor。
private MethodInterceptor getStatelessInterceptor(Object target, Method method, Retryable retryable) {//生成一個RetryTemplateRetryTemplate template = createTemplate(retryable.listeners());//生成retryPolicytemplate.setRetryPolicy(getRetryPolicy(retryable));//生成backoffPolicytemplate.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));return RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(getRecoverer(target, method)).build();}
具體生成retryPolicy和backoffPolicy的規則,等下再回頭來看。
RetryInterceptorBuilder其實就是為了生成RetryOperationsInterceptor
。
RetryOperationsInterceptor也是一個MethodInterceptor,
來看看它的invoke
方法。
public Object invoke(final MethodInvocation invocation) throws Throwable {String name;if (StringUtils.hasText(label)) {name = label;} else {name = invocation.getMethod().toGenericString();}final String label = name;//定義了一個RetryCallback,其實看它的doWithRetry方法,調用了invocation的proceed()方法,是不是有點眼熟,這就是AOP的攔截鏈調用,如果沒有攔截鏈,那就是對原來方法的調用。RetryCallback<Object, Throwable> retryCallback = new RetryCallback<Object, Throwable>() {public Object doWithRetry(RetryContext context) throws Exception {context.setAttribute(RetryContext.NAME, label);/** If we don't copy the invocation carefully it won't keep a reference to* the other interceptors in the chain. We don't have a choice here but to* specialise to ReflectiveMethodInvocation (but how often would another* implementation come along?).*/if (invocation instanceof ProxyMethodInvocation) {try {return ((ProxyMethodInvocation) invocation).invocableClone().proceed();}catch (Exception e) {throw e;}catch (Error e) {throw e;}catch (Throwable e) {throw new IllegalStateException(e);}}else {throw new IllegalStateException("MethodInvocation of the wrong type detected - this should not happen with Spring AOP, " +"so please raise an issue if you see this exception");}}};if (recoverer != null) {ItemRecovererCallback recoveryCallback = new ItemRecovererCallback(invocation.getArguments(), recoverer);return this.retryOperations.execute(retryCallback, recoveryCallback);}//最終還是進入到retryOperations的execute方法,這個retryOperations就是在之前的builder set進來的RetryTemplate。return this.retryOperations.execute(retryCallback);}
無論是RetryOperationsInterceptor
還是StatefulRetryOperationsInterceptor
,
最終的攔截處理邏輯還是調用到RetryTemplate的execute方法,
從名字也看出來,RetryTemplate作為一個模板類,里面包含了重試統一邏輯。
不過,我看這個RetryTemplate并不是很“模板”,因為它沒有很多可以擴展的地方。
?
重試邏輯及策略實現
上面介紹了Spring Retry利用了AOP代理使重試機制對業務代碼進行“入侵”。下面我們繼續看看重試的邏輯做了什么。RetryTemplate的doExecute方法。
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback, RetryState state)throws E, ExhaustedRetryException {RetryPolicy retryPolicy = this.retryPolicy;BackOffPolicy backOffPolicy = this.backOffPolicy;//新建一個RetryContext來保存本輪重試的上下文RetryContext context = open(retryPolicy, state);if (this.logger.isTraceEnabled()) {this.logger.trace("RetryContext retrieved: " + context);}// Make sure the context is available globally for clients who need// it...RetrySynchronizationManager.register(context);Throwable lastException = null;boolean exhausted = false;try {//如果有注冊RetryListener,則會調用它的open方法,給調用者一個通知。boolean running = doOpenInterceptors(retryCallback, context);if (!running) {throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");}// Get or Start the backoff context...BackOffContext backOffContext = null;Object resource = context.getAttribute("backOffContext");if (resource instanceof BackOffContext) {backOffContext = (BackOffContext) resource;}if (backOffContext == null) {backOffContext = backOffPolicy.start(context);if (backOffContext != null) {context.setAttribute("backOffContext", backOffContext);}}//判斷能否重試,就是調用RetryPolicy的canRetry方法來判斷。//這個循環會直到原方法不拋出異常,或不需要再重試while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {try {if (this.logger.isDebugEnabled()) {this.logger.debug("Retry: count=" + context.getRetryCount());}//清除上次記錄的異常lastException = null;//doWithRetry方法,一般來說就是原方法return retryCallback.doWithRetry(context);}catch (Throwable e) {//原方法拋出了異常lastException = e;try {//記錄異常信息registerThrowable(retryPolicy, state, context, e);}catch (Exception ex) {throw new TerminatedRetryException("Could not register throwable",ex);}finally {//調用RetryListener的onError方法doOnErrorInterceptors(retryCallback, context, e);}//再次判斷能否重試if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {try {//如果可以重試則走退避策略backOffPolicy.backOff(backOffContext);}catch (BackOffInterruptedException ex) {lastException = e;// back off was prevented by another thread - fail the retryif (this.logger.isDebugEnabled()) {this.logger.debug("Abort retry because interrupted: count="+ context.getRetryCount());}throw ex;}}if (this.logger.isDebugEnabled()) {this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());}if (shouldRethrow(retryPolicy, context, state)) {if (this.logger.isDebugEnabled()) {this.logger.debug("Rethrow in retry for policy: count="+ context.getRetryCount());}throw RetryTemplate.<E>wrapIfNecessary(e);}}/** A stateful attempt that can retry may rethrow the exception before now,* but if we get this far in a stateful retry there's a reason for it,* like a circuit breaker or a rollback classifier.*/if (state != null && context.hasAttribute(GLOBAL_STATE)) {break;}}if (state == null && this.logger.isDebugEnabled()) {this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());}exhausted = true;//重試結束后如果有兜底Recovery方法則執行,否則拋異常return handleRetryExhausted(recoveryCallback, context, state);}catch (Throwable e) {throw RetryTemplate.<E>wrapIfNecessary(e);}finally {//處理一些關閉邏輯close(retryPolicy, context, state, lastException == null || exhausted);//調用RetryListener的close方法doCloseInterceptors(retryCallback, context, lastException);RetrySynchronizationManager.clear();}}
主要核心重試邏輯就是上面的代碼了,看上去還是挺簡單的。
在上面,我們漏掉了RetryPolicy的canRetry方法和BackOffPolicy的backOff方法,
以及這兩個Policy是怎么來的。
我們回頭看看
getStatelessInterceptor
方法中的
getRetryPolicy
和getRetryPolicy
方法。
private RetryPolicy getRetryPolicy(Annotation retryable) {Map<String, Object> attrs = AnnotationUtils.getAnnotationAttributes(retryable);@SuppressWarnings("unchecked")Class<? extends Throwable>[] includes = (Class<? extends Throwable>[]) attrs.get("value");String exceptionExpression = (String) attrs.get("exceptionExpression");boolean hasExpression = StringUtils.hasText(exceptionExpression);if (includes.length == 0) {@SuppressWarnings("unchecked")Class<? extends Throwable>[] value = (Class<? extends Throwable>[]) attrs.get("include");includes = value;}@SuppressWarnings("unchecked")Class<? extends Throwable>[] excludes = (Class<? extends Throwable>[]) attrs.get("exclude");Integer maxAttempts = (Integer) attrs.get("maxAttempts");String maxAttemptsExpression = (String) attrs.get("maxAttemptsExpression");if (StringUtils.hasText(maxAttemptsExpression)) {maxAttempts = PARSER.parseExpression(resolve(maxAttemptsExpression), PARSER_CONTEXT).getValue(this.evaluationContext, Integer.class);}if (includes.length == 0 && excludes.length == 0) {SimpleRetryPolicy simple = hasExpression ? new ExpressionRetryPolicy(resolve(exceptionExpression)).withBeanFactory(this.beanFactory): new SimpleRetryPolicy();simple.setMaxAttempts(maxAttempts);return simple;}Map<Class<? extends Throwable>, Boolean> policyMap = new HashMap<Class<? extends Throwable>, Boolean>();for (Class<? extends Throwable> type : includes) {policyMap.put(type, true);}for (Class<? extends Throwable> type : excludes) {policyMap.put(type, false);}boolean retryNotExcluded = includes.length == 0;if (hasExpression) {return new ExpressionRetryPolicy(maxAttempts, policyMap, true, exceptionExpression, retryNotExcluded).withBeanFactory(this.beanFactory);}else {return new SimpleRetryPolicy(maxAttempts, policyMap, true, retryNotExcluded);}}
這里簡單做一下總結。就是通過@Retryable注解中的參數,來判斷具體使用文章開頭說到的哪個重試策略,是SimpleRetryPolicy還是ExpressionRetryPolicy等。
private BackOffPolicy getBackoffPolicy(Backoff backoff) {long min = backoff.delay() == 0 ? backoff.value() : backoff.delay();if (StringUtils.hasText(backoff.delayExpression())) {min = PARSER.parseExpression(resolve(backoff.delayExpression()), PARSER_CONTEXT).getValue(this.evaluationContext, Long.class);}long max = backoff.maxDelay();if (StringUtils.hasText(backoff.maxDelayExpression())) {max = PARSER.parseExpression(resolve(backoff.maxDelayExpression()), PARSER_CONTEXT).getValue(this.evaluationContext, Long.class);}double multiplier = backoff.multiplier();if (StringUtils.hasText(backoff.multiplierExpression())) {multiplier = PARSER.parseExpression(resolve(backoff.multiplierExpression()), PARSER_CONTEXT).getValue(this.evaluationContext, Double.class);}if (multiplier > 0) {ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();if (backoff.random()) {policy = new ExponentialRandomBackOffPolicy();}policy.setInitialInterval(min);policy.setMultiplier(multiplier);policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);if (this.sleeper != null) {policy.setSleeper(this.sleeper);}return policy;}if (max > min) {UniformRandomBackOffPolicy policy = new UniformRandomBackOffPolicy();policy.setMinBackOffPeriod(min);policy.setMaxBackOffPeriod(max);if (this.sleeper != null) {policy.setSleeper(this.sleeper);}return policy;}FixedBackOffPolicy policy = new FixedBackOffPolicy();policy.setBackOffPeriod(min);if (this.sleeper != null) {policy.setSleeper(this.sleeper);}return policy;}
一樣的味道。通過@Backoff注解中的參數,來判斷具體使用文章開頭說到的哪個退避策略,是FixedBackOffPolicy還是UniformRandomBackOffPolicy等。
?
那么每個RetryPolicy都會重寫canRetry方法,然后在RetryTemplate判斷是否需要重試。我們看看SimpleRetryPolicy的
@Overridepublic boolean canRetry(RetryContext context) {Throwable t = context.getLastThrowable();//判斷拋出的異常是否符合重試的異常//還有,是否超過了重試的次數return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;}
同樣,我們看看FixedBackOffPolicy的退避方法。
protected void doBackOff() throws BackOffInterruptedException {try {//就是sleep固定的時間sleeper.sleep(backOffPeriod);}catch (InterruptedException e) {throw new BackOffInterruptedException("Thread interrupted while sleeping", e);}}
至此,重試的主要原理以及邏輯大概就是這樣了。
?
RetryContext
我覺得有必要說說RetryContext,先看看它的繼承關系。
可以看出對每一個策略都有對應的Context。
在Spring Retry里,其實每一個策略都是單例來的。
我剛開始直覺是對每一個需要重試的方法都會new一個策略,這樣重試策略之間才不會產生沖突,
但是一想就知道這樣就可能多出了很多策略對象出來,增加了使用者的負擔,這不是一個好的設計。
Spring Retry采用了一個更加輕量級的做法,就是針對每一個需要重試的方法只new一個上下文Context對象,
然后在重試時,把這個Context傳到策略里,策略再根據這個Context做重試,
而且Spring Retry還對這個Context做了cache。這樣就相當于對重試的上下文做了優化。
?
總結
Spring Retry通過AOP機制來實現對業務代碼的重試”入侵“,RetryTemplate中包含了核心的重試邏輯,還提供了豐富的重試策略和退避策略。
?
?