朱曄和你聊Spring系列S1E5:Spring WebFlux小探

本文會來作一些應用對比Spring MVC和Spring WebFlux,觀察線程模型的區別,而後作一下簡單的壓力測試。前端

建立一個傳統的Spring MVC應用

先來建立一個新的webflux-mvc的模塊:java

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux-mvc</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux-mvc</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
複製代碼

而後在項目裏定義一個咱們會使用到的POJO:react

package me.josephzhu.spring101webfluxmvc;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
    @Id
    private String id;
    private String payload;
    private long time;
}
複製代碼

這裏的@Document和@Id是爲Mongodb服務的,咱們定義了MyData將會以mydata做爲Collection的名字,而後id字段是Document的Id列。 而後咱們來建立Controller,在這個Controller裏面咱們嘗試三種不一樣的操做:web

  1. Sleep 100ms的純獲取數據的方法。從請求中得到length參數做爲payload字符串的長度,從請求中得到size參數做爲MyData的個數。咱們在以後的測試過程當中能夠隨意調節這兩個參數來調整咱們的數據量。
  2. 從Mongodb獲取數據的方法,獲取到數據後直接返回。
  3. 複合邏輯。先走HTTP請求從data方法獲取數據,而後把數據保存進入Mongodb,最後返回這些數據。
package me.josephzhu.spring101webfluxmvc;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@RestController
public class MyController {
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private MyRepository myRepository;

    @GetMapping("/data")
    public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {

        }
        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        return IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
                .collect(Collectors.toList());
    }

    @GetMapping("/dbData")
    public List<MyData> getDbData() {
        return myRepository.findAll();
    }

    @GetMapping("/saveData")
    public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
                .queryParam("size", size)
                .queryParam("length", length);
        ResponseEntity<List<MyData>> responseEntity =
                restTemplate.exchange(builder.toUriString(),
                        HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {});
        return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
    }
}
複製代碼

注意,在這裏咱們使用了Java 8的Steam來作一些操做避免使用for循環:spring

  1. 經過length參數構建payload(payload由length個字符a構成)。
  2. 經過size參數構建MyData的List。
  3. 在RestTemplate獲取到MyData的List後,把每個對象交由myRepository的save方法來處理,而後統一收集返回結果。

這些Stream的代碼都是同步處理,也不涉及外部IO,和非阻塞沒有任何關係,只是方便代碼編寫。爲了讓代碼能夠運行,咱們還須要繼續來配置下Mongodb的Repository:mongodb

package me.josephzhu.spring101webfluxmvc;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyRepository extends MongoRepository<MyData, String> { }
複製代碼

由於咱們沒有用到複雜的查詢,在代碼裏只是用到了findAll方法,因此這裏咱們無需定義額外的方法,只是聲明接口便可。 最後,咱們建立主應用程序,順便配置一下Mongodb和RestTemplate:數據庫

package me.josephzhu.spring101webfluxmvc;

import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {

   @Bean
   MongoClientOptions mongoClientOptions(){
       return MongoClientOptions.builder().connectionsPerHost(1000).build();
   }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
    }
}
複製代碼

這裏咱們配置了Mongodb客戶端使得以後在進行壓力測試的時候能有超過100個鏈接鏈接到Mongodb,不然會出現沒法獲取鏈接的問題。apache

建立WebFlux版本的應用

如今咱們再來新建一個webflux模塊:編程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webflux</name>
    <description></description>

    <parent>
        <groupId>me.josephzhu</groupId>
        <artifactId>spring101</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
複製代碼

這裏能夠注意到,咱們引入了webflux這個starter以及data-mongodb-reactive這個starter。在以前的Spring MVC項目中,咱們引入的是mvc和data-mongodb兩個starter。 而後,咱們一樣須要建立一下MyData類(代碼和以前如出一轍,這裏省略)。 最關鍵的一步,咱們來建立三個Controller方法的定義:json

package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class MyHandler {
    @Autowired
    private MyReactiveRepository myReactiveRepository;

    public Mono<ServerResponse> getData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
        Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size)
                .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> getDbData(ServerRequest serverRequest) {
        Flux<MyData> data = myReactiveRepository.findAll();
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

    public Mono<ServerResponse> saveData(ServerRequest serverRequest) {
        int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
        int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));

        Flux<MyData> data = WebClient.create().get()
                .uri(builder -> builder
                        .scheme("http")
                        .host("localhost")
                        .port(8080)
                        .path("data")
                        .queryParam("size", size)
                        .queryParam("length", length)
                        .build())
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux(MyData.class)
                .flatMap(myReactiveRepository::save);

        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(data, MyData.class);
    }

}
複製代碼

這裏要說明幾點:

  1. 在WebFlux中,咱們能夠採用傳統的@Controller方式來定義Controller,也能夠採用函數式方式來聲明對外的Endpoint,也就是聲明Handler+Router。咱們這裏採用的是更有特點的後者來演示。
  2. 請你比較一下三個方法的實現對於兩個版本的區別。最主要的區別,咱們返回的實際數據是Mono<>和Flux<>,分別表明0~1個對象和0~N對象的響應式流。
  3. 在saveData方法中,對於Spring MVC咱們使用的是阻塞的RestTemplate來從遠端獲取數據,對於Spring WebFlux咱們使用的是非阻塞的WebClient來獲取數據。獲取數據後,咱們直接使用flatMap獲取到了全部的MyData轉給咱們的響應式的Mongodb Repository來處理數據。
  4. 對於saveData方法中插入Mongodb的操做,這裏和MVC的例子有很大的不一樣須要注意。在MVC中,咱們把遠程服務返回的結果轉爲Stream數據流,同步依次調用save方法,整個過程只會有佔用一個Mongodb的鏈接。而在這裏,直接對Flux流進行了Map,整個過程至關於併發進行了Mongodb的調用。在以後作壓測的時候,咱們會再次提到這點。

剛纔有提到,採用函數式聲明對外的Endpoint的話除了定義Handler,還須要配置Router來和Handler關聯,配置以下:

package me.josephzhu.spring101webflux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class RouterConfig {
    @Autowired
    private MyHandler myHandler;

    @Bean
    public RouterFunction<ServerResponse> config() {
        return route(GET("/data"), myHandler::getData)
                .andRoute(GET("/dbData"), myHandler::getDbData)
                .andRoute(GET("/saveData"), myHandler::saveData);
    }
}
複製代碼

這段代碼沒有太多須要說明,這裏咱們定義了三個GET請求(至關於MVC的@GetMapping),而後對應到注入的myHandler的三個方法上。 而後咱們還須要建立Mongodb的Repository:

package me.josephzhu.spring101webflux;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
複製代碼

以及配置和啓動類:

package me.josephzhu.spring101webflux;

import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {

    @Bean
    MongoClient mongoClient(){
        return MongoClients.create(mongoClientSettings());
    }

    @Bean
    MongoClientSettings mongoClientSettings(){
        return MongoClientSettings.builder()
                .clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
                .connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Spring101WebfluxApplication.class, args);
    }
}
複製代碼

這裏對Mongodb作了一些配置,主要也是但願放大鏈接池這塊的默認限制,爲從此的壓測服務。注意,在這裏配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下圖所示,還有其它兩個MongoClient,若是修改了不匹配的MongoClient的話是不會有做用的,我在這個坑裏躺了兩小時。

完成後能夠打開瀏覽器測試一下接口:

Spring MVC仍是WebFlux?

下圖是官網的一個圖說明了二者的關係,而後官網也給出了一些建議:

  1. 若是你如今的Spring MVC運行的沒啥問題的話就別改了,有大量的類庫可使用,實現簡單易於理解。
  2. 若是你但願實現輕量級的,函數式Web框架,那麼能夠考慮WebFlux的函數Web端點。
  3. 若是你依賴阻塞的持久化API好比JPA和JDBC那麼也就只能選擇Spring MVC了。目前對於非阻塞的JDBC實現有一些早期的項目在探索,可是沒有到能夠上生產的成熟度。
  4. 在Spring MVC應用程序中進行遠程調用也是可使用響應式的WebClient的。Spring MVC也可使用其它的響應式組件。每次調用延遲越厲害受益越大。
  5. 對於大型應用程序要考慮到非阻塞方式實現的學習曲線。最簡單的起步方式就是使用WebClient,徹底切換到非阻塞須要花時間熟悉函數式聲明式的編程API。
    官方的意思也是能夠在一些小引用上嘗試WebFlux,對於大型應用不建議冒然轉到WebFlux。

觀察線程模型

咱們知道對於阻塞的實現方式,咱們採用線程池來服務請求(線程池中的會維護一組普通的線程,線程池只是節省線程建立的時間),對於每個請求的處理,至始至終都是在一個線程中進行,若是處理的過程當中咱們須要訪問外部的網絡或數據庫,那麼線程就處於阻塞狀態,這個線程沒法服務其它請求,若是當時還有更多的併發的話,就須要建立更多的線程來服務其它請求。這種實現方式是很是簡單的,應對壓力的增加擴容方式也是粗暴的,那就是增長更多線程。

對於非阻塞的方式,採用的是EventLoop的方式,IO操做的時候是不佔用工做線程的,所以只會建立一組和CPU核數至關的工做線程用於工做處理(NodeJS甚至是單線程的,這種就更危險了,就那麼一個工做線程,一旦被長時間佔用其它請求都沒法處理)。因爲整個處理過程當中IO請求不佔用線程時間,線程不會阻塞等待,再增長超過CPU核數的工做線程也是沒有意義的(只會白白增長線程切換的開銷)。對於這種方式在壓力增加後,由於咱們不須要增長額外的線程,也就沒有了絕對的瓶頸。

試想一下在阻塞模型下,對於5000的併發,並且每個併發阻塞的時間很是長,那麼咱們其實須要5000個線程來服務(這麼多線程99%其實都是在等待,屬於空耗系統資源),建立5000的線程不談其它的,若是線程棧大小是1M的話就須要5GB的內存。對於非阻塞的線程模型在8核機器上仍是8個工做線程,內存佔用仍是這麼小,能夠以最小的開銷應對大併發,系統的損耗不多。非阻塞的Reactive模式是內耗很是小的模式,可是這是有代價的,在實現上咱們須要確保處理過程當中沒有阻塞產生,不然就會浪費寶貴的數目固定的工做線程,也就是說咱們須要依賴配套的非阻塞IO類庫來使用。 在默認狀況下tomcat的工做線程池初始化爲10,最大200,咱們經過啓動本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具來看下初始的狀況(35個線程) :

在項目的application.properties文件中咱們配置tomcat的最大線程數: server.tomcat.max-threads=250 在壓力的狀況下,咱們再來觀察一下線程的狀況(272個線程):
的確是建立多達250個工做線程。這裏看到大部分線程都在休眠,由於咱們這裏運行的是剛纔的data()方法,在方法內咱們休眠了100毫秒。對於一樣的壓力,咱們再來看一下Spring101WebfluxApplication程序的線程狀況(44個線程):
能夠看到用於處理HTTP的Reactor線程只有8個,和本機CPU核數量一致(下面有十個Thread打頭的線程是處理和Mongodb交互的,忽略),只須要這8個線程處理HTTP請求足以,由於HTTP請求的IO處理不會佔用線程。

使用Gatling進行壓力測試

咱們可使用Gatling類庫進行壓力測試,我我的感受比Jmeter方便。配置很簡單,首先咱們要安裝Scala的SDK,而後咱們新建一個模塊:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.josephzhu</groupId>
    <artifactId>spring101-webstresstest</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring101-webstresstest</name>
    <description></description>

    <dependencies>
        <dependency>
            <groupId>io.gatling.highcharts</groupId>
            <artifactId>gatling-charts-highcharts</artifactId>
            <version>2.3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.gatling</groupId>
                <artifactId>gatling-maven-plugin</artifactId>
                <version>2.2.4</version>
                <configuration>
                    <simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass>
                    <resultsFolder>/Users/zyhome/gatling</resultsFolder>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
複製代碼

引入了garling的maven插件,在這裏配置了測試結果輸出路徑以及壓測的類。接下去建立一下這個Scala測試類:

package me.josephzhu.spring101.webstresstest

import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

這段代碼定義了以下的測試行爲:

  1. 聲明一個data測試場景,重複進行1000次測試,發起一個遠程調用,驗證調用結果的響應狀態碼是200而且返回的結果包含字符串payload。
  2. 測試啓動的時候直接壓上去200個用戶,每個用戶運行完這1000次測試後結束了,因此這種方式一開始會是200用戶到測試最後階段用戶數會慢慢減小。固然還有其它一些測試方式(好比慢慢遞增用戶的方式),詳見官網:
nothingFor(4 seconds), // 1
    atOnceUsers(10), // 2
    rampUsers(10) over (5 seconds), // 3
    constantUsersPerSec(20) during (15 seconds), // 4
    constantUsersPerSec(20) during (15 seconds) randomized, // 5
    rampUsersPerSec(10) to 20 during (10 minutes), // 6
    rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
    splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
    heavisideUsers(1000) over (20 seconds) // 10
複製代碼

壓力測試一

先來進行第一個測試,1000併發對data接口進行100次循環(還記得嗎,接口有100ms休眠or延遲的):

class StressTest extends Simulation {

  val scn = scenario("data").repeat(100) {
    exec(
      http("mvc data")
        .get("http://localhost:8080/data?size=10&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(1000)))
}
複製代碼

下面兩個圖分別是MVC和WebFlux的測試結果(由於都是8080端口,因此測試的時候記得切換重啓兩個應用哦):

能夠看到WebFlux的吞吐幾乎是MVC的翻倍,平均響應時間少了兩倍不止,很明顯,在等待的時候,2000個併發用戶大大超過了咱們配置的250個線程池的線程數量,這個時候只能排隊,對於非阻塞的方式,延遲是不會佔用處理線程的,在延遲結束後纔會去佔用處理線程的資源進行處理,不會收到併發用戶數受限於線程池線程數的狀況。 咱們把Sleep相關代碼註釋再進行一次測試看看狀況,分別是MVC和WebFlux:
這個時候WebFlux優點沒有那麼明顯了。

性能測試二

如今咱們來訪問一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb來初始化100條數據,而後修改一下測試腳本壓測dbData接口: class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(1000))) } 下面看下此次的測試結果 ,分別是MVC和WebFlux:

吞吐量沒有太多提升,平均響應時間快很多。

性能測試三

再來試一下第三個saveData接口的狀況。修改測試代碼:

class StressTest extends Simulation {

  val scn = scenario("data").repeat(100) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=5&length=100000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

這裏咱們修改併發用戶爲200,每一個用戶進行100次測試,每次測試存入Mongodb 5條100KB的數據,一次測試後總數據量在10萬條。此次測試咱們並無使用1000併發用戶,緣由是這個測試咱們會先從遠端獲取數據而後再存入Mongodb,遠端的服務也是來自於當前應用程序,咱們的Tomcat最多隻有250個線程,在啓動1000個用戶後,一些線程服務於saveData接口,一些線程服務於data接口(saveData接口用到的),這樣至關於形成了循環依賴問題,請求在等待更多的可用線程執行服務data接口的響應,而這個時候線程又都被佔了致使沒法分配更多的請求,測試幾乎所有超時。 下面看下此次的測試結果 ,分別是MVC和WebFlux:

WebFlux也是併發略高,性能略好的優點。對於響應時間的分佈咱們再來細看下下面的圖:

第一個圖是MVC版本的響應時間分佈,能夠看到抖動比第二個圖的WebFlux的大很多。 最後來看看測試過程當中MVC的JVM狀況(263個線程):
以及WebFlux的(41線程):

性能測試四:

咱們來測試一下下面兩種狀況下對於WebFlux版本Mongodb側的狀況:

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=1&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

以及

class StressTest extends Simulation {

  val scn = scenario("data").repeat(1000) {
    exec(
      http("data")
        .get("http://localhost:8080/saveData?size=5&length=1000")
        .header("Content-Type", "application/json")
        .check(status.is(200)).check(substring("payload")))
  }

  setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

區別就在遠程服務返回的Flux是1個仍是5個。在1個的時候運行測試能夠看到咱們Mongodb有64個鏈接(須要把以前鏈接池的配置最小設置爲小一點,好比50):

> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }
複製代碼

在size爲5的時候,Flux返回的是5個對象,使用這個請求壓測的時候Mongodb的鏈接數以下:

> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }
複製代碼

這是由於Flux拿到的數據直接以響應式進入Mongodb,並無等到全部數據拿到以後串行調用方法。 總結一下這幾回的測試,咱們發現WebFlux方式對於MVC方式能有略微的性能提高,對於請求阻塞的時候性能優點明顯。我本金的測試並無看到現象中的幾倍甚至幾十倍的性能提高,我猜緣由以下:

  1. 本機有性能瓶頸了,壓測客戶端、Mongodb服務器、服務端都在本機運行,干擾因素太多,CPU的使用你爭我奪,測試不公平
  2. 測試的時候CPU永遠是100%還死機好幾回,我根本沒法測試更高的併發,沒法徹底把非阻塞的性能壓出來
  3. 我本機測試的時候走的是localhost而不是內網,不通過物理網卡,可能沒法體現非阻塞的性能

若是有條件可使用三臺獨立服務器在內網進行1萬以上併發用戶的性能測試或許能夠獲得更科學的結果。

總結

本文咱們建立了WebFlux和MVC兩套應用對比演示了簡單返回數據、發出遠程請求、使用Mongodb等一些簡單的應用場景,而後來看了一下ThreadPerRequest和EventLoop方式線程模型的區別,最後使用Gatling進行了幾個Case的壓力測試而且觀察結果。我以爲:

  1. 非阻塞模型確定是好東西,在IO壓力和IO延遲很大的狀況下,非阻塞模型由於不須要更多的線程,內耗小,性能略好,並且也穩定,因此更利於高併發
  2. WebFlux的函數式和聲明方式實現須要有很高的API熟悉使用門檻,對於複雜的邏輯這種方式的實現比回調地獄更容易繞暈,並且容易產生Bug(或許之後有可能響應式的編程在API上有可能和傳統方式進行統一)
  3. 目前和WebFlux配套的其它一些Reactive的庫還不是很全面成熟,要對複雜的業務邏輯全面啓用響應式編程有點難,阻塞調用不是不能在WebFlux中混用,可是這種方式仍是採用了線程池來處理,如今容器也是NIO的了,有又多大區別
  4. 採用阻塞方式實現,由阻塞的線程進行自然背壓進行流控,非阻塞方式很直接一竿子到底,從外部請求直接到最底層存儲,須要作好流控,這是很是容易產生問題的一個點,當請求的處理無需經過線程來承載的時候,前端壓力會直通最底層數據源,不收任何擴容方面的限制,直接擊潰底層
  5. 對於阻塞的方式,多線程的調度自然就是一個任務的負載均衡,並不會出現太嚴重的卡死工做線程的問題,非阻塞應用編程咱們要有意識代碼在哪一個線程上運行,若是是reactor線程的話千萬不能長時間阻塞

綜上所述,使用WebFlux進行響應式編程我我的認爲目前只適合作類IO轉發的高併發的又看中資源使用效率的應用場景(好比Gateway網關服務),對於複雜的業務邏輯不太適合,在90%的狀況下響應式的編程模型和線程模型不會享受大幅性能優點,更不建議盲目把現有的應用使用WebFlux來重寫。固然,這確定是一個會持續發展的方向,能夠先接觸研究起來。

相關文章
相關標籤/搜索