從Reactor到WebFlux

寫在前面

爲了應對高併發場景下到服務端編程需求,微軟最早提出了一種異步編程到方案Reactive Programming,也就是反應式編程。react

以後在Java社區就出現了RxJava和Akka Stream等技術方案,讓Java平臺在反應式編程上有了多種選擇。算法

反應式編程

函數式編程編程

反應式編程通常是基於函數式編程實現的,函數式編程有以下特色:安全

  1. 惰性計算
  2. 函數是第一公民
  3. 只使用表達式而不是用語句

反應式編程是一種基於數據流,傳遞變化,聲明式的編程範式。服務器

事件驅動網絡

思想是組件之間交互經過鬆耦合的生產者和消費者來實現的,而且事件以異步,非阻塞方式進行發送和接收。數據結構

事件驅動是系統經過推模式實現的,也就是生產者在消息產生時推送數據給消費者進行處理,而不是讓消費者不斷輪詢或等待數據實現的。併發

響應及時框架

因爲反應式是異步的,好比進行數據處理的話,在交出任務以後就快速返回,而不是阻塞的等待任務執行完畢再返回。任務的執行給到後臺線程執行,等任務處理完成以後返回,好比Java8的CompletableFuture。異步

事件彈性

事件驅動系統是鬆耦合的,上下游之間不是直接依賴,可是在Debug時成本更高一些。

Spring Reactor

Spring Reactor是Pivotal基於反應式編程實現的一種方案。是一種非阻塞,事件驅動的編程方案,使用函數式編程實現。

觀察者模式

反應式編程和命令式編程在迭代器上的實現:

  • 事件 Iterable (pull) Observable (push)
  • 獲取數據 T next() onNext(T)
  • 發現異常 throws Exception onError(Exception)
  • 處理完成 hasNext() onCompleted()
  1. Publisher推送數據給Subscriber,觸發onNext()方法,在處理完成或發生異常時觸發onCompleted()和onError()方法。
  2. Publisher發生異常時,觸發Subscriber的onError()方法,進行異常捕獲處理。
  3. Publisher每次推送都會觸發一次onNext()方法,全部推送完成時,最後觸發onCompleted()方法。

背壓

若是Publisher發佈消息太快,超過Subscriber處理速度該怎麼辦?響應式編程引入了背壓概念,使得Subscriber可以控制消費消息的速度。

Reactive Stream

在Java生態中,Netflix的RxJava,TypeSafe的Scala,Akaka,pivatol的Sping,Reactor都是反應式編程的框架。

Stream不是集合元素,不是數據結構,也不保存數據,只是關於算法和計算,更像一種能夠編程的迭代器。

Stream能夠並行操做,迭代器只能命令式的,串型操做。 並行操做是將數據分紅多段,每個在不一樣線程中處理,最後將結果一塊兒輸出。這樣能夠大大利用硬件資源。

Reactor主要模塊基於Netty實現:

  • reactor-core:包含核心API
  • reactor-ipc:複雜高性能網絡通訊

核心類:

  • Mono:表明0到1個元素髮布者
  • Flux:表明0到N個元素髮布者
  • Scheduler:表明事件驅動的反應流調度器,一般由各類線程池實現。

反應式編程概念總結:

  1. ReactiveStreams 是一套反應式編程 標準 和 規範;
  2. Reactor 是基於 ReactiveStreams 一套 反應式編程框架;
  3. WebFlux 以 Reactor 爲基礎,實現 Web 領域的 反應式編程框架。

Reactor開發

Reactor使用方式上基本分爲三步:

  1. 開始階段建立
  2. 中間階段處理
  3. 最終階段消費

建立階段

Reactor編程須要先建立出Mono或Flux。

同步調用結果建立對象

Mono<String> helloWorld = Mono.just("Hello World"); // 能夠指定序列中包含的所有元素

Flux<String> fewWords = Flux.just("Hello","World");

Flux<String> manyWords = Flux.fromIterable(words);

這種方式通常用在通過一系列非IO型操做後,獲得一個對應的對象,當須要將這個對象交給IO操做時,能夠經過這種方式轉換成Mono或Flux。

異步調用結果建立

若是異步獲得結果,好比CompletableFuture能夠建立一個Mono:

Mono.fromFuture(completableFuture);

若是這個異步調用不返回CompletableFuture,而有本身的回調方法,那麼可使用:

static<T>Mono<T>create(Consumer<MonoSink<T>>callback)
Mono.create(sink ->{

 ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url,String.class);

 entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>(){
 
 @Override
 public void onSuccess(ResponseEntity<String> result){
  sink.success(result.getBody());
 }

 @Override
 public void onFailure(Throwable ex)
 {
   sink.error(ex);
 }
});
});

處理階段

在進行Mono和Flux處理階段,通常使用filtermapflatMapthenzipreduce等。

map,flatMap,then 三個頻率使用比較高。

數據處理方式

then

是下一步意思,表明執行順序的下一步,不表示下一步依賴於上一步。then方法參數只是一個Mono,入參不是上一步的執行結果。

flatMap和map的參數是Function,是上一步執行的結果。

flatMap和map

傳統的命令式編程

Object result1 = doStep1(params);

Object result2 = doStep2(result1);

Object result3 = doStep3(result2);

對應的反應式編程

Mono.just(params) .flatMap(v -> doStep1(v)) .flatMap(v -> doStep2(v)) .flatMap(v -> doStep3(v));

flatMap入參Function的返回值要求是Mono對象。map的入參Function只要求返回一個普通對象。 對於一些返回值是Mono的方法,想將調用串聯起鏈式調用,必須使用flatMap,而不是map。

併發處理方式

通常使用Mono.zip,Tuple2等。

傳統編程方式併發執行是經過線程池+Future方式實現的。可是在作Future.get時是阻塞的。 Reactor中使用Mono和Flux中的zip方法以下:

Mono<CustomType1> item1Mono = ...;

Mono<CustomType2> item2Mono = ...;

Mono.zip(items -> {
 CustomType1 item1 = CustomType1.class.cast(items[0]);
 CustomType2 item2 = CustomType2.class.cast(items[1]);
// Do merge
return mergeResult;
}, item1Mono, item2Mono);

這樣item1Mono 和 item2Mono 過程是並行執行的。

使用zip方法時須要作類型強轉換,類型強轉換是不安全的

數據循環處理

通常使用:Flux.fromIterable(),Flux.reduce()方法。

好比:

Data initData = ...;
List<SubData> list = ...;

Flux.fromIterable(list)
  .reduce(initData,(data,itemInList) -> {
// Do something on data and itemInList
return data;
});

結束階段

直接消費的Mono和Flux就是調用subscriber方法,其餘的WebFlux接口能夠直接返回框架的Response輸出就能夠了。

WebFlux

Serverlet3.1支持了異步處理方式,Servlet線程不須要一直阻塞的等待任務執行。Servlet在接收到請求後,將請求委託給業務線程完成,本身則直接返回繼續接收新的請求。

因此Servlet3.1適用於那些業務處理很是耗時場景,這樣能夠減小服務器資源佔用,能夠提升併發處理速度,可是對於自己響應較爲迅速的應用來講收益不大。

WebFlux的異步處理是基於Reactor實現的,是將輸入流適配成Mono或Flux進行統一處理。

在最新的Spring Cloud Gateway中也是基於Netty和WebFlux實現的。

Flux和Mono

Flux和Mono屬於事件發佈者,相似於生產者,爲消費者提供訂閱接口。在實現發生時,Flux和Mono會回調消費者對應的方法通知消費者處理事件。 Flux能夠觸發多個事件,Mono只觸發一個事件。

Flux.fromIterable(getSomeLongList())
	.mergeWith(Flux.interval(100))
	.doOnNext(serviceA::someObserver)
	.map(d -> d * 2)
	.take(3)
	.onErrorResumeWith(errorHandler::fallback)
	.doAfterTerminate(serviceM::incrementTerminate)
	.subscribe(System.out::println
);

Mono.fromCallable(System::currentTimeMillis)
	.flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time)))
	.timeout(Duration.ofSeconds(3), errorHandler::fallback)
	.doOnSuccess(r -> serviceM.incrementSuccess())
	.subscribe(System.out::println);

選型注意

若是在框架中使用了WebFlux,他依賴的安全認證,數據訪問都必須使用Reactive API,在存儲層目前Reactive只支持MongoDB,Redis和Couchbase等幾種不支持事務管理的NoSql,須要注意。

WebFlux並不能將接口耗時減小,只是能夠減小線程擴展,提高系統的吞吐和伸縮能力。因爲其爲異步非阻塞Web框架,因此適用於IO密集型服務,好比咱們交易網關這種。

WebFlux支持兩種編程模式:

  1. 基於註解@Controller和其餘的類Spring MVC的註解
  2. 函數式,Java8 lambda風格的路由處理

能夠經過Reactive Streams實現背壓控制。

ServerRequest和ServerResponse是JDK8友好訪問底層HTTP消息的不可變接口。徹底是響應式的。

實踐建議

在使用lambda寫處理函數時,若是多個處理函數可能缺少可讀性且不易於維護。能夠將相關處理函數分組到一個處理程序或控制器類中。

相關文章
相關標籤/搜索