当前位置:   article > 正文

Hystrix实现ThreadLocal上下文的传递_requestcontextholder.currentrequestattributes().se

requestcontextholder.currentrequestattributes().setattribute(
前言
Hystrix提供了基于信号量和线程两种隔离模式,通过在Hystrix基础章节中已经验证过,通过 @HystrixCommand注解的方法体将在新的线程中执行,这样会带来些什么意想不到的意外呢,先来看一个示例:
1、定义一个webapi,通过 RequestContextHolder设定一个当前线程的上下文:
  1. @GetMapping(value = "/getServerInfo/{serviceName}")
  2. public String getServer1Info(@PathVariable(value = "serviceName") String serviceName) {
  3. LOGGER.info("当前线程ID:" + Thread.currentThread().getId() + "当前线程Name" + Thread.currentThread().getName());
  4. RequestContextHolder.currentRequestAttributes().setAttribute("context", "main-thread-context", SCOPE_REQUEST);
  5. return consumeService.getServerInfo(serviceName);
  6. }
2、在 @HystrixCommand注解的方法中再次通过RequestContextHolder获取当前上下文设定的value值:
  1. @Override
  2. @HystrixCommand(fallbackMethod = "getServerInfoFallback",
  3. commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD")},
  4. commandKey = "cust2GetServerInfo",
  5. threadPoolKey = "cust2ThreadPool",
  6. groupKey = "cust2")
  7. public String getServerInfo(String serviceName) {
  8. LOGGER.info(RibbonFilterContextHolder.getCurrentContext().get("TAG"));
  9. LOGGER.info(RequestContextHolder.currentRequestAttributes().getAttribute("context", SCOPE_REQUEST).toString());
  10. //如果是service1则需要添加http认证头,service1暂时添加了认证机制;反之service2不需要认证直接发出请求即可
  11. if ("service1".equals(serviceName)) {
  12. HttpEntity<String> requestEntity = new HttpEntity<String>(getHeaders());
  13. ResponseEntity<String> responseEntity = restTemplate.exchange("http://" + serviceName + "/getServerInfo?userName=shuaishuai", HttpMethod.GET, requestEntity, String.class);
  14. return responseEntity.getBody();
  15. } else
  16. return restTemplate.getForObject("http://" + serviceName + "/getServerInfo?userName=shuaishuai", String.class);
  17. }
  18. public String getServerInfoFallback(String serviceName, Throwable e) {
  19. if (e != null) {
  20. LOGGER.error(e.getMessage());
  21. }
  22. return "Maybe the server named " + serviceName + " is not normal running";
  23. }
3、启动服务请求1中定义的API:

可以看到上图中上下文的赋值与取值在不同的线程中执行,TAG信息被正常获取,而 RequestContextHolder设定的上线文信息获取失败,并进入回退方法并打印出了对应的异常信息,首先来看下为何TAG信息被正常获取,在 RibbonFilterContextHolder中定义变量如下

而在RequestContextHolder中变量定义如下


其区别在于是采用 ThreadLocalInheritableThreadLocal的差异, InheritableThreadLocal能够在子线程中继续传播父线程的上线文,而ThreadLocal只能在保存在当前线程中,但事实上我们不可能所有的应用均采用 InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。


本章概要
1、资料搜索;
2、源码分析;
3、扩展HystrixConcurrencyStrategy解决前言中的意外;
4、提高HystrixConcurrencyStrategy包装扩展性;

资料搜索
既然遇到了问题,就到springcloud的官方文档先检索下,找到如下对应的描述

红色框部分主要意思是,我们可以声明一个定制化的HystrixConcurrencyStrategy实例,并通过HystrixPlugins注册。先找到HystrixConcurrencyStrategy类,其有下面一段类注释
For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with additional behavior.
@HystrixCommand注解的方法,其执行源 Callable可以通过 wrapCallable方法进行定制化装饰,加入附加的行为,继续来看看 wrapCallable方法的定义

其同样提供了非常详细的注释,该方法提供了在方法被执行前进行装饰的机会,可以用来复制线程状态等附加行为,这个貌似就是我们需要的,很合意。
同样在Hystrix官方文档提供了更加详细的说明( https://github.com/Netflix/Hystrix/wiki/Plugins#concurrency-strategy), Concurrency Strategy 作为了Plugin的一种类别,描述如下

可以看到红色框中的重点描述,其已经说了非常明确,可以从父线程复制线程状态至子线程。自定义的Plugin如何被 HystrixCommand应用呢,继续查看官方的描述

其提供了 HystrixPlugins帮助我们注册自定义的Plugin,除了我们本章节重点关注的 Concurrency Strategy类别plugin,还有如下类别以及对应的抽象实现
类别
抽象实现
Event Notifier
Metrics Publisher
Properties Strategy
Concurrency Strategy
Command Execution Hook


源码分析
在springcloud中还有如下一段话

既然提高了定制化的实现,不如来看看官方已经提供了哪些默认实现

首先来看看 HystrixConcurrencyStrategyDefault

很精简的一段代码,并没有任何方法重写,其作为了一个标准提供默认实现。继续来看看 SecurityContextConcurrencyStrategy实现,直接找到wrapCallable方法

其对Callabe进行了二次包装,继续跟进来看看 DelegatingSecurityContextCallable的定义

其主要实现均在call方法中,红色框中标出了重点,在调用call方法前,我们可以将当前上下文信息放入 SecurityContextHolder中,在执行完成后清空 SecurityContextHolder对应的设置。再来看看 SecurityContextConcurrencyStrategy是如何被应用的,在 HystrixSecurityAutoConfiguration中有如下代码段

在启动注册配置过程中机会通过 HystrixPlugins注册当前扩展的 HystrixConcurrencyStrategy实现。

小节:自定义扩展类实现Callable接口,并传入当前Callable变量delegate,在delegate执行call方法前后进行线程上线文的操作即可实现线程状态在父线程与子线程间的传播。

扩展HystrixConcurrencyStrategy解决前言中的意外
通过源码部分的解读,基本了解springcloud是如何实现扩展的,又是如何被应用的,照葫芦画瓢下。

1、定义一个 RequestContextHystrixConcurrencyStrategy 实现 HystrixConcurrencyStrategy 接口,并重写其 wrapCallable方法:
  1. public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
  2. @Override
  3. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  4. return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes());
  5. }
  6. static class RequestAttributeAwareCallable<T> implements Callable<T> {
  7. private final Callable<T> delegate;
  8. private final RequestAttributes requestAttributes;
  9. public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
  10. this.delegate = callable;
  11. this.requestAttributes = requestAttributes;
  12. }
  13. @Override
  14. public T call() throws Exception {
  15. try {
  16. RequestContextHolder.setRequestAttributes(requestAttributes);
  17. return delegate.call();
  18. } finally {
  19. RequestContextHolder.resetRequestAttributes();
  20. }
  21. }
  22. }
  23. }
其中定义 RequestAttributeAwareCallable装饰类,通过构造函数传入当前待执行 Callable代理和当前待传播的 RequestAttributes值,并在 delegatecall方法执行前对 RequestContextHolderRequestAttributes赋值,在 finally块中重置。

2、同样在任意配置类中添加如下代码段即可,通过 HystrixPlugins注册 RequestContextHystrixConcurrencyStrategy
@PostConstruct
public void init() {
    HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());
}
3、启动服务验证,子线程取值成功:


小节:以上参考 SecurityContextConcurrencyStrategy的实现,完成了Hystrix中 RequestContextHolder上下文信息传播。

提高HystrixConcurrencyStrategy包装扩展性
上一个小节介绍了如果在Hystrix线程隔离场景下实现 ThreadLocal定义的上下文传播,根据示例,在实际应用过程中如果我们有多个类似 RequestContextHystrixConcurrencyStrategy策略,需要将每个自定义 HystrixConcurrencyStrategy示例注册至 HystrixPlugins中,这在扩展性方面显然是缺失的,借鉴spring的实践,我们可以定义对Callable的包装接口 HystrixCallableWrapper,根据实际的业务只需要对 HystrixCallableWrapper进行实现,并注册对应的实现bean即可。具体实现如下:

1、定义用于包装hystrix中Callable实例的接口:
  1. public interface HystrixCallableWrapper {
  2. /**
  3. * 包装Callable实例
  4. *
  5. * @param callable 待包装实例
  6. * @param <T> 返回类型
  7. * @return 包装后的实例
  8. */
  9. <T> Callable<T> wrap(Callable<T> callable);
  10. }

2、通过之前的源码阅读与实践,基本已经发现实现线程上线文传播的核心在于对Callable进行包装,通过多次对Callable包装即实现了一个链式包装过程,如下扩展 HystrixConcurrencyStrategy接口实现 RequestContextHystrixConcurrencyStrategy,其中定义 CallableWrapperChain类对所有注入的 HystrixCallableWrapper包装实现进行装配:
  1. public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
  2. private final Collection<HystrixCallableWrapper> wrappers;
  3. public RequestContextHystrixConcurrencyStrategy(Collection<HystrixCallableWrapper> wrappers) {
  4. this.wrappers = wrappers;
  5. }
  6. @Override
  7. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  8. return new CallableWrapperChain(callable, wrappers.iterator()).wrapCallable();
  9. }
  10. private static class CallableWrapperChain<T> {
  11. private final Callable<T> callable;
  12. private final Iterator<HystrixCallableWrapper> wrappers;
  13. CallableWrapperChain(Callable<T> callable, Iterator<HystrixCallableWrapper> wrappers) {
  14. this.callable = callable;
  15. this.wrappers = wrappers;
  16. }
  17. Callable<T> wrapCallable() {
  18. Callable<T> delegate = callable;
  19. while (wrappers.hasNext()) {
  20. delegate = wrappers.next().wrap(delegate);
  21. }
  22. return delegate;
  23. }
  24. }
  25. }
3、实现 HystrixCallableWrapper接口,定义一个包装 RequestContextHolder上下文处理的实现类:
  1. public final class RequestAttributeAwareCallableWrapper implements HystrixCallableWrapper {
  2. @Override
  3. public <T> Callable<T> wrap(Callable<T> callable) {
  4. return new RequestAttributeAwareCallable(callable, RequestContextHolder.getRequestAttributes());
  5. }
  6. static class RequestAttributeAwareCallable<T> implements Callable<T> {
  7. private final Callable<T> delegate;
  8. private final RequestAttributes requestAttributes;
  9. RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
  10. this.delegate = callable;
  11. this.requestAttributes = requestAttributes;
  12. }
  13. @Override
  14. public T call() throws Exception {
  15. try {
  16. RequestContextHolder.setRequestAttributes(requestAttributes);
  17. return delegate.call();
  18. } finally {
  19. RequestContextHolder.resetRequestAttributes();
  20. }
  21. }
  22. }
  23. }
4、实现 HystrixCallableWrapper接口,定义一个包装Mdc日志处理上下文的实现类:
  1. public class MdcAwareCallableWrapper implements HystrixCallableWrapper {
  2. @Override
  3. public <T> Callable<T> wrap(Callable<T> callable) {
  4. return new MdcAwareCallable<>(callable, MDC.getCopyOfContextMap());
  5. }
  6. private class MdcAwareCallable<T> implements Callable<T> {
  7. private final Callable<T> delegate;
  8. private final Map<String, String> contextMap;
  9. public MdcAwareCallable(Callable<T> callable, Map<String, String> contextMap) {
  10. this.delegate = callable;
  11. this.contextMap = contextMap != null ? contextMap : new HashMap();
  12. }
  13. @Override
  14. public T call() throws Exception {
  15. try {
  16. MDC.setContextMap(contextMap);
  17. return delegate.call();
  18. } finally {
  19. MDC.clear();
  20. }
  21. }
  22. }
  23. }
5、最后通过在Configuration配置类中注册如下 HystrixCallableWrapper 实现类的bean实例,并通过 HystrixPlugins注册扩展包装实现:
  1. @Bean
  2. public HystrixCallableWrapper requestAttributeAwareCallableWrapper() {
  3. return new RequestAttributeAwareCallableWrapper();
  4. }
  5. @Bean
  6. public HystrixCallableWrapper mdcAwareCallableWrapper(){
  7. return new MdcAwareCallableWrapper();
  8. }
  9. @Autowired(required = false)
  10. private List<HystrixCallableWrapper> wrappers = new ArrayList<>();
  11. @PostConstruct
  12. public void init() {
  13. HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy(wrappers));
  14. }

总结
本章从官方网站与源码出发,逐步实现了hystrix中如何进行线程上下文的传播。同时为了更好的扩展性,提供了基于自定义接口并注入实现的方式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/949077
推荐阅读
  

闽ICP备14008679号