RxJava2 實戰知識梳理(8) 使用 publish + merge 優化先加載緩存,再讀取網絡數據的請求過程

1、前言

在不少資訊應用當中,當咱們進入一個新的頁面,爲了提高用戶體驗,不讓頁面空白過久,咱們通常會先讀取緩存中的數據,再去請求網絡。java

今天這篇文章,咱們將實現下面這個效果:同時發起讀取緩存、訪問網絡的請求,若是緩存的數據先回來,那麼就先展現緩存的數據,而若是網絡的數據先回來,那麼就再也不展現緩存的數據。緩存

爲了讓你們對這一過程有更深入的理解,咱們介紹"先加載緩存,再請求網絡"這種模型的四種實現方式,其中第四種實現能夠達到上面咱們所說的效果,而前面的三種實現雖然也可以實現相同的需求,而且能夠正常工做,可是在某些特殊狀況下,會出現意想不到的狀況:網絡

  • 使用concat實現
  • 使用concatEager實現
  • 使用merge實現
  • 使用publish實現

2、示例

2.1 準備工做

咱們須要準備兩個Observable,分別表示 緩存數據源網絡數據源,在其中填入相應的緩存數據和網絡數據,爲了以後演示一些特殊的狀況,咱們能夠在建立它的時候指定它執行的時間:app

//模擬緩存數據源。
    private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "開始加載緩存數據");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("緩存");
                        entity.setDesc("序號=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "結束加載緩存數據");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }
    //模擬網絡數據源。
    private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "開始加載網絡數據");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("網絡");
                        entity.setDesc("序號=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "結束加載網絡數據");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }
複製代碼

在最終的下游,咱們接收數據,並在頁面上經過RecyclerView進行展現:ide

private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
        return new DisposableObserver<List<NewsResultEntity>>() {

            @Override
            public void onNext(List<NewsResultEntity> newsResultEntities) {
                mNewsResultEntities.clear();
                mNewsResultEntities.addAll(newsResultEntities);
                mNewsAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "加載錯誤, e=" + throwable);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "加載完成");
            }
        };
    }
複製代碼

2.2 使用 concat 實現

concat是不少文章都推薦使用的方式,由於它不會有任何問題,實現代碼以下:函數

private void refreshArticleUseContact() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
複製代碼

上面這段代碼的運行結果爲: 優化

從控制檯的輸出能夠看到,整個過程是先取讀取緩存,等緩存的數據讀取完畢以後,纔開始請求網絡,所以整個過程的耗時爲兩個階段的相加,即 2500ms

它的原理圖以下所示:
concat 原理圖
從原理圖中也驗證了咱們前面的現象,它會鏈接多個 Observable,而且必需要等到前一個 Observable的全部數據項都發送完以後,纔會開始下一個 Observable數據的發送。

那麼,concat操做符的缺點是什麼呢?很明顯,咱們白白浪費了前面讀取緩存的這段時間,能不能同時發起讀取緩存和網絡的請求,而不是等到讀取緩存完畢以後,纔去請求網絡呢?spa

2.3 使用 concatEager 實現

爲了解決前面沒有同時發起請求的問題,咱們可使用concatEager,它的使用方法以下:3d

private void refreshArticleUseConcatEager() {
        List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
        observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
        observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
複製代碼

它和concat最大的不一樣就是多個Observable能夠同時開始發射數據,若是後一個Observable發射完成後,前一個Observable還有發射完數據,那麼它會將後一個Observable的數據先緩存起來,等到前一個Observable發射完畢後,纔將緩存的數據發射出去。code

上面代碼中,請求緩存的時長改成500ms,而請求網絡的時長改成2000ms,運行結果爲:

那麼這種實現方式的缺點是什麼呢?就是在某些異常狀況下,若是讀取緩存的時間要大於網絡請求的時間,那麼就會致使出現「網絡請求的結果」等待「讀取緩存」這一過程完成後才能傳遞給下游,白白浪費了一段時間。

咱們將請求緩存的時長改成2000ms,而請求網絡的時長改成500ms,查看控制檯的輸出,能夠驗證上面的結論:

2.4 使用 merge 實現

下面,咱們來看一下merge操做符的示例:

private void refreshArticleUseMerge() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
複製代碼

merge的原理圖以下所示:

merge 原理圖
它和 concatEager同樣,會讓多個 Observable同時開始發射數據,可是它不須要 Observable之間的互相等待,而是直接發送給下游。

當緩存時間爲500ms,而請求網絡時間爲2000ms時,它的結果爲:

在讀取緩存的時間小於請求網絡的時間時,這個操做符可以很好的工做,可是反之,就會出現咱們先展現了網絡的數據,而後又被刷新成舊的緩存數據。
發生該異常時的現象以下所示:

2.5 使用 publish 實現

使用publish的實現以下所示:

private void refreshArticleUsePublish() {
        Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
                return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
            }

        });
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
複製代碼

這裏面一共涉及到了三個操做符,publishmergetakeUnti,咱們先來看一下它可否解決咱們以前三種方式的缺陷:

  • 讀取緩存的時間爲500ms,請求網絡的時間爲2000ms

  • 讀取緩存的時間爲2000ms,請求網絡的時間爲500ms

能夠看到,在讀取緩存的時間大於請求網絡時間的時候,僅僅只會展現網絡的數據,顯示效果爲:
而且讀取緩存和請求網絡是同時發起的,很好地解決了前面幾種實現方式的缺陷。

這裏要感謝簡友 無意下棋 在評論裏提到的問題:若是網絡請求先返回時發生了錯誤(例如沒有網絡等)致使發送了onError事件,從而使得緩存的Observable也沒法發送事件,最後界面顯示空白。

針對這個問題,咱們須要對網絡的Observable進行優化,讓其不將onError事件傳遞給下游。其中一種解決方式是經過使用onErrorResume操做符,它能夠接收一個Func函數,其形參爲網絡發送的錯誤,而在上游發生錯誤時會回調該函數。咱們能夠根據錯誤的類型來返回一個新的Observable,讓訂閱者鏡像到這個新的Observable,而且忽略onError事件,從而避免onError事件致使整個訂閱關係的結束。

這裏爲了不訂閱者在鏡像到新的Observable時會收到額外的時間,咱們返回一個Observable.never(),它表示一個永遠不發送事件的上游。

private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "開始加載網絡數據");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("網絡");
                        entity.setDesc("序號=" + i);
                        results.add(entity);
                    }
                    //a.正常狀況。
                    //observableEmitter.onNext(results);
                    //observableEmitter.onComplete();
                    //b.發生異常。
                    observableEmitter.onError(new Throwable("netWork Error"));
                    Log.d(TAG, "結束加載網絡數據");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
                Log.d(TAG, "網絡請求發生錯誤throwable=" + throwable);
                return Observable.never();
            }
        });
    }
複製代碼

當發生錯誤時,控制檯的輸出以下,能夠看到緩存仍然正常地發送給了下游:

下面,咱們就來分析一下它的實現原理。

2.5.1 takeUntil

takeUntil的原理圖以下所示:

這裏,咱們給 sourceObservable經過 takeUntil傳入了另外一個 otherObservable,它表示 sourceObservableotherObservable發射數據以後,就不容許再發射數據了,這就恰好知足了咱們前面說的「只要網絡源發送了數據,那麼緩存源就不該再發射數據」。

以後,咱們再用前面介紹過的merge操做符,讓兩個緩存源和網絡源同時開始工做,去取數據。

2.5.2 publish

可是上面有一點缺陷,就是調用mergetakeUntil會發生兩次訂閱,這時候就須要使用publish操做符,它接收一個Function函數,該函數返回一個Observable,該Observable是對原Observable,也就是上面網絡源的Observable轉換以後的結果,該Observable能夠被takeUntilmerge操做符所共享,從而實現只訂閱一次的效果。

publish的原理圖以下所示:

publish 原理圖


更多文章,歡迎訪問個人 Android 知識梳理系列:

相關文章
相關標籤/搜索