理解HystrixRequestContext

前言

在學習Hystrix的請求緩存與請求合併過程當中,不由產生疑問,如何實現基於一個相似於「ThreadLocal變量」,但上下文運用範圍爲Request維度,也就是「HystrixRequestContext」。java

產生這個疑問的緣由

Contains the state and manages the lifecycle of {@link HystrixRequestVariableDefault} objects that provide request scoped (rather than only thread scoped) variables so that multiple threads within a single request can share state.緩存

以請求合併(REQUEST維度)爲例

  • 前文中,提到請求合併是經過HystrixCollapser來實現的,執行合併請求經過使用HystrixContextCallable來實現。
  • 源碼中,CollapsedTask構造器中一段註釋,即運行過程當中,使用上一級線程的上下文,例如Tomcat線程
// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
複製代碼
// HystrixContextRunnable是個Runnable,一個可用於執行的任務
public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        // 獲取當前線程的HystrixRequestContext
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    // 關鍵的構造器
    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        
        // 將原始Callable裝飾, 建立了一個新的callable
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }
        });
        // 存儲當前線程的hystrixRequestContext
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        // 運行實際的Runnable以前先保存當前線程已有的HystrixRequestContext
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // 設置當前線程的HystrixRequestContext,來自上一級線程,所以兩個線程是同一個HystrixRequestContext
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // 還原當前線程的HystrixRequestContext
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }
}
複製代碼
  • 今後能夠獲知,在子線程執行具體代碼前,將當前線程的HystrixRequestContext替換爲了上一級線程的HystrixRequestContext,執行後還原,實現了HystrixRequestContext的傳遞。

簡化DEMO

DemoRequestContext

public class DemoRequestContext {

    // ThreadLoacal
    private static ThreadLocal<DemoRequestContext> demo = new ThreadLocal<>();

    // 線程上下文,存儲map
    private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();

    private DemoRequestContext() {
    }

    // 初始化上下文
    public static DemoRequestContext initializeContext() {
        DemoRequestContext context = new DemoRequestContext();
        demo.set(context);
        return context;
    }

    // 設置當前線程的上下文
    public static void setContextOnCurrentThread(DemoRequestContext state) {
        demo.set(state);
    }

    // 獲取當前線程的上下文
    public static DemoRequestContext getContextForCurrentThread() {
        DemoRequestContext context = demo.get();

        if (context != null && context.concurrentHashMap != null) {
            return context;
        } else {
            return null;
        }
    }

    // 存儲當前線程須要存儲的數據 key-value
    public void set(String key, String value) {
        if (DemoRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        DemoRequestContext.getContextForCurrentThread().concurrentHashMap.put(key, value);
    }

    // 根據key獲取當前線程上下文中的值
    public String get(String key) {
        if (DemoRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<String, String> variableMap = DemoRequestContext.getContextForCurrentThread().concurrentHashMap;
        return variableMap.get(key);
    }

}
複製代碼

DemoContextCallable

public class DemoContextCallable<V> implements Callable<V> {

    private final Callable<V> callable;
    private final DemoRequestContext parentThreadState;

    // 構造時 將上一級線程上下文注入
    public DemoContextCallable(Callable<V> callable) {
        this.callable = callable;
        this.parentThreadState = DemoRequestContext.getContextForCurrentThread();
    }

    // 替換線程上下文的操做
    @Override
    public V call() throws Exception {
        DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
        try {
            DemoRequestContext.setContextOnCurrentThread(parentThreadState);
            return callable.call();
        } finally {
            DemoRequestContext.setContextOnCurrentThread(context);
        }
    }
}
複製代碼

測試類

public class ContextTest {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    public static void main(String[] args) {
        // 主線程的context
        DemoRequestContext.initializeContext();

        // 主線程存儲的key-value
        DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
        context.set("name", "parentThread");

        DemoContextCallable<String> contextCallable = new DemoContextCallable<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                // 子線程中取出上下文內容
                return DemoRequestContext.getContextForCurrentThread().get("name");
            }
        });

        // 線程池運行
        List<Future<String>> list = Lists.newArrayList();
        for (int i = 0; i < 3; i++) {
            Future<String> future= executorService.submit(contextCallable);
            list.add(future);
        }

        for (Future<String> future : list) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

    }
}
複製代碼

輸出

parentThread
parentThread
parentThread
複製代碼

總結

一直對HystrixRequestContext的實現原理很困惑,官方推薦是在Filter中初始化並shutdown,所以有如下理解。bash

HystrixRequestContext是一次HttpRequest中的上下文,一次請求中可能有有多個線程執行多個CommandHystrixRequestCacheHystrixRequestContext生命週期一致,都是一次HttpRequestide

參考文獻

相關文章
相關標籤/搜索