專欄系列文章:SpringCloud系列專欄java
系列文章:web
SpringCloud 源碼系列(1)— 註冊中心Eureka 之 啓動初始化spring
SpringCloud 源碼系列(2)— 註冊中心Eureka 之 服務註冊、續約安全
SpringCloud 源碼系列(3)— 註冊中心Eureka 之 抓取註冊表markdown
SpringCloud 源碼系列(4)— 註冊中心Eureka 之 服務下線、故障、自我保護機制併發
SpringCloud 源碼系列(5)— 註冊中心Eureka 之 EurekaServer集羣app
SpringCloud 源碼系列(6)— 註冊中心Eureka 之 總結篇負載均衡
SpringCloud 源碼系列(7)— 負載均衡Ribbon 之 RestTemplateide
SpringCloud 源碼系列(8)— 負載均衡Ribbon 之 核心原理微服務
SpringCloud 源碼系列(9)— 負載均衡Ribbon 之 核心組件與配置
SpringCloud 源碼系列(10)— 負載均衡Ribbon 之 HTTP客戶端組件
SpringCloud 源碼系列(11)— 負載均衡Ribbon 之 重試與總結篇
SpringCloud 源碼系列(12)— 服務調用Feign 之 基礎使用篇
SpringCloud 源碼系列(13)— 服務調用Feign 之 掃描@FeignClient註解接口
SpringCloud 源碼系列(14)— 服務調用Feign 之 構建@FeignClient接口動態代理
SpringCloud 源碼系列(15)— 服務調用Feign 之 結合Ribbon進行負載均衡請求
SpringCloud 源碼系列(16)— 熔斷器Hystrix 之 基礎入門篇
SpringCloud 源碼系列(17)— 熔斷器Hystrix 之 獲取執行訂閱對象Observable
SpringCloud 源碼系列(18)— 熔斷器Hystrix 之 執行核心原理
前面的文章 SpringCloud 源碼系列(14)— 服務調用Feign 之 構建@FeignClient接口動態代理 已經詳細分析了 @FeignClient
接口生成動態代理並進行遠程調用的原理,下面把文章中的原理圖貼出來,便於回顧和理解。
圖種和 Hystrix 相關的就是 HystrixTargeter
,Targeter
就是用於建立 FeignClient 動態代理對象的,在這一步,feign 若是啓用了 hystrix,就會使用 HystrixTargeter 來建立動態代理對象,不然就會使用 DefaultTargeter
建立代理對象。那麼分析 Hystrix 與 Feign 的整合就能夠從 HystrixTargeter
這個入口來分析。
在配置類 FeignAutoConfiguration
中,能夠看到有以下配置來配置 Targeter 的具體實現類。在引入 feign.hystrix.HystrixFeign
的狀況下,Targeter 的實現類爲 HystrixTargeter
,不然就是默認的 DefaultTargeter
。
feign.hystrix.HystrixFeign
這個類屬於 feign-hystrix
依賴包,也就是說 feign 要開啓 hystrix 的功能,須要先加入 feign-hystrix
的組件包。不過 spring-cloud-starter-openfeign
已經幫咱們引入 feign-hystrix
的依賴了,不須要咱們單獨引入。
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new DefaultTargeter();
}
}
複製代碼
在 FeignClientsConfiguration
配置類中,能夠看到有以下配置決定 Feign.Builder
的具體類型,Feign.Builder
是 Targeter 用來構建 Feign
對象的構造器。能夠看到,默認狀況下 Feign.Builder 是 Feign.Builder
。若是引入了 Hystrix 且 feign.hystrix.enabled=true
的狀況下,Feign.Builder 的實際類型就是 HystrixFeign.Builder
,這塊後面會分析。
也就是說,Feign 要啓用 Hystrix,不只須要加入 feign-hystrix
的依賴,還須要配置 feign.hystrix.enabled=true
纔會生效。
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
public Feign.Builder feignBuilder(Retryer retryer) {
return Feign.builder().retryer(retryer);
}
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
複製代碼
來看一下 HystrixTargeter 的 target
方法,fein 啓用 hystrix 後,Feign.Builder 是 HystrixFeign.Builder
,因此會走 if
以後的邏輯。
groupName
。@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
// HystrixCommand 的名稱,取自 Feign 定義的服務名稱
String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
: factory.getContextId();
SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
// 回調類
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(name, context, target, builder, fallback);
}
// 回調工廠
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(name, context, target, builder, fallbackFactory);
}
return feign.target(target);
}
複製代碼
不管是回調類仍是回調工廠,最後異曲同工,都是走到 build(fallbackFactory)
。
public <T> T target(Target<T> target, T fallback) {
return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
.newInstance(target);
}
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
return build(fallbackFactory).newInstance(target);
}
複製代碼
繼續看 HystrixFeign.Builder
的 build()
方法,這下就明白了,與未啓用 Hystrix 時的最大區別,就在於建立的 InvocationHandler
不一樣。啓用 hystrix 後,設置的匿名 InvocationHandlerFactory 建立的 InvocationHandler 是 HystrixInvocationHandler
。
同時,用於處理接口註解的接口協議組件 Contract 也設置爲了 HystrixDelegatingContract
,默認爲 SpringMvcContract
,HystrixDelegatingContract 其實是代理了 SpringMvcContract。
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
複製代碼
未啓用 hystrix 時,默認建立 InvocationHandler 是 ReflectiveFeign.FeignInvocationHandler
。
public interface InvocationHandlerFactory {
//...
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
}
複製代碼
HystrixDelegatingContract 其實是一個裝飾器,能夠看到,若是 FeignClient 返回類型是 HystrixCommand、Observable、Single、Completable、CompletableFuture
,它會去設置 MethodMetadata 的返回類型爲實際類型。
public final class HystrixDelegatingContract implements Contract {
private final Contract delegate;
public HystrixDelegatingContract(Contract delegate) {
this.delegate = delegate;
}
@Override
public List<MethodMetadata> parseAndValidateMetadata(Class<?> targetType) {
List<MethodMetadata> metadatas = this.delegate.parseAndValidateMetadata(targetType);
for (MethodMetadata metadata : metadatas) {
Type type = metadata.returnType();
if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(HystrixCommand.class)) {
Type actualType = resolveLastTypeParameter(type, HystrixCommand.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Observable.class)) {
Type actualType = resolveLastTypeParameter(type, Observable.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Single.class)) {
Type actualType = resolveLastTypeParameter(type, Single.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Completable.class)) {
metadata.returnType(void.class);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(CompletableFuture.class)) {
metadata.returnType(resolveLastTypeParameter(type, CompletableFuture.class));
}
}
return metadatas;
}
}
複製代碼
看 HystrixInvocationHandler
的 invoke
方法,調用 FeignClient 接口時,會進入代理對象並調用這個 invoke 方法來執行。
到這裏其實就弄清楚 hystrix 與 feign 的整合了,這個代理對象反射調用時,會將本來方法的反射調用封裝到 HystrixCommand
的 run()
方法中,而後根據不一樣的返回類型調用不一樣的方法執行,默認就是調用 HystrixCommand 的 execute()
方法。
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
//...
// 構造 HystrixCommand
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// run() 方法中就是反射調用原始方法
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
// 重寫了獲取回調的方法
@Override
protected Object getFallback() {
// 執行回調
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
複製代碼
RestTemplate 遠程調用也須要考慮到熔斷、降級等,避免服務級聯故障,那 RestTemplate 如何與 Hystrix 整合呢?
RestTemplate 的遠程調用,咱們能夠封裝到一個方法中,而後用 @HystrixCommand
註解標註在調用方法上,並配置 grouopKey、回調方法等參數,通常 groupKey 配置遠程調用的服務名或提供接口的第三方便可。
其原理想一想其實也很簡單,hystrix 確定是增長了一個切面來攔截帶有 @HystrixCommand
註解的方法的執行,而後相似 feign 整合 hystrix,將方法的調用封裝到 HystrixCommand 中,而後執行命令。
@Service
public class ProducerWithHystrixService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand( groupKey = "demo-producer", fallbackMethod = "queryId_fallback" )
public String queryId() {
ResponseEntity<String> result = restTemplate.postForEntity("http://demo-producer/v1/uuid",
new LinkedMultiValueMap<String, Object>(), String.class);
return result.getBody();
}
public String queryId_fallback() {
return "error";
}
}
複製代碼
Hystrix 有線程池隔離和信號量隔離兩種資源隔離模式,若是是線程池隔離的狀況下,有個問題須要思考下,那就是若是業務中使用了 ThreadLocal
來存儲線程本地變量,而 HystrixCommand 的執行是在子線程中執行的,要知道 ThreadLocal 中的本地變量是不會傳遞到子線程中的,那業務邏輯在子線程中執行時,就沒法獲取到 ThreadLocal 中的本地變量了。那這個時候不就會影響本來業務邏輯的執行了嗎?又該怎麼處理呢?
最簡單的解決辦法,就是修改隔離策略,使用信號量隔離模式,可是 Hystrix 默認是線程池隔離模式,並且從實際的場景來講也是使用線程池隔離,這個辦法就不行了。
其次是 Hystrix 官方推薦的使用 HystrixConcurrencyStrategy
,實現 wrapCallable
方法,在裏面複製線程的狀態。
在前面分析 HystrixContextSchedulerWorker
的調度時,有個東西還沒分析,看 schedule 方法的代碼,worker.schedule
調度的 Action0 其實是上下文調度Action HystrixContexSchedulerAction
。它是對原始 Action0 的再一次封裝,建立 HystrixContexSchedulerAction 時傳入了 HystrixConcurrencyStrategy 對象和原始的 Action0。
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("...");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
複製代碼
接着看 HystrixContexSchedulerAction
,在構造方法中,原始的 Action0 被封裝到了一個 Callable 中,主要就是將主線程中的 Hystrix請求上下文 HystrixRequestContext
設置到子線程中,而後再去調用原始的 Action0,執行結束後,又重置了子線程的 Hystrix請求上下文。
重要的是,再次封裝的 Callable 傳入了 concurrencyStrategy 的 wrapCallable
方法中,所以,這也是 Hystrix 給咱們提供的一個可擴展的口子,讓咱們能夠在 Hystrix 請求執行前注入一些自定義的動做,好比複製線程的狀態。
public class HystrixContexSchedulerAction implements Action0 {
private final Action0 actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Void> c;
public HystrixContexSchedulerAction(Action0 action) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
}
public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
this.actual = action;
// 主線程狀態
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
// 子線程狀態
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// 主線程狀態設置到子線程
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// 在主線程狀態下執行原始的 Action0
actual.call();
return null;
} finally {
// 重置子線程狀態
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
});
}
@Override
public void call() {
try {
c.call();
} catch (Exception e) {
throw new RuntimeException("Failed executing wrapped Action0", e);
}
}
}
複製代碼
Spring Security 中,會在 ThreadLocal 中存儲安全上下文 SecurityContext
,爲了能讓 SecurityContext 傳遞到子線程,spring-cloud-netflix-core
模塊中就自定義了 SecurityContextConcurrencyStrategy
安全上下文併發策略類。下面咱們就經過 SecurityContextConcurrencyStrategy 來看看如何自定義 HystrixConcurrencyStrategy
達到複製線程狀態的目的。
開發步驟以下:
call()
方法中,首先暫存子線程中的狀態,再將主線程中的狀態設置到子線程中。這一步就達到了複製主線程狀態的目的。finally
中,重置子線程的狀態爲以前的狀態。public final class DelegatingSecurityContextCallable<V> implements Callable<V> {
// 要代理的 Callable
private final Callable<V> delegate;
// 主線程中的 SecurityContext
private final SecurityContext delegateSecurityContext;
// 子線程中的 SecurityContext
private SecurityContext originalSecurityContext;
// 構造方法傳入要代理的 Callable 和主線程中的 SecurityContext
public DelegatingSecurityContextCallable(Callable<V> delegate, SecurityContext securityContext) {
this.delegate = delegate;
this.delegateSecurityContext = securityContext;
}
// 構造方法傳入要代理的 Callable 和主線程中的 SecurityContext
public DelegatingSecurityContextCallable(Callable<V> delegate) {
// 保存主線程的 SecurityContext
this(delegate, SecurityContextHolder.getContext());
}
@Override
public V call() throws Exception {
// 暫存子線程的 SecurityContext
this.originalSecurityContext = SecurityContextHolder.getContext();
try {
// 將主線程的 SecurityContext 設置到子線程中
SecurityContextHolder.setContext(delegateSecurityContext);
// 調用原始的 Callable
return delegate.call();
} finally {
// 重置爲原子線程的 SecurityContext
SecurityContextHolder.setContext(originalSecurityContext);
}
}
}
複製代碼
接着開發自定義的Hystrix併發策略類繼承自 HystrixConcurrencyStrategy
,有以下的要點:
wrapCallable
方法中,建立自定義的 DelegatingSecurityContextCallable,這樣就包裝了咱們自定義的動做。public class SecurityContextConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
public SecurityContextConcurrencyStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties)
: super.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.wrapCallable(new DelegatingSecurityContextCallable<T>(callable))
: super.wrapCallable(new DelegatingSecurityContextCallable<T>(callable));
}
}
複製代碼
自定義併發策略類開發完成後,就須要註冊到 Hystrix 中,在 HystrixSecurityAutoConfiguration
中可看看如何註冊:
@Configuration
@Conditional(HystrixSecurityCondition.class)
@ConditionalOnClass({Hystrix.class, SecurityContext.class})
public class HystrixSecurityAutoConfiguration {
// 注入已存在的併發策略類
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
// 保存 Hystrix 插件本來的引用
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
// 重置
HystrixPlugins.reset();
// 註冊新的併發策略
HystrixPlugins.getInstance().registerConcurrencyStrategy(new SecurityContextConcurrencyStrategy(existingConcurrencyStrategy));
// 註冊其它的插件
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
static class HystrixSecurityCondition extends AllNestedConditions {
public HystrixSecurityCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
// 啓用條件
@ConditionalOnProperty(name = "hystrix.shareSecurityContext")
static class ShareSecurityContext {
}
}
}
複製代碼
爲何要重置以後再從新註冊呢?看這些註冊方法,能夠知道,這些註冊方法只能被調用一次,不然將拋出異常。所以爲了不已經註冊過了,因此須要重置以後再從新註冊。
public void registerConcurrencyStrategy(HystrixConcurrencyStrategy impl) {
if (!concurrencyStrategy.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
}
}
public void registerEventNotifier(HystrixEventNotifier impl) {
if (!notifier.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
}
}
複製代碼
在上面的代碼中還能夠看到,HystrixSecurityAutoConfiguration
配置類生效的前提是 hystrix.shareSecurityContext=true
,所以,若是想要在 spring security + hystrix 的環境中,可以在 hystrix 子線程獲取 SecurityContext,須要配置 hystrix.shareSecurityContext=true
。
Hystrix 有不少配置,咱們能夠從他們的 Properties 配置類中找到有哪些配置以及默認的配置值。
熔斷器相關配置和默認值能夠在 HystrixCommandProperties
中找到。
隔離策略相關配置和默認值能夠在 HystrixCommandProperties
中找到。
Hystrix線程池相關配置和默認值能夠在 HystrixThreadPoolProperties
中找到。
在微服務中,通常是經過 yml 文件來配置的,不會使用 HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true)
這種形式,那如何配置 Hystrix 全局默認值和不一樣組的呢?
全局默認配置使用 default
做爲 key:
hystrix:
threadpool:
# default 做爲 key
default:
coreSize: 10
maximumSize: 20
maxQueueSize: 10
command:
# default 做爲 key
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
複製代碼
針對特定客戶端則使用命令的名稱做爲 key:
hystrix:
threadpool:
# default
default:
coreSize: 10
maximumSize: 10
maxQueueSize: -1
# demo-consumer
demo-consumer:
coreSize: 5
maximumSize: 5
maxQueueSize: 10
command:
## default
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
# demo-producer
demo-producer:
execution:
isolation:
strategy: SEMAPHORE
thread:
timeoutInMilliseconds: 2000
複製代碼
針對特定客戶端的某個方法,用#
分隔客戶端名稱和方法名稱:
hystrix:
threadpool:
# demo-consumer
demo-consumer#sayHello(Long,String,Integer):
coreSize: 5
maximumSize: 5
maxQueueSize: 10
複製代碼
最後,用一張圖將前面的Hystrix源碼分析原理作個彙總。