這一系列文章原本我發表在簡書。最近開始轉移到掘金。之後也會在掘金髮表(慢慢拋棄簡書了應該,掘金的技術環境確實比簡書好些)。javascript
上篇簡單講到了一些關於Event/Rx bus的優缺點。而且提到了如何「正確」使用RxJava,而不是使用RxBus來本身從新發明輪子。html
其中也講到了一個簡單使用 create() 方法來進行封裝Observable。但也留下了許多坑,好比內存泄漏,不能Multicast(多個Subscriber訂閱同一個Observable) 等問題。因此這篇,咱們接着經過這個例子,來具體瞭解下,如何封裝Observable。react
首先咱們來簡單看一下Observable的靜態方法,just/from/create都怎麼爲你提供Observable。
咱們先看just:編程
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}複製代碼
咱們暫時不須要糾結 RxJavaPlugins.onAssembly() 這個方法。比較重要的是 just(T item) 方法會爲你提供一個 ObservableJust
在 RxJava 2.x 中 Observable 是一個抽象類,只有一個抽象方法,subscribeActual(Observer observer);(可是Observable的源碼足足有13518行!!!)
public abstract class Observable<T> implements ObservableSource<T>{
//implemented methods
protected abstract void subscribeActual(Observer<? super T> observer);
//other implements/operators
}複製代碼
那麼ObservableJust這個類究竟什麼樣呢?app
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}複製代碼
咱們首先看到構造方法裏,直接把value賦給了ObservableJust的成員。這也就是爲何Observable.just()裏的代碼會直接運行,而不是像create()方法,有Subscriber時候才能運行(Observable.create
的初始化方法在subscribeAcutal
裏執行)。
再來看看兩個item的just(T item1,T item2):ide
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}複製代碼
誒?怎麼畫風突變?不是ObservableJust了?其實除了只有一個item的just,其餘的just方法也都是調用了這個fromArray。那咱們來看看這個fromArray:源碼分析
public static <T> Observable<T> fromArray(T... items) {
//NullCheck
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}複製代碼
前面一些check咱們忽略,這裏咱們發現一些熟悉的身影了ObservableFromArray
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
//implements
}
}複製代碼
是否是更熟悉?其實Observable幾乎全部的靜態方法和操做符都是這樣,甚至包括一些著名的RxJava庫好比RxBinding,也都是使用這種封裝方法。內部實現Observable的subscribeActual()方法。對外只提供靜態方法來爲你生成Observable。爲何這麼作,咱們來了解一下subscribeActual()方法。
subscribeActual()其實就是Observable和Observer溝通的橋樑。這個Observer(Subscriber)就是你在Observable.subscribe()方法裏寫的那個類,或者是Consumer(只處理onNext方法)。
public final void subscribe(Observer<? super T> observer) {
//NullCheck&Apply plugin
subscribeActual(observer);
}複製代碼
咱們看到其實這個方法除了Check和Apply就只有這一行subscribeActual(observer),鏈接了Observable和Observer。因此咱們知道了,subscribeActual()方法裏的代碼,只有在subscribe()調用後,纔回調用。
那麼他們是如何連接的呢?其實很簡單,根據你的邏輯一句一句的調用observer.onXX()方法就能夠了。好比剛纔咱們看到的ObservableJust:
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}複製代碼
再好比咱們的ObservableFromArray:
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}複製代碼
複雜點的例子,好比如何封裝button的OnClick事件:
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}
}複製代碼
可是細心的同窗應該看到了,每一個subscribeActual()方法裏,都會有 observer.onSubscribe(disposable)
這句。那麼這句又是作什麼的呢?根據Observable Contract,onSubscribe
是告知已經準備好接收item。並且經過這個方法將Disposable傳回給Subscriber。
Disposable其實就是控制你取消訂閱的。他只有兩個方法 dispose() 取消訂閱,和 isDisposed() 來通知是否已經取消了訂閱。
取消訂閱時,要根據需求釋放資源。
在subscribeActual()裏邏輯要嚴謹,好比onComplete()以後不要有onNext()。須要注意的點不少,因此可能這也就是爲何RxJava推薦用戶使用靜態方法生成Observable吧。若是有興趣,能夠直接閱讀
create()
方法是一個歷史遺留問題了。因爲這個命名,不少人都以爲Observable.create()
不就應該是生成Obseravble最早想到的方法嗎? 在 RxJava 1.x 這是錯誤的,Observable.create()
在 1.x 版本幾乎飽受詬病。不是他很差,而是他太難操控。 RxJava必定要遵循Observable Contract纔會按照預期執行,而使用create()
你能夠徹底無視這個規則。你能夠在onComplete以後繼續發送onNext事件,下游仍會收到事件。若是在1.x想正確的使用Observable.create()
你必須首先了解幾乎全部的規則。因此一直以來 RxJava 1.x 版本使用Observable.create
是不推薦的。(在新版的RxJava 1.3中,create()方法已經標記@deprecated
)
在經歷了1.x的失敗後,RxJava 2.x 提供了安全的create()
方法。他經過ObservableEmitter做爲中間人,代替處理。使得即使你在Emitter中沒有參照ObservableContract,下游仍會按照預期的進行。
咱們上文說到的just
,from
,create
等等是生成Observable的操做符,那麼如map
,filter
等等的操做符會有什麼區別嗎?
咱們來看下源碼:
map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}複製代碼
filter:
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}複製代碼
咱們看到,這個區別就是在生成新Observable的時候,會須要兩個參數,一個是這個Observable自己,也就是代碼中的this,另外一個就是須要進行操做的接口實現(固然也有更多參數的好比Schduler等等,大同小異,再也不贅述)。而這個Observable自己,也就是咱們口中常說的上游。上游下游是根據操做符的來講,對於一個操做符,在這個操做符以前的就是上游,而這個操做符以後的就是下游。
好比咱們的map:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}複製代碼
source就是咱們的上游。而這個MapObserver就是咱們的中間人(其實也算是操做符自己),將數據根據需求,處理後發給下游。
操做符原理很是複雜,map能夠說是最簡單的了。若是有興趣我推薦能夠看一下publish(selector)
等等複雜的操做符。更深刻理解操做符。固然,有毅力的同窗也能夠關注RxJava 主要負責人的系列博客(純英文,並且很難懂,不是英語難懂,是原理很難懂)。
讀過扔物線大神文章入門的同窗應該對lift
有一個瞭解。RxJava 1.x 幾乎全部操做符都是基於lift
完成的。可是RxJava 2.x 能夠說幾乎看不到lift
。 目前lift
僅僅做爲提供自定義操做符的一個接口(雖然更推薦使用簡單好用的compose
,由於lift
須要複寫七個抽象方法。)。
最後再說一下幾點:
其實 Reactive Programming在Java上的實現不止 RxJava 一個。比較出名的還有Project Reactor和 google 的 agera 等等。 可是綜合考慮,不管是性能,擴展性上RxJava在Android平臺上是最優秀的。 因爲都在JVM上,你們都決定統一接口因此推出 Reactive Streams定義了這一套的幾個基本接口:
包括了 :
//對應RxJava中的Flowable
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
//RxJava並無直接對應,而是各類形式的實現類。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
//同上,RxJava在flowable中直接使用Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
//Flowable版本的Subject
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}複製代碼
正由於這四個接口的命名關係。本在RxJava 1.x 的Observable更名爲Flowable。而RxJava 2.x的 Observable是徹底沒有backpressure支持。由於起名衝突的緣由,將原本的Subscription改成Disposable,Subscriber改成Observer。