響應式編程簡介之:Reactor

簡介

Reactor是reactivex家族的一個很是重要的成員,Reactor是第四代的reactive library,它是基於Reactive Streams標準基礎上開發的,主要用來構建JVM環境下的非阻塞應用程序。java

今天給你們介紹一下Reactor。react

Reactor簡介

Reactor是基於JVM的非阻塞API,他直接跟JDK8中的API相結合,好比:CompletableFuture,Stream和Duration等。git

它提供了兩個很是有用的異步序列API:Flux和Mono,而且實現了Reactive Streams的標準。程序員

而且還能夠和reactor-netty相結合,做爲一些異步框架的底層服務,好比咱們很是熟悉的Spring MVC 5中引入的WebFlux。github

咱們知道WebFlux的底層使用的是reactor-netty,而reactor-netty又引用了Reactor。因此,若是你在POM中引入了webFlux依賴:web

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
複製代碼

那麼項目將會自動引入Reactor。spring

若是你用的不是Spring webflux,不要緊,你能夠直接添加下面的依賴來使用Reactor:編程

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
複製代碼

reactive programming的發展史

最最開始的時候微軟爲.NET平臺建立了Reactive Extensions (Rx) library。接着RxJava實現了JVM平臺的Reactive。markdown

而後Reactive Streams標準出現了,它定義了Java平臺必須知足的的一些規範。而且已經集成到JDK9中的java.util.concurrent類中。框架

在Flow中定義了實現Reactive Streams的四個很是重要的組件,分別是Publisher,Subscriber,Subscription和Processor。

Iterable-Iterator 和Publisher-Subscriber的區別

通常來講reactive在面向對象的編程語言中是以觀察者模式的擴展來使用的。

咱們來具體看一下這個觀察者模式的實現,以Publisher和Subscriber爲例:

public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
複製代碼
public static interface Subscriber<T> {

        public void onSubscribe(Subscription subscription);

        public void onNext(T item);

        public void onError(Throwable throwable);

        public void onComplete();
    }
複製代碼

上面定義了兩個接口,Publisher和Subscriber,Publisher的做用就是subscribe到subscriber。

而subscriber定義了4個on方法,用來觸發特定的事件。

那麼Publisher中的subscribe是怎麼觸發Subscriber的onSubscribe事件呢?

很簡單,咱們看一個具體的實現:

public void subscribe(Flow.Subscriber<? super T> subscriber) {
        Subscription sub;
        if (throwable != null) {
            assert iterable == null : "non-null iterable: " + iterable;
            sub = new Subscription(subscriber, null, throwable);
        } else {
            assert throwable == null : "non-null exception: " + throwable;
            sub = new Subscription(subscriber, iterable.iterator(), null);
        }
        subscriber.onSubscribe(sub);

        if (throwable != null) {
            sub.pullScheduler.runOrSchedule();
        }
    }
複製代碼

上面的例子是PullPublisher的subscribe實現。咱們能夠看到,在這個subscribe中觸發了subscriber.onSubscribe方法。而這就是觀察者模式的祕密。

或者說,當Publisher調用subscribe的時候,是主動push subscriber的onSubscribe方法。

熟悉Iterable-Iterator模式的朋友應該都知道,Iterator模式,實際上是一個主動的pull模式,由於須要不斷的去調用next()方法。因此它的控制權是在調用方。

爲何要使用異步reactive

在現代應用程序中,隨着用戶量的增多,程序員須要考慮怎麼才能提高系統的處理能力。

傳統的block IO的方式,由於須要佔用大量的資源,因此是不適合這樣的場景的。咱們須要的是NO-block IO。

JDK中提供了兩種異步編程的模型:

第一種是Callbacks,異步方法能夠經過傳入一個Callback參數的形式來在Callback中執行異步任務。比較典型的像是java Swing中的EventListener。

第二中就是使用Future了。咱們使用Callable來提交一個任務,而後經過Future來拿到它的運行結果。

這兩種異步編程會有什麼問題呢?

callback的問題就在於回調地獄。熟悉JS的朋友應該很理解這個回調地獄的概念。

簡單點講,回調地獄就是在callback中又使用了callback,從而形成了這種callback的層級調用關係。

而Future主要是對一個異步執行的結果進行獲取,它的 get()其實是一個block操做。而且不支持異常處理,也不支持延遲計算。

當有多個Future的組合應該怎麼處理呢?JDK8 實際上引入了一個CompletableFuture類,這個類是Future也是一個CompletionStage,CompletableFuture支持then的級聯操做。不過CompletableFuture提供的方法不是那麼的豐富,可能知足不了個人需求。

因而咱們的Reactor來了。

Flux

Reactor提供了兩個很是有用的操做,他們是 Flux 和 Mono。 其中Flux 表明的是 0 to N 個響應式序列,而Mono表明的是0或者1個響應式序列。

咱們看一個Flux是怎麼transfer items的:

先看下Flux的定義:

public abstract class Flux<T> implements Publisher<T> 複製代碼

能夠看到Flux其實就是一個Publisher,用來產生異步序列。

Flux提供了很是多的有用的方法,來處理這些序列,而且提供了completion和error的信號通知。

相應的會去調用Subscriber的onNext, onComplete, 和 onError 方法。

Mono

咱們看下Mono是怎麼transfer items的:

看下Mono的定義:

public abstract class Mono<T> implements Publisher<T> 複製代碼

Mono和Flux同樣,也是一個Publisher,用來產生異步序列。

Mono由於只有0或者1個序列,因此只會觸發Subscriber的onComplete和onError方法,沒有onNext。

另外一方面,Mono其實能夠看作Flux的子集,只包含Flux的部分功能。

Mono和Flux是能夠互相轉換的,好比Mono#concatWith(Publisher)返回一個Flux,而 Mono#then(Mono)返回一個Mono.

Flux和Mono的基本操做

咱們看下Flux建立的例子:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
複製代碼

能夠看到Flux提供了不少種建立的方式,咱們能夠自由選擇。

再看看Flux的subscribe方法:

Disposable subscribe(); 

Disposable subscribe(Consumer<? super T> consumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); 

Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
複製代碼

subscribe能夠一個參數都沒有,也能夠多達4個參數。

看下沒有參數的狀況:

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);

numbersFromFiveToSeven.subscribe();
複製代碼

注意,沒有參數並不表示Flux的對象不被消費,只是不可見而已。

看下帶參數的狀況:consumer用來處理on each事件,errorConsumer用來處理on error事件,completeConsumer用來處理on complete事件,subscriptionConsumer用來處理on subscribe事件。

前面的3個參數很好理解,咱們來舉個例子:

Flux<Integer> ints3 = Flux.range(1, 4);
        ints3.subscribe(System.out::println,
                error -> System.err.println("Error " + error),
                () -> System.out.println("Done"),
                sub -> sub.request(2));
複製代碼

咱們構建了從1到4的四個整數的Flux,on each就是打印出來,若是中間有錯誤的話,就輸出Error,所有完成就輸出Done。

那麼最後一個subscriptionConsumer是作什麼用的呢?

subscriptionConsumer accept的是一個Subscription對象,咱們看下Subscription的定義:

public interface Subscription {

    public void request(long n);
    public void cancel();
}
複製代碼

Subscription 定義了兩個方法,用來作初始化用的,咱們能夠調用request(n)來決定此次subscribe獲取元素的最大數目。

好比上面咱們的例子中,雖然構建了4個整數,可是最終輸出的只有2個。

上面全部的subscribe方法,都會返回一個Disposable對象,咱們能夠經過Disposable對象的dispose()方法,來取消這個subscribe。

Disposable只定義了兩個方法:

public interface Disposable {

	void dispose();

	default boolean isDisposed() {
		return false;
	}
複製代碼

dispose的原理是向Flux 或者 Mono發出一箇中止產生新對象的信號,可是並不能保證對象產生立刻中止。

有了Disposable,固然要介紹它的工具類Disposables。

Disposables.swap() 能夠建立一個Disposable,用來替換或者取消一個現有的Disposable。

Disposables.composite(…​)能夠將多個Disposable合併起來,在後面統一作處理。

總結

本文介紹了Reactor的基本原理和兩很是重要的組件Flux和Mono,下一篇文章咱們會繼續介紹Reactor core的一些更加高級的用法。敬請期待。

本文的例子learn-reactive

本文做者:flydean程序那些事

本文連接:www.flydean.com/introductio…

本文來源:flydean的博客

歡迎關注個人公衆號:「程序那些事」最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!

相關文章
相關標籤/搜索