赞
踩
在上一篇文章中大致的了解了Hystrix的基本原理,但是Hystrix的内部是如何实现的呢?为何通过简单的run方法和getFallback方法就可以具备熔断降级的能力。本文将主要讲述:
我们知道Hystrix把业务接口封装为了Command,要实现熔断功能需要继承HystrixCommand抽象类,在使用的时候新创建一个XXCommand对象,那么在构造函数中到底做了哪些操作?进入源码发现大部分工作是在AbstractCommand完成。下面是此类的构造函数:
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { //初始化group,group主要是用来对不同的command key进行统一管理 this.commandGroup = initGroupKey(group); // 初始化command key,用来标识降级逻辑 this.commandKey = initCommandKey(key, getClass()); // 初始化自定义的降级策略 this.properties = initCommandProperties(this.commandKey, propertiesStrategy,commandPropertiesDefaults); // 初始化线程池key,相同的线程池key将公用线程池 this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); // 初始化监控器 this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); // 初始化断路器 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); 初始化线程池 this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //省略部分代码 }
AbstractCommand构造函数中包括了线程池、缓存、降级策略、断路器的初始化等工作。每次请求都需要新创建Command,这么多的初始化工作是不是每次new一个Command都要进行,答案肯定是否。因为并发量如果大的情况下,大部分资源都被用来初始化了。其实在上面的大部分初始化工作只会在创建第一个Command时做。后面同一个Command的对象会在静态的HashMap中取出来进行赋值。比如同一个Command的Key和线程池key一般情况是一样的。
保存断路器的HashMap声明在HystrixCircuitBreaker.Factory中,key是Command key。
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
保存线程池的HashMap,声明在HystrixThreadPool接口中:
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
下面分析Hystrix是如何管理线程池,缓存和断路器原理类似。HystrixThreadPool.Factory源码如下:
static class Factory { final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>(); static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { String key = threadPoolKey.name(); HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); } }
在Factory静态类中只有一个静态常量threadPools,和一个静态getinstace方法。threadPools保存了Hystrix所有的线程池,getinstace方法也比较直观,通过key获取对应的线程池,如果没有则调用new HystrixThreadPoolDefault新建,并且添加到threadPools中。
HystrixThreadPoolDefault构造方法如下:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
可以看出这个线程池是对Java原生ThreadPoolExecutor的一层封装,在此构造函数中对Hystrix的线程池的属性、指标信息(metrics)以及任务队列的初始化工作。真正创建Java原生线程池是concurrencyStrategy.getThreadPool(threadPoolKey, properties)
:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
//省略部分代码
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
在这里终于看到看到了创建ThreadPoolExecutor,主要是通过线程核心数量、最大的数量、空闲线程的存活时间等参数构建原生线程池。具体原生线程池>>https://blog.csdn.net/TheLudlows/article/details/76973414
一般我们通过execute、queue、observer、toObservable方法开始执行一次Command请求,execute是同步调用调用后直接block住,直到依赖服务返回结果,或者抛出异常。queue():异步调用,返回一个Future对象,后面可以通过Future获取结果。
其实同步方式是通过直接执行Future的get方法进行实现的,这里需要知道这个返回的Future对象到底是什么?通过实现可以发现,返回的Futrue对象只是对toObservable返回结果的封装代理,直接进入toObservable方法。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; //省略部分代码 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { if (!commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); } //记录开始请求时间 commandStartTimestamp = System.currentTimeMillis(); final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); //如果开启缓存 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); //如果缓存不为空,取出值,直接返回 if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } //如果缓存为空,正常请求逻辑 Observable<R> hystrixObservable =Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable<R> afterCache; //将结果分装至缓存 if (requestCacheEnabled && cacheKey != null) { HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } }); }
上面的代码如果熟悉RxJava,读起来也比较直观,如果缓存中存在,则返回结果,如果不存在执行正常请求逻辑,其请求逻辑封装在了applyHystrixSemantics方法中:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { executionHook.onStart(_cmd); /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
正如源码中注释所言,determine if we’re allowed to execute,circuitBreaker.attemptExecution()该方法判断断路器的状态,决定是否通过请求。关于断路器的attemptExecution方法在熔断器原理分析中详细介绍。如果通过,也就是断路器没有打开那么继续执行。Hystrix提供了一个信号量限流器,限制进入熔断器最大并发数,可以控制请求下游的并发量,如果超过这个阈值,会被降级处理。如果通过限流,继续调用executeCommandAndObserve方法
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//省略Action定义
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Hystrix内部提供了超时检查的机制,如果参数executionTimeoutEnabled开启,则每次请求都会提交一个任务到线程池中延迟执行。进入executeCommandWithSpecifiedIsolation方法:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //判断隔离策略,如果是Semaphore 信号量则在当前线程上执行,否则进入线程分配逻辑 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); //更改HystrixCommand的状态 USER_CODE_EXECUTED if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); //判断HystrixCommand超时状态,如果已经超时则抛出异常 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()")); } //更改当前command的线程执行状态为 STARTED if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); //省略。。。 //调用 getUserExecutionObservable 执行具体逻辑 return getUserExecutionObservable(_cmd); } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() {//当Observale执行完毕后(HystrixCommand可能失败也可能执行成功),此时的线程状态可能有两种分别是 STARTED 和 NOT_USING_THREAD , 然后更改线程状态为 TERMINAL @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() {//当Observable被取消订阅,更改线程状态为 TERMINAL @Override public void call() { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {//subscribeOn 指定scheduler,这里Hystrix实现了自己的scheduler,在scheduler的worker指定线程池,在配置线程之前会重新加载线程池配置 @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else {//如果是信号量隔离 ... } }
getUserExecutionObservable执行业务代码的地方:
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
最终回到了HystrixCommand的中,run方法终于执行了。run方法执行之后,如果正常返回、抛出异常、或者其它情况,都需要对应的后续处理,这时之前executeCommandAndObserve方法中定义的Action,就开始起作用了。
final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }
出栈回到executeCommandAndObserve
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
markEmits回调:run方法正常返回时执行,主要记录执行耗时;触发执行成功的通知事件,可以通过扩展插件做更多事情;如果当前是熔断状态,则关闭熔断。handleFallback:run方法发生异常时执行,最终执行降级逻辑。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。