Hystrix 如何解決 ThreadLocal 信息丟失

本文分享 ThreadLocal 遇到 Hystrix 時上下文信息傳遞的方案。java

1、背景

筆者在業務開發中涉及到使用 ThreadLocal 來存放上下文鏈路中一些關鍵信息,其中一些業務實現對外部接口依賴,對這些依賴接口使用了Hystrix做熔斷保護,但在使用Hystrix做熔斷保護的方法中發現了獲取 ThreadLocal 信息與預期不一致問題,本文旨在探討如何解決這一問題。git

2、ThreadLocal

在Java編程語言裏ThreadLocal是用來方便開發人員在同一線程上下文中不一樣類、不一樣方法中共享信息的,ThreadLocal變量不受其餘線程的影響,不一樣線程間相互隔離,也就是線程安全的。在實際的業務鏈路中從入口到具體的業務實現有時候須要共享某些通用信息,好比用戶惟一標識、鏈路追蹤惟一標識等,這些信息就可使用ThreadLocal來存儲實現,下面就是一個簡單的同一鏈路中共享traceId的示例代碼。github

public class ThreadLocalUtil {
 
    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<>();
 
    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }
 
    public static String getTraceId() {
        return TRACE_ID.get();
    }
 
    public static void clearTraceId() {
        TRACE_ID.remove();
    }
}

3、Hystrix

在分佈式環境中,每一個系統所依賴的外部服務不可避免的會出現失敗或超時的狀況,Hystrix 經過增長對依賴服務的延時容錯及失敗容錯邏輯,也就是所謂的「熔斷」,以幫助開發人員去靈活控制所依賴的分佈式服務。編程

Hystrix經過隔離服務間的訪問點,阻斷服務間的級聯故障,並提供降級選項,這一切都是爲了提供系統總體的健壯性,在大規模分佈式服務中,系統的健壯性尤爲重要。Hystrix詳細的介紹能夠看:Hystrix介紹安全

4、ThreadLocal趕上Hystrix

當業務鏈路中的具體實現有依賴外部服務,且做了相關熔斷保護,那麼本文的兩個主角就這麼趕上了。網絡

根據Hystrix的相關文檔介紹咱們瞭解到,Hystrix提供兩種線程隔離模式:信號量和線程池。多線程

信號量模式下執行業務邏輯時處於同一線程上下文,而線程池模式則使用Hystrix提供的線程池去執行相關業務邏輯。在平常業務開發中更多須要熔斷的是涉及到外部網絡IO調用的(如RPC調用),Hystrix存在的一個目的就是想減小外部依賴的調用對服務容器線程的消耗,信號量模式顯然不太適合,所以咱們在絕大部分場景下使用的都是線程池模式,而Hystrix默認狀況下啓用的也是線程池模式。編程語言

本文想要解決的也正是在這種默認模式下才會有的問題:分佈式

一、InheritableThreadLocal

有人可能會想到是否是能夠用InheritableThreadLocal去解決?ide

InheritableThreadLocal能夠將當前線程中的線程變量信息共享到當前線程所建立的「子線程」中,但這邊忽略了一個很重要的信息,Hystrix中的線程模式底層使用的是本身維護的一個線程池,也就是其中的線程會出現複用的狀況,那麼就會出現每一個線程所共享的信息都是以前首次獲取到的「父線程」的共享信息,這顯然不是咱們所期待的,因此InheritableThreadLocal被排除。

那麼想要在Hystrix中解決這個問題怎麼辦?

優秀的Hystrix已經幫你們提供了相關解決方案,並且是插件化,按需定製。Hystrix的插件詳細介紹請看這:Hystrix插件介紹,本文給你們介紹兩種方案。

如何讓ThreadLocal變量信息在HystrixCommand執行時能在Hystrix線程中正確的傳遞?

二、Concurrency Strategy

使用 HystrixConcurrencyStrategy插件能夠來包裝Hystrix線程所執行的方法,具體直接看示例代碼:

public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
 
    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        String traceId = ThreadLocalUtil.getTraceId();
        return () -> {
            ThreadLocalUtil.setTraceId(traceId);
            try {
                return callable.call();
            } finally {
                ThreadLocalUtil.clearTraceId();
            }
        };
    }
}
 
 
// 業務代碼中某處合適的地方註冊下當前的策略插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new MyHystrixConcurrencyStrategy());

使用這種方式很是簡單,只要開發人員將本身關注的ThreadLocal值進行「複製」便可,那是否是使用這種方式就好了?

咱們留意到這種方式本質是針對HystrixCommand的run()方法(也就是加了@HystrixCommand註解的業務方法)攔截處理,但它可能會超時或失敗,那麼就會去執行fallback方法,若是在 fallback方法中也想共享相關上下文信息,這時就沒法覆蓋到這種場景了。

若是在你的業務中fallback不須要關注上下文信息這塊的內容,那麼上述這種方案就能夠知足需求了,也很簡單。但若是在fallback方法中也須要上下文信息,那麼可使用Hystrix提供的下面這種插件方式。

三、Command Execution Hook

使用HystrixCommandExecutionHook能夠實現對Hystrix執行流程的徹底控制,你能夠覆寫它的一些關鍵節點的回調方法,以實現你的定製需求。想要更多的瞭解能夠看下這:Command Execution Hook介紹,下面列舉出HystrixCommandExecutionHook的一些經常使用的關鍵方法:

在瞭解上述這些關鍵方法後,能夠發現實現也很簡單,只要在onStart()的時候「複製」下關注的上下文信息,而後在onExecutionStart()和onFallbackStart()兩個方法開始執行前「粘貼」下關注的上下文信息,最後在做相應的清理行爲,就能夠知足需求了,示例代碼以下所示:

public class MyHystrixHook extends HystrixCommandExecutionHook {
     
    private String traceId;
 
    @Override
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        copyTraceId();
    }
 
    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
// 下面option1和option2選擇其中一種覆寫就能夠了
//------------------------------------option1------------------------------------
    @Override
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onExecutionSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onExecutionError(commandInstance, e);
    }
 
    @Override
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onFallbackSuccess(commandInstance);
    }
     
     @Override
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onFallbackError(commandInstance, e);
    }
//------------------------------------option1------------------------------------
 
//------------------------------------option2------------------------------------
        @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        ThreadLocalUtil.clearTraceId();
        super.onSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        ThreadLocalUtil.clearTraceId();
        return super.onError(commandInstance, failureType, e);
    }
//------------------------------------option2------------------------------------
     
     private void copyTraceId() {
        this.traceId = ThreadLocalUtil.getTraceId();
    }
 
    private void pasteTraceId() {
        ThreadLocalUtil.setTraceId(traceId);
    }
 
}
 
// 業務代碼中某處合適的的地方註冊下Hook插件
HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());

那是否是這樣的實現方式就解決問題了?仔細想下會不會有什麼問題?

咱們知道HystrixCommandExecutionHook插件註冊後,全部HystrixCommand在被調用執行的時候都會通過這些覆寫的方法,也就會出現多線程覆寫traceId,那麼對於這個Hook下的traceId隨時可能被改變了。假設有這樣場景:

  1. 調用者線程1上下文的traceId爲"t1",在調用其依賴的Hystrix方法時,traceId被設爲"t1"
  2. 同一時刻調用者線程2上下文的traceId爲"t2",在調用其依賴的Hystrix方法時,也會觸發更改traceId爲"t2"
  3. 在hystrix線程1開始執行具體業務方法時,其想「粘貼」的traceId已經被改爲"t2",而不是初始調用者線程1時所設置"t1"

爲了解決上面遇到的問題,Hystrix爲開發人員提供了經過HystrixRequestContext和HystrixRequestVariableDefault這兩個關鍵類解決。

HystrixRequestContext用於記錄每次Hystrix請求的上下文信息,其中有兩個關鍵信息:

static ThreadLocal<HystrixRequestContext> requestVariables: 用於記錄每次HystrixCommand執行時的上下文。

ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state:用於記錄上下文真正的數據。

HystrixRequestVariableDefault的用法有點似於ThreadLocal,提供了get(),set()方法,具體能力的實現藉助於HystrixRequestContext。

HystrixCommandExecutionHook插件終極解決方式的實現的示例代碼以下:

public class MyHystrixHook extends HystrixCommandExecutionHook {
     
    private HystrixRequestVariableDefault<String> requestVariable = new HystrixRequestVariableDefault<>();
 
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.initializeContext();
                copyTraceId();
    }
 
    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        pasteTraceId();
    }
 
        @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        super.onSuccess(commandInstance);
    }
 
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return super.onError(commandInstance, failureType, e);
    }
     
     private void copyTraceId() {
        requestVariable.set(ThreadLocalUtil.getTraceId());
    }
 
    private void pasteTraceId() {
        ThreadLocalUtil.setTraceId(requestVariable.get());
    }
}

在每次Hook執行onStart()方法的時候,須要先執行HystrixRequestContext的初始化操做,而後對關注的上下文信息進行「複製」,關鍵代碼以下:

public void set(T value) {
    HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}

把關注的信息複製到一個線程相關的ConcurrentHashMap中了,根據前面對HystrixCommandExecutionHook的介紹咱們知道,onStart()的時候當前線程爲調用者線程;

在真正開始執行HystrixCommand業務方方法的時候,此時須要進行「粘貼」上下文信息,從requestVariable.get()獲取,get操做關鍵代碼以下:

public T get() {
      if (HystrixRequestContext.getContextForCurrentThread() == null) {
          throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
      }
      ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
     
      // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
      LazyInitializer<?> v = variableMap.get(this);
      if (v != null) {
          return (T) v.get();
      }
 
      // 省略一部分
      ....
}

從代碼能夠看出get與set操做相對應,也是從線程相關的ConcurrentHashMap獲取相應的值,從前序介紹咱們也得知當前線程是Hystrix提供的線程池線程,與調用者線程不是同一個線程,那麼這個業務關注的上下文信息還能正確的傳遞到Hystrix線程中嗎?通過測試它確實「神奇」的正確傳遞了,那究竟是怎麼作到的呢?

原來是Hystrix「默默」的幫咱們作了,經過調試咱們看到以下一段關鍵代碼:

this.actual = action;
    // 調用者線程HystrixRequestContext信息
    this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
 
    this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
 
        @Override
        public Void call() throws Exception {
            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
            try {
                // 幫咱們作了一步拷貝操做
                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                // 開始真正的執行業務定義的方法,此時上下文信息已經一致了
                actual.call();
                return null;
            } finally {
                HystrixRequestContext.setContextOnCurrentThread(existingState);
            }
        }
    });
}

在執行業務定義的HystrixCommand方法前,Hystrix封裝的對象幫咱們把調用者線程的上下文信息「拷貝」過來了,其實這個處理的思路有點相似於咱們前一個插件HystrixConcurrencyStrategy。

5、總結

HystrixConcurrencyStrategy 和HystrixCommandExecutionHook二者插件方式你們能夠根據實際狀況去斷定,若是肯定不須要在fallback中關注上下文傳遞信息,那用前者就能夠了,也很簡便,但若是你想解決的更完全點,那麼用後一種方式就能夠了。

做者:vivo 官網商城開發團隊
相關文章
相關標籤/搜索