Hystrix执行目标与断路执行分析
标签: Hystrix执行目标与断路执行分析 博客 51CTO博客
2023-04-03 18:23:45 118浏览
依赖如下
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
什么是Hystrix
2018.11发布了最后一个版本,目前处理维护阶段,不再升级版本
- 用途:
- 停止级联故障。fallback和优雅的降级,Fail fast和快速恢复
- 实时监控和配置实时变更
- 资源隔离,部分不可用不会导致整体系统不可用
- 场景:商品列表接口中,需要获取红包、价格、标签等数据。这时候可以给这个一个线程池。
如果线程池打满,也不会影响当前服务的非商品列表接口 - 使用的框架:hystrix主要使用Rxjava,上手可参考:https://www.jianshu.com/p/5e93c9101dc5
执行入口
Hystrix以command作为入口执行。AbstractCommand实现了Command命令的几乎所有逻辑,有两个子类 HystrixCommand、HystrixObservableCommand
99%场景都是用HystrixCommand,所以下面只讲解这个命令类,其提供execute()同步执行、queue()异步执行 的两个方法
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// .......省略所有构造器
// .......省略静态配置内部类Setter
// 用于执行的线程
private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);
/** 执行execute()或queue会调用这个run()方法 **/
protected abstract R run() throws Exception;
/**
* 调用execute()或queue()失败,会降级调用getFallback()方法。不实现当前方法,会默认抛出UnsupportedOperationException异常
* **/
protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); }
/**
* 是否自定义失败方法,如果是,就把他放进commandContainsFallback map中
*/
@Override
protected boolean isFallbackUserDefined() {
Boolean containsFromMap = commandContainsFallback.get(commandKey);
if (containsFromMap != null) {
return containsFromMap;
} else {
Boolean toInsertIntoMap;
try {
getClass().getDeclaredMethod("getFallback");
toInsertIntoMap = true;
} catch (NoSuchMethodException nsme) {
toInsertIntoMap = false;
}
commandContainsFallback.put(commandKey, toInsertIntoMap);
return toInsertIntoMap;
}
}
/**
* command是否是标量
* 当commandIsScalar=true,在makeEmits就会circuitBreaker.makeSuccess()
* - 在HystrixObservableCommand为false
*/
@Override
protected boolean commandIsScalar() { return true; }
/**
* 用于同步执行的指令
*/
public R execute() {
try {
// 其实queue是一个Future,用get的话会阻塞等待结果,所以execute()是同步执行的指令
// 最终还会调用到queue()
return queue().get();
} catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); }
}
/**
* 用于异步执行的指令
* 命令会在线程池中排队,并在完成后返回Future来获取结果
*/
public Future<R> queue() {
// toObservable():用于通过订阅{@link Observable}来实现带有回调的命令的异步执行。
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
/** 中断运行的方法 **/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) { return false; }
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
// 最终还是调用interrupt()来中断了
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() { return delegate.isCancelled(); }
@Override
public boolean isDone() { return delegate.isDone(); }
@Override
public R get() throws InterruptedException, ExecutionException { return delegate.get(); }
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
/** 立即抛出的错误状态的特殊处理。但是Future刚创建就抛错了? **/
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// 获取待抛出的异常
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
}
toObservable()方法
上面可看出HystrixCommand下有execute()、queue()方法。然后最终的都会调用到queue()中的toObservable()。下面解析toObservable()
- 步骤(核心方法 applyHystrixSemantics()):
- 判断线程是否NOT_STARTED,否则抛出HystrixRuntimeException,CAS确保当前命令执行唯一性
- 使用HystrixRequestLog记录该命令的执行(requestLogEnabled = false 关闭日志记录)
- 开启了请求缓存,就从缓存拿数据
3.1 requestCacheEnabled = true && getCacheKey() != null (所以重写获取缓存方法时不要返回null,不然不生效) - 如果缓存没开启或未命中,则执行目标命令获得结果
4.1 Observable.defer()目标方法不回立即执行,需要订阅才异步执行
4.2 !!!applyHystrixSemantics()方法为执行目标方法最最最核心逻辑!!! - 若开启了缓存,把结果放进缓存
- return返回结果,并在注册上相关清理工作
6.1 terminateCommandCleanup:把线程状态标记为TERMINAL
6.1.1 目标代码没有被执行(比如从缓存拿的结果):清空定时监听、记录执行耗时、HystrixCommandMetrics#markCommandDone(),触发执行完成后函数回调(若endCurrentThreadExecutingCommand不为null的话)
6.1.2 目标执行执行了。使用markCommandDone(true)标记
6.2 unsubscribeCommandCleanup会把线程状态记为UNSUBSCRIBED。触发executionHook.onUnsubscribe
6.3 fireOnCompleteHook仅触发executionHook.onSuccess
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable() {
// 省略前面的Action和Fun
// 通过Observable.defer()创建一个Observable
return Observable.defer(() -> {
// 这是一个有状态对象,所以只能使用一次(CAS替换限制仅进入一次)
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
// 填充命令开始时间
commandStartTimestamp = System.currentTimeMillis();
// 判断i请求日志是否开启
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
// 请求缓存是否开启
final boolean requestCacheEnabled = isRequestCachingEnabled();
// -- cacheKey默认是null,所以默认是不开启的,要设置才开启缓存
final String cacheKey = getCacheKey();
// 缓存开启了,就优先从缓存中取
/* try from cache first */
if (requestCacheEnabled) {
// 从缓存里面取
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
// 如果不为空。就将isResponseFromCache = true,并返回数据
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 包装成hystrixObservable对象
// 获得 执行命令Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
// 获得 缓存Observable
Observable<R> afterCache;
// 是否压入缓存中(在cacheKey不为空的情况下才进行处理。默认cacheKey是空的,这个要设置才会有)
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
// 订阅即将被终止时的监听,无论是正常终止还是异常终止
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
// 取消订阅时的监听
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
// Observable正常终止时的监听
.doOnCompleted(fireOnCompletedHook);
});
}
}
applyHystrixSemantics()方法
最终执行目标方法会调用到applyHystrixSemantics()
执行步骤如下:
- 判断断路器是否允许执行circuitBreaker.allowRequest()。不允许直接执行断路逻辑fallback
- 尝试获取信号量资源。线程池隔离模式会采用TryableSemaphoreNoOp,直接返回true
- 执行目标方法executeCommandAndObserve()。如果执行失败就会执行断路逻辑,调用handleSemaphoreRejectionViaFallback()
class AbstractCommand {
// .........省略其他
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 执行前进行标记
// 源码中有很多executionHook、eventNotifier的操作,这是Hystrix拓展性的一种体现。这里面啥事也没做,留了个口子,开发人员可以拓展
executionHook.onStart(_cmd);
// 决定断路器是否允许执行
// -- 开启了断路器调用(withCircuitBreakerEnabled(true)):HystrixCircuitBreakerImpl
// -- 关闭了断路器调用(withCircuitBreakerEnabled(false)):NoOpCircuitBreaker。返回true
if (circuitBreaker.allowRequest()) {
// 获取执行信号量。如果没配置信号量模式,返回TryableSemaphoreNoOp.DEFAULT
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = () -> {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
// 如果是TryableSemaphoreNoOp.DEFAULT,那就是一个空操作的方法
executionSemaphore.release();
}
};
final Action1<Throwable> markExceptionThrown = t -> eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
// 判断是否信号量拒绝
// 线程池模式会采用TryableSemaphoreNoOp,直接返回true
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 重点!!!!:处理隔离策略和Fallback策略()
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
// 走到这步骤,说明不接受请求。会执行fallback()方法
return handleShortCircuitViaFallback();
}
}
}
执行目标方法
执行目标方法核心方法是executeCommandAndObserve()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/**
* 处理隔离策略和各种fallback。最终要执行目标方法的!!!!!!!
* This decorates "Hystrix" functionality around the run() Observable.
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 执行上下文。保证线程池内亦能获取到主线程里的参数
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
/****************************************************
* 1. 记录result结果事件为:SUCCESS
* 2. 闭合circuitBreaker断路器(若已经是闭合的就忽略呗)
***************************************************/
final Action1<R> markEmits = r -> {
// 是否应该在onNext这步报告数据
// HystrixCommand -> false
// HystrixObservableCommand -> true
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
// 命令是否是标量
// HystrixCommand -> true
// HystrixObservableCommand -> false
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// 这几句代码是重点:
// 记录结果为SUCCESS成功
// 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了)
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
};
/****************************************************
* 1. 确保非Scala类型结果也能够正常关闭断路器以及标记Success
***************************************************/
final Action0 markOnCompleted = () -> {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// 这几句代码是重点:跟上面markEmits作用一样。不过commandIsScalar() == false才调用,即HystrixObservableCommand情况下使用
// 记录结果为SUCCESS成功
// 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了)
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
};
/****************************************************
* 当目标方法执行过程中抛出异常(可能是程序问题、可能是超时等等)时候,会进入到这里来处理,处理case可分为两大类:
*
* 1. 触发fallback函数
* 2. 不触发fallback函数
***************************************************/
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// 把Throwable t强转为Exception e(若不是Exception类型就包装为Exception类型)
// 比如若t是NPE异常,那么t和e是完全一样的。
// 只有当t是error类时,t才和e不相等
Exception e = getExceptionFromThrowable(t);
// 记录执行时候的异常e
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
// 线程池拒绝
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// 目标方法执行超时
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
// 下文详细分解
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
// 兜底方法,只有当子类复写了getExceptionFromThrowable()方法的时候才有可能进入到这里
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
// 重点在于:executeCommandWithSpecifiedIsolation()
// 重点在于:executeCommandWithSpecifiedIsolation()
if (properties.executionTimeoutEnabled().get()) {
// 开启了超时支持。 多了.lift(new HystrixObservableTimeoutOperator<R>(_cmd))的调用
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
// 未开启超时支持
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// 得到execution后,开始注册一些基本的事件、观察者
// -- doOnNext():观察者被回调之前的调用(此时其实数据已经发送,也就是目标方法已经执行了)
return execution.doOnNext(markEmits)
// -- doOnCompleted():正常完成时调用
.doOnCompleted(markOnCompleted)
// -- onErrorResumeNext():执行出错时调用
.onErrorResumeNext(handleFallback)
// -- doOnEach():每次调用都会执行。为子线程设置请求上下文,完成跨线程通信
.doOnEach(setRequestContext);
}
/**
* 对目标方法的真正执行(会根据指定的隔离类型来处理:THREAD或SEMAPHORE)
* -- THREAD:线程池隔离(默认)
* -- SEMAPHORE:信号量隔离
*/
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// 线程池隔离(默认)
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 预先设置线程池隔离占用,记录数据
executionResult = executionResult.setExecutionOccurred();
// 线程状态必须是OBSERVABLE_CHAIN_CREATED时才让执行
// 而此状态是由toObservable()方法设置过来的
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 收集指标信息:开始执行
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// run()方法还没执行了。在线程切换期间就超时了,直接返回异常
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// CAS将线程状态置为启动状态ThreadState.STARTED
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
// 线程 全局计数器+1,信号量是不需要这个计数器
HystrixCounters.incrementGlobalConcurrentThreads();
// 标记线程线程准备开始执行
threadPool.markThreadExecution();
// store the command that is being run
// 这个保存使用的ThreadLocal<ConcurrentStack<HystrixCommandKey>>和当前线程绑定
// 这样确保了命令在执行时的线程安全
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
// 执行钩子程序,以及执行目标run方法程序
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
// getUserExecutionObservable:getExecutionObservable()抽象方法获取到目标方法
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(() -> {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}).doOnUnsubscribe(() -> {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}).subscribeOn(threadPool.getScheduler(() -> properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT));
} else {
return Observable.defer(() -> {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
});
}
}
}
方法降级
上面讲解了如何调用一个目标方法,那降级是怎么处理的,接来下看看
什么情况下降级
Hystrix触发fallback降级逻辑5种情况:
- short-circuited短路
- threadpool-rejected线程池拒绝
- semaphore-rejected信号量拒绝
- time-out超时
- failed执行失败
但除了上面类型,HystrixBadRequestException异常(不会触发回退、不会计算故障指标)不会触发fallback机制。用于场景如处理400错误码
降级流程图如下:
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论