響應式編程是一種關注於數據流(data streams)和變化傳遞(propagation of change)的異步編程方式。 這意味着它能夠用既有的編程語言表達靜態(如數組)或動態(如事件源)的數據流。html
在響應式編程方面,微軟跨出了第一步,它在 .NET 生態中建立了響應式擴展庫(Reactive Extensions library, Rx)。接着 RxJava 在 JVM 上實現了響應式編程。後來,在 JVM 平臺出現了一套標準的響應式 編程規範,它定義了一系列標準接口和交互規範。並整合到 Java 9 中(Flow 類)。java
響應式編程一般做爲面向對象編程中的「觀察者模式」(Observer design pattern)的一種擴展。 響應式流(reactive streams)與「迭代子模式」(Iterator design pattern)也有相通之處, 由於其中也有 Iterable-Iterator 這樣的對應關係。主要的區別在於,Iterator 是基於 「拉取」(pull)方式的,而響應式流是基於「推送」(push)方式的。react
此外,對推送來的數據的操做是經過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者經過描述「控制流程」來定義對數據流的處理邏輯。git
除了數據推送,對錯誤處理(error handling)和完成(completion)信號的定義也很完善。一個 Publisher 能夠推送新的值到它的 Subscriber(調用 onNext 方法), 一樣也能夠推送錯誤(調用 onError 方法)和完成(調用 onComplete 方法)信號。 錯誤和完成信號均可以終止響應式流。能夠用下邊的表達式描述:github
onNext x 0..N [onError | onComplete]
複製代碼
這種方式很是靈活,不管是有/沒有值,仍是 n 個值(包括有無限個值的流,好比時鐘的持續讀秒),均可處理。編程
以上來自 projectreactor.io/docs/core/r… 翻譯數組
Reactive Streams 是上面提到的一套標準的響應式編程規範。它由四個核心概念構成:bash
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
複製代碼
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
複製代碼
public interface Subscription {
// request(n)用來發起請求數據,其中n表示請求數據的數量,它必須大於0,
// 不然會拋出IllegalArgumentException,並觸發onError,request的調用會
// 累加,若是沒有終止,最後會觸發相應次數的onNext方法.
public void request(long n);
// cancel至關於取消訂閱,調用以後,後續不會再收到訂閱,onError 和
// onComplete也不會被觸發
public void cancel();
}
複製代碼
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
複製代碼
Reactive Streams 經過上面的四個核心概念和相關的函數,對響應式流進行了一個框架性的約定,它沒有具體實現。簡單來講,它只提供通用的、合適的解決方案,你們都按照這個規約來實現就行了。app
Java 的 Reactive Programming 類庫主要有三個,分別是 Akka-Streams ,RxJava 和 Project Reactor。Spring 5 開始支持 Reactive Programming,其底層使用的是 Project Reactor。本篇主要是對 Project Reactor 中的一些點進行學習總結。框架
Project Reactor 是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。
Reactor 引入了實現 Publisher 的響應式類 Flux 和 Mono,以及豐富的操做方式。 一個 Flux 對象表明一個包含 0..N 個元素的響應式序列,而一個 Mono 對象表明一個包含零或者一個(0..1)元素的結果。
Flux 是生產者,即咱們上面提到的 Publisher,它表明的是一個包含 0-N 個元素的異步序列,Mono能夠看作 Flux 的有一個特例,表明 0-1 個元素,若是不須要生產任何元素,只是須要一個完成任務的信號,可使用 Mono。
先來看這張圖,這裏是直接從官方文檔上貼過來的。就這張圖作下說明,先來關注幾個點:
那總體來看就是 Flux 產生元數據,經過一系列 operator 操做獲得轉換結果,正常成功就是 onCompleted,出現錯誤就是 onError。看下面的一個小例子:
Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
// subscription 表示訂閱關係
System.out.println("onSubscribe,"+ subscription.getClass());
// subscription 經過 request 來觸發 onNext
subscription.request(2);
}
@Override
public void onNext(String s) {
System.out.println("currrent value is = " + s);
}
@Override
public void onError(Throwable throwable) {
System.out.println("it's error.");
}
@Override
public void onComplete() {
System.out.println("it's completed.");
}
});
複製代碼
執行結果:
onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed. 複製代碼
若是在 onSubscribe 方法中咱們不執行 request,則不會有後續任何操做。關於 request 下面看。
Flux 是一個可以發出 0 到 N 個元素的標準的 Publisher,它會被一個 "error" 或 "completion" 信號終止。所以,一個 Flux 的結果多是一個 value、completion 或 error。 就像在響應式流規範中規定的那樣,這三種類型的信號被翻譯爲面向下游的
onNext
,onComplete
和onError
方法。
這張圖也來自官方文檔,和上面 Flux 的區別就是,Mono 最多隻能 emitted 一個元素。
Mono.just("glmapper").subscribe(System.out::println);
複製代碼
經過上面兩段小的代碼來看,最直觀的感覺是,Flux 至關於一個 List,Mono 至關於 Optional。其實在編程中全部的結果咱們均可以用 List 來 表示,可是當只返回一個或者沒有結果時,用 Optional 可能會更精確些。
Optional 相關概念可自行搜索 jdk Optional
另外,Mono 和 Flux 都提供了一些工廠方法,用於建立相關的實例,這裏簡單羅列一下:
// 能夠指定序列中包含的所有元素。建立出來的 Flux
// 序列在發佈這些元素以後會自動結束。
Flux.just("glmapper", "leishu");
// 從一個Iterable 對象中建立 Flux 對象,固然還能夠是數組、Stream對象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 建立一個只包含錯誤消息的序列。
Flux.error(new IllegalStateException());
// 建立一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間
// 隔來發布。除了間隔時間以外,還能夠指定起始元素髮布以前的延遲時間。
Flux.interval(Duration.ofMillis(100)).take(10);
// 建立一個不包含任何消息通知的序列。
Flux.never();
// 建立一個不包含任何元素,只發布結束消息的序列。
Flux.empty();
// 建立包含從 start 起始的 count 個數量的 Integer 對象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());
複製代碼
上面的這些靜態方法適合於簡單的序列生成,當序列的生成須要複雜的邏輯時,則應該使用 generate() 或 create() 方法。
Reactor 的核心調用過程大體能夠分爲圖中的幾個階段
當須要處理 Flux 或 Mono 中的消息時,能夠經過 subscribe 方法來添加相應的訂閱邏輯。在調用 subscribe 方法時能夠指定須要處理的消息類型。能夠只處理其中包含的正常消息,也能夠同時處理錯誤消息和完成消息。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
複製代碼
結果:
1
2
java.lang.IllegalStateException
複製代碼
正常的消息處理相對簡單。當出現錯誤時,有多種不一樣的處理策略:
經過 onErrorReturn() 方法返回一個默認值
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);
複製代碼
結果:
1
2
0
複製代碼
經過 onErrorResume()方法來根據不一樣的異常類型來選擇要使用的產生元素的流
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
}).subscribe(System.out::println);
複製代碼
結果:
1
2
-1
複製代碼
經過 retry 操做符來進行重試,重試的動做是經過從新訂閱序列來實現的。在使用 retry 操做符時能夠指定重試的次數。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);
複製代碼
結果:
1
2
1
2
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
Caused by: java.lang.IllegalStateException
at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)
複製代碼
在 Reactor 中,執行模式以及執行過程取決於所使用的 Scheduler,Scheduler 是一個擁有普遍實現類的抽象接口,Schedulers 類提供的靜態方法用於達成以下的執行環境:
Schedulers.immediate().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// main-11
複製代碼
Schedulers.single().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// single-1-11
複製代碼
Schedulers.elastic().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// elastic-2-11
複製代碼
Schedulers.parallel().schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// parallel-1-11
複製代碼
ExecutorService executorService = Executors.newSingleThreadExecutor();
Schedulers.fromExecutorService(executorService).schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// pool-4-thread-1-11
複製代碼
Schedulers.newElastic("test-elastic").schedule(()->{
System.out.println(Thread.currentThread().getName()+"-"+11);
});
// test-elastic-4-11
複製代碼
一些操做符默認會使用一個指定的調度器(一般也容許開發者調整爲其餘調度器)例如, 經過工廠方法 Flux.interval(Duration.ofMillis(100)) 生成的每 100ms 打點一次的 Flux, 默認狀況下使用的是 Schedulers.parallel(),下邊的代碼演示瞭如何將其裝換爲 Schedulers.single()
Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
Schedulers.newSingle("test"))
.map(i -> Thread.currentThread().getName() +"@"+i);
intervalResult.subscribe(System.out::println);
複製代碼
結果:
test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略
複製代碼
Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler 的方法:publishOn 和 subscribeOn。 它們都接受一個 Scheduler 做爲參數,從而能夠改變調度器。可是 publishOn 在鏈中出現的位置是有講究的,而 subscribeOn 則無所謂。
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
複製代碼
結果:
[elastic-2] [single-1] parallel-1
複製代碼
上面這段代碼使用 create() 方法建立一個新的 Flux 對象,其中包含惟一的元素是當前線程的名稱。
接着是兩對 publishOn() 和 map()方法,其做用是先切換執行時的調度器,再把當前的線程名稱做爲前綴添加。
最後經過 subscribeOn()方法來改變流產生時的執行方式。
最內層的線程名字 parallel-1 來自產生流中元素時使用的 Schedulers.parallel()調度器,中間的線程名稱 single-1 來自第一個 map 操做以前的 Schedulers.single() 調度器,最外層的線程名字 elastic-2 來自第二個 map 操做以前的 Schedulers.elastic()調度器。
先到這裏,剩下的想到再補充...