Resilience4J是Spring Cloud Gateway推薦的容錯方案,它是一個輕量級的容錯庫java
借鑑了Hystrix而設計,而且採用JDK8 這個函數式編程,即lambda表達式react
相比之下, Netflix Hystrix 對Archaius 具備編譯依賴性,Resilience4j你無需引用所有依賴,能夠根據本身須要的功能引用相關的模塊便可 Hystrix不更新了,Spring提供Netflix Hystrix的替換方案,即Resilence4Jgit
Resilience4J 提供了一系列加強微服務的可用性功能:github
官方提供的依賴包spring
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience.version}</version>
</dependency>
複製代碼
首先在soul-admin控制檯插件管理開啓Resilience4j 編程
在soul網關添加依賴bootstrap
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
<version>${project.version}</version>
</dependency>
複製代碼
啓動三個服務,分別是一個soul-admin,一個soul-bootstrap,一個soul-examples-http緩存
在soul-admin控制檯找到插件列表的Resilience4j,自定義配置,以下圖, markdown
soul官網的配置介紹app
* Resilience4j處理詳解:
* timeoutDurationRate:等待獲取令牌的超時時間,單位ms,默認值:5000。
* limitRefreshPeriod:刷新令牌的時間間隔,單位ms,默認值:500。
* limitForPeriod:每次刷新令牌的數量,默認值:50。
* circuitEnable:是否開啓熔斷,0:關閉,1:開啓,默認值:0。
* timeoutDuration:熔斷超時時間,單位ms,默認值:30000。
* fallbackUri:降級處理的uri。
* slidingWindowSize:滑動窗口大小,默認值:100。
* slidingWindowType:滑動窗口類型,0:基於計數,1:基於時間,默認值:0。
* minimumNumberOfCalls:開啓熔斷的最小請求數,超過這個請求數纔開啓熔斷統計,默認值:100。
* waitIntervalFunctionInOpenState:熔斷器開啓持續時間,單位ms,默認值:10。
* permittedNumberOfCallsInHalfOpenState:半開狀態下的環形緩衝區大小,必須達到此數量纔會計算失敗率,默認值:10。
* failureRateThreshold:錯誤率百分比,達到這個閾值,熔斷器纔會開啓,默認值50。
* automaticTransitionFromOpenToHalfOpenEnabled:是否自動從open狀態轉換爲half-open狀態,,true:是,false:否,默認值:false。
複製代碼
/**
* check filed default value.
*
* @param resilience4JHandle {@linkplain Resilience4JHandle}
* @return {@linkplain Resilience4JHandle}
*/
public Resilience4JHandle checkData(final Resilience4JHandle resilience4JHandle) {
resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE));
//resilience4JHandle.setLimitRefreshPeriod(Math.max(resilience4JHandle.getLimitRefreshPeriod(), Constants.LIMIT_REFRESH_PERIOD));
//resilience4JHandle.setLimitForPeriod(Math.max(resilience4JHandle.getLimitForPeriod(), Constants.LIMIT_FOR_PERIOD));
//每次刷新令牌的數量爲2 ,刷新令牌的時間間隔爲1s
resilience4JHandle.setLimitRefreshPeriod(1000);
resilience4JHandle.setLimitForPeriod(2);
resilience4JHandle.setTimeoutDuration(1000);
resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE));
//resilience4JHandle.setTimeoutDuration(Math.max(resilience4JHandle.getTimeoutDuration(), Constants.TIMEOUT_DURATION));
resilience4JHandle.setFallbackUri(!"0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : "");
resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE));
resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE));
resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS));
resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE));
resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfOpenState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD));
return resilience4JHandle;
}
複製代碼
C:\Users\v-yanb07>sb -u http://localhost:9195/http/test/findByUserId?userId=1 -c 4 -N 10
Starting at 2021-03-14 15:46:28
[Press C to stop the test]
23 (RPS: 1)
---------------Finished!----------------
Finished at 2021-03-14 15:46:51 (took 00:00:23.0477097)
24 (RPS: 1) Status 200: 25
RPS: 2.2 (requests/second)
Max: 2020ms
Min: 472ms
Avg: 1677ms
50% below 1994ms
60% below 1997ms
70% below 1999ms
80% below 1999ms
90% below 2001ms
95% below 2019ms
98% below 2020ms
99% below 2020ms
99.9% below 2020ms
複製代碼
輸出日誌
2021-03-14 12:16:35.252 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:36.249 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:36.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:37.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:37.250 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:38.250 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:38.250 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:39.252 INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController : 限流測試
2021-03-14 12:16:39.252 INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController : 限流測試
複製代碼
控制檯日誌每秒輸出兩條,由此驗證限流生效
從配置信息咱們知道熔斷器默認是關閉,咱們須要開打
soul-examples-http調用接口處添加休眠時間
@GetMapping("/findByUserId")
public UserDTO findByUserId(@RequestParam("userId") final String userId) throws Exception{
UserDTO userDTO = new UserDTO();
userDTO.setUserId(userId);
userDTO.setUserName("hello world");
log.info("限流測試");
int i = RandomUtils.nextInt(1,3);
if(i %2==0){
//throw new Exception("異常拋出");
Thread.currentThread().sleep(2000);
}
return userDTO;
}
複製代碼
Resilience4JHandle#checkData手動設置超時時間爲1s
resilience4JHandle.setTimeoutDuration(1000);
複製代碼
屢次請求時,有的請求返回正常數據,有的請求返回以下數據,表示超時熔斷生效
{
"code": 500,
"message": "Internal Server Error",
"data": "404 NOT_FOUND"
}
複製代碼
soul網關Resilience4j插件源碼大量使用了響應式編程方式,首先須要對響應式編程瞭解
Resilience4J插件目錄結構
└─resilience4j
│ Resilience4JPlugin.java //插件處理,核心類
│
├─build
│ Resilience4JBuilder.java //構建Resilience4JConf對象
│
├─conf
│ Resilience4JConf.java
│
├─executor
│ CombinedExecutor.java //限流和熔斷執行器
│ Executor.java
│ RateLimiterExecutor.java //限流執行器
│
├─factory
│ Resilience4JRegistryFactory.java //限流和熔斷對象構建
│
└─handler
Resilience4JHandler.java
複製代碼
Resilience4JPlugn#doExecute Resilience4JPlugn其餘soul中插件同樣繼承AbstractSoulPlugin,只要開啓了,經過鏈式機制執行,都會走到核心方法doExecute
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
//獲取配置信息對象
Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
//校驗配置信息,若是小於默認值,則賦值默認值
resilience4JHandle = resilience4JHandle.checkData(resilience4JHandle);
//circuitEnable配置:1 開啓熔斷組件 ,不然走限流組件
if (resilience4JHandle.getCircuitEnable() == 1) {
return combined(exchange, chain, rule);
}
return rateLimiter(exchange, chain, rule);
}
複製代碼
private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
return ratelimiterExecutor.run(
// chain.execute(exchange) 後續插件執行
chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
.onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable))
//ratelimiterExecutor.run調用
@Override
public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
//限流器組件
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
//限流執行
Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
if (fallback != null) {
//回調的執行
return to.onErrorResume(fallback);
}
return to;
}
// to.onErrorResume(fallback);
default Mono<Void> fallback(ServerWebExchange exchange, String uri, Throwable t) {
if (StringUtils.isBlank(uri)) {
return withoutFallback(exchange, t);
}
DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class);
ServerHttpRequest request = exchange.getRequest().mutate().uri(Objects.requireNonNull(UriUtils.createUri(uri))).build();
ServerWebExchange mutated = exchange.mutate().request(request).build();
//回調的執行地方
return dispatcherHandler.handle(mutated);
}
複製代碼
熔斷 Resilience4JPlugin#combined
private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
Resilience4JConf conf = Resilience4JBuilder.build(rule);
return combinedExecutor.run(
chain.execute(exchange).doOnSuccess(v -> {
HttpStatus status = exchange.getResponse().getStatusCode();
if (status == null || !status.is2xxSuccessful()) {
exchange.getResponse().setStatusCode(null);
throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
}
}), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf);
}
//combinedExecutor#run執行的內容
public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
//斷路器的操做
Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
//限流操做
.transformDeferred(RateLimiterOperator.of(rateLimiter))
//設置超時時間
.timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
//若是超時了拋出超時異常
.doOnError(TimeoutException.class, t -> circuitBreaker.onError(
resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS,
t));
if (fallback != null) {
to = to.onErrorResume(fallback);
}
return to;
}
複製代碼