Hystrix 跨線程共享變量

1.在 Servlet 容器線程與 Hystrix 線程中共享變量的問題

        今天遇到一個問題,在使用 zuul 的過程當中我想要在 Hystrix 執行的線程中獲取到 com.netflix.zuul.context.RequestContext 中的數據。不過 RequestContext 是基於 ThreadLocal 的,因此在 Hystrix 執行的線程中調用 com.netflix.zuul.context.RequestContext#getCurrentContext 函數獲取到的只能是一個空的沒有任何數據的 RequestContext 對象。由於 Hystrix 默認是在另外一個線程中執行的 , 而 zuul filter 則是在 servlet 容器的線程中執行的,基於 ThreadLocal 的 RequestContext 天然沒法起做用了。java

 

2.在 Servlet 容器線程與 Hystrix 線程中共享變量的實現方式

2.1 關鍵類介紹

        Hystrix 的設計者們早就考慮到了這個問題,而且提供瞭解決方案。關鍵類是 :git

com.netflix.hystrix.strategy.concurrency.HystrixRequestContext ,com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault 。接下來分別說明他們是如何工做的。spring

        HystrixRequestContext 類內部有一個 ThreadLocal 和一個 ConcurrentHashMap ,ThreadLocal 的目的是爲了要在不一樣線程中保存 HystrixRequestContext 對象,這也就意味着保存了 HystrixRequestContext 中的 ConcurrentHashMap 。 源碼:maven

/*
     * ThreadLocal on each thread will hold the HystrixRequestVariableState.
     * 
     * Shutdown will clear the state inside HystrixRequestContext but not nullify the ThreadLocal on all
     * child threads as these threads will not be known by the parent when cleanupAfterRequest() is called.
     * 
     * However, the only thing held by those child threads until they are re-used and re-initialized is an empty
     * HystrixRequestContext object with the ConcurrentHashMap within it nulled out since once it is nullified
     * from the parent thread it is shared across all child threads.
     */
    private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();


/*
     * This ConcurrentHashMap should not be made publicly accessible. It is the state of RequestVariables for a given RequestContext.
     * 
     * Only HystrixRequestVariable has a reason to be accessing this field.
     */
    /* package */ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

        com.netflix.hystrix.strategy.concurrency.HystrixRequestContext#setContextOnCurrentThread 函數源碼 :把參數指定的一個 HystrixRequestContext 對象實列保存到當前線程的 ThreadLocal 中,看到這個函數咱們就應該想到他多是用來在切換線程以前用來作數據的拷貝用的。ide

public static void setContextOnCurrentThread(HystrixRequestContext state) {
        requestVariables.set(state);
    }

        ConcurrentHashMap key 的類型是 HystrixRequestVariableDefault : 看到它的 set(T value) 函數就是將數據放入到 HystrixRequestContext 對象的 ConcurrentHashMap 中。而 HystrixRequestVariableDefault 對象本身作爲這個 ConcurrentHashMap 的 key。函數

public void set(T value) {
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }

        

2.2 線程間 HystrixRequestContext 的拷貝

        com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable 源碼 :gradle

public class HystrixContextRunnable implements Runnable {

    private final Callable<Void> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextRunnable(Runnable actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }
    
    public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
        this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
    }

    public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {
        this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                actual.run();
                return null;
            }

        });
        this.parentThreadState = hystrixRequestContext;
    }

    @Override
    public void run() {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            try {
                actual.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

        com.netflix.hystrix.strategy.concurrency.HystrixContextCallable 源碼:this

public class HystrixContextCallable<K> implements Callable<K> {

    private final Callable<K> actual;
    private final HystrixRequestContext parentThreadState;

    public HystrixContextCallable(Callable<K> actual) {
        this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
    }

    public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
        this.actual = concurrencyStrategy.wrapCallable(actual);
        this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
    }

    @Override
    public K call() throws Exception {
        HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
        try {
            // set the state of this thread to that of its parent
            HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
            // execute actual Callable with the state of the parent
            return actual.call();
        } finally {
            // restore this thread back to its original state
            HystrixRequestContext.setContextOnCurrentThread(existingState);
        }
    }

}

        run 方法 , call 方法進入後就已經開啓了新的線程, 在這個新的線程中將原來線程中的 HystrixRequestContext 對象經過 HystrixRequestContext.setContextOnCurrentThread(parentThreadState); 函數設置到了當前的線程中 , parentThreadState 的值是 HystrixContextRunnable , HystrixContextCallable  初始化的時候在原來線程中獲取到的。spa

 

3. 完整使用示例

        目的:我想要在 hystrix 線程中使用到 RequestContext 中保存的數據。因此個人思路是定義一個 ZuulFilter 在這個 ZuulFilter 中將 RequestContext 保存到 HystrixRequestContext 中。不要忘記調用 com.netflix.hystrix.strategy.concurrency.HystrixRequestContext#shutdown 函數來清理已經使用過的數據,不然的話會有內存溢出的風險,就和使用 ThreadLocal 同樣在使用結束後要記得 remove 。.net

package org.hepeng.commons.spring.cloud.netflix.zuul.filter;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.netflix.zuul.exception.ZuulException;
import org.hepeng.commons.spring.cloud.netflix.zuul.RequestContextHelper;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.netflix.zuul.filters.support.FilterConstants;
import org.springframework.context.ApplicationContext;


/**
 * @author he peng
 */
public class HystrixRequestContextFilter extends ZuulFilter {

    private ApplicationContext context;

    public HystrixRequestContextFilter(ApplicationContext context) {
        this.context = context;
        registerHystrixRequestContextPostFilter();
    }

    private void registerHystrixRequestContextPostFilter() {
        BeanDefinition beanDefinition =
                new RootBeanDefinition(HystrixRequestContextPostFilter.class);
        beanDefinition.setScope("singleton");
        BeanDefinitionHolder beanDefinitionHolder =
                new BeanDefinitionHolder(beanDefinition , "hystrixRequestContextPostFilter" );
        BeanDefinitionReaderUtils.registerBeanDefinition(beanDefinitionHolder , (BeanDefinitionRegistry) this.context);
    }

    @Override
    public String filterType() {
        return FilterConstants.PRE_TYPE;
    }

    @Override
    public int filterOrder() {
        return FilterConstants.PRE_DECORATION_FILTER_ORDER + 1;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() throws ZuulException {
        HystrixRequestContext.initializeContext();
        RequestContextHelper.set(RequestContext.getCurrentContext());
        return null;
    }

    private static class HystrixRequestContextPostFilter extends ZuulFilter {
        @Override
        public String filterType() {
            return FilterConstants.POST_TYPE;
        }

        @Override
        public int filterOrder() {
            return -100;
        }

        @Override
        public boolean shouldFilter() {
            return true;
        }

        @Override
        public Object run() throws ZuulException {
            HystrixRequestContext context = HystrixRequestContext.getContextForCurrentThread();
            if (HystrixRequestContext.isCurrentThreadInitialized()) {
                context.shutdown();
            }
            return null;
        }
    }
}

 

對 HystrixRequestVariableDefault 的使用 ,其實就和使用 ThreadLocal 同樣,若是你瞭解 ThreadLocal 的話,就會知道 ThreadLocal 也是作爲一個 key 保存在 ThreadLocalMap 中的。

package org.hepeng.commons.spring.cloud.netflix.zuul;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableDefault;
import com.netflix.zuul.context.RequestContext;

import java.util.Objects;

import static org.springframework.cloud.netflix.zuul.filters.support.FilterConstants.*;

/**
 * @author he peng
 */
public class RequestContextHelper {

    private final static HystrixRequestVariableDefault<RequestContext> HYSTRIX_REQUEST_VARIABLE = new HystrixRequestVariableDefault();

    public static void set(RequestContext context) {
        HYSTRIX_REQUEST_VARIABLE.set(context);
    }

    public static RequestContext remove() {
        RequestContext context = HYSTRIX_REQUEST_VARIABLE.get();
        HYSTRIX_REQUEST_VARIABLE.remove();
        return context;
    }

    public static RequestContext get() {
        RequestContext context = RequestContext.getCurrentContext();
        if (context.isEmpty()) {
            RequestContext context1 = HYSTRIX_REQUEST_VARIABLE.get();
            if (Objects.nonNull(context1) && ! context1.isEmpty()) {
                context = context1;
            }
        }
        return context;
    }

    public static Object getServiceId() {
        RequestContext ctx = get();
        Object serviceId = ctx.get(SERVICE_ID_KEY);
        if (Objects.isNull(serviceId)) {
            serviceId = ctx.get(PROXY_KEY);
        }
        return serviceId;
    }

    public static Object getLoadBalancer() {
        return get().get(LOAD_BALANCER_KEY);
    }

    public static Object getRoutePath() {
        return get().get(REQUEST_URI_KEY);
    }
}

        基於上面個人代碼我在 Hystrix 的線程中若是想要獲取到以前線程中的 RequestContext 的時候只須要調用 RequestContextHelper.get() 函數就能夠獲取到了。

完整代碼:https://gitee.com/kernelHP/hp-java-commons

若是你想直接使用上述功能不想再本身編寫這些代碼能夠直接使用我已經寫好的代碼 :

gradle : implementation 'org.hepeng:hp-java-commons:1.1.5'

maven :   

<dependency>
  <groupId>org.hepeng</groupId>
  <artifactId>hp-java-commons</artifactId>
  <version>1.1.5</version>
</dependency>

使用示例:只須要將 HystrixRequestContextFilter 經過 spring 容器初始化便可。使用的時候只須要使用 RequestContextHelper 便可獲取到 RequestContext 。

@Configuration
public class Config {


    @Bean
    public ZuulFilter hystrixRequestContextFilter(ApplicationContext context) {
        return new HystrixRequestContextFilter(context);
    }

}
相關文章
相關標籤/搜索