(5)Spring WebFlux快速上手——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 lambda與函數式 | Reactor 3快速上手
本文源碼html

1.3.3 Spring WebFlux

Spring WebFlux是隨Spring 5推出的響應式Web框架。java

(5)Spring WebFlux快速上手——響應式Spring的道法術器

1)服務端技術棧react

Spring提供了完整的支持響應式的服務端技術棧。linux

如上圖所示,左側爲基於spring-webmvc的技術棧,右側爲基於spring-webflux的技術棧,git

  • Spring WebFlux是基於響應式流的,所以能夠用來創建異步的、非阻塞的、事件驅動的服務。它採用Reactor做爲首選的響應式流的實現庫,不過也提供了對RxJava的支持。
  • 因爲響應式編程的特性,Spring WebFlux和Reactor底層須要支持異步的運行環境,好比Netty和Undertow;也能夠運行在支持異步I/O的Servlet 3.1的容器之上,好比Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。
  • 從圖的縱向上看,spring-webflux上層支持兩種開發模式:
    • 相似於Spring WebMVC的基於註解(@Controller@RequestMapping)的開發模式;
    • Java 8 lambda 風格的函數式開發模式。
  • Spring WebFlux也支持響應式的Websocket服務端開發。

由此看來,Spring WebFlux與Vert.x有一些相通之處,都是創建在非阻塞的異步I/O和事件驅動的基礎之上的。github

2)響應式Http客戶端web

此外,Spring WebFlux也提供了一個響應式的Http客戶端API WebClient。它能夠用函數式的方式異步非阻塞地發起Http請求並處理響應。其底層也是由Netty提供的異步支持。ajax

咱們能夠把WebClient看作是響應式的RestTemplate,與後者相比,前者:spring

  • 是非阻塞的,能夠基於少許的線程處理更高的併發;
  • 可使用Java 8 lambda表達式;
  • 支持異步的同時也能夠支持同步的使用方式;
  • 能夠經過數據流的方式與服務端進行雙向通訊。

固然,與服務端對應的,Spring WebFlux也提供了響應式的Websocket客戶端API。mongodb


簡單介紹這些,讓咱們來Coding吧~

本節,咱們經過如下幾個例子來逐步深刻地瞭解它的使用方法:

** 1. 先介紹一下使用Spring WebMVC風格的基於註解的方式如何編寫響應式的Web服務,這幾乎沒有學習成本,很是贊。雖然這種方式在開發上與Spring WebMVC變化不大,可是框架底層已是徹底的響應式技術棧了;

  1. 再進一步介紹函數式的開發模式;
  2. 簡單幾行代碼實現服務端推送(Server Send Event,SSE);
  3. 而後咱們再加入響應式數據庫的支持(使用Reactive Spring Data for MongoDB);
  4. 使用WebClient與前幾步作好的服務端進行通訊;
  5. 最後咱們看一下如何經過「流」的方式在Http上進行通訊。**

Spring Boot 2是基於Spring 5的,其中一個比較大的更新就在於支持包括spring-webflux和響應式的spring-data在內的響應式模塊。Spring Boot 2即將發佈正式版,不過目前的版本從功能上已經完備,下邊的例子咱們就用Spring Boot 2在進行搭建。

1.3.3.1 基於WebMVC註解的方式

咱們首先用Spring WebMVC開發一個只有Controller層的簡單的Web服務,而後僅僅作一點點調整就可切換爲基於Spring WebFlux的具備一樣功能的Web服務。

咱們使用Spring Boot 2搭建項目框架。

如下截圖來自IntelliJ IDEA,不過其餘IDE也都是相似的。

1)基於Spring Initializr建立項目

本節的例子很簡單,不涉及Service層和Dao層,所以只選擇spring-webmvc便可,也就是「Web」的starter。

(5)Spring WebFlux快速上手——響應式Spring的道法術器

也可使用網頁版的https://start.spring.io來建立項目:

(5)Spring WebFlux快速上手——響應式Spring的道法術器

建立後的項目POM中,包含下邊的依賴,即表示基於Spring WebMVC:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

2)建立Controller和Endpoint

建立Controller類HelloController,僅提供一個Endpoint:/hello

@RestController
    public class HelloController {

        @GetMapping("/hello")
        public String hello() {
            return "Welcome to reactive world ~";
        }
    }

3)啓動應用

OK了,一個簡單的基於Spring WebMVC的Web服務。咱們新增了HelloController.java,修改了application.properties

(5)Spring WebFlux快速上手——響應式Spring的道法術器

使用IDE啓動應用,或使用maven命令:

mvn spring-boot:run

經過打印的log能夠看到,服務運行於Tomcat的8080端口:

spring-webmvc

測試Endpoint。在瀏覽器中訪問http://localhost:8080/hello,或運行命令:

curl http://localhost:8080/hello

返回Welcome to reactive world ~

基於Spring WebFlux的項目與上邊的步驟一致,僅有兩點不一樣。咱們此次偷個懶,就不重新建項目了,修改一下上邊的項目:

4)依賴「Reactive Web」的starter而不是「Web」

修改項目POM,調整依賴使其基於Spring WebFlux:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-WebFlux</artifactId>    <!--【改】增長「flux」四個字符-->
    </dependency>

5)Controller中處理請求的返回類型採用響應式類型

@RestController
    public class HelloController {

        @GetMapping("/hello")
        public Mono<String> hello() {   // 【改】返回類型爲Mono<String>
            return Mono.just("Welcome to reactive world ~");     // 【改】使用Mono.just生成響應式數據
        }
    }

6)啓動應用

僅須要上邊兩步就改完了,是否是很簡單,一樣的方法啓動應用。啓動後發現應用運行於Netty上:

spring-webflux

訪問http://localhost:8080/hello,結果與Spring WebMVC的相同。

7)總結

從上邊這個很是很是簡單的例子中能夠看出,Spring真是用心良苦,WebFlux提供了與以前WebMVC相同的一套註解來定義請求的處理,使得Spring使用者遷移到響應式開發方式的過程變得異常輕鬆。

雖然咱們只修改了少許的代碼,可是其實這個簡單的項目已經脫胎換骨了。整個技術棧從命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】變成了響應式的、異步非阻塞的【spring-webflux + Reactor + Netty】。

Netty是一套異步的、事件驅動的網絡應用程序框架和工具,可以開發高性能、高可靠性的網絡服務器和客戶端程序,所以與一樣是異步的、事件驅動的響應式編程範式一拍即合。

下邊的內容瞭解便可,就不實戰了。
在Java 7推出異步I/O庫,以及Servlet3.1增長了對異步I/O的支持以後,Tomcat等Servlet容器也隨後開始支持異步I/O,而後Spring WebMVC也增長了對Reactor庫的支持,因此上邊第4)步若是不是將spring-boot-starter-web替換爲spring-boot-starter-WebFlux,而是增長reactor-core的依賴的話,仍然能夠用註解的方式開發基於Tomcat的響應式應用。

1.3.3.2 WebFlux的函數式開發模式

既然是響應式編程了,有些朋友可能會想統一用函數式的編程風格,WebFlux知足你。WebFlux提供了一套函數式接口,能夠用來實現相似MVC的效果。咱們先接觸兩個經常使用的。

再回頭瞧一眼上邊例子中咱們用Controller定義定義對Request的處理邏輯的方式,主要有兩個點:

  1. 方法定義處理邏輯;
  2. 而後用@RequestMapping註解定義好這個方法對什麼樣url進行響應。

在WebFlux的函數式開發模式中,咱們用HandlerFunctionRouterFunction來實現上邊這兩點。

  • HandlerFunction至關於Controller中的具體處理方法,輸入爲請求,輸出爲裝在Mono中的響應:
Mono<T extends ServerResponse> handle(ServerRequest request);
  • RouterFunction,顧名思義,路由,至關於@RequestMapping,用來判斷什麼樣的url映射到那個具體的HandlerFunction,輸入爲請求,輸出爲裝在Mono裏邊的Handlerfunction
Mono<HandlerFunction<T>> route(ServerRequest request);

咱們看到,在WebFlux中,請求和響應再也不是WebMVC中的ServletRequestServletResponse,而是ServerRequestServerResponse。後者是在響應式編程中使用的接口,它們提供了對非阻塞和回壓特性的支持,以及Http消息體與響應式類型Mono和Flux的轉換方法。

下面咱們用函數式的方式開發兩個Endpoint:

  1. /time返回當前的時間;
  2. /date返回當前的日期。

對於這兩個需求,HandlerFunction很容易寫:

// 返回包含時間字符串的ServerResponse
    HandlerFunction<ServerResponse> timeFunction = 
        request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
            Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);

    // 返回包含日期字符串的ServerResponse
    HandlerFunction<ServerResponse> dateFunction = 
        request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
            Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);

那麼RouterFunction爲:

RouterFunction<ServerResponse> router = 
        RouterFunctions.route(GET("/time"), timeFunction)
            .andRoute(GET("/date"), dateFunction);

按照常見的套路,RouterFunctions是工具類。

不過這麼寫在業務邏輯複雜的時候不太好組織,咱們一般採用跟MVC相似的代碼組織方式,將同類業務的HandlerFunction放在一個類中,而後在Java Config中將RouterFunction配置爲Spring容器的Bean。咱們繼續在第一個例子的代碼上開發:

1)建立統一存放處理時間的Handler類

建立TimeHandler.java

import static org.springframework.web.reactive.function.server.ServerResponse.ok;

    @Component
    public class TimeHandler {
        public Mono<ServerResponse> getTime(ServerRequest serverRequest) {
            return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);
        }
        public Mono<ServerResponse> getDate(ServerRequest serverRequest) {
            return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
        }
    }

因爲出現次數一般比較多,這裏靜態引入ServerResponse.ok()方法。

2)在Spring容器配置RouterFunction

咱們採用Spring如今比較推薦的Java Config的配置Bean的方式,建立用於存放Router的配置類RouterConfig.java

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
    import static org.springframework.web.reactive.function.server.RouterFunctions.route;

    @Configuration
    public class RouterConfig {
        @Autowired
        private TimeHandler timeHandler;

        @Bean
        public RouterFunction<ServerResponse> timerRouter() {
            return route(GET("/time"), req -> timeHandler.getTime(req))
                    .andRoute(GET("/date"), timeHandler::getDate);  // 這種方式相對於上一行更加簡潔
        }
    }

3)重啓服務試一試

重啓服務測試一下吧:

$ curl http://localhost:8080/date
Today is 2018-02-26

$ curl http://localhost:8080/time
Now is 21:12:53

1.3.3.3 服務器推送

咱們可能會遇到一些須要網頁與服務器端保持鏈接(起碼看上去是保持鏈接)的需求,好比相似微信網頁版的聊天類應用,好比須要頻繁更新頁面數據的監控系統頁面或股票看盤頁面。咱們一般採用以下幾種技術:

  • 短輪詢:利用ajax按期向服務器請求,不管數據是否更新立馬返回數據,高併發狀況下可能會對服務器和帶寬形成壓力;
  • 長輪詢:利用comet不斷向服務器發起請求,服務器將請求暫時掛起,直到有新的數據的時候才返回,相對短輪詢減小了請求次數;
  • SSE:服務端推送(Server Send Event),在客戶端發起一次請求後會保持該鏈接,服務器端基於該鏈接持續向客戶端發送數據,從HTML5開始加入。
  • Websocket:這是也是一種保持鏈接的技術,而且是雙向的,從HTML5開始加入,並不是徹底基於HTTP,適合於頻繁和較大流量的雙向通信場景。

既然響應式編程是一種基於數據流的編程範式,天然在服務器推送方面駕輕就熟,咱們基於函數式方式再增長一個Endpoint /times,能夠每秒推送一次時間。

1)增長Handler方法

TimeHandler.java

public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) {
        return ok().contentType(MediaType.TEXT_EVENT_STREAM).body(  // 1
                Flux.interval(Duration.ofSeconds(1)).   // 2
                        map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())), 
                String.class);
    }
  1. MediaType.TEXT_EVENT_STREAM表示Content-Typetext/event-stream,即SSE;
  2. 利用interval生成每秒一個數據的流。

2)配置router

RouterConfig.java

@Bean
        public RouterFunction<ServerResponse> timerRouter() {
            return route(GET("/time"), timeHandler::getTime)
                    .andRoute(GET("/date"), timeHandler::getDate)
                    .andRoute(GET("/times"), timeHandler::sendTimePerSec);  // 增長這一行
        }

3)重啓服務試一下

重啓服務後,測試一下:

curl http://localhost:8080/times
data:21:32:22
data:21:32:23
data:21:32:24
data:21:32:25
data:21:32:26
<Ctrl+C>

就醬,訪問這個url會收到持續不斷的報時數據(時間數據是在data中的)。

那麼用註解的方式如何進行服務端推送呢,這個演示就融到下一個例子中吧~

1.3.3.3 響應式Spring Data

開發基於響應式流的應用,就像是在搭建數據流流動的管道,從而異步的數據可以順暢流過每一個環節。前邊的例子主要聚焦於應用層,然而絕大多數系統免不了要與數據庫進行交互,因此咱們也須要響應式的持久層API和支持異步的數據庫驅動。就像從自來水廠到家裏水龍頭這個管道中,若是任何一個環節發生了阻塞,那就可能形成總體吞吐量的降低。

各個數據庫都開始陸續推出異步驅動,目前Spring Data支持的能夠進行響應式數據訪問的數據庫有MongoDB、Redis、Apache Cassandra和CouchDB。今天咱們用MongoDB來寫一個響應式demo。

咱們這個例子很簡單,就是關於User的增刪改查,以及基於註解的服務端推送。

1)編寫User

既然是舉例,咱們隨便定義幾個屬性吧~

public class User {
        private String id;
        private String username;
        private String phone;
        private String email;
        private String name;
        private Date birthday;
    }

而後爲了方便開發,咱們引入lombok庫,它可以經過註解的方式爲咱們添加必要的Getter/Setter/hashCode()/equals()/toString()/構造方法等,添加依賴(版本可自行到http://search.maven.org搜索最新):

<dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>

而後爲User添加註解:

@Data   // 生成無參構造方法/getter/setter/hashCode/equals/toString
    @AllArgsConstructor // 生成全部參數構造方法
    @NoArgsConstructor  // @AllArgsConstructor會致使@Data不生成無參構造方法,須要手動添加@NoArgsConstructor,若是沒有無參構造方法,可能會致使好比com.fasterxml.jackson在序列化處理時報錯
    public class User {
        ...

咱們能夠利用IDE看一下生成的方法(以下圖黃框所示):

(5)Spring WebFlux快速上手——響應式Spring的道法術器

可能須要先在IDE中進行少許配置以便支持lombok的註解,好比IntelliJ IDEA:

  1. 安裝「lombok plugin」:
    (5)Spring WebFlux快速上手——響應式Spring的道法術器
  2. 開啓對註解編譯的支持:
    (5)Spring WebFlux快速上手——響應式Spring的道法術器

lombok對於Java開發者來講絕對算是個福音了,但願使用Kotlin的朋友不要笑話咱們土哦~

2)增長Spring Data的依賴

在POM中增長Spring Data Reactive Mongo的依賴:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>

MongoDB是文檔型的NoSQL數據庫,所以,咱們使用@Document註解User類:

@Data
    @AllArgsConstructor
    @Document
    public class User {
        @Id
        private String id;      // 註解屬性id爲ID
        @Indexed(unique = true) // 註解屬性username爲索引,而且不能重複
        private String username;
        private String name;
        private String phone;
        private Date birthday;
    }

OK,這樣咱們的模型就準備好了。MongoDB會自動建立collection,默認爲類名首字母小寫,也就是user

3)配置數據源

Spring Boot爲咱們搞定了幾乎全部的配置,太讚了,下邊是MongoDB的默認配置:

# MONGODB (MongoProperties)
spring.data.mongodb.authentication-database= # Authentication database name.
spring.data.mongodb.database=test # Database name.
spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use.
spring.data.mongodb.grid-fs-database= # GridFS database name.
spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri.
spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri.
spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri.
spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories.
spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials.
spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri.

請根據須要添加自定義的配置,好比個人MongoDB是跑在IP爲192.168.0.101的虛擬機的Docker中的,就可在application.properties中增長一條:

spring.data.mongodb.host=192.168.0.101

4)增長DAO層repository

與非響應式Spring Data的CrudReposity對應的,響應式的Spring Data也提供了相應的Repository庫:ReactiveCrudReposity,固然,咱們也可使用它的子接口ReactiveMongoRepository

咱們增長UserRepository

public interface UserRepository extends ReactiveCrudRepository<User, String> {  // 1
        Mono<User> findByUsername(String username);     // 2
        Mono<Long> deleteByUsername(String username);
    }
  1. 一樣的,ReactiveCrudRepository的泛型分別是UserID的類型;
  2. ReactiveCrudRepository已經提供了基本的增刪改查的方法,根據業務須要,咱們增長四個方法(在此膜拜一下Spring團隊的牛人們,使得咱們僅需按照規則定義接口方法名便可完成DAO層邏輯的開發,牛~)

5)Service層

因爲業務邏輯幾乎爲零,只是簡單調用了DAO層,直接貼代碼:

@Service
    public class UserService {
        @Autowired
        private UserRepository userRepository;

        /**
         * 保存或更新。
         * 若是傳入的user沒有id屬性,因爲username是unique的,在重複的狀況下有可能報錯,
         * 這時找到以保存的user記錄用傳入的user更新它。
         */
        public Mono<User> save(User user) {
            return userRepository.save(user)
                    .onErrorResume(e ->     // 1
                            userRepository.findByUsername(user.getUsername())   // 2
                                    .flatMap(originalUser -> {      // 4
                                        user.setId(originalUser.getId());
                                        return userRepository.save(user);   // 3
                                    }));
        }

        public Mono<Long> deleteByUsername(String username) {
            return userRepository.deleteByUsername(username);
        }

        public Mono<User> findByUsername(String username) {
            return userRepository.findByUsername(username);
        }

        public Flux<User> findAll() {
            return userRepository.findAll();
        }
    }
  1. onErrorResume進行錯誤處理;
  2. 找到username重複的記錄;
  3. 拿到ID從而進行更新而不是建立;
  4. 因爲函數式爲User -&gt; Publisher,因此用flatMap

6)Controller層

直接貼代碼:

@RestController
    @RequestMapping("/user")
    public class UserController {
        @Autowired
        private UserService userService;

        @PostMapping("")
        public Mono<User> save(User user) {
            return this.userService.save(user);
        }

        @DeleteMapping("/{username}")
        public Mono<Long> deleteByUsername(@PathVariable String username) {
            return this.userService.deleteByUsername(username);
        }

        @GetMapping("/{username}")
        public Mono<User> findByUsername(@PathVariable String username) {
            return this.userService.findByUsername(username);
        }

        @GetMapping("")
        public Flux<User> findAll() {
            return this.userService.findAll();
        }
    }

7)啓動應用測試一下

因爲涉及到POST和DELETE方法的請求,建議用支持RESTful的client來測試,好比「Restlet client」:

title

如圖,增長操做是成功的,只要username不變,再次發送請求會更新該記錄。

圖中birthday的時間差8小時,不去管它。

用一樣的方法增長一個李四,以後咱們再來測試一下查詢。

1) 根據用戶名查詢(METHOD:GET URL:http://localhost:8080/user/zhangsan),下邊輸出是格式化的JSON

{
    "id": "5a9504a167646d057051e229",
    "username": "zhangsan",
    "name": "張三",
    "phone": "18610861861",
    "birthday": "1989-12-31T16:00:00.000+0000"
}

2) 查詢所有(METHOD:GET URL:http://localhost:8080/user

[{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"張三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"},{"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}]

測試一下刪除(METHOD:DELETE URL:http://localhost:8080/user/zhangsan),返回值爲1,再查詢所有,發現張三已經被刪除了,OK

8)stream+json

看到這裏細心的朋友可能會有點嘀咕,怎麼看是否是異步的呢?畢竟查詢所有的時候,結果都用中括號括起來了,這和原來返回List&lt;User&gt;的效果彷佛沒多大區別。假設一下查詢100個數據,若是是異步的話,以咱們對「異步響應式流」的印象彷佛應該是一個一個至少是一批一批的到達客戶端的嘛。咱們加個延遲驗證一下:

@GetMapping("")
    public Flux<User> findAll() {
        return this.userService.findAll().delayElements(Duration.ofSeconds(1));
    }

每一個元素都延遲1秒,如今咱們在數據庫里弄三條記錄,而後請求查詢所有的那個URL,發現並非像/times同樣一秒一個地出來,而是3秒以後一起出來的。果真如此,這一點都不響應式啊!

/times相似,咱們也加一個MediaType,不過因爲這裏返回的是JSON,所以不能使用TEXT_EVENT_STREAM,而是使用APPLICATION_STREAM_JSON,即application/stream+json格式。

@GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> findAll() {
    return this.userService.findAll().delayElements(Duration.ofSeconds(2));
}
  1. produces後邊的值應該是application/stream+json字符串,所以用APPLICATION_STREAM_JSON_VALUE

重啓服務再次請求,發現三個user是一秒一個的速度出來的,中括號也沒有了,而是一個一個獨立的JSON值構成的json stream:

{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"張三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"}
{"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}
{"id":"5a955f08fa10b93ec48df37f","username":"wangwu","name":"王五","phone":"18610861865","birthday":"1995-05-04T16:00:00.000+0000"}

9)總結

若是有Spring Data開發經驗的話,切換到Spring Data Reactive的難度並不高。跟Spring WebFlux相似:原來返回User的話,那如今就返回Mono&lt;User&gt;;原來返回List&lt;User&gt;的話,那如今就返回Flux&lt;User&gt;

對於稍微複雜的業務邏輯或一些必要的異常處理,好比上邊的save方法,請必定採用響應式的編程方式來定義,從而一切都是異步非阻塞的。以下圖所示,從HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的針對不一樣server的適配器),到咱們編寫的Controller和DAO,以及異步數據庫驅動,構成了一個完整的異步非阻塞的管道,裏邊流動的就是響應式流。

title

1.3.3.4 使用WebClient開發響應式Http客戶端

下面,咱們用WebClient測試一下前邊幾個例子的成果。

1) /hello,返回Mono

@Test
    public void webClientTest1() throws InterruptedException {
        WebClient webClient = WebClient.create("http://localhost:8080");   // 1
        Mono<String> resp = webClient
                .get().uri("/hello") // 2
                .retrieve() // 3
                .bodyToMono(String.class);  // 4
        resp.subscribe(System.out::println);    // 5
        TimeUnit.SECONDS.sleep(1);  // 6
    }
  1. 建立WebClient對象並指定baseUrl;
  2. HTTP GET;
  3. 異步地獲取response信息;
  4. 將response body解析爲字符串;
  5. 打印出來;
  6. 因爲是異步的,咱們將測試線程sleep 1秒確保拿到response,也能夠像前邊的例子同樣用CountDownLatch

運行效果以下:
title

2) /user,返回Flux

爲了多演示一些不一樣的實現方式,下邊的例子咱們調整幾個地方,可是效果跟上邊是同樣的:

@Test
    public void webClientTest2() throws InterruptedException {
        WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build(); // 1
        webClient
                .get().uri("/user")
                .accept(MediaType.APPLICATION_STREAM_JSON) // 2
                .exchange() // 3
                .flatMapMany(response -> response.bodyToFlux(User.class))   // 4
                .doOnNext(System.out::println)  // 5
                .blockLast();   // 6
    }
  1. 此次咱們使用WebClientBuilder來構建WebClient對象;
  2. 配置請求Header:Content-Type: application/stream+json
  3. 獲取response信息,返回值爲ClientResponseretrive()能夠看作是exchange()方法的「快捷版」;
  4. 使用flatMap來將ClientResponse映射爲Flux;
  5. 只讀地peek每一個元素,而後打印出來,它並非subscribe,因此不會觸發流;
  6. 上個例子中sleep的方式有點low,blockLast方法,顧名思義,在收到最後一個元素前會阻塞,響應式業務場景中慎用。

運行效果以下:
title

3) /times,服務端推送

@Test
    public void webClientTest3() throws InterruptedException {
        WebClient webClient = WebClient.create("http://localhost:8080");
        webClient
                .get().uri("/times")
                .accept(MediaType.TEXT_EVENT_STREAM)    // 1
                .retrieve()
                .bodyToFlux(String.class)
                .log()  // 2
                .take(10)   // 3
                .blockLast();
    }
  1. 配置請求Header:Content-Type: text/event-stream,即SSE;
  2. 此次用log()代替doOnNext(System.out::println)來查看每一個元素;
  3. 因爲/times是一個無限流,這裏取前10個,會致使流被取消

運行效果以下:
title

1.3.3.5 讓數據在Http上雙向無限流動起來

許多朋友看到這個題目會想到Websocket,的確,Websocket確實能夠實現全雙工通訊,但它的數據傳輸並不是是徹底基於HTTP協議的,關於Websocket咱們後邊再聊。

下面咱們實現一個這樣兩個Endpoint:

  • POST方法的/events,「源源不斷」地收集數據,並存入數據庫;
  • GET方法的/events,「源源不斷」將數據庫中的記錄發出來。

0)準備

1、數據模型MyEvent

@Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Document(collection = "event") // 1
    public class MyEvent {
        @Id
        private Long id;    // 2
        private String message;
    }
  1. 指定collection名爲event
  2. 此次咱們使用表示時間的long型數據做爲ID。

2、DAO層:

public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { // 1
    }
  1. 下邊用到了能夠保存Flux的insert(Flux)方法,這個方法是在ReactiveMongoRepository中定義的。

3、簡單起見就不要Service層了,直接Controller:

@RestController
    @RequestMapping("/events")
    public class MyEventController {
        @Autowired
        private MyEventRepository myEventRepository;

        @PostMapping(path = "")
        public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) {   // 1
            // TODO
            return null;
        }

        @GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
        public Flux<MyEvent> getEvents() {  // 2
            // TODO
            return null;
        }
    }
  1. POST方法的接收數據流的Endpoint,因此傳入的參數是一個Flux,返回結果其實就看須要了,咱們用一個Mono&lt;Void&gt;做爲方法返回值,表示若是傳輸完的話只給一個「完成信號」就OK了;
  2. GET方法的無限發出數據流的Endpoint,因此返回結果是一個Flux&lt;MyEvent&gt;,不要忘了註解上produces = MediaType.APPLICATION_STREAM_JSON_VALUE

準備到此爲止,類以下。咱們來完成上邊的兩個TODO吧。

(5)Spring WebFlux快速上手——響應式Spring的道法術器

1)接收數據流的Endpoint

在客戶端,WebClient能夠接收text/event-streamapplication/stream+json格式的數據流,也能夠在請求的時候上傳一個數據流到服務器;
在服務端,WebFlux也支持接收一個數據流做爲請求參數,從而實現一個接收數據流的Endpoint。

咱們先看服務端。Controller中的loadEvents方法:

@PostMapping(path = "", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1
    public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) {
        return this.myEventRepository.insert(events).then();    // 2
    }
  1. 指定傳入的數據是application/stream+json,與getEvents方法的區別在於這個方法是consume這個數據流;
  2. insert返回的是保存成功的記錄的Flux,但咱們不須要,使用then方法表示「忽略數據元素,只返回一個完成信號」。

服務端寫好後,啓動之,再看一下客戶端怎麼寫(仍是放在src/test下):

@Test
    public void webClientTest4() {
        Flux<MyEvent> eventFlux = Flux.interval(Duration.ofSeconds(1))
                .map(l -> new MyEvent(System.currentTimeMillis(), "message-" + l)).take(5); // 1
        WebClient webClient = WebClient.create("http://localhost:8080");
        webClient
                .post().uri("/events")
                .contentType(MediaType.APPLICATION_STREAM_JSON) // 2
                .body(eventFlux, MyEvent.class) // 3
                .retrieve()
                .bodyToMono(Void.class)
                .block();
    }
  1. 聲明速度爲每秒一個MyEvent元素的數據流,不加take的話表示無限個元素的數據流;
  2. 聲明請求體的數據格式爲application/stream+json
  3. body方法設置請求體的數據。

運行一下這個測試,根據控制檯數據能夠看到是一條一條將數據發到/events的,看一下MongoDB中的數據:

(5)Spring WebFlux快速上手——響應式Spring的道法術器

2)發出無限流的Endpoint

回想一下前邊/user的例子,當數據庫中全部的內容都查詢出來以後,這個流就結束了,由於其後跟了一個「完成信號」,咱們能夠經過在UserServicefindAll()方法的流上增長log()操做符來觀察更詳細的日誌:

title

咱們能夠看到在三個onNext信號後是一個onComplete信號。

這樣的流是有限流,這個時候若是在數據庫中再新增一個User的話,已經結束的請求也不會再有新的內容出現了。

反觀/times請求,它會無限地發出SSE,而不會有「完成信號」出現,這是無限流。

咱們但願的狀況是不管是請求GET的/events以後,當全部數據都發完以後,不要結束,而是掛起等待新的數據。若是咱們用上邊的POST的/events傳入新的數據到數據庫後,新的數據會自動地流到客戶端。

這能夠在DAO層配置實現:

public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
        @Tailable   // 1
        Flux<MyEvent> findBy(); // 2
    }
  1. @Tailable註解的做用相似於linux的tail命令,被註解的方法將發送無限流,須要註解在返回值爲Flux這樣的多個元素的Publisher的方法上;
  2. findAll()是想要的方法,可是在ReactiveMongoRepository中咱們夠不着,因此使用findBy()代替。

而後完成Controller中的方法:

@GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MyEvent> getEvents() {
        return this.myEventRepository.findBy();
    }

不過,這還不夠,@Tailable僅支持有大小限制的(「capped」)collection,而自動建立的collection是不限制大小的,所以咱們須要先手動建立。Spring Boot提供的CommandLineRunner能夠幫助咱們實現這一點。

Spring Boot應用程序在啓動後,會遍歷CommandLineRunner接口的實例並運行它們的run方法。

@Bean   // 1
    public CommandLineRunner initData(MongoOperations mongo) {  // 2
        return (String... args) -> {    // 3
            mongo.dropCollection(MyEvent.class);    // 4
            mongo.createCollection(MyEvent.class, CollectionOptions.empty().size(200).capped()); // 5
        };
    }
  1. 對於複雜的Bean只能經過Java Config的方式配置,這也是爲何Spring3以後官方推薦這種配置方式的緣由,這段代碼能夠放到配置類中,本例咱們就直接放到啓動類WebFluxDemoApplication了;
  2. MongoOperations提供對MongoDB的操做方法,由Spring注入的mongo實例已經配置好,直接使用便可;
  3. CommandLineRunner也是一個函數式接口,其實例能夠用lambda表達;
  4. 若是有,先刪除collection,生產環境慎用這種操做;
  5. 建立一個記錄個數爲10的capped的collection,容量滿了以後,新增的記錄會覆蓋最舊的。

啓動應用,咱們檢查一下event collection:

(5)Spring WebFlux快速上手——響應式Spring的道法術器

OK,這個時候咱們請求一下http://localhost:8080/events,發現立馬返回了,並無掛起。緣由在於collection中一條記錄都沒有,而@Tailable起做用的前提是至少有一條記錄。

跑一下WebClient測試程序插入5條數據,而後再次請求:

(5)Spring WebFlux快速上手——響應式Spring的道法術器

請求是掛起的,這沒錯,可是隻有兩條數據,看WebClient測試程序的控制檯明明發出了5個請求啊。

緣由定義的CollectionOptions.empty().size(200).capped()中,size指的是以字節爲單位的大小,而且會向上取到256的整倍數,因此咱們剛纔定義的是256byte大小的collection,因此最多容納兩條記錄。咱們能夠這樣改一下:

CollectionOptions.empty().maxDocuments(200).size(100000).capped()

maxDocuments限制了記錄條數,size限制容量且是必須定義的,由於MongoDB不像關係型數據庫有嚴格的列和字段大小定義,鬼知道會存多大的數據進來,因此容量限制是必要的。

好了,再次啓動應用,先插入5條數據,而後請求/events,收到5條記錄後請求仍然掛起,在插入5條數據,curl客戶端又會陸續收到新的數據。

title

咱們用代碼搭建了圖中箭頭所表示的「管道」,看效果仍是很暢通的嘛。如今再回想咱們最初的那個Excel的例子,是否是感受這個demo頗有響應式的「範兒」了呢?

1.3.3.6 總結

這一節,咱們對WebFlux作了一個簡單的基於實例的介紹,相信你對響應式編程及其在WEB應用中如何發揮做用有了更多的體會,本章的實戰是比較基礎的,初衷是但願可以經過上手編寫代碼體會響應式編程的感受,由於切換到響應式思惟方式並不是易事。

這一章的核心關鍵詞其實翻來覆去就是:「異步非阻塞的響應式流」。咱們瞭解了異步非阻塞的好處,也知道如何讓數據流動起來,下面咱們就經過對實例的性能測試,藉助實實在在的數據,真切感覺一下異步非阻塞的「絲滑」。

相關文章
相關標籤/搜索