Hystrix之ThreadLocal上下文傳播

引言

ThreadLocal

ThreadLocal這個類給線程提供了一個本地變量,這個變量是該線程本身擁有,各線程間不共享。在該線程存活和ThreadLocal實例能訪問的時候,保存了對這個變量副本的引用。當線程消失的時候,全部的本地實例都會被GC。而且建議ThreadLocal最好是使用 private static 修飾。git

InheritableThreadLocal

InheritableThreadLocal是爲了解決子線程得到父線程本地變量的需求,繼承自ThreadLocal。若是你使用它,那麼保存的全部東西都已經不在原來的threadLocals裏面,而是在一個新的叫inheritableThreadLocals變量中。意思就是說每一個線程Thread裏面還有一個Map變量,名叫inheritableThreadLocals,它保存的是須要傳遞的引用(經過InheritableThreadLocal設置的線程變量)。github

這種父子傳遞的需求仍是有些比較重要的應用場景,如上下文傳遞(用戶標識、事務等),調用日誌跟蹤等。Log4j中的MDC就是基於InheritableThreadLocal實現。但通常來講咱們用線程池比較多,線程池會緩存線程,重複使用,線程可能會執行不一樣的任務。這樣一來它的上下文傳遞就達不到正確的效果。緩存

TransmittableThreadLocal

阿里巴巴有個開源項目就是爲了解決線程池中變量傳遞,它裏面有個叫TransmittableThreadLocal,繼承於InheritableThreadLocal。它經過包裝返回Runnable的方式代理了run方法,在run以前copy裝載線程變量,run以後清除線程變量,來實現此功能。TransmittableThreadLocal除了繼承過來的線程Map,它還定義了一個名叫holder的InheritableThreadLocal靜態變量,也就是說TransmittableThreadLocal有兩套線程變量。併發

但事實上咱們不可能全部的應用均採用InheritableThreadLocal,儘管他是一個不錯的選擇,但如何讓ThreadLocal也實如今Hystrix應用場景下實現線程上下文的傳播呢。這就是本章的重點了。app

HystrixConcurrencystrategy

Hystrixconcurrencystrategy是Hystrix的線程池建立源碼所在,而且提供方法wrapCallable來裝飾線程池執行環境。因此咱們咱們能夠自定義一個併發策略,便可於Hystrix應用場景內實現線程上下文的傳播。附wrapCallable源碼ide

/**
 * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
 * <p>
 * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
 * <p>
 * <b>Default Implementation</b>
 * <p>
 * Pass-thru that does no wrapping.
 *
 * @param callable
 *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
 * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
 */
public <T> Callable<T> wrapCallable(Callable<T> callable) {
    return callable;
}

Ps:註釋上有一句這麼說的:This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).ui

拓展

既然這個方法能夠裝飾線程池回調,那麼咱們亦可定義一個裝飾器接口,只要這個接口的實現類,都會經過上述併發策略裝飾不一樣業務不一樣場景須要的線程變量。this

裝飾器接口定義

/**
 * Hystrix CallBack 裝飾器定義
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:18
 **/
public interface HystrixCallableWrapper {

    /**
     * 裝飾 Callable實例
     *
     * @param callable
     *            待裝飾實例
     * @param <T>
     *            返回類型
     * @return 裝飾後的實例
     */
    <T> Callable<T> wrap(Callable<T> callable);
}

HystrixConcurrencystrategy Custom

/**
 * Hystrix併發策略
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:06
 **/
public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy {

    /** 裝飾隊列 */
    private final List<HystrixCallableWrapper> wrappers;

    public HystrixConcurrencyStrategyCustom(List<HystrixCallableWrapper> wrappers) {
        this.wrappers = wrappers;
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new CallableWrapperChain(callable, this.wrappers).wrapCallable();
    }

    /**
     * callback 調用鏈
     *
     * @author wangzhuhua
     * @date 2018/09/07 下午4:38
     **/
    private static class CallableWrapperChain<T> {

        /** 回調 */
        private final Callable<T> callable;
        /** 回調包裝 */
        private final List<HystrixCallableWrapper> wrappers;

        CallableWrapperChain(Callable<T> callable, List<HystrixCallableWrapper> wrappers) {
            this.callable = callable;
            this.wrappers = wrappers;
        }

        /**
         * 裝飾線程hystrix callable
         * 
         * @return {@link Callable}
         */
        Callable<T> wrapCallable() {
            Callable<T> delegate = callable;
            for (HystrixCallableWrapper wrapper : wrappers) {
                delegate = wrapper.wrap(delegate);
            }
            return delegate;
        }
    }

}

Configuration

/**
 * Hystrix配置
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:01
 **/
@Configuration
@ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")
public class HystrixConfiguration {

    @Autowired(required = false)
    private List<HystrixCallableWrapper> wrappers = new ArrayList<>();

    @PostConstruct
    public void init() {
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers));
    }

}

實現示例

/**
 * Token 裝飾器
 *
 * @author wangzhuhua
 * @date 2018/09/10 上午11:28
 **/
@Component
public class TokenWrapper implements HystrixCallableWrapper {
    @Override
    public <T> Callable<T> wrap(Callable<T> callable) {
        return new TokenAwareCallable(callable, TokenHandler.getToken());
    }

    /**
     * Token裝飾
     *
     * @param <T>
     */
    static class TokenAwareCallable<T> implements Callable<T> {

        /** 回調代理 */
        private final Callable<T> delegate;
        /** Token */
        private final String token;

        TokenAwareCallable(Callable<T> callable, String token) {
            this.delegate = callable;
            this.token = token;
        }

        @Override
        public T call() throws Exception {
            try {
                // 填充當前線程變量
                TokenHandler.setToken(this.token);
                return delegate.call();
            } finally {
                TokenHandler.remove();
            }
        }
    }
}

其中TokenHandler內使用線程變量存儲spa

相關文章
相關標籤/搜索