(11)照虎畫貓深刻理解響應式流規範——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手
本文源碼html

2 響應式編程之法

上一章在響應式編程庫方面,本着「快速上手」的原則,介紹了響應式流的概念,以及Reactor 3的使用。這一章,咱們基於對Reactor 3源碼的模仿,從《響應式流規範》入手,深刻了解響應式流開發庫。java

2.1 響應式流規範

現代軟件對近乎實時地處理數據的需求愈來愈強烈,對不斷變化的信息的即時響應,意味着更大的商業價值,流處理是一種快速將數據轉換爲有用信息的手段。react

數據流中的元素能夠是一個一個的待計算的數據,也能夠是一個一個待響應的事件。前者多用於大數據處理,好比Storm、Spark等產品,後者經常使用於響應式編程,好比Netflix在使用的RxJava、Scala編程語言的發明者Typesafe公司(已改名爲Lightbend)的Akka Stream、Java開發者都熟悉的Pivotal公司的Project Reactor、走在技術前沿的Vert.x等。git

軟件行業是一個很是注重分享和交流的行業。隨着對響應式編程技術的討論與溝通逐漸深刻,2013年底的時候,Netflix、Pivotal、Typesafe等公司的工程師們共同發起了關於制定「響應式流規範(Reactive Stream Specification)」的倡議和討論,並在github上建立了reactive-streams-jvm項目。到2015年5月份,1.0版本的規範出爐,項目README就是規範正文。github

各個響應式開發庫都要遵循這個規範,其好處也是顯而易見的。之因此咱們編寫的Java代碼能夠在Hotspot、J9和Zing等JVM運行,是由於它們都遵循Java虛擬機規範。相似的,因爲各個響應式開發庫都遵循響應式流規範,所以互相兼容,不一樣的開發庫之間能夠進行交互,咱們甚至能夠同時在項目中使用多個響應式開發庫。對於Spring WebFlux來講,也可使用RxJava做爲響應式庫。編程

<img height=300em src="https://leanote.com/api/file/getImage?fileId=5a9e3ed2ab644159cf00128e"/>;api

雖然響應式流規範是用來約束響應式開發庫的,做爲使用者的咱們若是可以瞭解這一規範對於咱們理解開發庫的使用也是頗有幫助的,由於規範的內容都是對響應式編程思想的精髓的呈現。訪問reactive-streams-jvm項目,能夠瀏覽規範的細節,包括其中定義的響應式流的特色:數組

  1. 具備處理無限數量的元素的能力;
  2. 按序處理;
  3. 異步地傳遞元素;
  4. 必須實現非阻塞的回壓(backpressure)。

2.1.1 響應式流接口

響應式流規範定義了四個接口,以下:安全

1.Publisher是可以發出元素的發佈者。併發

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

2.Subscriber是接收元素並作出響應的訂閱者。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

當執行subscribe方法時,發佈者會回調訂閱者的onSubscribe方法,這個方法中,一般訂閱者會藉助傳入的Subscription向發佈者請求n個數據。而後發佈者經過不斷調用訂閱者的onNext方法向訂閱者發出最多n個數據。若是數據所有發完,則會調用onComplete告知訂閱者流已經發完;若是有錯誤發生,則經過onError發出錯誤數據,一樣也會終止流。

title

訂閱後的回調用表達式表示就是onSubscribe onNext* (onError | onComplete)?,即以一個onSubscribe開始,中間有0個或多個onNext,最後有0個或1個onErroronComplete事件。

PublisherSubscriber融合了迭代器模式和觀察者模式。

咱們常常用到的IterableIterator就是迭代器模式的體現,能夠知足上邊第1和2個特色關於按需處理數據流的要求;而觀察者模式基於事件的回調機制有助於知足第3個特色關於異步傳遞元素的要求。

3.SubscriptionPublisherSubscriber的「中間人」。

public interface Subscription {
    public void request(long n);
    public void cancel();
}

當發佈者調用subscribe方法註冊訂閱者時,會經過訂閱者的回調方法onSubscribe傳入Subscription對象,以後訂閱者就可使用這個Subscription對象的request方法向發佈者「要」數據了。回壓機制正是基於此來實現的,所以第4個特色也可以實現了。

4.ProcessorPublisherSubscriber於一身。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

這四個接口在JEP 266跟隨Java 9版本被引入了Java SDK

這四個接口是實現各開發庫之間互相兼容的橋樑,響應式流規範也僅僅聚焦於此,而對諸如轉換、合併、分組等等的操做一律未作要求,所以是一個很是抽象且精簡的接口規範。

若是這時候有人要造輪子,再寫一套響應式開發庫,如何基於這幾個接口展開呢?

2.1.2 照虎畫貓,理解訂閱後發生了什麼

Reactor 3是遵循響應式流規範的實現,所以,小擼一把Reactor的源碼有助於咱們理解規範中定義的接口的使用。

Reactor中,咱們最早接觸的生成Publisher的方法就是Flux.just(),下面咱們來動手寫代碼模擬一下Reactor的實現方式。不過具有生產能力開發庫會考慮性能、併發安全性等諸多因素,所謂「照虎畫貓」,咱們的代碼只是模擬出實現思路,代碼少的多,但五臟俱全。

源碼位於:https://github.com/get-set/get-reactive/tree/master/my-reactor

首先,引入響應式流規範的四個接口定義,基於Java 9的話能夠直接使用java.util.concurrent.Flow

<dependency>
        <groupId>org.reactivestreams</groupId>
        <artifactId>reactive-streams</artifactId>
        <version>1.0.2</version>
    </dependency>

首先建立最最基礎的類Flux,它是一個Publisher

package reactor.core.publisher;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public abstract class Flux<T> implements Publisher<T> {
    public abstract void subscribe(Subscriber<? super T> s);
}

在Reactor中,Flux既是一個發佈者,又充當工具類的角色,當咱們用Flux.just()Flux.range()Flux.interval()等工廠方法生成Flux時,會new一個新的Flux,好比Flux.just會返回一個FluxArray對象。

public static <T> Flux<T> just(T... data) {
        return new FluxArray<>(data);
    }

返回的FluxArray對象是Flux.just生成的Publisher,它繼承自Flux,並實現了subscribe方法。

public class FluxArray<T> extends Flux<T> {
    private T[] array;  // 1

    public FluxArray(T[] data) {
        this.array = data;
    }

    @Override
    public void subscribe(Subscriber<? super T> actual) {
        actual.onSubscribe(new ArraySubscription<>(actual, array)); // 2
    }
}
  1. FluxArray內部使用一個數組來保存數據;
  2. subscribe方法一般會回調SubscriberonSubscribe方法,該方法須要傳入一個Subscription對象,從而訂閱者以後能夠經過回調傳回的Subscriptionrequest方法跟FluxArray請求數據,這也是回壓的應有之義。

繼續編寫ArraySubscription

public class FluxArray<T> extends Flux<T> {
    ...
    static class ArraySubscription<T> implements Subscription { // 1
        final Subscriber<? super T> actual;
        final T[] array;    // 2
        int index;
        boolean canceled;

        public ArraySubscription(Subscriber<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public void request(long n) {
            if (canceled) {
                return;
            }
            long length = array.length;
            for (int i = 0; i < n && index < length; i++) {
                actual.onNext(array[index++]);  // 3
            }
            if (index == length) {
                actual.onComplete();    // 4
            }
        }

        @Override
        public void cancel() {  // 5
            this.canceled = true;
        }
    }
}
  1. ArraySubscription是一個靜態內部類。靜態內部類是最簡單的一種內部類,你盡能夠把它當成普通的類,只不過剛好定義在其餘類的內部;
  2. 可見在Subscription內也有一份數據;
  3. 當有能夠發出的元素時,回調訂閱者的onNext方法傳遞元素;
  4. 當全部的元素都發完時,回調訂閱者的onComplete方法;
  5. 訂閱者可使用Subscription取消訂閱。

到此爲止,發佈者就開發完了。咱們測試一下:

@Test
public void fluxArrayTest() {
    Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1

        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("onSubscribe");
            s.request(6);   // 2
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("onNext:" + integer);
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
}
  1. Subscriber經過匿名內部類定義,其中須要實現接口的四個方法;
  2. 訂閱時請求6個元素。

測試方法運行以下:

1
2
3
4
5
Completed.

若是請求3個元素呢?輸出以下:

1
2
3

沒有完成事件,OK,一個簡單的Flux.just就完成了,經過這個例子咱們可以初步摸出Flux工廠方法的一些「套路」:

  • 工廠方法返回的是Flux子類的實例,如FluxArray
  • FluxArraysubscribe方法會返回給訂閱者一個Subscription實現類的對象,這個ArraySubscriptionFluxArray的靜態內部類,定義了「如何發佈元素」的邏輯;
  • 訂閱者能夠經過這個ArraySubscription對象向發佈者請求n個數據;發佈者也能夠藉助這個ArraySubscription對象向訂閱者傳遞數據元素(onNext/onError/onComplete)。

用圖來表示以下(因爲Subscription是靜態內部類,能夠看作普通類,就單獨放一邊了):

title

上圖的這個過程基本適用於大多數的用於生成Flux/Mono的靜態工廠方法,如Flux.justFlux.range等。

首先,使用相似Flux.just的方法建立發佈者後,會建立一個具體的發佈者(Publisher),如FluxArray

  1. 當使用.subscribe訂閱這個發佈者時,首先會new一個具備相應邏輯的Subscription(如ArraySubscription,這個Subscription定義瞭如何處理下游的request,以及如何「發出數據」);
  2. 而後發佈者將這個Subscription經過訂閱者的.onSubscribe方法傳給訂閱者;
  3. 在訂閱者的.onSubscribe方法中,須要經過Subscription發起第一次的請求.request
  4. Subscription收到請求,就能夠經過回調訂閱者的onNext方法發出元素了,有多少發多少,但不能超過請求的個數;
  5. 訂閱者在onNext中一般定義對元素的處理邏輯,處理完成以後,能夠繼續發起請求;
  6. 發佈者根據繼續知足訂閱者的請求;
  7. 直至發佈者的序列結束,經過訂閱者的onComplete予以告知;固然序列發送過程當中若是有錯誤,則經過訂閱者的onError予以告知並傳遞錯誤信息;這兩種狀況都會致使序列終止,訂閱過程結束。

以上從1~7這些階段稱爲訂閱期(subscribe time)

2.1.3 照虎畫貓——操做符「流水線」

響應式開發庫的一個很讚的特性就是能夠像組裝流水線同樣將操做符串起來,用來聲明覆雜的處理邏輯。好比:

Flux ff = Flux.just(1, 2, 3, 4, 5)
    .map(i -> i * i)
    .filter(i -> (i % 2) == 0);
ff.subscribe(...)

經過源碼,咱們能夠了解這種「流水線」的實現機制。下面咱們仍然是經過照虎畫貓的方式模擬一下Reactor中Flux.map的實現方式。

Flux.map用於實現轉換,轉換後元素的類型可能會發生變化,轉換的邏輯由參數Function決定。方法自己返回的是一個轉換後的Flux,基於此,該方法實現以下:

public abstract class Flux<T> implements Publisher<T> {
    ...
    public <V> Flux<V> map(Function<? super T, ? extends V> mapper) {   // 1
        return new FluxMap<>(this, mapper); // 2
    }
}
  1. 泛型方法,經過泛型表示可能出現的類型的變化(T → V);
  2. FluxMap就是新的Flux。

既然FluxMap是一個新的Flux,那麼與2.1.2中FluxArray相似,其內部定義有MapSubscription,這是一個Subscription,可以根據其訂閱者的請求發出數據。

public class FluxMap<T, R> extends Flux<R> {

    private final Flux<? extends T> source;
    private final Function<? super T, ? extends R> mapper;

    public FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Subscriber<? super R> actual) {
        source.subscribe(new MapSubscriber<>(actual, mapper));
    }

    static final class MapSubscription<T, R> implements Subscription {
        private final Subscriber<? super R> actual;
        private final Function<? super T, ? extends R> mapper;

        MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void request(long n) {   // 1
            // TODO 收到請求,發出元素
        }

        @Override
        public void cancel() {
            // TODO 取消訂閱
        }
    }
}
  1. 可是map操做符並不產生數據,只是數據的搬運工。收到request後要發出的數據來自上游。

因此MapSubscription同時也應該是一個訂閱者,它訂閱上游的發佈者,並將數據處理後傳遞給下游的訂閱者(爲了跟Reactor源碼一致,將MapSubscription更名爲MapSubscriber,其實沒差)。

title

如圖,對下游是做爲發佈者,傳遞上游的數據到下游;對上游是做爲訂閱者,傳遞下游的請求到上游。

static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { // 1
    ...
}
  1. 實現了SubscriberSubscription

這樣,總共有5個方法要實現:來自Subscriber接口的onSubscribeonNextonErroronComplete,和來自Subscription接口的requestcancel。下面咱們本着「搬運工」的角色實現這幾個方法便可。

static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
    private final Subscriber<? super R> actual;
    private final Function<? super T, ? extends R> mapper;

    boolean done;

    Subscription subscriptionOfUpstream;

    MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
        this.actual = actual;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Subscription s) {
        this.subscriptionOfUpstream = s;    // 1
        actual.onSubscribe(this);           // 2
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        actual.onNext(mapper.apply(t));     // 3
    }

    @Override
    public void onError(Throwable t) { 
        if (done) {
            return;
        }
        done = true;
        actual.onError(t);                  // 4
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        actual.onComplete();                // 5
    }

    @Override
    public void request(long n) {
        this.subscriptionOfUpstream.request(n);     // 6
    }

    @Override
    public void cancel() {
        this.subscriptionOfUpstream.cancel();       // 7
    }
}
  1. 拿到來自上游的Subscription;
  2. 回調下游的onSubscribe,將自身做爲Subscription傳遞過去;
  3. 收到上游發出的數據後,將其用mapper進行轉換,而後接着發給下游;
  4. 將上游的錯誤信號原樣發給下游;
  5. 將上游的完成信號原樣發給下游;
  6. 將下游的請求傳遞給上游;
  7. 將下游的取消操做傳遞給上游。

從這個對源碼的模仿,能夠體會到,當有多個操做符串成「操做鏈」的時候:

  • 向下:很天然地,數據和信號(onSubscribeonNextonErroronComplete)是經過每個操做符向下傳遞的,傳遞的過程當中進行相應的操做處理,這一點並不難理解;
  • 向上:然而在內部咱們看不到的是,有一個自下而上的「訂閱鏈」,這個訂閱鏈能夠用來傳遞request,所以回壓(backpressure)能夠實現從下游向上遊的傳遞。

這一節最開頭的那一段代碼的執行過程以下圖所示:

title

2.1.4 LambdaSubscriber

在1.3.2節的時候,介紹了.subscribe的幾個不一樣方法簽名的變種:

subscribe(  Consumer<? super T> consumer) 

subscribe(  @Nullable Consumer<? super T> consumer, 
            Consumer<? super Throwable> errorConsumer) 

subscribe(  @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer) 

subscribe(  @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Consumer<? super Subscription> subscriptionConsumer)

用起來很是方便,可是響應式流規範中只定義了一個訂閱方法subscribe(Subscriber subscriber)。實際上,這幾個方法最終都是調用的subscribe(LambdaSubscriber subscriber),並經過LambdaSubscriber實現了對不一樣個數參數的組裝。以下圖所示:

title

所以,

flux.subscribe(System.out::println, System.err::println);

是調用的:

flux.subscribe(new LambdaSubscriber(System.out::println, System.err::println, null, null));
相關文章
相關標籤/搜索