在讀 Hystrix 源碼時,發現一些奇特的寫法。稍做搜索,知道使用了最新流行的響應式編程庫RxJava。那麼響應式編程到底是怎樣的呢? 本文對響應式編程及 RxJava 庫做一個初步的探索。html
在學習新的編程模型時,我喜歡將其與原來的編程模型聯繫起來。由於新的編程模型每每是對原來編程模型的承襲和組合。響應式編程的兩個基本要素是:java
函數式編程,在以前的文章 「徹底」函數式編程」、「Java8函數式編程探祕」、「精練代碼:一次Java函數式編程的重構之旅」 等有較多探索,觀察者模式在 「設計模式之觀察者模式:實現配置更新實時推送」 有講述過。咱們將在這二者的基礎上探索響應式編程。
react
初次接觸 RxJava ,很容易被一連串的 Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete 等繞暈。不過軟件裏面無新鮮事。大多無非是用一種新的方式來組織邏輯罷了。基於觀察者模式的事件驅動也不例外。咱們只要梳理清楚脈絡,就能夠容易地理解。觀察者模式有三個基本參與者:git
基本流程是:被觀察者 Observable 裝備發射裝置 Emitter,發射消息,建立事件;觀察者 Observer 監聽到事件,接收觀察者發射的消息,調用對應的函數 onNext, onError 和 onComplete 進行處理。onError 和 OnComplete 只能有一個被觸發。github
不妨寫個基本 Demo 來模擬下基本流程。爲了更好滴理解,我把三者都區分開了。編程
首先定義觀察者 MyObserver,繼承抽象類 DefaultObserver ,這樣實現成本最小。json
package zzz.study.reactor; import com.alibaba.fastjson.JSON; import io.reactivex.observers.DefaultObserver; /** * @Description 觀察者定義 * @Date 2021/1/23 4:13 下午 * @Created by qinshu */ public class MyObserver extends DefaultObserver { @Override public void onStart() { System.out.println("MyObserver: Start"); } @Override public void onNext(Object o) { System.out.println("Observed: " + JSON.toJSONString(o)); } @Override public void onError(Throwable e) { System.out.println("Observed: " + e.getMessage()); } @Override public void onComplete() { System.out.println("MyObserver: Complete"); } }
接着,定義發射裝置(發射消息) MyEmitter:設計模式
package zzz.study.reactor; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @Description 發射裝置 * @Date 2021/1/24 7:04 上午 * @Created by qinshu */ public class MyEmitter implements ObservableOnSubscribe { Random random = new Random(System.currentTimeMillis()); @Override public void subscribe(ObservableEmitter emitter) throws Exception { TimeUnit.SECONDS.sleep(1); emitter.onNext("next"); if (random.nextInt(3) == 0) { emitter.onError(new RuntimeException("A RuntimeException")); } else { emitter.onComplete(); } } }
最後,建立被觀察者,並串起流程:併發
package zzz.study.reactor; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; /** * @Description RxJava基本Demo * @Date 2021/1/23 12:28 下午 * @Created by qinshu */ public class RxJavaBasic { public static void main(String[] args) { for (int i=0; i<5; i++) { ObservableOnSubscribe observableOnSubscribe = new MyEmitter(); Observable observable = Observable.create(observableOnSubscribe); Observer observer = new MyObserver(); observable.subscribe(observer); } } }
運行,可得結果:dom
MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" Observed: A RuntimeException MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" MyObserver: Complete
如何理解上述流程及結果呢?最好的辦法就是單步調試。通過單步調試,能夠知道整個過程以下:
步驟1: 整個過程由這一行觸發 observable.subscribe(observer);
,會去調用 Observable.subscribeActual 方法,分派給具體實現類 ObservableCreate.subscribeActual ;單步調試的好處就是能肯定具體實現者;
步驟2: ObservableCreate.subscribeActual 所作的事情,調用 observer.onSubscribe ( MyObserver.onStart 方法 ),而後轉發給 MyEmitter.subscribe 來發射消息。
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
步驟3:MyEmitter 執行 onNext ,分派給具體實現類 CreateEmitter.onNext ,進而調用 observer.onNext 方法;
步驟4:MyEmitter 執行 onError ,分派給具體實現類 CreateEmitter.onError ,進而 調用 observer.onError 方法;若是 MyEmitter 發射 onComplete ,那麼就會分派給具體實現類 CreateEmitter.onComplete ,進而調用 observer.onComplete 方法。注意,onError 和 onComplete 二者只可能執行一個。
基本流程就是這樣。
除了訂閱自定義 Emitter 來發射消息,類 Observable 還提供了各類工具方法,更便捷滴作訂閱和推送。好比:
public static void testDirectSubscribe() { Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver()); }
會輸出:
MyObserver: Start Observed: "I" Observed: "Have" Observed: "a" Observed: "dream" MyObserver: Complete
具體實現是: fromArray 方法會建立一個 Observable 的具體類 ObservableFromArray,而這個類的 subscribeActual 方法會建立一個 FromArrayDisposable 來處理。FromArrayDisposable 的 run 方法被調用,依次遍歷所指定列表,調用 observer.onNext ,最後調用 observer.onComplete。具體源碼以下:
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } // other methods @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } }
那麼 Disposable 的意義何在呢 ? 個人理解是:它做爲訂閱完成的一個流程閉環。好比重複訂閱同一個觀察者,以下代碼:
public static void testDirectSubscribe() { Observer observer = new MyObserver(); Observable.fromArray("I", "Have", "a", "dream").subscribe(observer); Observable.fromArray("changed").subscribe(observer); }
會拋出異常:
io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead. at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148) at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57) at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70) at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34) at io.reactivex.Observable.subscribe(Observable.java:12284) at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34) at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)
這個異常是在調用 DefaultObserver.onSubscribe 拋出的:
@Override public final void onSubscribe(@NonNull Disposable d) { if (EndConsumerHelper.validate(this.upstream, d, getClass())) { this.upstream = d; onStart(); } } public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) { ObjectHelper.requireNonNull(next, "next is null"); if (upstream != null) { next.dispose(); if (upstream != DisposableHelper.DISPOSED) { reportDoubleSubscription(observer); } return false; } return true; }
這就是說,若是同一個觀察者,它的上一個 Disposable 訂閱沒有結束,那麼再次訂閱 Disposable 就會出錯。怎麼解決呢?能夠在 MyObserver 的 onError 和 onComplete 添加 super.cancel 調用,能夠結束上一次的訂閱,再次訂閱就不拋出異常了:
@Override public void onError(Throwable e) { System.out.println("Observed: " + e.getMessage()); super.cancel(); } @Override public void onComplete() { System.out.println("MyObserver: Complete"); super.cancel(); } /** * Cancels the upstream's disposable. */ protected final void cancel() { Disposable upstream = this.upstream; this.upstream = DisposableHelper.DISPOSED; upstream.dispose(); }
可是,即使這樣,也沒法發射咱們新的訂閱消息。這是由於上一次的 upstream 不爲 null,本次的訂閱就沒法發射。
咱們無法覆寫 DefaultObserver.onSubscribe 方法,由於該方法聲明爲 final 的,且 upstream 聲明爲 private ,也沒有公共方法能夠設置 upstream。這明確代表了設計者的意圖:這是 Observer 訂閱 Disposable 的前置檢測約定,不可被破壞,不然後果自負。
咱們能夠繞過 DefaultObserver , 不繼承它,而是直接實現 Observer 接口:
public static void testDirectSubscribe() { Observer observer = new RepeatedSubscribeMyObserver(); Observable.fromArray("I", "Have", "a", "dream").subscribe(observer); Observable.fromArray("changed").subscribe(observer); } /** * @Description 可重複訂閱的觀察者 * @Date 2021/1/24 10:11 上午 * @Created by qinshu */ public class RepeatedSubscribeMyObserver<T> implements Observer<T> { public Disposable upstream; @Override public void onSubscribe(@NonNull Disposable d){ System.out.println(getName() + ": Start"); this.upstream = d; } @Override public void onNext(T o) { System.out.println(getName() + ": " + JSON.toJSONString(o)); } @Override public void onError(Throwable e) { System.out.println(getName() + "RepeatedSubscribeMyObserver: " + e.getMessage()); cancel(); } @Override public void onComplete() { System.out.println(getName() + "RepeatedSubscribeMyObserver: Complete"); cancel(); } public String getName() { return this.getClass().getSimpleName(); } /** * Cancels the upstream's disposable. */ protected final void cancel() { Disposable upstream = this.upstream; this.upstream = DisposableHelper.DISPOSED; upstream.dispose(); } }
這樣就能夠實現屢次訂閱同一個 Observer 了。運行結果:
RepeatedSubscribeMyObserver: Start RepeatedSubscribeMyObserver: "I" RepeatedSubscribeMyObserver: "Have" RepeatedSubscribeMyObserver: "a" RepeatedSubscribeMyObserver: "dream" RepeatedSubscribeMyObserver: Complete RepeatedSubscribeMyObserver: Start RepeatedSubscribeMyObserver: "changed" RepeatedSubscribeMyObserver: Complete
弄懂了 Observable.fromArray 的實現原理,就弄清楚了 Observable 中不少基本方法的基本套路。好比 just 方法有兩個及以上參數時,實際上是 fromArray 的包裝,而 range 方法則是建立一個 RangeDisposable 來處理。
Observable.just(1,2,3).subscribe(observer); Observable.range(1,4).subscribe(observer);
上文談到了響應式編程的一大基本元素是函數式編程。函數式的優點是能夠無限疊加組合,構建出靈活多變的函數和行爲。這使得觀察者的行爲也能夠定製得更加靈活。能夠組合多個 Observable 的發射行爲。
合併
簡單的組合使用 merge 方法,構造一個 Observable 的列表,依次遍歷合併後的每一個 Observable 的發射信息:
Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet( Observable.fromArray(3,4,5), Observable.range(10,3) ); Observable.merge(observableSourceSet).subscribe(observer);
流式
Observable 能夠經過 Stream 進行組合,這裏就是函數式編程的用武之地了。以下代碼所示:
Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);
注意到,這裏使用到了裝飾器模式。filter 方法會建立一個 ObservableFilter 對象,而在這個對象裏,subscribeActual 方法會建立一個 FilterObserver 將傳入的 observer 裝飾起來。downstream 便是傳入的 observer。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> { final Predicate<? super T> predicate; public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer<? super T> observer) { source.subscribe(new FilterObserver<T>(observer, predicate)); // FilterObserver 裝飾了傳入的自定義的 observer } static final class FilterObserver<T> extends BasicFuseableObserver<T, T> { final Predicate<? super T> filter; FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) { super(actual); this.filter = filter; } @Override public void onNext(T t) { // 這裏對傳入的 Observer.onNext 作了個裝飾,僅當條件成立時才調用 if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { downstream.onNext(t); // downstream 便是咱們傳入的自定義的 Observer } } else { downstream.onNext(null); } } }
正如 filter 對發射數據流進行過濾,map 或 flatMap 則對發射數據流進行映射變換,與 stream.map 或 stream.flatMap 的功能相似:
Observable.range(1,10).map(x -> x*x).subscribe(observer); Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);
map 方法將建立一個 ObservableMap 對象,在 subscribeActual 中用 MapObserver 將所傳入的 observer 裝飾起來;flatMap 將建立一個 ObservableFlatMap 對象,在 subscribeActual 中 MergeObserver 將傳入的 observer 裝飾起來。
還可使用 scan:對於生成的每一個值,使用累加器 (x,y) -> x*y 生成新的值併發射。
Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);
最後再給個分組的示例:
Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver()); /** * @Description 可重複訂閱的分組觀察者 * @Date 2021/1/24 10:11 上午 * @Created by qinshu */ public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> { @Override public void onNext(GroupedObservable o) { o.subscribe(new RepeatedSubscribeMyObserver() { @Override public void onNext(Object v) { String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v)); System.out.println(info); } }); } }
groupBy 方法生成的是一個 GroupedObservable ,所以要訂閱一個 Observer
本文先寫到這裏。
項目代碼見工程: 「ALLIN」 的包 zzz.study.reactor 下。須要引入 Maven 依賴:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.20</version> </dependency>
本文講解了響應式編程及 RxJava 庫的最基本概念:Observable , Observer 及 Emitter, Disposable ,也講到了如何組合 Observable 來構建更靈活的消息發射機制。這些基本構成了響應式編程的基本骨架流程。
響應式編程的強大能力構建在事件驅動機制和函數式編程上,裏面大量應用了裝飾器模式。所以,熟悉這些基本編程思想,對掌握響應式編程模型亦大有裨益。