在學習Hystrix
的請求緩存與請求合併過程當中,不由產生疑問,如何實現基於一個相似於「ThreadLocal
變量」,但上下文運用範圍爲Request
維度,也就是「HystrixRequestContext
」。java
一個Request
,即Command
能夠由多個線程執行,舉個例子,咱們使用線程A和線程B處理TestCommand
,應該屬於兩次TestCommand
執行,A線程和B線程是怎麼共用的緩存,減小了額外的邏輯執行?git
結合Hystrix請求合併與請求緩存(一):請求緩存、Hystrix請求合併與請求緩存(二):請求合併 中的分析,HystrixRequestContext
怎麼如註釋所說,實現request scoped
?github
Contains the state and manages the lifecycle of {@link HystrixRequestVariableDefault} objects that provide request scoped (rather than only thread scoped) variables so that multiple threads within a single request can share state.緩存
HystrixCollapser
來實現的,執行合併請求經過使用HystrixContextCallable
來實現。CollapsedTask
構造器中一段註釋,即運行過程當中,使用上一級線程的上下文,例如Tomcat線程。// this gets executed from the context of a HystrixCommand parent thread (such as a Tomcat thread)
複製代碼
// HystrixContextRunnable是個Runnable,一個可用於執行的任務
public class HystrixContextRunnable implements Runnable {
private final Callable<Void> actual;
private final HystrixRequestContext parentThreadState;
public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}
public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
// 獲取當前線程的HystrixRequestContext
this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
}
// 關鍵的構造器
public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
// 將原始Callable裝飾, 建立了一個新的callable
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
actual.run();
return null;
}
});
// 存儲當前線程的hystrixRequestContext
this.parentThreadState = hystrixRequestContext;
}
@Override
public void run() {
// 運行實際的Runnable以前先保存當前線程已有的HystrixRequestContext
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// 設置當前線程的HystrixRequestContext,來自上一級線程,所以兩個線程是同一個HystrixRequestContext
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// 還原當前線程的HystrixRequestContext
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
}
複製代碼
HystrixRequestContext
替換爲了上一級線程的HystrixRequestContext
,執行後還原,實現了HystrixRequestContext
的傳遞。public class DemoRequestContext {
// ThreadLoacal
private static ThreadLocal<DemoRequestContext> demo = new ThreadLocal<>();
// 線程上下文,存儲map
private ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
private DemoRequestContext() {
}
// 初始化上下文
public static DemoRequestContext initializeContext() {
DemoRequestContext context = new DemoRequestContext();
demo.set(context);
return context;
}
// 設置當前線程的上下文
public static void setContextOnCurrentThread(DemoRequestContext state) {
demo.set(state);
}
// 獲取當前線程的上下文
public static DemoRequestContext getContextForCurrentThread() {
DemoRequestContext context = demo.get();
if (context != null && context.concurrentHashMap != null) {
return context;
} else {
return null;
}
}
// 存儲當前線程須要存儲的數據 key-value
public void set(String key, String value) {
if (DemoRequestContext.getContextForCurrentThread() == null) {
throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
DemoRequestContext.getContextForCurrentThread().concurrentHashMap.put(key, value);
}
// 根據key獲取當前線程上下文中的值
public String get(String key) {
if (DemoRequestContext.getContextForCurrentThread() == null) {
throw new IllegalArgumentException(DemoRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<String, String> variableMap = DemoRequestContext.getContextForCurrentThread().concurrentHashMap;
return variableMap.get(key);
}
}
複製代碼
public class DemoContextCallable<V> implements Callable<V> {
private final Callable<V> callable;
private final DemoRequestContext parentThreadState;
// 構造時 將上一級線程上下文注入
public DemoContextCallable(Callable<V> callable) {
this.callable = callable;
this.parentThreadState = DemoRequestContext.getContextForCurrentThread();
}
// 替換線程上下文的操做
@Override
public V call() throws Exception {
DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
try {
DemoRequestContext.setContextOnCurrentThread(parentThreadState);
return callable.call();
} finally {
DemoRequestContext.setContextOnCurrentThread(context);
}
}
}
複製代碼
public class ContextTest {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 主線程的context
DemoRequestContext.initializeContext();
// 主線程存儲的key-value
DemoRequestContext context = DemoRequestContext.getContextForCurrentThread();
context.set("name", "parentThread");
DemoContextCallable<String> contextCallable = new DemoContextCallable<String>(new Callable<String>() {
@Override
public String call() throws Exception {
// 子線程中取出上下文內容
return DemoRequestContext.getContextForCurrentThread().get("name");
}
});
// 線程池運行
List<Future<String>> list = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Future<String> future= executorService.submit(contextCallable);
list.add(future);
}
for (Future<String> future : list) {
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
複製代碼
parentThread
parentThread
parentThread
複製代碼
一直對HystrixRequestContext
的實現原理很困惑,官方推薦是在Filter
中初始化並shutdown
,所以有如下理解。bash
HystrixRequestContext
是一次HttpRequest
中的上下文,一次請求中可能有有多個線程執行多個Command
。HystrixRequestCache
和HystrixRequestContext
生命週期一致,都是一次HttpRequest
。ide