本系列文章索引《響應式Spring的道法術器》
前情提要 lambda與函數式 | Reactor 3快速上手
本文源碼html
Spring WebFlux是隨Spring 5推出的響應式Web框架。java
1)服務端技術棧react
Spring提供了完整的支持響應式的服務端技術棧。linux
如上圖所示,左側爲基於spring-webmvc的技術棧,右側爲基於spring-webflux的技術棧,git
@Controller
、@RequestMapping
)的開發模式;由此看來,Spring WebFlux與Vert.x有一些相通之處,都是創建在非阻塞的異步I/O和事件驅動的基礎之上的。github
2)響應式Http客戶端web
此外,Spring WebFlux也提供了一個響應式的Http客戶端API WebClient
。它能夠用函數式的方式異步非阻塞地發起Http請求並處理響應。其底層也是由Netty提供的異步支持。ajax
咱們能夠把WebClient
看作是響應式的RestTemplate
,與後者相比,前者:spring
固然,與服務端對應的,Spring WebFlux也提供了響應式的Websocket客戶端API。mongodb
簡單介紹這些,讓咱們來Coding吧~
本節,咱們經過如下幾個例子來逐步深刻地瞭解它的使用方法:
** 1. 先介紹一下使用Spring WebMVC風格的基於註解的方式如何編寫響應式的Web服務,這幾乎沒有學習成本,很是贊。雖然這種方式在開發上與Spring WebMVC變化不大,可是框架底層已是徹底的響應式技術棧了;
WebClient
與前幾步作好的服務端進行通訊;Spring Boot 2是基於Spring 5的,其中一個比較大的更新就在於支持包括spring-webflux和響應式的spring-data在內的響應式模塊。Spring Boot 2即將發佈正式版,不過目前的版本從功能上已經完備,下邊的例子咱們就用Spring Boot 2在進行搭建。
咱們首先用Spring WebMVC開發一個只有Controller層的簡單的Web服務,而後僅僅作一點點調整就可切換爲基於Spring WebFlux的具備一樣功能的Web服務。
咱們使用Spring Boot 2搭建項目框架。
如下截圖來自IntelliJ IDEA,不過其餘IDE也都是相似的。
1)基於Spring Initializr建立項目
本節的例子很簡單,不涉及Service層和Dao層,所以只選擇spring-webmvc便可,也就是「Web」的starter。
也可使用網頁版的https://start.spring.io來建立項目:
建立後的項目POM中,包含下邊的依賴,即表示基於Spring WebMVC:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2)建立Controller和Endpoint
建立Controller類HelloController
,僅提供一個Endpoint:/hello
:
@RestController public class HelloController { @GetMapping("/hello") public String hello() { return "Welcome to reactive world ~"; } }
3)啓動應用
OK了,一個簡單的基於Spring WebMVC的Web服務。咱們新增了HelloController.java
,修改了application.properties
。
使用IDE啓動應用,或使用maven命令:
mvn spring-boot:run
經過打印的log能夠看到,服務運行於Tomcat的8080端口:
測試Endpoint。在瀏覽器中訪問http://localhost:8080/hello
,或運行命令:
curl http://localhost:8080/hello
返回Welcome to reactive world ~
。
基於Spring WebFlux的項目與上邊的步驟一致,僅有兩點不一樣。咱們此次偷個懶,就不重新建項目了,修改一下上邊的項目:
4)依賴「Reactive Web」的starter而不是「Web」
修改項目POM,調整依賴使其基於Spring WebFlux:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-WebFlux</artifactId> <!--【改】增長「flux」四個字符--> </dependency>
5)Controller中處理請求的返回類型採用響應式類型
@RestController public class HelloController { @GetMapping("/hello") public Mono<String> hello() { // 【改】返回類型爲Mono<String> return Mono.just("Welcome to reactive world ~"); // 【改】使用Mono.just生成響應式數據 } }
6)啓動應用
僅須要上邊兩步就改完了,是否是很簡單,一樣的方法啓動應用。啓動後發現應用運行於Netty上:
訪問http://localhost:8080/hello
,結果與Spring WebMVC的相同。
7)總結
從上邊這個很是很是簡單的例子中能夠看出,Spring真是用心良苦,WebFlux提供了與以前WebMVC相同的一套註解來定義請求的處理,使得Spring使用者遷移到響應式開發方式的過程變得異常輕鬆。
雖然咱們只修改了少許的代碼,可是其實這個簡單的項目已經脫胎換骨了。整個技術棧從命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】變成了響應式的、異步非阻塞的【spring-webflux + Reactor + Netty】。
Netty是一套異步的、事件驅動的網絡應用程序框架和工具,可以開發高性能、高可靠性的網絡服務器和客戶端程序,所以與一樣是異步的、事件驅動的響應式編程範式一拍即合。
下邊的內容瞭解便可,就不實戰了。
在Java 7推出異步I/O庫,以及Servlet3.1增長了對異步I/O的支持以後,Tomcat等Servlet容器也隨後開始支持異步I/O,而後Spring WebMVC也增長了對Reactor庫的支持,因此上邊第4)步若是不是將spring-boot-starter-web
替換爲spring-boot-starter-WebFlux
,而是增長reactor-core
的依賴的話,仍然能夠用註解的方式開發基於Tomcat的響應式應用。
既然是響應式編程了,有些朋友可能會想統一用函數式的編程風格,WebFlux知足你。WebFlux提供了一套函數式接口,能夠用來實現相似MVC的效果。咱們先接觸兩個經常使用的。
再回頭瞧一眼上邊例子中咱們用Controller
定義定義對Request的處理邏輯的方式,主要有兩個點:
@RequestMapping
註解定義好這個方法對什麼樣url進行響應。在WebFlux的函數式開發模式中,咱們用HandlerFunction
和RouterFunction
來實現上邊這兩點。
HandlerFunction
至關於Controller
中的具體處理方法,輸入爲請求,輸出爲裝在Mono
中的響應:Mono<T extends ServerResponse> handle(ServerRequest request);
RouterFunction
,顧名思義,路由,至關於@RequestMapping
,用來判斷什麼樣的url映射到那個具體的HandlerFunction
,輸入爲請求,輸出爲裝在Mono裏邊的Handlerfunction
:Mono<HandlerFunction<T>> route(ServerRequest request);
咱們看到,在WebFlux中,請求和響應再也不是WebMVC中的ServletRequest
和ServletResponse
,而是ServerRequest
和ServerResponse
。後者是在響應式編程中使用的接口,它們提供了對非阻塞和回壓特性的支持,以及Http消息體與響應式類型Mono和Flux的轉換方法。
下面咱們用函數式的方式開發兩個Endpoint:
/time
返回當前的時間;/date
返回當前的日期。對於這兩個需求,HandlerFunction很容易寫:
// 返回包含時間字符串的ServerResponse HandlerFunction<ServerResponse> timeFunction = request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body( Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); // 返回包含日期字符串的ServerResponse HandlerFunction<ServerResponse> dateFunction = request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body( Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
那麼RouterFunction爲:
RouterFunction<ServerResponse> router = RouterFunctions.route(GET("/time"), timeFunction) .andRoute(GET("/date"), dateFunction);
按照常見的套路,
RouterFunctions
是工具類。
不過這麼寫在業務邏輯複雜的時候不太好組織,咱們一般採用跟MVC相似的代碼組織方式,將同類業務的HandlerFunction放在一個類中,而後在Java Config中將RouterFunction配置爲Spring容器的Bean。咱們繼續在第一個例子的代碼上開發:
1)建立統一存放處理時間的Handler類
建立TimeHandler.java
:
import static org.springframework.web.reactive.function.server.ServerResponse.ok; @Component public class TimeHandler { public Mono<ServerResponse> getTime(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); } public Mono<ServerResponse> getDate(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class); } }
因爲出現次數一般比較多,這裏靜態引入
ServerResponse.ok()
方法。
2)在Spring容器配置RouterFunction
咱們採用Spring如今比較推薦的Java Config的配置Bean的方式,建立用於存放Router的配置類RouterConfig.java
:
import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RouterFunctions.route; @Configuration public class RouterConfig { @Autowired private TimeHandler timeHandler; @Bean public RouterFunction<ServerResponse> timerRouter() { return route(GET("/time"), req -> timeHandler.getTime(req)) .andRoute(GET("/date"), timeHandler::getDate); // 這種方式相對於上一行更加簡潔 } }
3)重啓服務試一試
重啓服務測試一下吧:
$ curl http://localhost:8080/date Today is 2018-02-26 $ curl http://localhost:8080/time Now is 21:12:53
咱們可能會遇到一些須要網頁與服務器端保持鏈接(起碼看上去是保持鏈接)的需求,好比相似微信網頁版的聊天類應用,好比須要頻繁更新頁面數據的監控系統頁面或股票看盤頁面。咱們一般採用以下幾種技術:
既然響應式編程是一種基於數據流的編程範式,天然在服務器推送方面駕輕就熟,咱們基於函數式方式再增長一個Endpoint /times
,能夠每秒推送一次時間。
1)增長Handler方法
TimeHandler.java
:
public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) { return ok().contentType(MediaType.TEXT_EVENT_STREAM).body( // 1 Flux.interval(Duration.ofSeconds(1)). // 2 map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class); }
MediaType.TEXT_EVENT_STREAM
表示Content-Type
爲text/event-stream
,即SSE;2)配置router
RouterConfig.java
:
@Bean public RouterFunction<ServerResponse> timerRouter() { return route(GET("/time"), timeHandler::getTime) .andRoute(GET("/date"), timeHandler::getDate) .andRoute(GET("/times"), timeHandler::sendTimePerSec); // 增長這一行 }
3)重啓服務試一下
重啓服務後,測試一下:
curl http://localhost:8080/times data:21:32:22 data:21:32:23 data:21:32:24 data:21:32:25 data:21:32:26 <Ctrl+C>
就醬,訪問這個url會收到持續不斷的報時數據(時間數據是在data
中的)。
那麼用註解的方式如何進行服務端推送呢,這個演示就融到下一個例子中吧~
開發基於響應式流的應用,就像是在搭建數據流流動的管道,從而異步的數據可以順暢流過每一個環節。前邊的例子主要聚焦於應用層,然而絕大多數系統免不了要與數據庫進行交互,因此咱們也須要響應式的持久層API和支持異步的數據庫驅動。就像從自來水廠到家裏水龍頭這個管道中,若是任何一個環節發生了阻塞,那就可能形成總體吞吐量的降低。
各個數據庫都開始陸續推出異步驅動,目前Spring Data支持的能夠進行響應式數據訪問的數據庫有MongoDB、Redis、Apache Cassandra和CouchDB。今天咱們用MongoDB來寫一個響應式demo。
咱們這個例子很簡單,就是關於User
的增刪改查,以及基於註解的服務端推送。
1)編寫User
既然是舉例,咱們隨便定義幾個屬性吧~
public class User { private String id; private String username; private String phone; private String email; private String name; private Date birthday; }
而後爲了方便開發,咱們引入lombok庫,它可以經過註解的方式爲咱們添加必要的Getter/Setter/hashCode()/equals()/toString()/構造方法等,添加依賴(版本可自行到http://search.maven.org搜索最新):
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency>
而後爲User
添加註解:
@Data // 生成無參構造方法/getter/setter/hashCode/equals/toString @AllArgsConstructor // 生成全部參數構造方法 @NoArgsConstructor // @AllArgsConstructor會致使@Data不生成無參構造方法,須要手動添加@NoArgsConstructor,若是沒有無參構造方法,可能會致使好比com.fasterxml.jackson在序列化處理時報錯 public class User { ...
咱們能夠利用IDE看一下生成的方法(以下圖黃框所示):
可能須要先在IDE中進行少許配置以便支持lombok的註解,好比IntelliJ IDEA:
- 安裝「lombok plugin」:
- 開啓對註解編譯的支持:
lombok對於Java開發者來講絕對算是個福音了,但願使用Kotlin的朋友不要笑話咱們土哦~
2)增長Spring Data的依賴
在POM中增長Spring Data Reactive Mongo的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
MongoDB是文檔型的NoSQL數據庫,所以,咱們使用@Document
註解User
類:
@Data @AllArgsConstructor @Document public class User { @Id private String id; // 註解屬性id爲ID @Indexed(unique = true) // 註解屬性username爲索引,而且不能重複 private String username; private String name; private String phone; private Date birthday; }
OK,這樣咱們的模型就準備好了。MongoDB會自動建立collection,默認爲類名首字母小寫,也就是user
。
3)配置數據源
Spring Boot爲咱們搞定了幾乎全部的配置,太讚了,下邊是MongoDB的默認配置:
# MONGODB (MongoProperties) spring.data.mongodb.authentication-database= # Authentication database name. spring.data.mongodb.database=test # Database name. spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use. spring.data.mongodb.grid-fs-database= # GridFS database name. spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri. spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri. spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri. spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories. spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials. spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri.
請根據須要添加自定義的配置,好比個人MongoDB是跑在IP爲192.168.0.101的虛擬機的Docker中的,就可在application.properties
中增長一條:
spring.data.mongodb.host=192.168.0.101
4)增長DAO層repository
與非響應式Spring Data的CrudReposity
對應的,響應式的Spring Data也提供了相應的Repository庫:ReactiveCrudReposity
,固然,咱們也可使用它的子接口ReactiveMongoRepository
。
咱們增長UserRepository
:
public interface UserRepository extends ReactiveCrudRepository<User, String> { // 1 Mono<User> findByUsername(String username); // 2 Mono<Long> deleteByUsername(String username); }
ReactiveCrudRepository
的泛型分別是User
和ID
的類型;ReactiveCrudRepository
已經提供了基本的增刪改查的方法,根據業務須要,咱們增長四個方法(在此膜拜一下Spring團隊的牛人們,使得咱們僅需按照規則定義接口方法名便可完成DAO層邏輯的開發,牛~)5)Service層
因爲業務邏輯幾乎爲零,只是簡單調用了DAO層,直接貼代碼:
@Service public class UserService { @Autowired private UserRepository userRepository; /** * 保存或更新。 * 若是傳入的user沒有id屬性,因爲username是unique的,在重複的狀況下有可能報錯, * 這時找到以保存的user記錄用傳入的user更新它。 */ public Mono<User> save(User user) { return userRepository.save(user) .onErrorResume(e -> // 1 userRepository.findByUsername(user.getUsername()) // 2 .flatMap(originalUser -> { // 4 user.setId(originalUser.getId()); return userRepository.save(user); // 3 })); } public Mono<Long> deleteByUsername(String username) { return userRepository.deleteByUsername(username); } public Mono<User> findByUsername(String username) { return userRepository.findByUsername(username); } public Flux<User> findAll() { return userRepository.findAll(); } }
onErrorResume
進行錯誤處理;User -> Publisher
,因此用flatMap
。6)Controller層
直接貼代碼:
@RestController @RequestMapping("/user") public class UserController { @Autowired private UserService userService; @PostMapping("") public Mono<User> save(User user) { return this.userService.save(user); } @DeleteMapping("/{username}") public Mono<Long> deleteByUsername(@PathVariable String username) { return this.userService.deleteByUsername(username); } @GetMapping("/{username}") public Mono<User> findByUsername(@PathVariable String username) { return this.userService.findByUsername(username); } @GetMapping("") public Flux<User> findAll() { return this.userService.findAll(); } }
7)啓動應用測試一下
因爲涉及到POST和DELETE方法的請求,建議用支持RESTful的client來測試,好比「Restlet client」:
如圖,增長操做是成功的,只要username不變,再次發送請求會更新該記錄。
圖中birthday的時間差8小時,不去管它。
用一樣的方法增長一個李四,以後咱們再來測試一下查詢。
1) 根據用戶名查詢(METHOD:GET URL:http://localhost:8080/user/zhangsan),下邊輸出是格式化的JSON:
{ "id": "5a9504a167646d057051e229", "username": "zhangsan", "name": "張三", "phone": "18610861861", "birthday": "1989-12-31T16:00:00.000+0000" }
2) 查詢所有(METHOD:GET URL:http://localhost:8080/user)
[{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"張三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"},{"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}]
測試一下刪除(METHOD:DELETE URL:http://localhost:8080/user/zhangsan),返回值爲1,再查詢所有,發現張三已經被刪除了,OK。
8)stream+json
看到這裏細心的朋友可能會有點嘀咕,怎麼看是否是異步的呢?畢竟查詢所有的時候,結果都用中括號括起來了,這和原來返回List<User>
的效果彷佛沒多大區別。假設一下查詢100個數據,若是是異步的話,以咱們對「異步響應式流」的印象彷佛應該是一個一個至少是一批一批的到達客戶端的嘛。咱們加個延遲驗證一下:
@GetMapping("") public Flux<User> findAll() { return this.userService.findAll().delayElements(Duration.ofSeconds(1)); }
每一個元素都延遲1秒,如今咱們在數據庫里弄三條記錄,而後請求查詢所有的那個URL,發現並非像/times
同樣一秒一個地出來,而是3秒以後一起出來的。果真如此,這一點都不響應式啊!
與/times
相似,咱們也加一個MediaType,不過因爲這裏返回的是JSON,所以不能使用TEXT_EVENT_STREAM
,而是使用APPLICATION_STREAM_JSON
,即application/stream+json
格式。
@GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<User> findAll() { return this.userService.findAll().delayElements(Duration.ofSeconds(2)); }
produces
後邊的值應該是application/stream+json
字符串,所以用APPLICATION_STREAM_JSON_VALUE
。重啓服務再次請求,發現三個user是一秒一個的速度出來的,中括號也沒有了,而是一個一個獨立的JSON值構成的json stream:
{"id":"5a9504a167646d057051e229","username":"zhangsan","name":"張三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"} {"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"} {"id":"5a955f08fa10b93ec48df37f","username":"wangwu","name":"王五","phone":"18610861865","birthday":"1995-05-04T16:00:00.000+0000"}
9)總結
若是有Spring Data開發經驗的話,切換到Spring Data Reactive的難度並不高。跟Spring WebFlux相似:原來返回User
的話,那如今就返回Mono<User>
;原來返回List<User>
的話,那如今就返回Flux<User>
。
對於稍微複雜的業務邏輯或一些必要的異常處理,好比上邊的save方法,請必定採用響應式的編程方式來定義,從而一切都是異步非阻塞的。以下圖所示,從HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的針對不一樣server的適配器),到咱們編寫的Controller和DAO,以及異步數據庫驅動,構成了一個完整的異步非阻塞的管道,裏邊流動的就是響應式流。
WebClient
開發響應式Http客戶端下面,咱們用WebClient測試一下前邊幾個例子的成果。
1) /hello,返回Mono
@Test public void webClientTest1() throws InterruptedException { WebClient webClient = WebClient.create("http://localhost:8080"); // 1 Mono<String> resp = webClient .get().uri("/hello") // 2 .retrieve() // 3 .bodyToMono(String.class); // 4 resp.subscribe(System.out::println); // 5 TimeUnit.SECONDS.sleep(1); // 6 }
WebClient
對象並指定baseUrl;CountDownLatch
。運行效果以下:
2) /user,返回Flux
爲了多演示一些不一樣的實現方式,下邊的例子咱們調整幾個地方,可是效果跟上邊是同樣的:
@Test public void webClientTest2() throws InterruptedException { WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build(); // 1 webClient .get().uri("/user") .accept(MediaType.APPLICATION_STREAM_JSON) // 2 .exchange() // 3 .flatMapMany(response -> response.bodyToFlux(User.class)) // 4 .doOnNext(System.out::println) // 5 .blockLast(); // 6 }
Content-Type: application/stream+json
;ClientResponse
,retrive()
能夠看作是exchange()
方法的「快捷版」;flatMap
來將ClientResponse映射爲Flux;blockLast
方法,顧名思義,在收到最後一個元素前會阻塞,響應式業務場景中慎用。運行效果以下:
3) /times,服務端推送
@Test public void webClientTest3() throws InterruptedException { WebClient webClient = WebClient.create("http://localhost:8080"); webClient .get().uri("/times") .accept(MediaType.TEXT_EVENT_STREAM) // 1 .retrieve() .bodyToFlux(String.class) .log() // 2 .take(10) // 3 .blockLast(); }
Content-Type: text/event-stream
,即SSE;log()
代替doOnNext(System.out::println)
來查看每一個元素;/times
是一個無限流,這裏取前10個,會致使流被取消;運行效果以下:
許多朋友看到這個題目會想到Websocket,的確,Websocket確實能夠實現全雙工通訊,但它的數據傳輸並不是是徹底基於HTTP協議的,關於Websocket咱們後邊再聊。
下面咱們實現一個這樣兩個Endpoint:
/events
,「源源不斷」地收集數據,並存入數據庫;/events
,「源源不斷」將數據庫中的記錄發出來。0)準備
1、數據模型MyEvent
:
@Data @AllArgsConstructor @NoArgsConstructor @Document(collection = "event") // 1 public class MyEvent { @Id private Long id; // 2 private String message; }
event
;2、DAO層:
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { // 1 }
insert(Flux)
方法,這個方法是在ReactiveMongoRepository
中定義的。3、簡單起見就不要Service層了,直接Controller:
@RestController @RequestMapping("/events") public class MyEventController { @Autowired private MyEventRepository myEventRepository; @PostMapping(path = "") public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) { // 1 // TODO return null; } @GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<MyEvent> getEvents() { // 2 // TODO return null; } }
Mono<Void>
做爲方法返回值,表示若是傳輸完的話只給一個「完成信號」就OK了;Flux<MyEvent>
,不要忘了註解上produces = MediaType.APPLICATION_STREAM_JSON_VALUE
。準備到此爲止,類以下。咱們來完成上邊的兩個TODO吧。
1)接收數據流的Endpoint
在客戶端,WebClient
能夠接收text/event-stream
和application/stream+json
格式的數據流,也能夠在請求的時候上傳一個數據流到服務器;
在服務端,WebFlux也支持接收一個數據流做爲請求參數,從而實現一個接收數據流的Endpoint。
咱們先看服務端。Controller中的loadEvents
方法:
@PostMapping(path = "", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1 public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) { return this.myEventRepository.insert(events).then(); // 2 }
application/stream+json
,與getEvents
方法的區別在於這個方法是consume
這個數據流;insert
返回的是保存成功的記錄的Flux,但咱們不須要,使用then
方法表示「忽略數據元素,只返回一個完成信號」。服務端寫好後,啓動之,再看一下客戶端怎麼寫(仍是放在src/test
下):
@Test public void webClientTest4() { Flux<MyEvent> eventFlux = Flux.interval(Duration.ofSeconds(1)) .map(l -> new MyEvent(System.currentTimeMillis(), "message-" + l)).take(5); // 1 WebClient webClient = WebClient.create("http://localhost:8080"); webClient .post().uri("/events") .contentType(MediaType.APPLICATION_STREAM_JSON) // 2 .body(eventFlux, MyEvent.class) // 3 .retrieve() .bodyToMono(Void.class) .block(); }
take
的話表示無限個元素的數據流;application/stream+json
;body
方法設置請求體的數據。運行一下這個測試,根據控制檯數據能夠看到是一條一條將數據發到/events
的,看一下MongoDB中的數據:
2)發出無限流的Endpoint
回想一下前邊/user
的例子,當數據庫中全部的內容都查詢出來以後,這個流就結束了,由於其後跟了一個「完成信號」,咱們能夠經過在UserService
的findAll()
方法的流上增長log()
操做符來觀察更詳細的日誌:
咱們能夠看到在三個onNext
信號後是一個onComplete
信號。
這樣的流是有限流,這個時候若是在數據庫中再新增一個User的話,已經結束的請求也不會再有新的內容出現了。
反觀/times
請求,它會無限地發出SSE,而不會有「完成信號」出現,這是無限流。
咱們但願的狀況是不管是請求GET的/events
以後,當全部數據都發完以後,不要結束,而是掛起等待新的數據。若是咱們用上邊的POST的/events
傳入新的數據到數據庫後,新的數據會自動地流到客戶端。
這能夠在DAO層配置實現:
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { @Tailable // 1 Flux<MyEvent> findBy(); // 2 }
@Tailable
註解的做用相似於linux的tail
命令,被註解的方法將發送無限流,須要註解在返回值爲Flux這樣的多個元素的Publisher的方法上;findAll()
是想要的方法,可是在ReactiveMongoRepository
中咱們夠不着,因此使用findBy()
代替。而後完成Controller中的方法:
@GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<MyEvent> getEvents() { return this.myEventRepository.findBy(); }
不過,這還不夠,@Tailable
僅支持有大小限制的(「capped」)collection,而自動建立的collection是不限制大小的,所以咱們須要先手動建立。Spring Boot提供的CommandLineRunner
能夠幫助咱們實現這一點。
Spring Boot應用程序在啓動後,會遍歷CommandLineRunner接口的實例並運行它們的run方法。
@Bean // 1 public CommandLineRunner initData(MongoOperations mongo) { // 2 return (String... args) -> { // 3 mongo.dropCollection(MyEvent.class); // 4 mongo.createCollection(MyEvent.class, CollectionOptions.empty().size(200).capped()); // 5 }; }
WebFluxDemoApplication
了;MongoOperations
提供對MongoDB的操做方法,由Spring注入的mongo實例已經配置好,直接使用便可;CommandLineRunner
也是一個函數式接口,其實例能夠用lambda表達;啓動應用,咱們檢查一下event
collection:
OK,這個時候咱們請求一下http://localhost:8080/events
,發現立馬返回了,並無掛起。緣由在於collection中一條記錄都沒有,而@Tailable
起做用的前提是至少有一條記錄。
跑一下WebClient測試程序插入5條數據,而後再次請求:
請求是掛起的,這沒錯,可是隻有兩條數據,看WebClient測試程序的控制檯明明發出了5個請求啊。
緣由定義的CollectionOptions.empty().size(200).capped()
中,size
指的是以字節爲單位的大小,而且會向上取到256的整倍數,因此咱們剛纔定義的是256byte大小的collection,因此最多容納兩條記錄。咱們能夠這樣改一下:
CollectionOptions.empty().maxDocuments(200).size(100000).capped()
maxDocuments
限制了記錄條數,size
限制容量且是必須定義的,由於MongoDB不像關係型數據庫有嚴格的列和字段大小定義,鬼知道會存多大的數據進來,因此容量限制是必要的。
好了,再次啓動應用,先插入5條數據,而後請求/events
,收到5條記錄後請求仍然掛起,在插入5條數據,curl客戶端又會陸續收到新的數據。
咱們用代碼搭建了圖中箭頭所表示的「管道」,看效果仍是很暢通的嘛。如今再回想咱們最初的那個Excel的例子,是否是感受這個demo頗有響應式的「範兒」了呢?
這一節,咱們對WebFlux作了一個簡單的基於實例的介紹,相信你對響應式編程及其在WEB應用中如何發揮做用有了更多的體會,本章的實戰是比較基礎的,初衷是但願可以經過上手編寫代碼體會響應式編程的感受,由於切換到響應式思惟方式並不是易事。
這一章的核心關鍵詞其實翻來覆去就是:「異步非阻塞的響應式流」。咱們瞭解了異步非阻塞的好處,也知道如何讓數據流動起來,下面咱們就經過對實例的性能測試,藉助實實在在的數據,真切感覺一下異步非阻塞的「絲滑」。