本文會來作一些應用對比Spring MVC和Spring WebFlux,觀察線程模型的區別,而後作一下簡單的壓力測試。前端
先來建立一個新的webflux-mvc的模塊:java
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webflux-mvc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webflux-mvc</name>
<description></description>
<parent>
<groupId>me.josephzhu</groupId>
<artifactId>spring101</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
複製代碼
而後在項目裏定義一個咱們會使用到的POJO:react
package me.josephzhu.spring101webfluxmvc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
@Id
private String id;
private String payload;
private long time;
}
複製代碼
這裏的@Document和@Id是爲Mongodb服務的,咱們定義了MyData將會以mydata做爲Collection的名字,而後id字段是Document的Id列。 而後咱們來建立Controller,在這個Controller裏面咱們嘗試三種不一樣的操做:web
package me.josephzhu.spring101webfluxmvc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RestController
public class MyController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private MyRepository myRepository;
@GetMapping("/data")
public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
return IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
.collect(Collectors.toList());
}
@GetMapping("/dbData")
public List<MyData> getDbData() {
return myRepository.findAll();
}
@GetMapping("/saveData")
public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
.queryParam("size", size)
.queryParam("length", length);
ResponseEntity<List<MyData>> responseEntity =
restTemplate.exchange(builder.toUriString(),
HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {});
return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
}
}
複製代碼
注意,在這裏咱們使用了Java 8的Steam來作一些操做避免使用for循環:spring
這些Stream的代碼都是同步處理,也不涉及外部IO,和非阻塞沒有任何關係,只是方便代碼編寫。爲了讓代碼能夠運行,咱們還須要繼續來配置下Mongodb的Repository:mongodb
package me.josephzhu.spring101webfluxmvc;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyRepository extends MongoRepository<MyData, String> { }
複製代碼
由於咱們沒有用到複雜的查詢,在代碼裏只是用到了findAll方法,因此這裏咱們無需定義額外的方法,只是聲明接口便可。 最後,咱們建立主應用程序,順便配置一下Mongodb和RestTemplate:數據庫
package me.josephzhu.spring101webfluxmvc;
import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {
@Bean
MongoClientOptions mongoClientOptions(){
return MongoClientOptions.builder().connectionsPerHost(1000).build();
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
}
}
複製代碼
這裏咱們配置了Mongodb客戶端使得以後在進行壓力測試的時候能有超過100個鏈接鏈接到Mongodb,不然會出現沒法獲取鏈接的問題。apache
如今咱們再來新建一個webflux模塊:編程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webflux</name>
<description></description>
<parent>
<groupId>me.josephzhu</groupId>
<artifactId>spring101</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
複製代碼
這裏能夠注意到,咱們引入了webflux這個starter以及data-mongodb-reactive這個starter。在以前的Spring MVC項目中,咱們引入的是mvc和data-mongodb兩個starter。 而後,咱們一樣須要建立一下MyData類(代碼和以前如出一轍,這裏省略)。 最關鍵的一步,咱們來建立三個Controller方法的定義:json
package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Component
public class MyHandler {
@Autowired
private MyReactiveRepository myReactiveRepository;
public Mono<ServerResponse> getData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono<ServerResponse> getDbData(ServerRequest serverRequest) {
Flux<MyData> data = myReactiveRepository.findAll();
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono<ServerResponse> saveData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
Flux<MyData> data = WebClient.create().get()
.uri(builder -> builder
.scheme("http")
.host("localhost")
.port(8080)
.path("data")
.queryParam("size", size)
.queryParam("length", length)
.build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(MyData.class)
.flatMap(myReactiveRepository::save);
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
}
複製代碼
這裏要說明幾點:
剛纔有提到,採用函數式聲明對外的Endpoint的話除了定義Handler,還須要配置Router來和Handler關聯,配置以下:
package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class RouterConfig {
@Autowired
private MyHandler myHandler;
@Bean
public RouterFunction<ServerResponse> config() {
return route(GET("/data"), myHandler::getData)
.andRoute(GET("/dbData"), myHandler::getDbData)
.andRoute(GET("/saveData"), myHandler::saveData);
}
}
複製代碼
這段代碼沒有太多須要說明,這裏咱們定義了三個GET請求(至關於MVC的@GetMapping),而後對應到注入的myHandler的三個方法上。 而後咱們還須要建立Mongodb的Repository:
package me.josephzhu.spring101webflux;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
複製代碼
以及配置和啓動類:
package me.josephzhu.spring101webflux;
import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {
@Bean
MongoClient mongoClient(){
return MongoClients.create(mongoClientSettings());
}
@Bean
MongoClientSettings mongoClientSettings(){
return MongoClientSettings.builder()
.clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
.connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxApplication.class, args);
}
}
複製代碼
這裏對Mongodb作了一些配置,主要也是但願放大鏈接池這塊的默認限制,爲從此的壓測服務。注意,在這裏配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下圖所示,還有其它兩個MongoClient,若是修改了不匹配的MongoClient的話是不會有做用的,我在這個坑裏躺了兩小時。
完成後能夠打開瀏覽器測試一下接口:下圖是官網的一個圖說明了二者的關係,而後官網也給出了一些建議:
咱們知道對於阻塞的實現方式,咱們採用線程池來服務請求(線程池中的會維護一組普通的線程,線程池只是節省線程建立的時間),對於每個請求的處理,至始至終都是在一個線程中進行,若是處理的過程當中咱們須要訪問外部的網絡或數據庫,那麼線程就處於阻塞狀態,這個線程沒法服務其它請求,若是當時還有更多的併發的話,就須要建立更多的線程來服務其它請求。這種實現方式是很是簡單的,應對壓力的增加擴容方式也是粗暴的,那就是增長更多線程。
對於非阻塞的方式,採用的是EventLoop的方式,IO操做的時候是不佔用工做線程的,所以只會建立一組和CPU核數至關的工做線程用於工做處理(NodeJS甚至是單線程的,這種就更危險了,就那麼一個工做線程,一旦被長時間佔用其它請求都沒法處理)。因爲整個處理過程當中IO請求不佔用線程時間,線程不會阻塞等待,再增長超過CPU核數的工做線程也是沒有意義的(只會白白增長線程切換的開銷)。對於這種方式在壓力增加後,由於咱們不須要增長額外的線程,也就沒有了絕對的瓶頸。
試想一下在阻塞模型下,對於5000的併發,並且每個併發阻塞的時間很是長,那麼咱們其實須要5000個線程來服務(這麼多線程99%其實都是在等待,屬於空耗系統資源),建立5000的線程不談其它的,若是線程棧大小是1M的話就須要5GB的內存。對於非阻塞的線程模型在8核機器上仍是8個工做線程,內存佔用仍是這麼小,能夠以最小的開銷應對大併發,系統的損耗不多。非阻塞的Reactive模式是內耗很是小的模式,可是這是有代價的,在實現上咱們須要確保處理過程當中沒有阻塞產生,不然就會浪費寶貴的數目固定的工做線程,也就是說咱們須要依賴配套的非阻塞IO類庫來使用。 在默認狀況下tomcat的工做線程池初始化爲10,最大200,咱們經過啓動本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具來看下初始的狀況(35個線程) :
在項目的application.properties文件中咱們配置tomcat的最大線程數: server.tomcat.max-threads=250 在壓力的狀況下,咱們再來觀察一下線程的狀況(272個線程): 的確是建立多達250個工做線程。這裏看到大部分線程都在休眠,由於咱們這裏運行的是剛纔的data()方法,在方法內咱們休眠了100毫秒。對於一樣的壓力,咱們再來看一下Spring101WebfluxApplication程序的線程狀況(44個線程): 能夠看到用於處理HTTP的Reactor線程只有8個,和本機CPU核數量一致(下面有十個Thread打頭的線程是處理和Mongodb交互的,忽略),只須要這8個線程處理HTTP請求足以,由於HTTP請求的IO處理不會佔用線程。咱們可使用Gatling類庫進行壓力測試,我我的感受比Jmeter方便。配置很簡單,首先咱們要安裝Scala的SDK,而後咱們新建一個模塊:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.josephzhu</groupId>
<artifactId>spring101-webstresstest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring101-webstresstest</name>
<description></description>
<dependencies>
<dependency>
<groupId>io.gatling.highcharts</groupId>
<artifactId>gatling-charts-highcharts</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.gatling</groupId>
<artifactId>gatling-maven-plugin</artifactId>
<version>2.2.4</version>
<configuration>
<simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass>
<resultsFolder>/Users/zyhome/gatling</resultsFolder>
</configuration>
</plugin>
</plugins>
</build>
</project>
複製代碼
引入了garling的maven插件,在這裏配置了測試結果輸出路徑以及壓測的類。接下去建立一下這個Scala測試類:
package me.josephzhu.spring101.webstresstest
import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼
這段代碼定義了以下的測試行爲:
nothingFor(4 seconds), // 1
atOnceUsers(10), // 2
rampUsers(10) over (5 seconds), // 3
constantUsersPerSec(20) during (15 seconds), // 4
constantUsersPerSec(20) during (15 seconds) randomized, // 5
rampUsersPerSec(10) to 20 during (10 minutes), // 6
rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
heavisideUsers(1000) over (20 seconds) // 10
複製代碼
先來進行第一個測試,1000併發對data接口進行100次循環(還記得嗎,接口有100ms休眠or延遲的):
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("mvc data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(1000)))
}
複製代碼
下面兩個圖分別是MVC和WebFlux的測試結果(由於都是8080端口,因此測試的時候記得切換重啓兩個應用哦):
能夠看到WebFlux的吞吐幾乎是MVC的翻倍,平均響應時間少了兩倍不止,很明顯,在等待的時候,2000個併發用戶大大超過了咱們配置的250個線程池的線程數量,這個時候只能排隊,對於非阻塞的方式,延遲是不會佔用處理線程的,在延遲結束後纔會去佔用處理線程的資源進行處理,不會收到併發用戶數受限於線程池線程數的狀況。 咱們把Sleep相關代碼註釋再進行一次測試看看狀況,分別是MVC和WebFlux: 這個時候WebFlux優點沒有那麼明顯了。如今咱們來訪問一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb來初始化100條數據,而後修改一下測試腳本壓測dbData接口: class StressTest extends Simulation {
val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }
setUp(scn.inject(atOnceUsers(1000))) } 下面看下此次的測試結果 ,分別是MVC和WebFlux:
吞吐量沒有太多提升,平均響應時間快很多。再來試一下第三個saveData接口的狀況。修改測試代碼:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=100000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼
這裏咱們修改併發用戶爲200,每一個用戶進行100次測試,每次測試存入Mongodb 5條100KB的數據,一次測試後總數據量在10萬條。此次測試咱們並無使用1000併發用戶,緣由是這個測試咱們會先從遠端獲取數據而後再存入Mongodb,遠端的服務也是來自於當前應用程序,咱們的Tomcat最多隻有250個線程,在啓動1000個用戶後,一些線程服務於saveData接口,一些線程服務於data接口(saveData接口用到的),這樣至關於形成了循環依賴問題,請求在等待更多的可用線程執行服務data接口的響應,而這個時候線程又都被佔了致使沒法分配更多的請求,測試幾乎所有超時。 下面看下此次的測試結果 ,分別是MVC和WebFlux:
WebFlux也是併發略高,性能略好的優點。對於響應時間的分佈咱們再來細看下下面的圖: 第一個圖是MVC版本的響應時間分佈,能夠看到抖動比第二個圖的WebFlux的大很多。 最後來看看測試過程當中MVC的JVM狀況(263個線程): 以及WebFlux的(41線程):咱們來測試一下下面兩種狀況下對於WebFlux版本Mongodb側的狀況:
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=1&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼
以及
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼
區別就在遠程服務返回的Flux是1個仍是5個。在1個的時候運行測試能夠看到咱們Mongodb有64個鏈接(須要把以前鏈接池的配置最小設置爲小一點,好比50):
> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }
複製代碼
在size爲5的時候,Flux返回的是5個對象,使用這個請求壓測的時候Mongodb的鏈接數以下:
> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }
複製代碼
這是由於Flux拿到的數據直接以響應式進入Mongodb,並無等到全部數據拿到以後串行調用方法。 總結一下這幾回的測試,咱們發現WebFlux方式對於MVC方式能有略微的性能提高,對於請求阻塞的時候性能優點明顯。我本金的測試並無看到現象中的幾倍甚至幾十倍的性能提高,我猜緣由以下:
若是有條件可使用三臺獨立服務器在內網進行1萬以上併發用戶的性能測試或許能夠獲得更科學的結果。
本文咱們建立了WebFlux和MVC兩套應用對比演示了簡單返回數據、發出遠程請求、使用Mongodb等一些簡單的應用場景,而後來看了一下ThreadPerRequest和EventLoop方式線程模型的區別,最後使用Gatling進行了幾個Case的壓力測試而且觀察結果。我以爲:
綜上所述,使用WebFlux進行響應式編程我我的認爲目前只適合作類IO轉發的高併發的又看中資源使用效率的應用場景(好比Gateway網關服務),對於複雜的業務邏輯不太適合,在90%的狀況下響應式的編程模型和線程模型不會享受大幅性能優點,更不建議盲目把現有的應用使用WebFlux來重寫。固然,這確定是一個會持續發展的方向,能夠先接觸研究起來。