赞
踩
最近遇到了一些生产问题涉及到了hystrix,想要了解下底层的原理。hystrix中大量运用了Rxjava的响应式编程,不懂Rxjava,理解起来有点费劲。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spring-boot.version>2.3.12.RELEASE</spring-boot.version> <spring-cloud.version>Hoxton.SR11</spring-cloud.version> </properties> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependencyManagement> <dependencies> <!--导入依赖的pom文件--> <!--顺序靠前,先被引入--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>
//在主类上添加 @EnableHystrix 我们看下源码 @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @EnableCircuitBreaker public @interface EnableHystrix { } 最终@EnableHystrix继承了@EnableCircuitBreaker @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableCircuitBreakerImportSelector.class) public @interface EnableCircuitBreaker { }
按照之前的模式,我们一定是去分析EnableCircuitBreakerImportSelector,至于如何执行@Import这里不做过多说明,之前已经讲过,感兴趣的看下这里:具体代码在ConfigurationClassParser.processImports里的String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
//优先级不是特别高,毕竟是运行时的切面
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
//默认是开启的
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
Import导入后,会通过selectImports从配置文件里加载对应的类 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker
public abstract class SpringFactoryImportSelector<T>
implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
//将泛型T解析为this.annotationClass
protected SpringFactoryImportSelector() {
this.annotationClass = (Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(), SpringFactoryImportSelector.class);
}
@Override
public String[] selectImports(AnnotationMetadata metadata) {
//通过SpringFactoriesLoader.loadFactoryNames从spring.factories中获取org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker对应的配置
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
}
}
具体到最后对应的是2.2.8.RELEASE/spring-cloud-netflix-hystrix-2.2.8.RELEASE.jar!/META-INF/spring.factories
spring.factories文件的内容
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
在@SpringBootApplication中,对@EnableAutoConfiguration有引用,所以一启动,就会加载HystrixAutoConfiguration这些类
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = { @Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication {}
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
//核心是这个切面
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
....
}
在HystrixCommandAspect中主要是对HystrixCommand和HystrixCollapser这两个注解进行处理
@Aspect public class HystrixCommandAspect { static { //通过静态方法将两个注解的两个工厂实例化 META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } //定义切入点注解HystrixCommand @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } //定义切入点注解HystrixCollapser @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //获取目标方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); //只处理这两种注解标注的方法 if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } //获取MetaHolderFactory不同实现的factory MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //获取目标方法的的元数据,方法签名,参数等 MetaHolder metaHolder = metaHolderFactory.create(joinPoint); /** * 创建处理器CommandCollapser 或 GenericCommand (同步) 或GenericObservableCommand(异步) * GenericCommand里有很多super,最终通过HystrixCommandBuilderFactory.getInstance().create(metaHolder) 构建了一个HystrixCommandBuilder作为GenericCommad的参数 * new GenericCommand 通过super到AbstractHystrixCommand, * AbstractHystrixCommand 通过super到HystrixCommand, * HystrixCommand最终到了AbstractCommand 一路传递 * 一会在AbstractCommand中分析下 */ HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); //根据返回值推断执行类型 ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); //返回结果 Object result; try { //不使用observable模式 if (!metaHolder.isObservable()) { //execute执行 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } //HystrixCommand的时候MetaHolder的创建 private static class CommandMetaHolderFactory extends MetaHolderFactory { @Override public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { //获取注解HystrixCommand HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class); //根据返回结果推断任务类型,可以知道以哪种方式执行 ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType()); MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint); if (isCompileWeaving()) { builder.ajcMethod(getAjcMethodFromTarget(joinPoint)); } //这里没有多少参数,最重要的一个hystrixCommand,你在注解里加了什么 return builder.defaultCommandKey(method.getName()) .hystrixCommand(hystrixCommand) .observableExecutionMode(hystrixCommand.observableExecutionMode()) //执行模式 .executionType(executionType) //执行方式 .observable(ExecutionType.OBSERVABLE == executionType) .build(); } } } //在枚举ExecutionType类里 public static ExecutionType getExecutionType(Class<?> type) { if (Future.class.isAssignableFrom(type)) { return ExecutionType.ASYNCHRONOUS; } else if (Observable.class.isAssignableFrom(type)) { return ExecutionType.OBSERVABLE; } else { return ExecutionType.SYNCHRONOUS; } }
我们重点分析下同步处理,通过代码我们可以看到HystrixInvokable 是 GenericCommand,我们同步里的看下 CommandExecutor.execute(invokable, executionType, metaHolder)
public class CommandExecutor { public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //重点看同步处理这个,先把GenericCommand 转成HystrixExecutable 再执行execute return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } }
我们先看下类关系
通过GenericCommand一层层的往上翻,最终定位到HystrixCommand有个execute()
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> { //同步执行 public R execute() { try { //通过queue().get()来同步执行 return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } //异步执行,什么时候get(),由调用者决定,get()的时候会阻塞 public Future<R> queue() { //核心处理,最终定位到了AbstractCommand里的toObservable()里 final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { ..... @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; //特殊处理了下,已经执行完了,get()也不会阻塞了 if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { ... } } return f; } }
我们看下官方的流程图:(来源地址:https://github.com/Netflix/Hystrix/wiki/How-it-Works)通过代码,我们也可以看到确实到了这里
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { // 一个 new GenericCommand 最终走到了这里,参数都包装到了这里,只是从GenericCommand 来说 protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { //设置group this.commandGroup = initGroupKey(group); //正常为方法名,不设置,就是类 this.commandKey = initCommandKey(key, getClass()); //这里做了优化,将组装好的CommandProperties放入了ConcurrentHashMap缓存了起来(如果是动态生成方法溢出?) this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); //初始化线程池的key,如果为null,则用groupKey.name() this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); //初始化指标,以commandKey缓存 this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); //初始化断路器,也是以commandKey缓存 this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); //初始化线程池,传入有就用,没有就创建一个缓存,还是commandKey维度,最终是通过concurrencyStrategy.getThreadPool(threadPoolKey, properties)创建 this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; } //核心处理逻辑 public Observable<R> toObservable() { //拿到当前的对象 final AbstractCommand<R> _cmd = this; /** * terminateCommandCleanup 用于中断命令清理的回调函数 * unsubscribeCommandCleanup 取消订阅命令的清理回调函数 * applyHystrixSemantics 创建带执行逻辑的Observable,用于订阅后回调 * wrapWithAllOnNextHooks onNext执行的钩子 * fireOnCompletedHook 完成后触发的钩子 */ final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { //没有订阅不执行 if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } //执行 return applyHystrixSemantics(_cmd); } }; //onNext执行后的钩子 final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call(R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx); return afterFirstApplication; } } }; //通过defer创建Observable return Observable.defer(new Func0<Observable<R>>() { //创建观察者,观察者的执行,得由subscribe()触发 @Override public Observable<R> call() { //通过cas来判断命令状态,初始为NOT_STARTED,如果进来不是NOT_STARTED说明已经有别线程执行了,就抛异常 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } //命令开始时间 commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { // 不管发生什么都要用日志记录下来 if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } //是否允许请求缓存, final boolean requestCacheEnabled = isRequestCachingEnabled(); //cacheKey是需要设置@CacheKey 在HystrixCacheKeyGenerator中会把方法和指定的key的value进行拼接 final String cacheKey = getCacheKey(); //允许请求缓存的情况下,先从缓存中获取,最终缓存用的是ConcurrentHashMap if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } //通过defer创建Observable Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 开启缓存的情况下,重新放入缓存 if (requestCacheEnabled && cacheKey != null) { // 对hystrixObservable进行包装后放入缓存 HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) //中断命令清理 .doOnUnsubscribe(unsubscribeCommandCleanup) // 不订阅也清理 .doOnCompleted(fireOnCompletedHook);//完成触发的钩子 } }); } private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //开始执行 executionHook.onStart(_cmd); //断路器允许执行,里面有好多逻辑,具体看下面的解说 if (circuitBreaker.allowRequest()) { //信号量处理(前提是你配置的是信号量处理机制,如果使用线程池则使用TryableSemaphoreNoOp,申请信号量为true) final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); //执行完释放信号量 final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; //申请信号量 if (executionSemaphore.tryAcquire()) { try { //设置调用时间 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) //执行命令,这块是核心 .doOnError(markExceptionThrown) //异常处理 .doOnTerminate(singleSemaphoreRelease)//中断时,信号量释放 .doOnUnsubscribe(singleSemaphoreRelease);//不订阅,也释放信号量 } catch (RuntimeException e) { return Observable.error(e); } } else { //申请不到信号量就直接执行Fallback return handleSemaphoreRejectionViaFallback(); } } else { //断路器不允许执行,就直接执行Fallback方法 return handleShortCircuitViaFallback(); } } private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { //HystrixRequestContext hystrix线程的上下文,可以通过它传递一些内容,防止异步过程中数据丢失 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); /** * 又是定义一堆Action和Func * markEmits 用于onNext回调 * markOnCompleted 用于完成后处理,会采样一个SUCCESS事件 * handleFallback,有返回结果 用于处理Fallback, * setRequestContext 设置请求上下文 * markEmits、markOnCompleted、handleFallback都会产生一个事件 */ Observable<R> execution; //执行命令 if (properties.executionTimeoutEnabled().get()) { //配置超时时间的情况下会加一个HystrixObservableTimeoutOperator 这里会发送超时事件 execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //真正执行逻辑,区分了线程池和信号量 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()")); } //设置线程状态,计数,并执行 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //计数 HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override public void call() { //异常处理 if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } } }).doOnUnsubscribe(new Action0() { @Override public void call() { //未订阅处理 if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { //信号量的执行 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } } }
public interface HystrixCircuitBreaker { //初始化实现 public static class Factory { public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // 缓存中有,就直接返回 HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } // 第一次进来,没有就初始化并缓存到map里 HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { // this means the putIfAbsent step just created a new one so let's retrieve and return it return circuitBreakersByCommand.get(key.name()); } else { return cbForCommand; } } } //断路器实现 static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { //hystrix 的配置都在HystrixCommandProperties @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { //强制打开断路器,就直接返回,这个时候就熔断了 return false; } //断路器关闭的情况下,再次验证 if (properties.circuitBreakerForceClosed().get()) { isOpen(); //不管本次什么样的结果,至少我进来的是开着的,设置为关闭以后,我还得继续执行 return true; } return !isOpen() || allowSingleTest(); } @Override public boolean isOpen() { //如果是open的,拦截,直接返回true if (circuitOpen.get()) { return true; } /** * HealthCounts里存储的是一个滑动窗口期间的请求数。 * totalCount 总请求数 包含:数(失败+成功+超时+ threadPoolRejected +信号量拒绝) * errorCount 异常数据 刨除成功就是失败 * errorPercentage 异常百分比= errorCount/totalCount *100; */ HealthCounts health = metrics.getHealthCounts(); //如果总请求数小于配置的值,不拦截,配置值是hystrix.对应的comandKey.circuitBreaker.requestVolumeThreshold 默认值是20 if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { return false; } //如果异常率小于配置的值,也不拦截,配置值是hystrix.对应的comandKey.circuitBreaker.errorThresholdPercentage 默认值是50,也就是有50%异常,就熔断 if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 异常比率较高,就设置断路器为open if (circuitOpen.compareAndSet(false, true)) { // 设置断路器器打开的时间 circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { //走到这里,说明本线程没有设置成功,别的线程已经打开了断路器,直接返回 return true; } } } } } }
断路器的三种状态:转化关系如下:
断路器初始是Closed状态,如果调用持续出错或者超时,断路器会进入Open状态,熔断请求,后续的一段时间内所有的调用都会触发fallback
Open 状态:请求不再进行调用当前服务,内部设置时钟一般为MTTR(平均故障处理时间),当打开时长达到所设时钟则进入半熔断状态
Closed 关闭:路器器关闭不会对服务进行熔断部分请求
Half Open 半开:根据规则调用当前服务,如果请求成功且符合规则则认为当前服务恢复正常,关闭熔断
看下官方的流程图 在官方:https://github.com/Netflix/Hystrix/wiki/How-it-Works 有图有说明
断路器是以commandKey为维度
断路器是打开状态,直接熔断
在窗口期内,如果请求的量大于设置的值,熔断(默认是10秒超过20个请求)
如果异常率大于配置的值,熔断(默认是10秒超过50%的失败)
一段时间之后,这个断路器是掰开状态,会允许一个请求进来,如果成功,断路器会关闭
ps: HystrixCommand 的配置属性在类:HystrixCommandProperties
最后上下逻辑图
同时,在hystrix-core的jar包com.netflix.hystrix.metric.consumer下,有很多的HystrixEvent的消费流,这些根据配置执行不同的限流手段,有滑动窗口,有令牌桶,这些流都会在HystrixCommandMetrics实例化的时候启动。如果想了解这一块可以看下HystrixConfigurationStream和HystrixCommandMetrics。后期也会针对这块分析下。
流程是理顺了,但是整起来磕磕绊绊的,里面一层套一层的,想要真正的了解它的底层运作,需要好好的了解下Rxjava,后续研究下再分享。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。