Spring WebFlux運用中的思考與對比

本文基於Spring Cloud Finchley SR4mysql

本文經過幾個問題,解析下Spring WebFlux用法最佳實踐,並與另外一框架Vertx做對比web

1. 是否必定要用默認的Web容器,用本身的Web容器是否能夠,同時是否能夠有web和webflux

是能夠的,這樣的依賴是可行的(容器用tomcat和undertow或者其餘均可以,這裏使用undertow):spring

2. 怎樣實現真正的異步背壓的Reactor模型呢?

這個問題,除此運用像WebFlux和Vertx的框架的人,都會對這個有誤解。認爲僅僅簡單的把webFlux的依賴添加進來,以後接口返回Mono就實現了異步背壓的Reactor模型。實際上並非這樣的。 咱們來舉幾個例子,分步驟深刻了解下。 首先爲了測試方便,咱們將web容器的處理http請求線程池的大小改爲惟一一個,對於Tomcat,配置:sql

server.thread.max-thread=1

對於UnderTow(咱們這裏用的是underTow):數據庫

# 設置IO線程數, 它主要執行非阻塞的任務,它們會負責多個鏈接, 默認設置每一個CPU核心一個線程
server.undertow.io-threads=1
# 阻塞任務線程池, 當執行相似servlet請求阻塞IO操做, undertow會從這個線程池中取得線程
# 它的值設置取決於系統線程執行任務的阻塞係數,默認值是IO線程數*8
server.undertow.worker-threads=1

以後,配置Log4J2日誌格式爲:api

<Property name="springAppName">test</Property>
    <Property name="LOG_ROOT">log</Property>
    <Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property>
    <Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property>
    <Property name="LOG_LEVEL_PATTERN">%5p</Property>
    <Property name="logFormat">
        %d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}
    </Property>

這樣的格式可使咱們看到線程號,還有sleuth的traceId和spanId(咱們的項目依賴了sleuth)。 首先編寫測試代碼,看看直接簡單調用並just是否實現了異步背壓:緩存

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;
    
    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.just(testService.simulateIOTest());
    }

    @Service
    public static class TestService {

        public String simulateIOTest() {
            try {
                //simulate io
                log.info("simulate start");
                TimeUnit.SECONDS.sleep(5);
                log.info("simulate end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "hello";
        }
    }
}

併發調用接口,查看日誌,發現:tomcat

2019-11-12 09:05:41.595  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:41.596  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:46.598  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:51.636  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:56.644  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end

能夠明顯看出,請求是串行處理的,由於只有一個線程,而且這個線程還在等待請求處理完。這就不符合Reactor模型,處理http請求的線程XNIO-2 task-1應該不等待請求處理完而直接處理下一個請求才對。 Mono.just(testService.simulateIOTest())替換成Mono.fromCallable(() -> testService.simulateIOTest())等等相似的是同樣的效果,這裏必須本身用其餘的線程池,去處理實際請求,處理結束的時候,將結果填寫到最外層的Mono裏面。這樣的話,考慮到代碼整潔性不採用純回調寫法,要求每個調用方法返回的都是Future類型的。這裏咱們返回CompletableFuture。併發

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> {
            log.info("apply");
            //填寫成功結果
            stringMonoSink.success(s);
            return s;
        }));
    }

    @Service
    public static class TestService {

        public CompletableFuture<String> simulateIOTest() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello";
            });
        }
    }
}

結果是:app

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.155  INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.962  INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.963  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:05.756  INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:05.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply

這樣,才真正實現了Reactor模型。

3. CompletableFuture線程池管理還有日誌追蹤

CompletableFuture能夠指定線程池,亦能夠不指定。若是像上面不指定的話,那麼使用的線程池就是Java8以後會默認啓動一個大小爲CPU核數減一的CommonForkJoinPool去執行。須要指定的話,基本上每一個方法均可以額外傳入一個線程池做爲參數。

最佳實踐是,只要涉及到IO的,就交給不一樣的線程池去作,不一樣種類的IO的線程池不一樣。例如,用於數據庫IO的線程池,用於RPC的線程池,用於緩存訪問的線程池等等。

這裏還有一個問題存在,就是異步調用,致使spanId和traceId丟失了,例如上面的例子:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start

8d6eddc9cc80612f這個丟失了,致使微服務調用鏈日誌追蹤變得不可行,因此,這裏咱們對於異步的代碼,也須要在異步調用前強制設置下spanId和traceId。

綜上以後,修改的代碼是:

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.fromFuture(testService.simulateIOTest());
    }

    @Service
    public static class TestService {
        @Autowired
        private Tracer tracer;
        ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build();
        private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy());

        public CompletableFuture<String> simulateIOTest() {
            Span span = tracer.currentSpan();
            return CompletableFuture.supplyAsync(() -> {
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                    return "hello";
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService);
        }
    }
}

結果是:

2019-11-12 09:44:30.953  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:28]:test started
2019-11-12 09:44:30.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:44:35.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end

3. 與Vertx對比,有哪些異同?

實際上,從設計上看,基本思路是同樣的。對於任意一個IO操做,若是有原生的異步客戶端(返回是一個Future),則運用Future封裝交給其餘線程池處理,不影響http請求線程接受其餘請求。

主要區別在於:

  1. WebFlux框架並無涉及到線程池,Vertx連異步線程池也封裝成爲Vertx的WorkerExecutor類。
  2. WebFlux異步Future使用的仍是Java原生的,Vetx框架本身封裝了Future。
  3. WebFlux與Spring在結合方面更完善,可是Spring生態裏面並無提供原生的NIO客戶端,例如實現了MySQL協議棧的NIO mysql客戶端,這個Vertx是有的,可是是否健壯還有待考證。這些進一步限制了WebFlux的性能。
  4. Vertx是一個跨語言的框架
相關文章
相關標籤/搜索