springcloud微服務中, 服務間傳輸全局類參數,如session信息等。git
Hystrix有2個隔離策略:THREAD以及SEMAPHORE,當隔離策略爲 THREAD 時,是沒辦法拿到 ThreadLocal 中的值的。github
Hystrix提供了基於信號量和線程兩種隔離模式,經過在Hystrix基礎章節中已經驗證過,經過@HystrixCommand註解的方法體將在新的線程中執行,這樣會帶來些什麼意想不到的意外呢,先來看一個示例:
一、定義一個webapi,經過RequestContextHolder設定一個當前線程的上下文:web
@GetMapping(value = "/getServerInfo/{serviceName}") public String getServer1Info(@PathVariable(value = "serviceName") String serviceName) { LOGGER.info("當前線程ID:" + Thread.currentThread().getId() + "當前線程Name" + Thread.currentThread().getName()); RequestContextHolder.currentRequestAttributes().setAttribute("context", "main-thread-context", SCOPE_REQUEST); return consumeService.getServerInfo(serviceName); }
二、在@HystrixCommand註解的方法中再次經過RequestContextHolder獲取當前上下文設定的value值:spring
@Override @HystrixCommand(fallbackMethod = "getServerInfoFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD")}, commandKey = "cust2GetServerInfo", threadPoolKey = "cust2ThreadPool", groupKey = "cust2") public String getServerInfo(String serviceName) { LOGGER.info(RibbonFilterContextHolder.getCurrentContext().get("TAG")); LOGGER.info(RequestContextHolder.currentRequestAttributes().getAttribute("context", SCOPE_REQUEST).toString()); //若是是service1則須要添加http認證頭,service1暫時添加了認證機制;反之service2不須要認證直接發出請求便可 if ("service1".equals(serviceName)) { HttpEntity<String> requestEntity = new HttpEntity<String>(getHeaders()); ResponseEntity<String> responseEntity = restTemplate.exchange("http://" + serviceName + "/getServerInfo?userName=shuaishuai", HttpMethod.GET, requestEntity, String.class); return responseEntity.getBody(); } else return restTemplate.getForObject("http://" + serviceName + "/getServerInfo?userName=shuaishuai", String.class); } public String getServerInfoFallback(String serviceName, Throwable e) { if (e != null) { LOGGER.error(e.getMessage()); } return "Maybe the server named " + serviceName + " is not normal running"; }
三、啓動服務請求1中定義的API:
能夠看到上圖中上下文的賦值與取值在不一樣的線程中執行,TAG信息被正常獲取,而RequestContextHolder設定的上線文信息獲取失敗,並進入回退方法並打印出了對應的異常信息,首先來看下爲什麼TAG信息被正常獲取,在RibbonFilterContextHolder中定義變量以下
而在RequestContextHolder中變量定義以下
其區別在因而採用ThreadLocal與InheritableThreadLocal的差別,InheritableThreadLocal可以在子線程中繼續傳播父線程的上線文,而ThreadLocal只能在保存在當前線程中,但事實上咱們不可能全部的應用均採用InheritableThreadLocal,儘管他是一個不錯的選擇,但如何讓ThreadLocal也實如今Hystrix應用場景下實現線程上下文的傳播呢。這就是本章的重點了。api
hystrix.command.default.execution.isolation.strategy: SEMAPHORE
這樣配置後,Feign能夠正常工做。session
但該方案不是特別好。緣由是Hystrix官方強烈建議使用THREAD做爲隔離策略! 能夠參考官方文檔說明。併發
記得以前在研究zipkin日誌追蹤的時候,看到過Sleuth有本身的熔斷機制,用來在thread之間傳遞Trace信息,Sleuth是能夠拿到本身上下文信息的,查看源碼找到了app
既然遇到了問題,就到springcloud的官方文檔先檢索下,找到以下對應的描述
ide
您還能夠選擇設置hystrix。shareSecurityContext屬性爲真。這樣作將自動配置一個Hystrix併發策略插件鉤子,它將把SecurityContext從主線程轉移到Hystrix命令使用的線程。Hystrix不容許註冊多個Hystrix併發策略,所以能夠經過將本身的HystrixConcurrencyStrategy聲明爲Spring bean來使用擴展機制。Spring Cloud將在Spring上下文中查找您的實現,並將其封裝在本身的插件中。函數
紅色框部分主要意思是,咱們能夠聲明一個定製化的HystrixConcurrencyStrategy實例,並經過HystrixPlugins註冊。先找到HystrixConcurrencyStrategy類,其有下面一段類註釋
/** * Abstract class for defining different behavior or implementations for concurrency related aspects of the system with default implementations. * <p> * For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with * additional behavior. * <p> * When you implement a concrete {@link HystrixConcurrencyStrategy}, you should make the strategy idempotent w.r.t ThreadLocals. * Since the usage of threads by Hystrix is internal, Hystrix does not attempt to apply the strategy in an idempotent way. * Instead, you should write your strategy to work idempotently. See https://github.com/Netflix/Hystrix/issues/351 for a more detailed discussion. * <p> * See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a * href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>. */ public abstract class HystrixConcurrencyStrategy {
被@HystrixCommand註解的方法,其執行源Callable能夠經過wrapCallable方法進行定製化裝飾,加入附加的行爲,繼續來看看wrapCallable方法的定義
/** * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution. * <p> * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). * <p> * <b>Default Implementation</b> * <p> * Pass-thru that does no wrapping. * * @param callable * {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable<T>} either as a pass-thru or wrapping the one given */ public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
其一樣提供了很是詳細的註釋,該方法提供了在方法被執行前進行裝飾的機會,能夠用來複制線程狀態等附加行爲,這個貌似就是咱們須要的,很合意。
一樣在Hystrix官方文檔提供了更加詳細的說明(https://github.com/Netflix/Hystrix/wiki/Plugins#concurrency-strategy),Concurrency Strategy做爲了Plugin的一種類別,描述以下
能夠看到紅色框中的重點描述,其已經說了很是明確,能夠從父線程複製線程狀態至子線程。自定義的Plugin如何被HystrixCommand應用呢,繼續查看官方的描述
其提供了HystrixPlugins幫助咱們註冊自定義的Plugin,除了咱們本章節重點關注的Concurrency Strategy類別plugin,還有以下類別以及對應的抽象實現
在springcloud中還有以下一段話
既然提升了定製化的實現,不如來看看官方已經提供了哪些默認實現
首先來看看HystrixConcurrencyStrategyDefault,
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy { private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault(); public static HystrixConcurrencyStrategy getInstance() { return INSTANCE; } private HystrixConcurrencyStrategyDefault() { } }
很精簡的一段代碼,並無任何方法重寫,其做爲了一個標準提供默認實現。繼續來看看SecurityContextConcurrencyStrategy實現,直接找到wrapCallable方法
@Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy .wrapCallable(new DelegatingSecurityContextCallable<T>(callable)) : super.wrapCallable(new DelegatingSecurityContextCallable<T>(callable)); }
其對Callabe進行了二次包裝,繼續跟進來看看DelegatingSecurityContextCallable的定義
其主要實現均在call方法中,紅色框中標出了重點,在調用call方法前,咱們能夠將當前上下文信息放入SecurityContextHolder中,在執行完成後清空SecurityContextHolder對應的設置。再來看看SecurityContextConcurrencyStrategy是如何被應用的,在HystrixSecurityAutoConfiguration中有以下代碼段
@Configuration @Conditional(HystrixSecurityCondition.class) @ConditionalOnClass({ Hystrix.class, SecurityContext.class }) public class HystrixSecurityAutoConfiguration { @Autowired(required = false) private HystrixConcurrencyStrategy existingConcurrencyStrategy; @PostConstruct public void init() { // Keeps references of existing Hystrix plugins. HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance() .getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance() .getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance() .getPropertiesStrategy(); HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance() .getCommandExecutionHook(); HystrixPlugins.reset(); // Registers existing plugins excepts the Concurrent Strategy plugin. HystrixPlugins.getInstance().registerConcurrencyStrategy( new SecurityContextConcurrencyStrategy(existingConcurrencyStrategy)); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); } static class HystrixSecurityCondition extends AllNestedConditions { public HystrixSecurityCondition() { super(ConfigurationPhase.REGISTER_BEAN); } @ConditionalOnProperty(name = "hystrix.shareSecurityContext") static class ShareSecurityContext { } } }
在啓動註冊配置過程當中機會經過HystrixPlugins註冊當前擴展的HystrixConcurrencyStrategy實現。
小節:自定義擴展類實現Callable接口,並傳入當前Callable變量delegate,在delegate執行call方法先後進行線程上線文的操做便可實現線程狀態在父線程與子線程間的傳播。
擴展HystrixConcurrencyStrategy解決前言中的意外
經過源碼部分的解讀,基本瞭解springcloud是如何實現擴展的,又是如何被應用的,照葫蘆畫瓢下。
一、定義一個RequestContextHystrixConcurrencyStrategy實現HystrixConcurrencyStrategy接口,並重寫其wrapCallable方法:
public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes()); } static class RequestAttributeAwareCallable<T> implements Callable<T> { private final Callable<T> delegate; private final RequestAttributes requestAttributes; public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) { this.delegate = callable; this.requestAttributes = requestAttributes; } @Override public T call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); return delegate.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
其中定義RequestAttributeAwareCallable裝飾類,經過構造函數傳入當前待執行Callable代理和當前待傳播的RequestAttributes值,並在delegate的call方法執行前對RequestContextHolder的RequestAttributes賦值,在finally塊中重置。
二、一樣在任意配置類中添加以下代碼段便可,經過HystrixPlugins註冊RequestContextHystrixConcurrencyStrategy:
@PostConstruct public void init() { HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy()); }
三、啓動服務驗證,子線程取值成功:
小節:以上參考SecurityContextConcurrencyStrategy的實現,完成了Hystrix中RequestContextHolder上下文信息傳播。
提升HystrixConcurrencyStrategy包裝擴展性
上一個小節介紹了若是在Hystrix線程隔離場景下實現ThreadLocal定義的上下文傳播,根據示例,在實際應用過程當中若是咱們有多個相似RequestContextHystrixConcurrencyStrategy策略,須要將每一個自定義HystrixConcurrencyStrategy示例註冊至HystrixPlugins中,這在擴展性方面顯然是缺失的,借鑑spring的實踐,咱們能夠定義對Callable的包裝接口HystrixCallableWrapper,根據實際的業務只須要對HystrixCallableWrapper進行實現,並註冊對應的實現bean便可。具體實現以下:
一、定義用於包裝hystrix中Callable實例的接口:
public interface HystrixCallableWrapper { /** * 包裝Callable實例 * * @param callable 待包裝實例 * @param <T> 返回類型 * @return 包裝後的實例 */ <T> Callable<T> wrap(Callable<T> callable); }
二、經過以前的源碼閱讀與實踐,基本已經發現實現線程上線文傳播的核心在於對Callable進行包裝,經過屢次對Callable包裝即實現了一個鏈式包裝過程,以下擴展HystrixConcurrencyStrategy接口實現RequestContextHystrixConcurrencyStrategy,其中定義CallableWrapperChain類對全部注入的HystrixCallableWrapper包裝實現進行裝配:
public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private final Collection<HystrixCallableWrapper> wrappers; public RequestContextHystrixConcurrencyStrategy(Collection<HystrixCallableWrapper> wrappers) { this.wrappers = wrappers; } @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return new CallableWrapperChain(callable, wrappers.iterator()).wrapCallable(); } private static class CallableWrapperChain<T> { private final Callable<T> callable; private final Iterator<HystrixCallableWrapper> wrappers; CallableWrapperChain(Callable<T> callable, Iterator<HystrixCallableWrapper> wrappers) { this.callable = callable; this.wrappers = wrappers; } Callable<T> wrapCallable() { Callable<T> delegate = callable; while (wrappers.hasNext()) { delegate = wrappers.next().wrap(delegate); } return delegate; } } }
三、實現HystrixCallableWrapper接口,定義一個包裝RequestContextHolder上下文處理的實現類:
public final class RequestAttributeAwareCallableWrapper implements HystrixCallableWrapper { @Override public <T> Callable<T> wrap(Callable<T> callable) { return new RequestAttributeAwareCallable(callable, RequestContextHolder.getRequestAttributes()); } static class RequestAttributeAwareCallable<T> implements Callable<T> { private final Callable<T> delegate; private final RequestAttributes requestAttributes; RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) { this.delegate = callable; this.requestAttributes = requestAttributes; } @Override public T call() throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); return delegate.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
四、實現HystrixCallableWrapper接口,定義一個包裝Mdc日誌處理上下文的實現類:
public class MdcAwareCallableWrapper implements HystrixCallableWrapper { @Override public <T> Callable<T> wrap(Callable<T> callable) { return new MdcAwareCallable<>(callable, MDC.getCopyOfContextMap()); } private class MdcAwareCallable<T> implements Callable<T> { private final Callable<T> delegate; private final Map<String, String> contextMap; public MdcAwareCallable(Callable<T> callable, Map<String, String> contextMap) { this.delegate = callable; this.contextMap = contextMap != null ? contextMap : new HashMap(); } @Override public T call() throws Exception { try { MDC.setContextMap(contextMap); return delegate.call(); } finally { MDC.clear(); } } } }
五、最後經過在Configuration配置類中註冊以下HystrixCallableWrapper 實現類的bean實例,並經過HystrixPlugins註冊擴展包裝實現:
@Bean public HystrixCallableWrapper requestAttributeAwareCallableWrapper() { return new RequestAttributeAwareCallableWrapper(); } @Bean public HystrixCallableWrapper mdcAwareCallableWrapper(){ return new MdcAwareCallableWrapper(); } @Autowired(required = false) private List<HystrixCallableWrapper> wrappers = new ArrayList<>(); @PostConstruct public void init() { HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy(wrappers)); }
總結本章從官方網站與源碼出發,逐步實現了hystrix中如何進行線程上下文的傳播。同時爲了更好的擴展性,提供了基於自定義接口並注入實現的方式。--------------------- 做者:帥天下 來源:CSDN 原文:https://blog.csdn.net/songhaifengshuaige/article/details/80345012 版權聲明:本文爲博主原創文章,轉載請附上博文連接!