Soul網關學習Resilience4j插件原理解析

目標

  • 什麼是Resilience4J
  • soul的Resilience4j體驗
    • 限流
    • 熔斷
  • Resilience4J插件源碼解讀

什麼是Resilience4j

  • Resilience4J是Spring Cloud Gateway推薦的容錯方案,它是一個輕量級的容錯庫java

  • 借鑑了Hystrix而設計,而且採用JDK8 這個函數式編程,即lambda表達式react

  • 相比之下, Netflix Hystrix 對Archaius 具備編譯依賴性,Resilience4j你無需引用所有依賴,能夠根據本身須要的功能引用相關的模塊便可 Hystrix不更新了,Spring提供Netflix Hystrix的替換方案,即Resilence4Jgit

  • Resilience4J 提供了一系列加強微服務的可用性功能:github

    • 斷路器 CircuitBreaker
    • 限流 RateLimiter
    • 基於信號量的隔離
    • 緩存
    • 限時 Timelimiter
    • 請求重啓 Retry
  • 官方提供的依賴包spring

    <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-circuitbreaker</artifactId>
            <version>${resilience.version}</version>
     </dependency>
    
    複製代碼

    soul的Resilience4j體驗

    • 首先在soul-admin控制檯插件管理開啓Resilience4j 在這裏插入圖片描述編程

    • 在soul網關添加依賴bootstrap

      <dependency>
          <groupId>org.dromara</groupId>
          <artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
          <version>${project.version}</version>
      </dependency>
      
      複製代碼
  • 啓動三個服務,分別是一個soul-admin,一個soul-bootstrap,一個soul-examples-http緩存

  • 在soul-admin控制檯找到插件列表的Resilience4j,自定義配置,以下圖, 在這裏插入圖片描述markdown

  • soul官網的配置介紹app

    * Resilience4j處理詳解:
    
        * timeoutDurationRate:等待獲取令牌的超時時間,單位ms,默認值:5000。
    
        * limitRefreshPeriod:刷新令牌的時間間隔,單位ms,默認值:500。
    
        * limitForPeriod:每次刷新令牌的數量,默認值:50。
    
        * circuitEnable:是否開啓熔斷,0:關閉,1:開啓,默認值:0。
    
        * timeoutDuration:熔斷超時時間,單位ms,默認值:30000。
    
        * fallbackUri:降級處理的uri。
    
        * slidingWindowSize:滑動窗口大小,默認值:100。
    
        * slidingWindowType:滑動窗口類型,0:基於計數,1:基於時間,默認值:0。
    
        * minimumNumberOfCalls:開啓熔斷的最小請求數,超過這個請求數纔開啓熔斷統計,默認值:100。
    
        * waitIntervalFunctionInOpenState:熔斷器開啓持續時間,單位ms,默認值:10。
    
        * permittedNumberOfCallsInHalfOpenState:半開狀態下的環形緩衝區大小,必須達到此數量纔會計算失敗率,默認值:10。
    
        * failureRateThreshold:錯誤率百分比,達到這個閾值,熔斷器纔會開啓,默認值50。
    
        * automaticTransitionFromOpenToHalfOpenEnabled:是否自動從open狀態轉換爲half-open狀態,,true:是,false:否,默認值:false。
    
    複製代碼

限流

  • 參數配置 以下是參數配置校驗,參數值小於默認值,會直接賦值默認值,所以方便測試效果直接修改源碼的配置 : 每次刷新令牌的數量爲2 ,刷新令牌的時間間隔爲1s,超時時間爲1s
/**
     * check filed default value.
     *
     * @param resilience4JHandle {@linkplain Resilience4JHandle}
     * @return {@linkplain Resilience4JHandle}
     */
    public Resilience4JHandle checkData(final Resilience4JHandle resilience4JHandle) {
        resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE));
           //resilience4JHandle.setLimitRefreshPeriod(Math.max(resilience4JHandle.getLimitRefreshPeriod(), Constants.LIMIT_REFRESH_PERIOD));
           //resilience4JHandle.setLimitForPeriod(Math.max(resilience4JHandle.getLimitForPeriod(), Constants.LIMIT_FOR_PERIOD));
        //每次刷新令牌的數量爲2 ,刷新令牌的時間間隔爲1s
        resilience4JHandle.setLimitRefreshPeriod(1000);
        resilience4JHandle.setLimitForPeriod(2);
        resilience4JHandle.setTimeoutDuration(1000);
        resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE));
           //resilience4JHandle.setTimeoutDuration(Math.max(resilience4JHandle.getTimeoutDuration(), Constants.TIMEOUT_DURATION));
        resilience4JHandle.setFallbackUri(!"0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : "");
        resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE));
        resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE));
        resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS));
        resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE));
        resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfOpenState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE));
        resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD));
        return resilience4JHandle;
    }

複製代碼
  • 使用SuperBenchmarker工具,4個線程,執行10s
C:\Users\v-yanb07>sb -u http://localhost:9195/http/test/findByUserId?userId=1 -c 4 -N 10
Starting at 2021-03-14 15:46:28
[Press C to stop the test]
23      (RPS: 1)
---------------Finished!----------------
Finished at 2021-03-14 15:46:51 (took 00:00:23.0477097)
24      (RPS: 1)                        Status 200:    25

RPS: 2.2 (requests/second)
Max: 2020ms
Min: 472ms
Avg: 1677ms

  50%   below 1994ms
  60%   below 1997ms
  70%   below 1999ms
  80%   below 1999ms
  90%   below 2001ms
  95%   below 2019ms
  98%   below 2020ms
  99%   below 2020ms
99.9%   below 2020ms

複製代碼
  • 輸出日誌

    2021-03-14 12:16:35.252  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:36.249  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:36.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:37.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:37.250  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:38.250  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:38.250  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:39.252  INFO 379336 --- [ctor-http-nio-7] o.d.s.e.h.controller.HttpTestController  : 限流測試
    2021-03-14 12:16:39.252  INFO 379336 --- [ctor-http-nio-4] o.d.s.e.h.controller.HttpTestController  : 限流測試
    
    複製代碼

    控制檯日誌每秒輸出兩條,由此驗證限流生效

    熔斷

    • 從配置信息咱們知道熔斷器默認是關閉,咱們須要開打

    • soul-examples-http調用接口處添加休眠時間

      @GetMapping("/findByUserId")
      public UserDTO findByUserId(@RequestParam("userId") final String userId) throws Exception{
      UserDTO userDTO = new UserDTO();
      userDTO.setUserId(userId);
      userDTO.setUserName("hello world");
      log.info("限流測試");
      
      int i = RandomUtils.nextInt(1,3);
      if(i %2==0){
              //throw new Exception("異常拋出");
      Thread.currentThread().sleep(2000);
      }
      return userDTO;
      }
      
      複製代碼
  • Resilience4JHandle#checkData手動設置超時時間爲1s

    resilience4JHandle.setTimeoutDuration(1000);
    
    複製代碼

    屢次請求時,有的請求返回正常數據,有的請求返回以下數據,表示超時熔斷生效

    {
    "code": 500,
    "message": "Internal Server Error",
    "data": "404 NOT_FOUND"
    }
    
    複製代碼

Resilience4J插件源碼解讀

soul網關Resilience4j插件源碼大量使用了響應式編程方式,首先須要對響應式編程瞭解

  • Resilience4J插件目錄結構

    └─resilience4j
    │  Resilience4JPlugin.java                   //插件處理,核心類
    │  
    ├─build
    │      Resilience4JBuilder.java              //構建Resilience4JConf對象
    │      
    ├─conf
    │      Resilience4JConf.java
    │      
    ├─executor
    │      CombinedExecutor.java                 //限流和熔斷執行器
    │      Executor.java
    │      RateLimiterExecutor.java              //限流執行器
    │      
    ├─factory
    │      Resilience4JRegistryFactory.java      //限流和熔斷對象構建
    │      
    └─handler
            Resilience4JHandler.java
    
    複製代碼
  • Resilience4JPlugn#doExecute Resilience4JPlugn其餘soul中插件同樣繼承AbstractSoulPlugin,只要開啓了,經過鏈式機制執行,都會走到核心方法doExecute

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        //獲取配置信息對象
        Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
        //校驗配置信息,若是小於默認值,則賦值默認值
        resilience4JHandle = resilience4JHandle.checkData(resilience4JHandle);
        //circuitEnable配置:1 開啓熔斷組件 ,不然走限流組件
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }
    
        return rateLimiter(exchange, chain, rule);
    }
    
    複製代碼
    • 限流 Resilience4JPlugin#rateLimiter
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
    return ratelimiterExecutor.run(
            // chain.execute(exchange)  後續插件執行
            chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
            .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable))  
        
    
    //ratelimiterExecutor.run調用
    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
    //限流器組件
    RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
    //限流執行
    Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
    if (fallback != null) {
    //回調的執行
        return to.onErrorResume(fallback);
    }
    return to;
    }
    
    
    // to.onErrorResume(fallback);
    default Mono<Void> fallback(ServerWebExchange exchange, String uri, Throwable t) {
    if (StringUtils.isBlank(uri)) {
        return withoutFallback(exchange, t);
    }
    DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class);
    ServerHttpRequest request = exchange.getRequest().mutate().uri(Objects.requireNonNull(UriUtils.createUri(uri))).build();
    ServerWebExchange mutated = exchange.mutate().request(request).build();
    //回調的執行地方
    return dispatcherHandler.handle(mutated);
    }    
    
    複製代碼
  • 熔斷 Resilience4JPlugin#combined

private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        Resilience4JConf conf = Resilience4JBuilder.build(rule);
        return combinedExecutor.run(
                chain.execute(exchange).doOnSuccess(v -> {
                    HttpStatus status = exchange.getResponse().getStatusCode();
                    if (status == null || !status.is2xxSuccessful()) {
                        exchange.getResponse().setStatusCode(null);
                        throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
                    }
                }), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf);
    }


   //combinedExecutor#run執行的內容
    public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
        CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
                     //斷路器的操做
        Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                //限流操做
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                //設置超時時間
                .timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
                //若是超時了拋出超時異常
                .doOnError(TimeoutException.class, t -> circuitBreaker.onError(
                        resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
                        TimeUnit.MILLISECONDS,
                        t));
        if (fallback != null) {
            to = to.onErrorResume(fallback);
        }
        return to;
    }

複製代碼

總結

  • soul網關提供限流和熔斷,熔斷默認是關閉的
  • 參數值小於默認值,會直接使用默認值
相關文章
相關標籤/搜索