在咱們的項目中,比較普遍地使用了ThreadLocal,好比,在filter層,根據token,取到用戶信息後,就會放到一個ThreadLocal變量中;在後續的業務處理中,就會直接從當前線程,來獲取該ThreadLocal變量,而後獲取到其中的用戶信息,很是的方便。java
可是,hystrix 這個組件一旦引入的話,若是使用線程隔離的方式,咱們的業務邏輯就被分紅了兩部分,以下:git
public class SimpleHystrixCommand extends HystrixCommand<String> { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { .... } ... }
首先,咱們定義了一個Command,這個Command,最終就會丟給hystrix的線程池中去運行。那,咱們的controller層,會怎麼寫呢?github
@RequestMapping("/") public String hystrixOrder () { SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 1 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); // 2 String res = simpleHystrixCommand.execute(); return res; }
因此,這中間,是有線程切換的,執行1時,當前線程裏的ThreadLocal數據,在執行業務方法的時候,線程變了,也就取不到ThreadLocal數據了。緩存
若是沒時間,能夠直接看源碼:app
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demodom
一開始,個人思路是,看看能不能把hystrix的默認線程池給換掉,由於構建HystrixCommand時,支持使用Setter的方式去配置。ide
以下:函數
com.netflix.hystrix.HystrixCommand.Setter final public static class Setter { // 1 protected final HystrixCommandGroupKey groupKey; // 2 protected HystrixCommandKey commandKey; // 3 protected HystrixThreadPoolKey threadPoolKey; // 4 protected HystrixCommandProperties.Setter commandPropertiesDefaults; // 5 protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; }
1處,設置命令組ui
2處,設置命令的keythis
3處,設置線程池的key;hystrix會根據這個key,在一個map中,來查找對應的線程池,若是找不到,則建立一個,並放到map中。
com.netflix.hystrix.HystrixThreadPool.Factory final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
4處,命令的相關屬性,包括是否降級,是否熔斷,是否容許請求合併,命令執行的最大超時時長,以及metric等實時統計信息
5處,線程池的相關屬性,好比核心線程數,最大線程數,隊列長度等
怎麼樣,能夠設置的屬性不少,是吧,可是,並無讓咱們控制線程池的建立相關的,也沒辦法替換其默認線程池。
ok,那不用setter的方式,行不行呢?
HystrixCommand 的構造函數,看看能不能傳入自定義的線程池呢?
通過我一開始不仔細的觀察,發現有一個構造函數能夠傳入HystrixThreadPool,ok,就是它了。可是,後面仔細一看,居然是 package權限,個人子類,和HystrixCommand固然不是一個package下的,因此,訪問不了這個構造器。
雖然,可使用反射,可是,我們仍是守規矩點好了,再看看有沒有其餘入口。
仔細觀察下,看看線程池何時建立的?
入口在下圖,每次new一個HystrixCommand,最終都會調用父類的構造函數:
上圖所示處,initThreadPool裏面,會去建立線程池,須要注意的是,這裏的第一個實參,threadPool,是構造函數的第5個形參,目前來看,傳進來的都是null。爲啥說這個,咱們接着看:
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { //1 get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }
上面咱們說了,第一個實參,老是null,因此,會走這裏的1處。
com.netflix.hystrix.HystrixThreadPool.Factory#getInstance static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { String key = threadPoolKey.name(); //1 this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } //2 if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { // 3 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
咱們接着看3處:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { // 1 this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); // 2 HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); // 3 this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); // 4 this.threadPool = this.metrics.getThreadPool(); ... }
1處,獲取線程池的默認配置,這個就和咱們前面說的那個Setter裏的相似
2處,從HystrixPlugins.getInstance()獲取一個HystrixConcurrencyStrategy類型的對象,保存到局部變量 concurrencyStrategy
3處,初始化metrics,這裏的第二個參數,是concurrencyStrategy.getThreadPool來獲取的,這個操做,實際上就會去建立線程池。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy#getThreadPool public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); ... final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); ... // 1 return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
上面的1處,會去建立線程池。可是,這裏直接就是要了 jdk 的默認線程池類來建立,這還怎麼搞?類型都定死了。無法擴展了。。。
可是,回過頭來,又仔細看了看,這個getThreadPool 是 HystrixConcurrencyStrategy類的一個方法,這個方法也是個實例方法。
方法不能改,那,實例能換嗎?再看看前面的代碼:
ok,那接着分析:
public HystrixConcurrencyStrategy getConcurrencyStrategy() { if (concurrencyStrategy.get() == null) { //1 check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class); concurrencyStrategy.compareAndSet(null, (HystrixConcurrencyStrategy) impl); } return concurrencyStrategy.get(); }
1處,根據這個類,獲取實現,感受有點戲。
private <T> T getPluginImplementation(Class<T> pluginClass) { // 1 T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; // 2 return findService(pluginClass, classLoader); }
1處,從一個動態屬性中獲取,後來經查,發現是若是集成了Netflix Archaius就能夠動態獲取屬性,相似於一個配置中心
2處,若是前面沒找到,就是要 JDK 的SPI機制。
private static <T> T findService( Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader<T> sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }
那就好說了。SPI ,咱們自定義一個實現,就能夠替換掉默認的了,hystrix作的仍是不錯,擴展性能夠。
如今知道能夠自定義HystrixConcurrencyStrategy了,那要怎麼自定義呢?
這個類,是個抽象類,大致有以下幾個方法:
getThreadPool getBlockingQueue(int maxQueueSize) Callable<T> wrapCallable(Callable<T> callable) getRequestVariable(final HystrixRequestVariableLifecycle<T> rv)
說是抽象類,但其實並無須要咱們實現的方法,全部方法都有默認實現,咱們只須要重寫須要覆蓋的方法便可。
我這裏,看重了第三個方法:
/** * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution. * <p> * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). * <p> * <b>Default Implementation</b> * <p> * Pass-thru that does no wrapping. * * @param callable * {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable<T>} either as a pass-thru or wrapping the one given */ public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
方法註釋如上,我簡單說下,在執行前,提供一個機會,讓你去wrap這個callable,即最終要丟到線程池執行的那個callable。
咱們能夠wrap一下原有的callable,在執行前,把當前線程的threadlocal變量存下來,即爲A,而後設置到callable裏面去;在callable執行的時候,就可使用咱們的A中的threadlocal來替換掉worker線程中的。
多說無益,這裏直接看代碼:
// 0 public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { /** * 1 獲取當前線程的threadlocalmap */ Object currentThreadlocalMap = getCurrentThreadlocalMap(); Callable<T> finalCallable = new Callable<T>() { // 2 private Object callerThreadlocalMap = currentThreadlocalMap; // 3 private Callable<T> targetCallable = callable; @Override public T call() throws Exception { /** * 4 將工做線程的原有線程變量保存起來 */ Object oldThreadlocalMapOfWorkThread = getCurrentThreadlocalMap(); /** *5 將本線程的線程變量,設置爲caller的線程變量 */ setCurrentThreadlocalMap(callerThreadlocalMap); try { // 6 return targetCallable.call(); }finally { // 7 setCurrentThreadlocalMap(oldThreadlocalMapOfWorkThread); log.info("restore work thread's threadlocal"); } } }; return finalCallable; }
獲取線程的threadlocal的代碼:
private Object getCurrentThreadlocalMap() { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); Object o = field.get(thread); return o; } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } return null; }
設置線程的threadlocal的代碼:
private void setCurrentThreadlocalMap(Object newThreadLocalMap) { Thread thread = Thread.currentThread(); try { Field field = Thread.class.getDeclaredField("threadLocals"); field.setAccessible(true); field.set(thread,newThreadLocalMap); } catch (NoSuchFieldException | IllegalAccessException e) { log.error("{}",e); } }
https://github.com/Netflix/Hystrix/wiki/Plugins
@RequestMapping("/") public String hystrixOrder () { // 1 SessionUtils.getSessionVOFromRedisAndPut2ThreadLocal(); // 2 SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(testService); String res = simpleHystrixCommand.execute(); return res; }
1處,設置ThreadLocal變量
public static UserVO getSessionVOFromRedisAndPut2ThreadLocal() { UserVO userVO = new UserVO(); userVO.setUserName("test user"); RequestContextHolder.set(userVO); log.info("set thread local:{} to context",userVO); return userVO; }
2處,new了一個HystrixCommand,而後execute執行
public class SimpleHystrixCommand extends HystrixCommand<String> { private TestService testService; public SimpleHystrixCommand(TestService testService) { super(setter()); this.testService = testService; } @Override protected String run() throws Exception { // 1 String s = testService.getResult(); log.info("get thread local:{}",s); /** * 若是睡眠時間,超過2s,會降級 * {@link #getFallback()} */ int millis = new Random().nextInt(3000); log.info("will sleep {} millis",millis); Thread.sleep(millis); return s; }
重點看1處代碼:
public String getResult() { UserVO userVO = RequestContextHolder.get(); log.info("I am hystrix pool thread,try to get threadlocal:{}",userVO); return userVO.toString(); }
如上所示,會去獲取ThreadLocal變量,並打印。
在resources\META-INF\services目錄下,建立文件:
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
內容爲下面一行:
com.learn.hystrix.utils.MyHystrixConcurrencyStrategy
2020-05-09 17:26:11.134 INFO 7452 --- [nio-8080-exec-2] com.learn.hystrix.utils.SessionUtils : set thread local:UserVO(userName=test user) to context 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] com.learn.hystrix.service.TestService : I am hystrix pool thread,try to get threadlocal:UserVO(userName=test user) 2020-05-09 17:26:11.143 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : get thread local:UserVO(userName=test user) 2020-05-09 17:26:11.144 INFO 7452 --- [x-member-pool-2] c.l.h.command.SimpleHystrixCommand : will sleep 126 millis 2020-05-09 17:26:11.281 INFO 7452 --- [x-member-pool-2] c.l.h.u.MyHystrixConcurrencyStrategy : restore work thread's threadlocal
能夠看到,已經發生了線程切換,在worker線程也取到了。
你們若是發現日誌中出現了[ HystrixTimer-1] 線程的身影,不用擔憂,那只是由於咱們的線程超時了,因此timer線程檢測到了以後,去執行一個callable任務,那個runnable就是前面被咱們包裝過的那個callable。(這塊超時的機制,todo吧,下次再講)
hystrix的插件機制,不止能夠擴展上面這一個類,還有幾個別的類也是能夠的。你們直接參考:
https://github.com/Netflix/Hystrix/wiki/Plugins
代碼demo,我放在了:
https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/hystrix-thread-local-demo