給初學者的 RxJava2.0 教程 (九)

Outlinejava

[TOC]react

前言

很久不見朋友們,最近一段時間在忙工做上的事情,沒來得及寫文章,這兩天正好有點時間,趕忙寫下了這篇教程,省得你們說我太監了。android

正題

先來回顧一下上上節,咱們講Flowable的時候,說它採用了響應式拉的方式,咱們還舉了個葉問打小日本的例子,再來回顧一下吧,咱們說把上游當作小日本, 把下游看成葉問, 當調用Subscription.request(1)時, 葉問就說我要打一個! 而後小日本就拿出一個鬼子給葉問, 讓他打, 等葉問打死這個鬼子以後, 再次調用request(10), 葉問就又說我要打十個! 而後小日本又派出十個鬼子給葉問, 而後就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子後再繼續要鬼子接着打。app

可是不知道你們有沒有發現,在咱們前兩節中的例子中,咱們口中聲稱的響應式拉並無徹底體現出來,好比這個例子:異步

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;  
                        s.request(1);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                        mSubscription.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

雖然咱們在下游中是每次處理掉了一個事件以後才調用request(1)去請求下一個事件,也就是說葉問的確是在打死了一個鬼子以後才繼續打下一個鬼子,但是上游呢?上游真的是每次當下遊請求一個纔拿出一個嗎?從上上篇文章中咱們知道並非這樣的,上游仍然是一開始就發送了全部的事件,也就是說小日本並無等葉問打死一個纔拿出一個,而是一開始就拿出了全部的鬼子,這些鬼子從一開始就在這兒排隊等着被打死。ide

有個故事是這麼說的:oop

楚人有賣盾與矛者,先譽其盾之堅,曰:「吾盾之堅,物莫能陷也。」俄而又譽其矛之利,曰:「吾矛之利,萬物莫不陷也。」市人詰之曰:"以子之矛陷子之盾,何如?」其人弗能應也。衆皆笑之。學習

沒錯,咱們先後所說的就是自相矛盾了,這說明了什麼呢,說明咱們的實現並非一個完整的實現,那麼,究竟怎樣的實現纔是完整的呢?spa

咱們先本身來想想,在下游中調用Subscription.request(n)就能夠告訴上游,下游可以處理多少個事件,那麼上游要根據下游的處理能力正確的去發送事件,那麼上游是否是應該知道下游的處理能力是多少啊,對吧,否則,一個巴掌拍不響啊,這種事情得你情我願才行。線程

那麼上游從哪裏得知下游的處理能力呢?咱們來看看上游最重要的部分,確定就是FlowableEmitter了啊,咱們就是經過它來發送事件的啊,來看看它的源碼吧(別緊張,它的代碼灰常簡單):

public interface FlowableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable s);
    void setCancellable(Cancellable c);

    /** * The current outstanding request amount. * <p>This method is thread-safe. * @return the current outstanding request amount */
    long requested();

    boolean isCancelled();
    FlowableEmitter<T> serialize();
}複製代碼

FlowableEmitter是個接口,繼承Emitter,Emitter裏面就是咱們的onNext(),onComplete()和onError()三個方法。咱們看到FlowableEmitter中有這麼一個方法:

long requested();複製代碼

方法註釋的意思就是當前外部請求的數量,哇哦,這好像就是咱們要找的答案呢. 咱們仍是實際驗證一下吧.

先來看同步的狀況吧:

public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

這個例子中,咱們在上游中打印出當前的request數量,下游什麼也不作。

咱們先猜想一下結果,下游沒有調用request(),說明當前下游的處理能力爲0,那麼上游獲得的requested也應該是0,是否是呢?

來看看運行結果:

D/TAG: onSubscribe
D/TAG: current requested: 0複製代碼

哈哈,結果果真是0,說明咱們的結論基本上是對的。

那下游要是調用了request()呢,來看看:

public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10); //我要打十個!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

此次在下游中調用了request(10),告訴上游我要打十個,看看運行結果:

D/TAG: onSubscribe
D/TAG: current requested: 10複製代碼

果真!上游的requested的確是根據下游的請求來決定的,那要是下游屢次請求呢?好比這樣:

public static void demo1() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10);  //我要打十個!
                        s.request(100); //再給我一百個!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

下游先調用了request(10), 而後又調用了request(100),來看看運行結果:

D/TAG: onSubscribe
D/TAG: current requested: 110複製代碼

看來屢次調用也沒問題,作了加法

誒加法?對哦,只是作加法,那何時作減法呢?

固然是發送事件啦!

來看個例子吧:

public static void demo2() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "before emit, requested = " + emitter.requested());

                        Log.d(TAG, "emit 1");
                        emitter.onNext(1);
                        Log.d(TAG, "after emit 1, requested = " + emitter.requested());

                        Log.d(TAG, "emit 2");
                        emitter.onNext(2);
                        Log.d(TAG, "after emit 2, requested = " + emitter.requested());

                        Log.d(TAG, "emit 3");
                        emitter.onNext(3);
                        Log.d(TAG, "after emit 3, requested = " + emitter.requested());

                        Log.d(TAG, "emit complete");
                        emitter.onComplete();

                        Log.d(TAG, "after emit complete, requested = " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(10);  //request 10
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

代碼很簡單,來看看運行結果:

D/TAG: onSubscribe                        
D/TAG: before emit, requested = 10        
D/TAG: emit 1                             
D/TAG: onNext: 1                          
D/TAG: after emit 1, requested = 9        
D/TAG: emit 2                             
D/TAG: onNext: 2                          
D/TAG: after emit 2, requested = 8        
D/TAG: emit 3                             
D/TAG: onNext: 3                          
D/TAG: after emit 3, requested = 7        
D/TAG: emit complete                      
D/TAG: onComplete                         
D/TAG: after emit complete, requested = 7複製代碼

你們應該能看出端倪了吧,下游調用request(n) 告訴上游它的處理能力,上游每發送一個next事件以後,requested就減一,注意是next事件,complete和error事件不會消耗requested,當減到0時,則表明下游沒有處理能力了,這個時候你若是繼續發送事件,會發生什麼後果呢?固然是MissingBackpressureException啦,試一試:

public static void demo2() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "before emit, requested = " + emitter.requested());

                        Log.d(TAG, "emit 1");
                        emitter.onNext(1);
                        Log.d(TAG, "after emit 1, requested = " + emitter.requested());

                        Log.d(TAG, "emit 2");
                        emitter.onNext(2);
                        Log.d(TAG, "after emit 2, requested = " + emitter.requested());

                        Log.d(TAG, "emit 3");
                        emitter.onNext(3);
                        Log.d(TAG, "after emit 3, requested = " + emitter.requested());

                        Log.d(TAG, "emit complete");
                        emitter.onComplete();

                        Log.d(TAG, "after emit complete, requested = " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(2);   //request 2
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

仍是這個例子,只不過此次只request(2), 看看運行結果:

D/TAG: onSubscribe
 D/TAG: before emit, requested = 2
 D/TAG: emit 1
 D/TAG: onNext: 1
 D/TAG: after emit 1, requested = 1
 D/TAG: emit 2
 D/TAG: onNext: 2
 D/TAG: after emit 2, requested = 0
 D/TAG: emit 3
 W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                 at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                 at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                 at zlc.season.rxjava2demo.demo.ChapterNine$4.subscribe(ChapterNine.java:80)
                 at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                 at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                 at zlc.season.rxjava2demo.demo.ChapterNine.demo2(ChapterNine.java:89)
                 at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
                 at android.view.View.performClick(View.java:4780)
                 at android.view.View$PerformClick.run(View.java:19866)
                 at android.os.Handler.handleCallback(Handler.java:739)
                 at android.os.Handler.dispatchMessage(Handler.java:95)
                 at android.os.Looper.loop(Looper.java:135)
                 at android.app.ActivityThread.main(ActivityThread.java:5254)
                 at java.lang.reflect.Method.invoke(Native Method)
                 at java.lang.reflect.Method.invoke(Method.java:372)
                 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
                 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
 D/TAG: after emit 3, requested = 0
 D/TAG: emit complete
 D/TAG: after emit complete, requested = 0複製代碼

到目前爲止咱們一直在說同步的訂閱,如今同步說完了,咱們先用一張圖來總結一下同步的狀況:

同步request.png

這張圖的意思就是當上下游在同一個線程中的時候,在下游調用request(n)就會直接改變上游中的requested的值,屢次調用便會疊加這個值,而上游每發送一個事件以後便會去減小這個值,當這個值減小至0的時候,繼續發送事件便會拋異常了。

咱們再來講說異步的狀況,異步和同步會有區別嗎?會有什麼區別呢?帶着這個疑問咱們繼續來探究。

一樣的先來看一個基本的例子:

public static void demo3() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

此次是異步的狀況,上游啥也不作,下游也啥也不作,來看看運行結果:

D/TAG: onSubscribe
D/TAG: current requested: 128複製代碼

哈哈,又是128,看了我前幾篇文章的朋友確定很熟悉這個數字啊!這個數字爲何和咱們以前所說的默認的水缸大小同樣啊,莫非?

帶着這個疑問咱們繼續來研究一下:

public static void demo3() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "current requested: " + emitter.requested());
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(1000); //我要打1000個!!
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

此次咱們在下游調用了request(1000)告訴上游我要打1000個,按照以前咱們說的,此次的運行結果應該是1000,來看看運行結果:

D/TAG: onSubscribe
D/TAG: current requested: 128複製代碼

臥槽,你肯定你沒貼錯代碼?

是的,真相就是這樣,就是128,蜜汁128。。。

what happened?

I don't know !

爲了答疑解惑,我就直接上圖了:

異步request.png

能夠看到,當上下游工做在不一樣的線程裏時,每個線程裏都有一個requested,而咱們調用request(1000)時,實際上改變的是下游主線程中的requested,而上游中的requested的值是由RxJava內部調用request(n)去設置的,這個調用會在合適的時候自動觸發。

如今咱們就能理解爲何沒有調用request,上游中的值是128了,由於下游在一開始就在內部調用了request(128)去設置了上游中的值,所以即便下游沒有調用request(),上游也能發送128個事件,這也能夠解釋以前咱們爲何說Flowable中默認的水缸大小是128,其實就是這裏設置的。

剛纔同步的時候咱們說了,上游每發送一個事件,requested的值便會減一,對於異步來講一樣如此,那有人確定有疑問了,一開始上游的requested的值是128,那這128個事件發送完了不就不能繼續發送了嗎?

剛剛說了,設置上游requested的值的這個內部調用會在合適的時候自動觸發,那到底何時是合適的時候呢?是發完128個事件纔去調用嗎?仍是發送了一半纔去調用呢?

帶着這個疑問咱們來看下一段代碼:

public static void request() {
        mSubscription.request(96); //請求96個事件
    }

public static void demo4() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                        Log.d(TAG, "First requested = " + emitter.requested());
                        boolean flag;
                        for (int i = 0; ; i++) {
                            flag = false;
                            while (emitter.requested() == 0) {
                                if (!flag) {
                                    Log.d(TAG, "Oh no! I can't emit value!");
                                    flag = true;
                                }
                            }
                            emitter.onNext(i);
                            Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
                        }
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

此次的上游稍微複雜了一點點,首先仍然是個無限循環發事件,可是是有條件的,只有當上遊的requested != 0的時候纔會發事件,而後咱們調用request(96)去消費96個事件(爲何是96而不是其餘的數字先不要管),來看看運行結果吧:

D/TAG: onSubscribe
D/TAG: First requested = 128
D/TAG: emit 0 , requested = 127
D/TAG: emit 1 , requested = 126
D/TAG: emit 2 , requested = 125
  ...
D/TAG: emit 124 , requested = 3
D/TAG: emit 125 , requested = 2
D/TAG: emit 126 , requested = 1
D/TAG: emit 127 , requested = 0
D/TAG: Oh no! I can't emit value!複製代碼

首先運行以後上游便會發送完128個事件,以後便不作任何事情,從打印的結果中咱們也能夠看出這一點。

而後咱們調用request(96),這會讓下游去消費96個事件,來看看運行結果吧:

D/TAG: onNext: 0
D/TAG: onNext: 1
  ...
D/TAG: onNext: 92
D/TAG: onNext: 93
D/TAG: onNext: 94
D/TAG: onNext: 95
D/TAG: emit 128 , requested = 95
D/TAG: emit 129 , requested = 94
D/TAG: emit 130 , requested = 93
D/TAG: emit 131 , requested = 92
  ...
D/TAG: emit 219 , requested = 4
D/TAG: emit 220 , requested = 3
D/TAG: emit 221 , requested = 2
D/TAG: emit 222 , requested = 1
D/TAG: emit 223 , requested = 0
D/TAG: Oh no! I can't emit value!複製代碼

能夠看到,當下遊消費掉第96個事件以後,上游又開始發事件了,並且能夠看到當前上游的requested的值是96(打印出來的95是已經發送了一個事件減一以後的值),最終發出了第223個事件以後又進入了等待區,而223-127 正好等於 96。

這是否是說明當下游每消費96個事件便會自動觸發內部的request()去設置上游的requested的值啊!沒錯,就是這樣,而這個新的值就是96。

朋友們能夠手動試試請求95個事件,上游是不會繼續發送事件的。

至於這個96是怎麼得出來的(確定不是我猜的蒙的啊),感興趣的朋友能夠自行閱讀源碼尋找答案,對於初學者而言應該沒什麼必要,管它內部怎麼實現的呢對吧。

好了今天的教程就到這裏了!經過本節的學習,你們應該知道如何正確的去實現一個完整的響應式拉取了,在某一些場景下,能夠在發送事件前先判斷當前的requested的值是否大於0,若等於0則說明下游處理不過來了,則須要等待,例以下面這個例子。

實踐

這個例子是讀取一個文本文件,須要一行一行讀取,而後處理並輸出,若是文本文件很大的時候,好比幾十M的時候,所有先讀入內存確定不是明智的作法,所以咱們能夠一邊讀取一邊處理,實現的代碼以下:

public static void main(String[] args) {
        practice1();
        try {
            Thread.sleep(10000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void practice1() {
        Flowable
                .create(new FlowableOnSubscribe<String>() {
                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                        try {
                            FileReader reader = new FileReader("test.txt");
                            BufferedReader br = new BufferedReader(reader);

                            String str;

                            while ((str = br.readLine()) != null && !emitter.isCancelled()) {
                                while (emitter.requested() == 0) {
                                    if (emitter.isCancelled()) {
                                        break;
                                    }
                                }
                                emitter.onNext(str);
                            }

                            br.close();
                            reader.close();

                            emitter.onComplete();
                        } catch (Exception e) {
                            emitter.onError(e);
                        }
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<String>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        mSubscription = s;
                        s.request(1);
                    }

                    @Override
                    public void onNext(String string) {
                        System.out.println(string);
                        try {
                            Thread.sleep(2000);
                            mSubscription.request(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println(t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });
    }複製代碼

運行的結果即是:

poetry.gif

好了,本次的教程就到這裏了,謝謝你們捧場!下節見,敬請期待!(PS: 我這麼用心的寫文章, 大家也不給個贊嗎?)

相關文章
相關標籤/搜索