hystrix是一個微服務容錯組件,提供了資源隔離、服務降級、服務熔斷的功能。這一章重點分析hystrix的實現原理
1、服務降級
CAP原則是分布式系統的一個理論基礎,它的三個關鍵屬性分別是一致性、可用性和容錯性。當服務實例所在服務器承受過大的壓力或者受到網絡因素影響沒法及時響應請求時,整個任務將處于阻塞狀態,這樣的系統容錯性不高,稍有不慎就會陷入癱瘓,hystrix為此提供了一種容錯機制:當服務實例沒法及時響應請求,可以采用服務降級的方式快速失敗,維持系統的穩定性
服務降級和@HystrixCommand注解綁定,查看它的源碼
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {...String fallbackMethod() default "";}
源碼提供的信息很少,想要分析注解的功能,還得找到處理注解信息的類:HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {...// 環繞通知@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {Method method = AopUtils.getMethodFromTarget(joinPoint);Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");} else {MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));MetaHolder metaHolder = metaHolderFactory.create(joinPoint);HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();try {Object result;if (!metaHolder.isObservable()) {// 代理執行方法result = CommandExecutor.execute(invokable, executionType, metaHolder);} else {result = this.executeObservable(invokable, executionType, metaHolder);}return result;} catch (HystrixBadRequestException var9) {throw var9.getCause();} catch (HystrixRuntimeException var10) {throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);}}}
}
從命名上我們能看出這是一個切面,說明服務降級是通過aop代理實現的,跟蹤CommandExecutor的execute方法
調用鏈:
-> CommandExecutor.execute
-> castToExecutable(invokable, executionType).execute()
-> HystrixCommand.execute
-> this.queue().get()
public Future<R> queue() {// 獲取Future對象final Future<R> delegate = this.toObservable().toBlocking().toFuture();Future<R> f = new Future<R>() {...public R get() throws InterruptedException, ExecutionException {return delegate.get();}public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.get(timeout, unit);}};...
}
HystrixCommand類的queue方法返回了一個Future對象,在線程任務中常用Future對象來獲取任務執行的結果。這里的Future對象是通過this.toObservable().toBlocking().toFuture()創建的,點擊查看toObservable方法,它返回一個Observable對象
public Observable<R> toObservable() {...final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {public Observable<R> call() {return
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() : // 傳入指令執行任務AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);}};...return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {...// 有訂閱者訂閱了才創建Observable對象Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);Observable afterCache;if (requestCacheEnabled && cacheKey != null) {HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);if (fromCache != null) {toCache.unsubscribe();AbstractCommand.this.isResponseFromCache = true;return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);}afterCache = toCache.toObservable();} else {afterCache = hystrixObservable;}return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);...}});
}
Observable對象的創建任務委托了給了AbstractCommand.this.applyHystrixSemantics方法
private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {this.executionHook.onStart(_cmd);// 是否允許請求,判斷熔斷狀態if (this.circuitBreaker.allowRequest()) {final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);Action0 singleSemaphoreRelease = new Action0() {public void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {public void call(Throwable t) {AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);}};if (executionSemaphore.tryAcquire()) {try {this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());// 執行任務return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException var7) {return Observable.error(var7);}} else {return this.handleSemaphoreRejectionViaFallback();}} else {// 處于熔斷狀態,執行備用任務return this.handleShortCircuitViaFallback();}
}
this.circuitBreaker.allowReques返回true表示沒有熔斷,走executeCommandAndObserve方法
private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {...Observable execution;if ((Boolean)this.properties.executionTimeoutEnabled().get()) {// 添加了超時監控execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));} else {execution = this.executeCommandWithSpecifiedIsolation(_cmd);}...// handleFallback:不同異常狀況下使用不同的處理方法Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {public Observable<R> call(Throwable t) {Exception e = AbstractCommand.this.getExceptionFromThrowable(t);AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);if (e instanceof RejectedExecutionException) {return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {// 拋出超時異常時,做超時處理return AbstractCommand.this.handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return AbstractCommand.this.handleBadRequestByEmittingError(e);} else if (e instanceof HystrixBadRequestException) {AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);return Observable.error(e);} else {return AbstractCommand.this.handleFailureViaFallback(e);}}};...return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)// 調用handleFallback處理異常.onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {final AbstractCommand<R> originalCommand;public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {this.originalCommand = originalCommand;}public Subscriber<? super R> call(final Subscriber<? super R> child) {final CompositeSubscription s = new CompositeSubscription();child.add(s);final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {public void run() {// 3.拋出超時異常child.onError(new HystrixTimeoutException());}});HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {// 1.判斷是否超時public void tick() {if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);s.unsubscribe();// 2.執行超時任務timeoutRunnable.run();}}};}}
executeCommandAndObserve方法添加超時監控,如果任務執行超出限制時間會拋出超時異常,由handleTimeoutViaFallback方法處理異常
private Observable<R> handleTimeoutViaFallback() {// 1.根據異常類型處理異常return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {...// 獲取回調觀察者fallbackExecutionChain = this.getFallbackObservable();...
} protected final Observable<R> getFallbackObservable() {return Observable.defer(new Func0<Observable<R>>() {public Observable<R> call() {try {// 執行備用方法return Observable.just(HystrixCommand.this.getFallback());} catch (Throwable var2) {return Observable.error(var2);}}});
}
到這里終于看到了getFallback方法,它會調用注解中fallback指向的方法,快速失敗返回響應結果
protected Object getFallback() {// 獲取注解中的備用方法信息final CommandAction commandAction = this.getFallbackAction();if (commandAction != null) {try {return this.process(new AbstractHystrixCommand<Object>.Action() {Object execute() {MetaHolder metaHolder = commandAction.getMetaHolder();Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);}});} catch (Throwable var3) {LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));}} else {return super.getFallback();}
}
回到AbstractCommand.this.applyHystrixSemantics方法,當this.circuitBreaker.allowReques返回true是請求正常往下走,當它返回false時表示服務進入熔斷狀態,會走else分支,同樣會進入getFallback方法
調用鏈
-> AbstractCommand.handleShortCircuitViaFallback
-> getFallbackOrThrowException
-> this.getFallbackObservable
-> GenericCommand.getFallback
2、服務熔斷
服務熔斷是hystrix提供的一種保護機制,當一段時間內服務響應的異常的次數過多,hystrix會讓服務降級快速返回失敗信息,避免累積壓力造成服務崩潰。
聯系上文找到circuitBreaker.allowRequest方法,該方法判斷是否允許請求往下走
public boolean allowRequest() {// 是否強制打開熔斷if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {return false;// 是否強制關閉熔斷} else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {this.isOpen();return true;} else {return !this.isOpen() || this.allowSingleTest();}
}public boolean isOpen() {if (this.circuitOpen.get()) {return true;} else {HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();// 請求次數是否超過單位時間內請求數閾值if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {return false;// 請求異常次數占比} else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {return false;} else if (this.circuitOpen.compareAndSet(false, true)) {this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());return true;} else {return true;}}
}
isOpen方法內有針對請求的各種量化計算,當請求異常情況過多,就會觸發熔斷,走服務降級
3、總結
hystrix組件會根據請求狀態判斷是否執行請求,當請求超時或者存在其他異常會走備用方法,當異常次數過多會進入熔斷狀態快速失敗,避免服務累積過多壓力