以前寫RxJava相關文章的時候,就有人想讓我談談RxJava2.0的新特性,說實話,一開始我是拒絕的。由於在我看來,RxJava2.0雖然是版本的重大升級,但總歸仍是RxJava,升級一個版本還能上天是咋的?瞭解一下它的更新文檔不就行了麼?真的有必要單出一篇文章來談這個麼?javascript
可是詳細的瞭解了RxJava2.0以及部分源碼以後,我以爲仍是有必要對RxJava2.0作一個說明,幫助你們對於RxJava有更好的認識。java
假如你還不是很熟悉RxJava,或者對於背壓這個概念(2.0更新中會涉及到背壓的概念)很模糊,但願你也能讀一讀下面兩篇鋪墊的文章:react
關於背壓的那篇文章原本是本文的一部分,由於篇幅過大,被剝離出去了,因此建議你們有時間也一併閱讀。android
RxJava2.0有不少的更新,一些改動甚至衝擊了我以前的文章裏的內容,這也是我想寫這篇文章的緣由之一。不過想要寫這篇文章其實也是有難度的,由於相關的資料去實際上是不多的,還得本身硬着頭皮上....不過俗話說得好,有困難要上,沒有困難創造困難也要上。git
在這裏,我會按照咱們以前關於RxJava的文章的講述順序:觀察者模式,操做符,線程調度,這三個方面依次看有哪些更新。github
這個估計得放在最前面。app
Android端使用RxJava須要依賴新的包名:異步
//RxJava的依賴包(我使用的最新版本)
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
//RxAndroid的依賴包
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'複製代碼
首先聲明,RxJava以觀察者模式爲骨架,在2.0中依然如此。ide
不過這次更新中,出現了兩種觀察者模式:post
RxJava2.X中,Observeable用於訂閱Observer,是不支持背壓的,而Flowable用於訂閱Subscriber,是支持背壓(Backpressure)的。
關於背壓這個概念以及它在1.0版本中的缺憾在上一篇文章中我已經介紹到了,若是你不是很清楚,我在這裏在作一個介紹:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略,在1.0中,關於背壓最大的遺憾,就是集中在Observable這個類中,致使有的Observable支持背壓,有的不支持。爲了解決這種缺憾,新版本把支持背壓和不支持背壓的Observable區分開來。
Observable正經常使用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//這是新加入的方法,在訂閱後發送數據以前,
//回首先調用這個方法,而Disposable可用於取消訂閱
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);複製代碼
這種觀察者模型是不支持背壓的。
啥叫不支持背壓呢?
當被觀察者快速發送大量數據時,下游不會作其餘處理,即便數據大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大隻會OOM
我在測試的時候,快速發送了100000個整形數據,下游延遲接收,結果被觀察者的數據所有發送出去了,內存確實明顯增長了,遺憾的是沒有OOM。
因此,當咱們使用Observable/Observer的時候,咱們須要考慮的是,數據量是否是很大(官方給出以1000個事件爲分界線,僅供各位參考)
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//當訂閱後,會首先調用這個方法,其實就至關於onStart(),
//傳入的Subscription s參數能夠用於請求數據或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});複製代碼
輸出以下:
onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->9
onComplete
onsubscribe end複製代碼
Flowable是支持背壓的,也就是說,通常而言,上游的被觀察者會響應下游觀察者的數據請求,下游調用request(n)來告訴上游發送多少個數據。這樣避免了大量數據堆積在調用鏈上,使內存一直處於較低水平。
固然,Flowable也能夠經過creat()來建立:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//須要指定背壓策略
, BackpressureStrategy.BUFFER);複製代碼
Flowable雖然能夠經過create()來建立,可是你必須指定背壓的策略,以保證你建立的Flowable是支持背壓的(這個在1.0的時候就很難保證,能夠說RxJava2.0收緊了create()的權限)。
根據上面的代碼的結果輸出中能夠看到,當咱們調用subscription.request(n)方法的時候,不等onSubscribe()中後面的代碼執行,就會馬上執行到onNext方法,所以,若是你在onNext方法中使用到須要初始化的類時,應當儘可能在subscription.request(n)這個方法調用以前作好初始化的工做;
固然,這也不是絕對的,我在測試的時候發現,經過create()自定義Flowable的時候,即便調用了subscription.request(n)方法,也會等onSubscribe()方法中後面的代碼都執行完以後,纔開始調用onNext。
TIPS: 儘量確保在request()以前已經完成了全部的初始化工做,不然就有空指針的風險。
固然,除了上面這兩種觀察者,還有一類觀察者
其實這三者都差很少,Maybe/MaybeObserver能夠說是前二者的複合體,所以以Maybe/MaybeObserver爲例簡單介紹一下這種觀察者模式的用法
//判斷是否登錄
Maybe.just(isLogin())
//可能涉及到IO操做,放在子線程
.subscribeOn(Schedulers.newThread())
//取回結果傳到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});複製代碼
上面就是Maybe/MaybeObserver的普通用法,你能夠看到,實際上,這種觀察者模式並不用於發送大量數據,而是發送單個數據,也就是說,當你只想要某個事件的結果(true or false)的時候,你能夠用這種觀察者模式
這是上面那些被觀察者的上層接口:
//Observable接口
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}複製代碼
其實咱們能夠看到,每一種觀察者都繼承自各自的接口,這也就把他們能徹底的區分開,各自獨立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操做符不會相互影響。
例如flatMap操做符實現:
//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);複製代碼
假如你想爲Flowable寫一個自定義的操做符,那麼只要保證Function< Publisher >中的類型實現了Publisher接口便可。這麼說可能很抽象,你們不理解其實也不要緊,由於並不推薦你們自定義操做符,RxJava中的操縱符的組合已經能夠知足你們的需求了。
固然,你也會注意到上面那些接口中的subscribe()方法的返回類型爲void了,在1.X中,這個方法通常會返回一個Subscription對象,用於取消訂閱。如今,這個功能的對象已經被放到觀察者Observer或者subscriber的內部實現方法中了,
Flowable/Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}複製代碼
上面的實例中,onSubscribe(Subscription s)傳入的參數s就肩負着取消訂閱的功能,固然,他也能夠用於請求上游的數據。
在Observable/observer中,傳入的參數是另外一個對象
Observable/Observer
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
public interface Disposable {
/** * Dispose the resource, the operation should be idempotent. */
void dispose();
/** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */
boolean isDisposed();
}複製代碼
在Observer接口中,onSubscribe(Disposable d)方法傳入的Disposable也是用於取消訂閱,基本功能是差很少的,只不過命名不一致,你們知道就好。
其實這種設計能夠說仍是符合邏輯的,由於取消訂閱這個動做就只有觀察者(Observer等)才能作的,如今把它併入到觀察者內部,也算瓜熟蒂落吧。
最後再提一點更新,就是被觀察者再也不接收null做爲數據源了。
這一塊其實能夠說沒什麼改動,大部分以前你用過的操做符都沒變,即便有所變更,也只是包名或類名的改動。你們可能常常用到的就是Action和Function。
以前我在文章裏介紹過關於Action這類接口,在1.0中,這類接口是從Action0,Action1...日後排(數字表明可接受的參數),如今作出了改動
Rx1.0-----------Rx2.0
Action0--------Action
Action1--------Consumer
Action2--------BiConsumer
後面的Action都去掉了,只保留了ActionN
同上,也是命名方式的改變
上面那兩個類,和RxJava1.0相比,他們都增長了throws Exception,也就是說,在這些方法作某些操做就不須要try-catch。
例如:
Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);複製代碼
Files.readLines(name)這類io方法原本是須要try-catch的,如今直接拋出異常,就能夠放心的使用lambda ,保證代碼的簡潔優美。
以doOnCancel爲例,大概就是當取消訂閱時,會調用這個方法,例如:
Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);複製代碼
take新操符會取消後面那些還未被髮送的事件,於是會觸發doOnCancel
其餘的一些操做符基本沒變,或者只是改變了名字,在這裏就不一一介紹了,須要提一下的是,不少操做符都有兩套,一套用於Observable,一套用於Flowable。
能夠說這一起基本也沒有改動,若是必定要說的話。
若是你想把本身的RxJava1.0的遷移到2.0的版本,可使用這個庫RxJava2Interop,它能夠在Rxjava1.0和2.0之間相互轉換,也就是說,不只能夠把1.0的代碼遷移到2.0,你還能夠把2.0的代碼遷移到1.0,哈哈。
在RxJava1.0中,有的人會使用CompositeSubscription來收集Subscription,來統一取消訂閱,如今在RxJava2.0中,因爲subscribe()方法如今返回void,那怎麼辦呢?
其實在RxJava2.0中,Flowable提供了subscribeWith這個方法能夠返回當前訂閱的觀察者,而且經過ResourceSubscriber DisposableSubscriber等觀察者來提供 Disposable的接口。
因此,若是想要達成RxJava1.0的效果,如今應該是這樣作:
CompositeDisposable composite = new CompositeDisposable();
composite.add(Flowable.range(1, 8).subscribeWith(subscriber));
這個subscriber 應該是 ResourceSubscriber 或者 DisposableSubscriber 的實例。
其實從整篇文章的分析來看,改動最大的仍是觀察者模式的實現,被拆分和細化了,主要分紅了Observable和Flowable兩大類,固然還有與之相關聯的其餘變更,整體來看這一版本能夠說是對於觀察者和被觀察者的重構。
RxJava2.0的範例代碼我沒精力去寫了,也正巧有位外國朋友已經寫了RxJava2.0的demo,下面是他的項目地址:
固然,學習2.0 的過程當中有什麼問題也能夠在這裏留言討論。
這篇文章半個月前就開始寫了,可是一直不太滿意,因此在草稿箱裏躺了好久,可是本着不放棄任何一篇落後文章的信念,仍是振做起來,完成關於RxJava2.0的介紹。你可能不信,寫完頓時有了一種解(xu)脫的感受。
身體被掏空...
下面我截圖展現一下2.0相對於1.0的一些改動的細節,僅作參考。
其實這些都是官方給出的列表,截圖在這裏只是方便你們觀摩。