聊聊 Spring Boot 2.0 的 WebFlux## 前言
對照下 Spring Web MVC ,Spring Web MVC 是基於 Servlet API 和 Servlet 容器設計的。那麼 Spring WebFlux 確定不是基於前面二者,它基於 Reactive Streams API 和 Servlet 3.1+ 容器設計。java
那 Reactive Streams API 是什麼?
先理解 Stream 流是什麼?流是序列,是生產者生產,一個或多個消費者消費的元素序列。這種具體的設計模式成爲發佈訂閱模式。常見的流處理機制是 pull / push 模式。背壓是一種經常使用策略,使得發佈者擁有無限制的緩衝區存儲 item,用於確保發佈者發佈 item 太快時,不會去壓制訂閱者。
Reactive Streams (響應式流)是提供處理非阻塞背壓異步流的一種標準。主要針對的場景是運行時環境(包括 JVM 和 JS)和網絡。一樣,JDK 9 java.util.concurrent 包提供了兩個主要的 API 來處理響應流:
- Flow
- SubmissionPublisherreact
爲啥只能運行在 Servlet 3.1+ 容器?
你們知道,3.1 規範其中一個新特性是異步處理支持。
異步處理支持:Servlet 線程不需一直阻塞,即不須要到業務處理完畢再輸出響應,而後結束 Servlet線程。異步處理的做用是在接收到請求以後,Servlet 線程能夠將耗時的操做委派給另外一個線程來完成,在不生成響應的狀況下返回至容器。主要應用場景是針對業務處理較耗時的狀況,能夠減小服務器資源的佔用,而且提升併發處理速度。
因此 WebFlux 支持的容器有 Tomcat、Jetty(Non-Blocking IO API) ,也能夠像 Netty 和 Undertow 的自己就支持異步容器。在容器中 Spring WebFlux 會將輸入流適配成 Mono 或者 Flux 格式進行統一處理。git
Spring WebFlux 是什麼
先看這張圖,上面咱們瞭解了容器、響應流。這裏介紹下 Spring WebFlux 是什麼? Spring WebFlux 是 Spring 5 的一個新模塊,包含了響應式 HTTP 和 WebSocket 的支持,另外在上層服務端支持兩種不一樣的編程模型:
- 基於 Spring MVC 註解 @Controller 等
- 基於 Functional 函數式路由github
下面是兩個實現小案例,首先在 pom.xml 加入對應的依賴:web
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
基於 Spring MVC 註解 RESTful API
官方案例很簡單,以下:spring
@RestController
public class PersonController {
private final PersonRepository repository;
public PersonController(PersonRepository repository) {
this.repository = repository;
}
@PostMapping("/person")
Mono<Void> create(@RequestBody Publisher<Person> personStream) {
return this.repository.save(personStream).then();
}
@GetMapping("/person")
Flux<Person> list() {
return this.repository.findAll();
}
@GetMapping("/person/{id}")
Mono<Person> findById(@PathVariable String id) {
return this.repository.findOne(id);
}
}
可是 PersonRepository 這種 Spring Data Reactive Repositories 不支持 MySQL,進一步也不支持 MySQL 事務。因此用了 Reactivey 原來的 spring 事務管理就很差用了。jdbc jpa 的事務是基於阻塞 IO 模型的,若是 Spring Data Reactive 沒有升級 IO 模型去支持 JDBC,生產上的應用只能使用不強依賴事務的。也可使用透明的事務管理,即每次操做的時候以回調形式去傳遞數據庫鏈接 connection。數據庫
Spring Data Reactive Repositories 目前支持 Mongo、Cassandra、Redis、Couchbase 。編程
那提到的「用了 Reactivey 原來的 spring 事務管理就很差用了」,您可否再詳細介紹一下?另外,應用有強依賴事務,有沒有對應的解決方案?
咱們先看看這張圖。Spring Boot 2.0 這裏有兩條不一樣的線分別是:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive設計模式
因此這裏問題的答案是,若是使用 Spring Data Reactive ,原來的 Spring 針對 Spring Data (JDBC等)的事務管理確定不起做用了。由於原來的 Spring 事務管理(Spring Data JPA)都是基於 ThreadLocal 傳遞事務的,其本質是基於 阻塞 IO 模型,不是異步的。但 Reactive 是要求異步的,不一樣線程裏面 ThreadLocal 確定取不到值了。天然,咱們得想一想如何在使用 Reactive 編程是作到事務,有一種方式是 回調 方式,一直傳遞 conn :
newTransaction(conn ->{})api
由於每次操做數據庫也是異步的,因此 connection 在 Reactive 編程中沒法靠 ThreadLocal 傳遞了,只能放在參數上面傳遞。雖然會有必定的代碼侵入行。進一步,也能夠 kotlin 協程,去作到透明的事務管理,即把 conn 放到 協程的局部變量中去。
那 Spring Data Reactive Repositories 不支持 MySQL,進一步也不支持 MySQL 事務,怎麼辦?
答案是,這個問題其實和第一個問題也相關。 爲啥不支持 MySQL,即 JDBC 不支持。你們能夠看到 JDBC 是所屬 Spring Data 的。因此能夠等待 Spring Data Reactive Repositories 升級 IO 模型,去支持 MySQL。也能夠和上面也講到了,如何使用 Reactive 編程支持事務。
若是應用只能使用不強依賴數據事務,依舊使用 MySQL ,可使用下面的實現,代碼以下:
@RestController
@RequestMapping(value = "/city")
public class CityRestController {
@Autowired
private CityService cityService;
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
public Mono<City> findOneCity(@PathVariable("id") Long id) {
return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.findCityById(id)));
}
@RequestMapping(method = RequestMethod.GET)
public Flux<City> findAllCity() {
return Flux.create(cityFluxSink -> {
cityService.findAllCity().forEach(city -> {
cityFluxSink.next(city);
});
cityFluxSink.complete();
});
}
@RequestMapping(method = RequestMethod.POST)
public Mono<Long> createCity(@RequestBody City city) {
return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.saveCity(city)));
}
@RequestMapping(method = RequestMethod.PUT)
public Mono<Long> modifyCity(@RequestBody City city) {
return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.updateCity(city)));
}
@RequestMapping(value = "/{id}", method = RequestMethod.DELETE)
public Mono<Long> modifyCity(@PathVariable("id") Long id) {
return Mono.create(cityMonoSink -> cityMonoSink.success(cityService.deleteCity(id)));
}
}
findAllCity 方法中,利用 Flux.create 方法對響應進行建立封裝成 Flux 數據。而且使用 lambda 寫數據流的處理函數會十分的方便。
Service 層依舊是之前那套邏輯,業務服務層接口以下:
public interface CityService {
/**
* 獲取城市信息列表
*
* @return
*/
List<City> findAllCity();
/**
* 根據城市 ID,查詢城市信息
*
* @param id
* @return
*/
City findCityById(Long id);
/**
* 新增城市信息
*
* @param city
* @return
*/
Long saveCity(City city);
/**
* 更新城市信息
*
* @param city
* @return
*/
Long updateCity(City city);
/**
* 根據城市 ID,刪除城市信息
*
* @param id
* @return
*/
Long deleteCity(Long id);
}
具體案例在個人 Github:https://github.com/JeffLi1993/springboot-learning-example
基於 Functional 函數式路由實現 RESTful API
建立一個 Route 類來定義 RESTful HTTP 路由
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class Routes {
private CityService cityService;
public Routes(CityService cityService) {
this.cityService = cityService;
}
@Bean
public RouterFunction<?> routerFunction() {
return route(
GET("/api/city").and(accept(MediaType.APPLICATION_JSON)), cityService:: findAllCity).and(route(
GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), cityService:: findCityById)
);
}
}
RoouterFunction 相似 Spring Web MVC 的 @RequestMapping ,用來定義路由信息,每一個路由會映射到一個處理方法,當接受 HTTP 請求時候會調用該處理方法。
建立 HttpServerConfig 自定義 Http Server,這裏建立一個 Netty HTTP 服務器:
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import reactor.ipc.netty.http.server.HttpServer;
@Configuration
public class HttpServerConfig {
@Autowired
private Environment environment;
@Bean
public HttpServer httpServer(RouterFunction<?> routerFunction) {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer server = HttpServer.create("localhost", Integer.valueOf(environment.getProperty("server.port")));
server.newHandler(adapter);
return server;
}
}
天然推薦 Netty 來運行 Reactive 應用,由於 Netty 是基於異步和事件驅動的。