本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手
本文源碼html
上一章在響應式編程庫方面,本着「快速上手」的原則,介紹了響應式流的概念,以及Reactor 3的使用。這一章,咱們基於對Reactor 3源碼的模仿,從《響應式流規範》入手,深刻了解響應式流開發庫。java
現代軟件對近乎實時地處理數據的需求愈來愈強烈,對不斷變化的信息的即時響應,意味着更大的商業價值,流處理是一種快速將數據轉換爲有用信息的手段。react
數據流中的元素能夠是一個一個的待計算的數據,也能夠是一個一個待響應的事件。前者多用於大數據處理,好比Storm、Spark等產品,後者經常使用於響應式編程,好比Netflix在使用的RxJava、Scala編程語言的發明者Typesafe公司(已改名爲Lightbend)的Akka Stream、Java開發者都熟悉的Pivotal公司的Project Reactor、走在技術前沿的Vert.x等。git
軟件行業是一個很是注重分享和交流的行業。隨着對響應式編程技術的討論與溝通逐漸深刻,2013年底的時候,Netflix、Pivotal、Typesafe等公司的工程師們共同發起了關於制定「響應式流規範(Reactive Stream Specification)」的倡議和討論,並在github上建立了reactive-streams-jvm項目。到2015年5月份,1.0版本的規範出爐,項目README就是規範正文。github
各個響應式開發庫都要遵循這個規範,其好處也是顯而易見的。之因此咱們編寫的Java代碼能夠在Hotspot、J9和Zing等JVM運行,是由於它們都遵循Java虛擬機規範。相似的,因爲各個響應式開發庫都遵循響應式流規範,所以互相兼容,不一樣的開發庫之間能夠進行交互,咱們甚至能夠同時在項目中使用多個響應式開發庫。對於Spring WebFlux來講,也可使用RxJava做爲響應式庫。編程
<img height=300em src="https://leanote.com/api/file/getImage?fileId=5a9e3ed2ab644159cf00128e"/>;api
雖然響應式流規範是用來約束響應式開發庫的,做爲使用者的咱們若是可以瞭解這一規範對於咱們理解開發庫的使用也是頗有幫助的,由於規範的內容都是對響應式編程思想的精髓的呈現。訪問reactive-streams-jvm項目,能夠瀏覽規範的細節,包括其中定義的響應式流的特色:數組
響應式流規範定義了四個接口,以下:安全
1.Publisher
是可以發出元素的發佈者。併發
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
2.Subscriber
是接收元素並作出響應的訂閱者。
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
當執行subscribe
方法時,發佈者會回調訂閱者的onSubscribe
方法,這個方法中,一般訂閱者會藉助傳入的Subscription
向發佈者請求n個數據。而後發佈者經過不斷調用訂閱者的onNext
方法向訂閱者發出最多n個數據。若是數據所有發完,則會調用onComplete
告知訂閱者流已經發完;若是有錯誤發生,則經過onError
發出錯誤數據,一樣也會終止流。
訂閱後的回調用表達式表示就是onSubscribe onNext* (onError | onComplete)?
,即以一個onSubscribe
開始,中間有0個或多個onNext
,最後有0個或1個onError
或onComplete
事件。
Publisher
和Subscriber
融合了迭代器模式和觀察者模式。
咱們常常用到的Iterable
和Iterator
就是迭代器模式的體現,能夠知足上邊第1和2個特色關於按需處理數據流的要求;而觀察者模式基於事件的回調機制有助於知足第3個特色關於異步傳遞元素的要求。
3.Subscription
是Publisher
和Subscriber
的「中間人」。
public interface Subscription { public void request(long n); public void cancel(); }
當發佈者調用subscribe
方法註冊訂閱者時,會經過訂閱者的回調方法onSubscribe
傳入Subscription
對象,以後訂閱者就可使用這個Subscription
對象的request
方法向發佈者「要」數據了。回壓機制正是基於此來實現的,所以第4個特色也可以實現了。
4.Processor
集Publisher
和Subscriber
於一身。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
這四個接口在JEP 266跟隨Java 9版本被引入了Java SDK。
這四個接口是實現各開發庫之間互相兼容的橋樑,響應式流規範也僅僅聚焦於此,而對諸如轉換、合併、分組等等的操做一律未作要求,所以是一個很是抽象且精簡的接口規範。
若是這時候有人要造輪子,再寫一套響應式開發庫,如何基於這幾個接口展開呢?
Reactor 3是遵循響應式流規範的實現,所以,小擼一把Reactor的源碼有助於咱們理解規範中定義的接口的使用。
Reactor中,咱們最早接觸的生成Publisher
的方法就是Flux.just()
,下面咱們來動手寫代碼模擬一下Reactor的實現方式。不過具有生產能力開發庫會考慮性能、併發安全性等諸多因素,所謂「照虎畫貓」,咱們的代碼只是模擬出實現思路,代碼少的多,但五臟俱全。
源碼位於:https://github.com/get-set/get-reactive/tree/master/my-reactor
首先,引入響應式流規範的四個接口定義,基於Java 9的話能夠直接使用java.util.concurrent.Flow:
<dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> <version>1.0.2</version> </dependency>
首先建立最最基礎的類Flux
,它是一個Publisher
。
package reactor.core.publisher; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; public abstract class Flux<T> implements Publisher<T> { public abstract void subscribe(Subscriber<? super T> s); }
在Reactor中,Flux
既是一個發佈者,又充當工具類的角色,當咱們用Flux.just()
、Flux.range()
或Flux.interval()
等工廠方法生成Flux
時,會new一個新的Flux
,好比Flux.just
會返回一個FluxArray
對象。
public static <T> Flux<T> just(T... data) { return new FluxArray<>(data); }
返回的FluxArray
對象是Flux.just
生成的Publisher
,它繼承自Flux
,並實現了subscribe
方法。
public class FluxArray<T> extends Flux<T> { private T[] array; // 1 public FluxArray(T[] data) { this.array = data; } @Override public void subscribe(Subscriber<? super T> actual) { actual.onSubscribe(new ArraySubscription<>(actual, array)); // 2 } }
FluxArray
內部使用一個數組來保存數據;subscribe
方法一般會回調Subscriber
的onSubscribe
方法,該方法須要傳入一個Subscription
對象,從而訂閱者以後能夠經過回調傳回的Subscription
的request
方法跟FluxArray
請求數據,這也是回壓的應有之義。繼續編寫ArraySubscription
:
public class FluxArray<T> extends Flux<T> { ... static class ArraySubscription<T> implements Subscription { // 1 final Subscriber<? super T> actual; final T[] array; // 2 int index; boolean canceled; public ArraySubscription(Subscriber<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public void request(long n) { if (canceled) { return; } long length = array.length; for (int i = 0; i < n && index < length; i++) { actual.onNext(array[index++]); // 3 } if (index == length) { actual.onComplete(); // 4 } } @Override public void cancel() { // 5 this.canceled = true; } } }
ArraySubscription
是一個靜態內部類。靜態內部類是最簡單的一種內部類,你盡能夠把它當成普通的類,只不過剛好定義在其餘類的內部;Subscription
內也有一份數據;onNext
方法傳遞元素;onComplete
方法;Subscription
取消訂閱。到此爲止,發佈者就開發完了。咱們測試一下:
@Test public void fluxArrayTest() { Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1 @Override public void onSubscribe(Subscription s) { System.out.println("onSubscribe"); s.request(6); // 2 } @Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("onComplete"); } }); }
Subscriber
經過匿名內部類定義,其中須要實現接口的四個方法;測試方法運行以下:
1 2 3 4 5 Completed.
若是請求3個元素呢?輸出以下:
1 2 3
沒有完成事件,OK,一個簡單的Flux.just
就完成了,經過這個例子咱們可以初步摸出Flux
工廠方法的一些「套路」:
Flux
子類的實例,如FluxArray
;FluxArray
的subscribe
方法會返回給訂閱者一個Subscription
實現類的對象,這個ArraySubscription
是FluxArray
的靜態內部類,定義了「如何發佈元素」的邏輯;ArraySubscription
對象向發佈者請求n個數據;發佈者也能夠藉助這個ArraySubscription
對象向訂閱者傳遞數據元素(onNext/onError/onComplete)。用圖來表示以下(因爲Subscription是靜態內部類,能夠看作普通類,就單獨放一邊了):
上圖的這個過程基本適用於大多數的用於生成Flux
/Mono
的靜態工廠方法,如Flux.just
、Flux.range
等。
首先,使用相似Flux.just
的方法建立發佈者後,會建立一個具體的發佈者(Publisher
),如FluxArray
。
.subscribe
訂閱這個發佈者時,首先會new一個具備相應邏輯的Subscription
(如ArraySubscription
,這個Subscription
定義瞭如何處理下游的request
,以及如何「發出數據」);Subscription
經過訂閱者的.onSubscribe
方法傳給訂閱者;.onSubscribe
方法中,須要經過Subscription
發起第一次的請求.request
;Subscription
收到請求,就能夠經過回調訂閱者的onNext
方法發出元素了,有多少發多少,但不能超過請求的個數;onNext
中一般定義對元素的處理邏輯,處理完成以後,能夠繼續發起請求;onComplete
予以告知;固然序列發送過程當中若是有錯誤,則經過訂閱者的onError
予以告知並傳遞錯誤信息;這兩種狀況都會致使序列終止,訂閱過程結束。以上從1~7這些階段稱爲訂閱期(subscribe time)。
響應式開發庫的一個很讚的特性就是能夠像組裝流水線同樣將操做符串起來,用來聲明覆雜的處理邏輯。好比:
Flux ff = Flux.just(1, 2, 3, 4, 5) .map(i -> i * i) .filter(i -> (i % 2) == 0); ff.subscribe(...)
經過源碼,咱們能夠了解這種「流水線」的實現機制。下面咱們仍然是經過照虎畫貓的方式模擬一下Reactor中Flux.map
的實現方式。
Flux.map
用於實現轉換,轉換後元素的類型可能會發生變化,轉換的邏輯由參數Function
決定。方法自己返回的是一個轉換後的Flux
,基於此,該方法實現以下:
public abstract class Flux<T> implements Publisher<T> { ... public <V> Flux<V> map(Function<? super T, ? extends V> mapper) { // 1 return new FluxMap<>(this, mapper); // 2 } }
FluxMap
就是新的Flux。既然FluxMap
是一個新的Flux,那麼與2.1.2中FluxArray
相似,其內部定義有MapSubscription
,這是一個Subscription
,可以根據其訂閱者的請求發出數據。
public class FluxMap<T, R> extends Flux<R> { private final Flux<? extends T> source; private final Function<? super T, ? extends R> mapper; public FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override public void subscribe(Subscriber<? super R> actual) { source.subscribe(new MapSubscriber<>(actual, mapper)); } static final class MapSubscription<T, R> implements Subscription { private final Subscriber<? super R> actual; private final Function<? super T, ? extends R> mapper; MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void request(long n) { // 1 // TODO 收到請求,發出元素 } @Override public void cancel() { // TODO 取消訂閱 } } }
map
操做符並不產生數據,只是數據的搬運工。收到request
後要發出的數據來自上游。因此MapSubscription
同時也應該是一個訂閱者,它訂閱上游的發佈者,並將數據處理後傳遞給下游的訂閱者(爲了跟Reactor源碼一致,將MapSubscription
更名爲MapSubscriber
,其實沒差)。
如圖,對下游是做爲發佈者,傳遞上游的數據到下游;對上游是做爲訂閱者,傳遞下游的請求到上游。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { // 1 ... }
Subscriber
和Subscription
。這樣,總共有5個方法要實現:來自Subscriber
接口的onSubscribe
、onNext
、onError
、onComplete
,和來自Subscription
接口的request
和cancel
。下面咱們本着「搬運工」的角色實現這幾個方法便可。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { private final Subscriber<? super R> actual; private final Function<? super T, ? extends R> mapper; boolean done; Subscription subscriptionOfUpstream; MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onSubscribe(Subscription s) { this.subscriptionOfUpstream = s; // 1 actual.onSubscribe(this); // 2 } @Override public void onNext(T t) { if (done) { return; } actual.onNext(mapper.apply(t)); // 3 } @Override public void onError(Throwable t) { if (done) { return; } done = true; actual.onError(t); // 4 } @Override public void onComplete() { if (done) { return; } done = true; actual.onComplete(); // 5 } @Override public void request(long n) { this.subscriptionOfUpstream.request(n); // 6 } @Override public void cancel() { this.subscriptionOfUpstream.cancel(); // 7 } }
onSubscribe
,將自身做爲Subscription
傳遞過去;從這個對源碼的模仿,能夠體會到,當有多個操做符串成「操做鏈」的時候:
onSubscribe
、onNext
、onError
、onComplete
)是經過每個操做符向下傳遞的,傳遞的過程當中進行相應的操做處理,這一點並不難理解;request
,所以回壓(backpressure)能夠實現從下游向上遊的傳遞。這一節最開頭的那一段代碼的執行過程以下圖所示:
在1.3.2節的時候,介紹了.subscribe
的幾個不一樣方法簽名的變種:
subscribe( Consumer<? super T> consumer) subscribe( @Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
用起來很是方便,可是響應式流規範中只定義了一個訂閱方法subscribe(Subscriber subscriber)
。實際上,這幾個方法最終都是調用的subscribe(LambdaSubscriber subscriber)
,並經過LambdaSubscriber
實現了對不一樣個數參數的組裝。以下圖所示:
所以,
flux.subscribe(System.out::println, System.err::println);
是調用的:
flux.subscribe(new LambdaSubscriber(System.out::println, System.err::println, null, null));