閱讀PDF版本前端
本文會來作一些應用對比Spring MVC和Spring WebFlux,觀察線程模型的區別,而後作一下簡單的壓力測試。java
先來建立一個新的webflux-mvc的模塊:react
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webflux-mvc</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webflux-mvc</name> <description></description> <parent> <groupId>me.josephzhu</groupId> <artifactId>spring101</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
而後在項目裏定義一個咱們會使用到的POJO:web
package me.josephzhu.spring101webfluxmvc; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @Data @AllArgsConstructor @NoArgsConstructor @Document(collection = "mydata") public class MyData { @Id private String id; private String payload; private long time; }
這裏的@Document和@Id是爲Mongodb服務的,咱們定義了MyData將會以mydata做爲Collection的名字,而後id字段是Document的Id列。
而後咱們來建立Controller,在這個Controller裏面咱們嘗試三種不一樣的操做:spring
package me.josephzhu.spring101webfluxmvc; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponentsBuilder; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @RestController public class MyController { @Autowired private RestTemplate restTemplate; @Autowired private MyRepository myRepository; @GetMapping("/data") public List<MyData> getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) { try { Thread.sleep(100); } catch (InterruptedException e) { } String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining()); return IntStream.rangeClosed(1, size) .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis())) .collect(Collectors.toList()); } @GetMapping("/dbData") public List<MyData> getDbData() { return myRepository.findAll(); } @GetMapping("/saveData") public List<MyData> saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){ UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data") .queryParam("size", size) .queryParam("length", length); ResponseEntity<List<MyData>> responseEntity = restTemplate.exchange(builder.toUriString(), HttpMethod.GET, null, new ParameterizedTypeReference<List<MyData>>() {}); return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList()); } }
注意,在這裏咱們使用了Java 8的Steam來作一些操做避免使用for循環:mongodb
package me.josephzhu.spring101webfluxmvc; import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository public interface MyRepository extends MongoRepository<MyData, String> { }
由於咱們沒有用到複雜的查詢,在代碼裏只是用到了findAll方法,因此這裏咱們無需定義額外的方法,只是聲明接口便可。
最後,咱們建立主應用程序,順便配置一下Mongodb和RestTemplate:數據庫
package me.josephzhu.spring101webfluxmvc; import com.mongodb.MongoClientOptions; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestTemplate; @SpringBootApplication @Configuration public class Spring101WebfluxMvcApplication { @Bean MongoClientOptions mongoClientOptions(){ return MongoClientOptions.builder().connectionsPerHost(1000).build(); } @Bean public RestTemplate restTemplate(RestTemplateBuilder builder) { return builder.build(); } public static void main(String[] args) { SpringApplication.run(Spring101WebfluxMvcApplication.class, args); } }
這裏咱們配置了Mongodb客戶端使得以後在進行壓力測試的時候能有超過100個鏈接鏈接到Mongodb,不然會出現沒法獲取鏈接的問題。apache
如今咱們再來新建一個webflux模塊:編程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webflux</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webflux</name> <description></description> <parent> <groupId>me.josephzhu</groupId> <artifactId>spring101</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
這裏能夠注意到,咱們引入了webflux這個starter以及data-mongodb-reactive這個starter。在以前的Spring MVC項目中,咱們引入的是mvc和data-mongodb兩個starter。
而後,咱們一樣須要建立一下MyData類(代碼和以前如出一轍,這裏省略)。
最關鍵的一步,咱們來建立三個Controller方法的定義:json
package me.josephzhu.spring101webflux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Component public class MyHandler { @Autowired private MyReactiveRepository myReactiveRepository; public Mono<ServerResponse> getData(ServerRequest serverRequest) { int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10")); int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100")); String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining()); Flux<MyData> data = Flux.fromStream(IntStream.rangeClosed(1, size) .mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100)); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } public Mono<ServerResponse> getDbData(ServerRequest serverRequest) { Flux<MyData> data = myReactiveRepository.findAll(); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } public Mono<ServerResponse> saveData(ServerRequest serverRequest) { int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10")); int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100")); Flux<MyData> data = WebClient.create().get() .uri(builder -> builder .scheme("http") .host("localhost") .port(8080) .path("data") .queryParam("size", size) .queryParam("length", length) .build()) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToFlux(MyData.class) .flatMap(myReactiveRepository::save); return ok() .contentType(MediaType.APPLICATION_JSON) .body(data, MyData.class); } }
這裏要說明幾點:
package me.josephzhu.spring101webflux; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RouterFunctions.route; @Configuration public class RouterConfig { @Autowired private MyHandler myHandler; @Bean public RouterFunction<ServerResponse> config() { return route(GET("/data"), myHandler::getData) .andRoute(GET("/dbData"), myHandler::getDbData) .andRoute(GET("/saveData"), myHandler::saveData); } }
這段代碼沒有太多須要說明,這裏咱們定義了三個GET請求(至關於MVC的@GetMapping),而後對應到注入的myHandler的三個方法上。
而後咱們還須要建立Mongodb的Repository:
package me.josephzhu.spring101webflux; import org.springframework.data.mongodb.repository.ReactiveMongoRepository; import org.springframework.stereotype.Repository; @Repository public interface MyReactiveRepository extends ReactiveMongoRepository<MyData, String> { }
以及配置和啓動類:
package me.josephzhu.spring101webflux; import com.mongodb.ConnectionString; import com.mongodb.async.client.MongoClientSettings; import com.mongodb.connection.ClusterSettings; import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoClients; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @SpringBootApplication @Configuration public class Spring101WebfluxApplication { @Bean MongoClient mongoClient(){ return MongoClients.create(mongoClientSettings()); } @Bean MongoClientSettings mongoClientSettings(){ return MongoClientSettings.builder() .clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build()) .connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build()) .build(); } public static void main(String[] args) { SpringApplication.run(Spring101WebfluxApplication.class, args); } }
這裏對Mongodb作了一些配置,主要也是但願放大鏈接池這塊的默認限制,爲從此的壓測服務。注意,在這裏配置的Bean是com.mongodb.reactivestream.client下的MongoClient,以下圖所示,還有其它兩個MongoClient,若是修改了不匹配的MongoClient的話是不會有做用的,我在這個坑裏躺了兩小時。
完成後能夠打開瀏覽器測試一下接口:
下圖是官網的一個圖說明了二者的關係,而後官網也給出了一些建議:
咱們知道對於阻塞的實現方式,咱們採用線程池來服務請求(線程池中的會維護一組普通的線程,線程池只是節省線程建立的時間),對於每個請求的處理,至始至終都是在一個線程中進行,若是處理的過程當中咱們須要訪問外部的網絡或數據庫,那麼線程就處於阻塞狀態,這個線程沒法服務其它請求,若是當時還有更多的併發的話,就須要建立更多的線程來服務其它請求。這種實現方式是很是簡單的,應對壓力的增加擴容方式也是粗暴的,那就是增長更多線程。
對於非阻塞的方式,採用的是EventLoop的方式,IO操做的時候是不佔用工做線程的,所以只會建立一組和CPU核數至關的工做線程用於工做處理(NodeJS甚至是單線程的,這種就更危險了,就那麼一個工做線程,一旦被長時間佔用其它請求都沒法處理)。因爲整個處理過程當中IO請求不佔用線程時間,線程不會阻塞等待,再增長超過CPU核數的工做線程也是沒有意義的(只會白白增長線程切換的開銷)。對於這種方式在壓力增加後,由於咱們不須要增長額外的線程,也就沒有了絕對的瓶頸。
試想一下在阻塞模型下,對於5000的併發,並且每個併發阻塞的時間很是長,那麼咱們其實須要5000個線程來服務(這麼多線程99%其實都是在等待,屬於空耗系統資源),建立5000的線程不談其它的,若是線程棧大小是1M的話就須要5GB的內存。對於非阻塞的線程模型在8核機器上仍是8個工做線程,內存佔用仍是這麼小,能夠以最小的開銷應對大併發,系統的損耗不多。非阻塞的Reactive模式是內耗很是小的模式,可是這是有代價的,在實現上咱們須要確保處理過程當中沒有阻塞產生,不然就會浪費寶貴的數目固定的工做線程,也就是說咱們須要依賴配套的非阻塞IO類庫來使用。
在默認狀況下tomcat的工做線程池初始化爲10,最大200,咱們經過啓動本文建立的Spring101WebfluxMvcApplication程序,用jvisualvm工具來看下初始的狀況(35個線程):
在項目的application.properties文件中咱們配置tomcat的最大線程數:
server.tomcat.max-threads=250
在壓力的狀況下,咱們再來觀察一下線程的狀況(272個線程):
的確是建立多達250個工做線程。這裏看到大部分線程都在休眠,由於咱們這裏運行的是剛纔的data()方法,在方法內咱們休眠了100毫秒。對於一樣的壓力,咱們再來看一下Spring101WebfluxApplication程序的線程狀況(44個線程):
能夠看到用於處理HTTP的Reactor線程只有8個,和本機CPU核數量一致(下面有十個Thread打頭的線程是處理和Mongodb交互的,忽略),只須要這8個線程處理HTTP請求足以,由於HTTP請求的IO處理不會佔用線程。
咱們可使用Gatling類庫進行壓力測試,我我的感受比Jmeter方便。配置很簡單,首先咱們要安裝Scala的SDK,而後咱們新建一個模塊:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>me.josephzhu</groupId> <artifactId>spring101-webstresstest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring101-webstresstest</name> <description></description> <dependencies> <dependency> <groupId>io.gatling.highcharts</groupId> <artifactId>gatling-charts-highcharts</artifactId> <version>2.3.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>io.gatling</groupId> <artifactId>gatling-maven-plugin</artifactId> <version>2.2.4</version> <configuration> <simulationClass>me.josephzhu.spring101.webstresstest.StressTest</simulationClass> <resultsFolder>/Users/zyhome/gatling</resultsFolder> </configuration> </plugin> </plugins> </build> </project>
引入了garling的maven插件,在這裏配置了測試結果輸出路徑以及壓測的類。接下去建立一下這個Scala測試類:
package me.josephzhu.spring101.webstresstest import io.gatling.core.Predef._ import io.gatling.core.scenario.Simulation import io.gatling.http.Predef._ class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/data?size=10&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
這段代碼定義了以下的測試行爲:
nothingFor(4 seconds), // 1 atOnceUsers(10), // 2 rampUsers(10) over (5 seconds), // 3 constantUsersPerSec(20) during (15 seconds), // 4 constantUsersPerSec(20) during (15 seconds) randomized, // 5 rampUsersPerSec(10) to 20 during (10 minutes), // 6 rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7 splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8 splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9 heavisideUsers(1000) over (20 seconds) // 10
先來進行第一個測試,1000併發對data接口進行100次循環(還記得嗎,接口有100ms休眠or延遲的):
class StressTest extends Simulation { val scn = scenario("data").repeat(100) { exec( http("mvc data") .get("http://localhost:8080/data?size=10&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(1000))) }
下面兩個圖分別是MVC和WebFlux的測試結果(由於都是8080端口,因此測試的時候記得切換重啓兩個應用哦):
能夠看到WebFlux的吞吐幾乎是MVC的翻倍,平均響應時間少了兩倍不止,很明顯,在等待的時候,2000個併發用戶大大超過了咱們配置的250個線程池的線程數量,這個時候只能排隊,對於非阻塞的方式,延遲是不會佔用處理線程的,在延遲結束後纔會去佔用處理線程的資源進行處理,不會收到併發用戶數受限於線程池線程數的狀況。
咱們把Sleep相關代碼註釋再進行一次測試看看狀況,分別是MVC和WebFlux:
這個時候WebFlux優點沒有那麼明顯了。
如今咱們來訪問一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb來初始化100條數據,而後修改一下測試腳本壓測dbData接口:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/dbData")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(1000)))
}
下面看下此次的測試結果 ,分別是MVC和WebFlux:
吞吐量沒有太多提升,平均響應時間快很多。
再來試一下第三個saveData接口的狀況。修改測試代碼:
class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=100000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
這裏咱們修改併發用戶爲200,每一個用戶進行100次測試,每次測試存入Mongodb 5條100KB的數據,一次測試後總數據量在10萬條。此次測試咱們並無使用1000併發用戶,緣由是這個測試咱們會先從遠端獲取數據而後再存入Mongodb,遠端的服務也是來自於當前應用程序,咱們的Tomcat最多隻有250個線程,在啓動1000個用戶後,一些線程服務於saveData接口,一些線程服務於data接口(saveData接口用到的),這樣至關於形成了循環依賴問題,請求在等待更多的可用線程執行服務data接口的響應,而這個時候線程又都被佔了致使沒法分配更多的請求,測試幾乎所有超時。
下面看下此次的測試結果 ,分別是MVC和WebFlux:
WebFlux也是併發略高,性能略好的優點。對於響應時間的分佈咱們再來細看下下面的圖:
第一個圖是MVC版本的響應時間分佈,能夠看到抖動比第二個圖的WebFlux的大很多。
最後來看看測試過程當中MVC的JVM狀況(263個線程):
以及WebFlux的(41線程):
咱們來測試一下下面兩種狀況下對於WebFlux版本Mongodb側的狀況:
class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/saveData?size=1&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
以及
class StressTest extends Simulation { val scn = scenario("data").repeat(1000) { exec( http("data") .get("http://localhost:8080/saveData?size=5&length=1000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) } setUp(scn.inject(atOnceUsers(200))) }
區別就在遠程服務返回的Flux
> db.serverStatus().connections { "current" : 64, "available" : 3212, "totalCreated" : 8899 }
在size爲5的時候,Flux返回的是5個對象,使用這個請求壓測的時候Mongodb的鏈接數以下:
> db.serverStatus().connections { "current" : 583, "available" : 2693, "totalCreated" : 10226 }
這是由於Flux拿到的數據直接以響應式進入Mongodb,並無等到全部數據拿到以後串行調用方法。
總結一下這幾回的測試,咱們發現WebFlux方式對於MVC方式能有略微的性能提高,對於請求阻塞的時候性能優點明顯。我本金的測試並無看到現象中的幾倍甚至幾十倍的性能提高,我猜緣由以下:
本文咱們建立了WebFlux和MVC兩套應用對比演示了簡單返回數據、發出遠程請求、使用Mongodb等一些簡單的應用場景,而後來看了一下ThreadPerRequest和EventLoop方式線程模型的區別,最後使用Gatling進行了幾個Case的壓力測試而且觀察結果。我以爲: