這一系列文章原本我發表在簡書。最近開始轉移到掘金。之後也會在掘金髮表(慢慢拋棄簡書了應該,掘金的技術環境確實比簡書好些)。javascript
前言: 不少朋友誤會我文章的意思。我寫這個系列文章的意思主要是幫助瞭解一下RxJava的常見用法。而不是使用一下本身或別人封裝好的RxBus就以爲本身的項目使用RxJava了。可是這也僅僅是我的口味問題,不少狀況下確實RxBus/EventBus會很方便,很刺激,很上癮。因此從這篇文章開始,我把標題中的"放棄RxBus"去除。java
不管在簡書,微信平臺,GitHub,掘金等等分享平臺。一個名字上寫着 "MVP(MVVM) + RxJava + Retrofit + Dagger2 + ........"這樣的名字,再熟悉不過了。然而,大多數狀況進去看一下RxJava部分。要麼就是簡單的把取到的數據用Observable.just()
直接傳給下一層,要麼就是直接使用Retrofit的Adapter來直接得到Observable,而app中其餘部分並無reactive。並且還有不少Observable用法錯誤,好比冷熱不分,連續太多的Map/FlatMap等等。react
爲何不用RxBus我已經寫了兩篇文章了,可能因爲我不常寫文,不少人並無理解。在這裏我再解釋一次:EventBus若是是一輛穿梭在全部代碼之間的公交車。那麼Observable就是穿梭在少量人之間的Uber專車。他比起EventBus有不少優點,好比類型安全,異常處理,線程切換,強大的操做符等等。你固然能夠作出一輛超級Uber來當全局公交車(RxBus)使用,然而這卻損失了RxJava原本的許多優點,而且又給本身挖了許多坑,得不償失。android
剛開始使用RxJava的時候,咱們會以爲operator的鏈式調用會很是的爽,一個簡單的例子:git
Observable.just("1", "2", "3", "4", "5", "6", "7")
.map(x -> Integer.valueOf(x))
.map(x -> x * 2)
.map(x -> x + 4)
.filter(x -> x >2)
// and much more operators
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());複製代碼
當你只有不多數據的時候,這樣固然能夠,可是你數據量上來的時候,這就會有不少的overhead。 其實幾乎全部的operator都會給你生成一個新的Observable。因此在上面這個例子中,咱們在過程當中生成了至少7個Observable。然而咱們徹底能夠將中間的.map().map().map().filter合併在一個FlatMap中,減小不少的overhead。github
Observable.just()
即便你沒有調用subscribe方法。just()括號裏面的代碼也已經執行了。顯然,Observable.just()
不適合封裝網絡數據,由於咱們一般不想在subscribe以前作網絡請求。class TestClass{
TestClass(){
System.out.println("I'm created!");
}
}
Observable.just(new TestClass());複製代碼
這時你運行代碼,你就看到確實你的TestClass 已經被建立了:I/System.out: I'm created!複製代碼
同理,fromIterable也和just有一樣的缺點。固然,這個能夠簡單的用defer()
/fromCallable()
/create()
操做符來是實現只有subscribe只有才加載。// use fromCallable
Observable.fromCallable(TestClass::new);
//or
Observable.defer(() -> Observable.just(new TestClass()));複製代碼
固然ObservableJust在不少狀況下,確實不錯。若是你不須要監聽後續的更新,那麼ObservableJust能夠知足你的需求。設計模式
這部分是本篇文章的重點!緩存
不少人在封裝數據的時候,並無太多考慮冷熱的問題,一般狀況下並不會出錯。由於目前不少開源項目(Demo)裏除了RxBus,並無太多的RxJava的實時狀況。然而,當你的App愈來愈Reactive的時候,冷熱即是一個必須考慮的問題。
Hot Observable 意思是若是他開始傳輸數據,你不主動喊停(dispose()
/cancel()
),那麼他就不會停,一直髮射數據,即便他已經沒有Subscriber了。而Cold Observable則是subscribe時纔會發射數據。
然而,問題來了。我上篇文章講過,只有subscribeActual方法調用了的時候,Observable發射數據,那爲何Hot Observable沒有Subscriber也會發射數據,他把數據發射給誰了呢?咱們在解決這個問題以前,先看一下Cold Observable:安全
咱們常見的工廠方法提供的都是ColdObservable,包括just()
,fromXX
,create()
,interval()
,defer()
。 他們的共同點是當你有多個Subscriber的時候,他們的事件是獨立的,舉個例子:微信
Observable interval = Observable.interval(1,TimeUnit.SECONDS);複製代碼
若是咱們有兩個subscriber,那麼他們會各自有本身的計時器,而且互不干擾。效果以下圖:
不一樣於Cold Observable, Hot Observable是共享數據的。對於Hot Observable的全部subscriber,他們會在同一時刻收到相同的數據。咱們一般使用publish()
操做符來將ColdObservable變爲Hot。或者咱們在RxBus中經常用到的Subjects
也是Hot Observable。
剛剛咱們剛剛提出了一個問題,
既然Hot Observable在沒有subscriber的時候,還會繼續發送數據,那麼數據究竟發給誰了呢?
其實Hot Observable其實並無發送數據,而是他上層的Observable 發送數據給這個hot Observable。不信?咱們來分別看一下:
咱們在上面的誤區中知道了,幾乎全部operator都會生成一個新的Observable。publish固然不例外。可是有區別的是,publish會給你一個ConnectableObservable。具體實現類是ObservablePublish。這個Observable的區別是他提供一個connect()
方法,若是你調用connect()
方法,ConnectableObservable就會開始接收上游Observable的數據。咱們來測試一下:
ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();複製代碼
果真,因爲咱們subscribe晚了一些。0這個數據沒有收到,當咱們兩個 Subscriber
都dispose的時候,ConnectableObservable
也仍在接受數據,致使咱們6這個數據沒有接收到。ConnectableObservable
其實在內部,有一個PublishObserver
,他有兩個做用。一個是當咱們調用 connect()
方法時, PublishObserver
開始接受上游的數據,咱們的例子裏即是 Observable.interval(1, TimeUnit.SECONDS)
。因此才能在咱們沒有調用 subscribe
方法時,他也能開始發送數據。第二個做用是 PublishObserver
存儲全部的下游Subscriber, 也就是咱們例子中的Subscriber1 和Subscriber2,在 PublishObserver
每次接到一個上游數據,就會將接收到的結果,依次分發給他存儲的全部 Subscribers
,若是下游 Subscriber
調用了 dispose
方法,那麼他就會在本身的緩存中刪除這個 Subscriber,下次接受到上游數據便不會傳給這個Subscriber
。
那麼這時候,有同窗應該要問了:
咱們可不能夠中止從上游接受數據?
咱們固然能夠。 connect()
方法會返回一個 Disposable 給咱們來控制是否繼續接受上游的數據。
咱們固然不但願每次都手動控制 ConnectableObservable
的開關。RxJava給咱們提供了一些經常使用的控制操做符
refCount()
能夠說是最經常使用的操做符了。他會把 ConnectableObservable
變爲一個一般的Observable但又保持了HotObservable的特性。也就是說,若是出現第一個Subscriber,他就會自動調用 connect()
方法,若是他開始接受以後,下游的 Subscribers
所有dispose,那麼他也會中止接受上游的數據。具體看圖: 每一個 Subscriber
每次都會接受一樣的數據,可是當全部 subscriber
都 dispose時候,他也會自動dipose上游的 Observable
。因此咱們從新subscribe的時候,又從新從0開始。
這個操做符經常使用到,RxJava將他和publish合併爲一個操做符 :share()
。
autoConnect()
看名字就知道,他會自動連接,若是你單純調用 autoConnect()
,那麼,他會在你連接第一個 Subscriber
的時候調用 connect()
,或者你調用 autoConnect(int Num)
,那麼他將會再收到Num個 subscriber
的時候連接。Disposable
來控制上游的開關。 不過沒問題,autoConnect提供了另外一種重載方法 :autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
Consumer
傳給你 你須要的那個總開關。並且,autoConnect並不會autoDisconnect, 也就是若是他即便沒有subscriber了。他也會繼續接受數據。replay()replay()
方法和 publish()
同樣,會返回一個 ConnectableObservable
,區別是, replay()
會爲新的subscriber重放他以前所收到的上游數據,咱們再來舉個例子:
//only replay 3 values
Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();複製代碼
ReplayingShare()
其實ReplayingShare並不能算是ConnectableObservable的一個操做符,他是JakeWhaton的一個開源庫,只有百來行。實現的功能是幾乎和replay(1).refCount()
差很少。可是若是中斷 Conncection以後,從新開始subscribe,他仍然會給你一個重放他上一次的結果。 具體看圖:
Subjects 做爲一個Reactive世界中的特殊存在,他特殊在於他本身既是一個Observable又是一個Observer(Subscriber)。你既能夠像普通Observable同樣讓別的Subscriber來訂閱,也能夠用Subjects來訂閱別人。更方便的是他甚至暴露了OnXX(),方法給你。你直接調用能夠通知全部的Subscriber。 這也是RxBus的基礎,RxBus幾乎離不開Subjects。 蜘蛛俠的老爹告訴咱們,力量越大,責任就也大。Subjects也同樣。 Subjects由於暴露了OnXX()方法,使得Subjects的數據來源變得難以控制。並且,Subjects一直是HotObservable,咱們來看下Subject的OnNext()
方法的實現:
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}複製代碼
能夠看出來Subjects只要調用了OnNext()
方法就會當即發送數據。因此,使用時必定要注意Subjects和Subscriber的連接時序問題。具體Subjects的用法我想介紹帖子已經足夠多了。這裏就不贅述了。
View 的各類Listener 咱們經常使用create方法來封裝,好比OnClickListener:
Observable.create(emitter -> {
button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
emitter.setCancellable(() -> button.setOnClickListener(null));
});複製代碼
這裏很是關鍵的一點是必定要設置解除綁定,不然你將持續使用這個會形成內存泄漏。並且最好配合使用share()。不然只有最後一個Subscriber能收到OnClick。固然,若是不考慮方法數的話,推薦配合使用RxBinding。
並且,用create()方法封裝Listener適合幾乎全部的callback, 而且安全。
設想一個場景,咱們有一個User類。裏面有咱們的用戶名,頭像,各類信息。然而在咱們的app中,可能有三四個Fragment/Activity須要根據這個User作出不一樣的反應。這時咱們就能夠簡單的使用Subject來封裝User類。
public class UserRepository {
private User actualUser;
private Subject<User> subject = ReplaySubject.createWithSize(1);
/** * *Get User Data from wherever you want Network/Database etc */
public Observable<User> getUpdate(){
return subject;
}
public void updateUser(User user){
actualUser = user;
subject.onNext(actualUser);
}
}複製代碼
若是咱們某些模塊須要這個User,那麼只須要subscribe到這個Repository,若是User有更新,每個Subscriber都會收到更新後的User而且互相不影響。並且咱們使用ReplaySubject,即便有新的Subscriber,也會收到最新的一個User的緩存。
可是使用的時候必定要注意,由於用的是Subject.因此在onNext方法中一旦出現了error。那麼全部的Subscriber都將和這個subject斷開了連接。這裏也能夠用 RxRelay 代替Subject,簡單來講Relay就是一個沒有onError和onComplete的Subject。
Dan Lew在他的博客Loading data from multiple sources with RxJava 中介紹過他這種處理方法,
// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;
// Retrieve the first source with data
Observable<Data> source = Observable
.concat(memory, disk, network)
.first();複製代碼
而後在每次作不一樣請求的時候刷新緩存
Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});複製代碼
具體也能夠看這篇簡書,我也不在過多贅述 :RxJava(八)concat符操做處理多數據源
這裏也說一下這個方法的缺點。 首先,這個只適合一個Item的時候。若是咱們有多個Item從這個Observable中流出。 fisrt()
操做符只會取第一個。
這多是最靈活的寫法?若是你想用RxJava封裝本身的庫,推薦這種方法封裝。由於這樣不只僅能夠有效的進行錯誤處理,而且不會暴露過多邏輯給外面,許多優秀的RxJava相關庫都是這樣封裝,就連RxJava本身也是把一個個的operator封裝成一個個不一樣的Observable。可是這種方法確實要求很高,要作不少考慮,好比異步,多線程衝突,錯誤處理。對新手不是很推薦。
仍是稍微講一個例子: 仍是咱們的onClickLisnter的封裝:
RxBinding 的 封裝:
final class ViewClickObservable extends Observable<Object> {
private final View view;
ViewClickObservable(View view) {
this.view = view;
}
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}
}複製代碼
其實這裏雖然代碼更多,可是實質上是剛纔咱們說到的用Observable.create()
來封裝沒有不少區別。那咱們爲何還要這麼麻煩本身寫Observable? 由於與create相比,減小了對象個數。 Observable封裝一個OnClickListener 須要 ObservableCreate
, ObservableOnSubscribe
,ObservableEmitter
這三個類的實例。來確保你封裝出來的Observable 是遵照 Observable Contract的。 而若是你默認本身遵照Observable Contract, 你只須要一個 CustomObservable
來實現。減小了兩個對象的生成。 這個觀點也獲得了證明,我在StackOverFlow問過相關問題。很幸運獲得了RxJava 2.x 做者 David 的回覆:
若是直接繼承與Observable來封裝。大概分以下幾步:
ViewClickObservable(View view) {
this.view = view;
}複製代碼
將你須要使用的Listener/CallBack 和 Disposable/Observer接口結合成爲一個實現類,實現監聽。而且經過構造方法,將下游的Observer傳入,以實現傳輸數據。好比OnClickListener:
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}複製代碼
這裏推薦使用內部類的形式,下降內部可見性。並且這裏須要注意的點很是多。
首先Disposable的 兩個方法都須要實現,isDisposed()通常使用AtomicBoolean來控制監聽是否已經取消訂閱,好比:
private final AtomicBoolean unsubscribed = new AtomicBoolean();
@Override
public final boolean isDisposed() {
return unsubscribed.get();
}複製代碼
dispose()方法通常會放一些取消監聽等方法。好比咱們上面看到的view.setOnClickListener(null);
。 這裏 onDispose
正常是沒有這個方法的。 這個是Jake Wharton爲了方便封裝出來的接口,放在dispose方法裏運行的。
public abstract class MainThreadDisposable implements Disposable {
@Override
public final void dispose() {
onDispose();
}
protected abstract void onDispose();
}複製代碼
他固然還在dipose方法裏作了其餘安全檢查,消除了一些Boilerplate。
其次,這裏的對於Observer的控制是十分寬鬆的。因此你的行爲最好必定遵循Observable Contract。不然出現其餘問題,好比下游取消訂閱上游還在發送數據。 又或者onComplete/onError以後還有onNext出發。 又或者出現異常並不會在onError中獲得等等。 這裏推薦看一下RxBinding其餘實現類的源碼。或者是Retrofit RxJava 2 的 Adapter。 都會頗有幫助。
最後,若是你是封裝一個生產Observable的方法,那麼使用Disposable。若是你是想封裝一個自定義Operator。那麼須要實現Observer接口。使用這個observer來監聽上游數據。自定義Operator實在複雜。這裏我就不講了,我本身也沒精通這個。
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}複製代碼
看了我第二篇文章的朋友這裏應該對subscribeActual方法不陌生。這裏的參數是下游的observer。將他直接傳入咱們剛纔設計好的Listener/Disposable/Observer。 而後經過observer.onSubscribe註冊咱們的disposable。 而後咱們開始註冊監聽。這裏順序很重要。必定要先調用onSubscribe方法再註冊監聽。不然可能會出現下游disposable空指針異常。
這篇文章在簡書上也發了有一陣子了。轉到掘金時我又從新檢查了下,補充了一些內容。可能有些地方看起來與我以前的文章有些許重複。但願見諒。