在不少資訊應用當中,當咱們進入一個新的頁面,爲了提高用戶體驗,不讓頁面空白過久,咱們通常會先讀取緩存中的數據,再去請求網絡。java
今天這篇文章,咱們將實現下面這個效果:同時發起讀取緩存、訪問網絡的請求,若是緩存的數據先回來,那麼就先展現緩存的數據,而若是網絡的數據先回來,那麼就再也不展現緩存的數據。緩存
爲了讓你們對這一過程有更深入的理解,咱們介紹"先加載緩存,再請求網絡"這種模型的四種實現方式,其中第四種實現能夠達到上面咱們所說的效果,而前面的三種實現雖然也可以實現相同的需求,而且能夠正常工做,可是在某些特殊狀況下,會出現意想不到的狀況:網絡
concat
實現concatEager
實現merge
實現publish
實現咱們須要準備兩個Observable
,分別表示 緩存數據源 和 網絡數據源,在其中填入相應的緩存數據和網絡數據,爲了以後演示一些特殊的狀況,咱們能夠在建立它的時候指定它執行的時間:app
//模擬緩存數據源。
private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載緩存數據");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("緩存");
entity.setDesc("序號=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "結束加載緩存數據");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
//模擬網絡數據源。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載網絡數據");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("網絡");
entity.setDesc("序號=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "結束加載網絡數據");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
複製代碼
在最終的下游,咱們接收數據,並在頁面上經過RecyclerView
進行展現:ide
private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
return new DisposableObserver<List<NewsResultEntity>>() {
@Override
public void onNext(List<NewsResultEntity> newsResultEntities) {
mNewsResultEntities.clear();
mNewsResultEntities.addAll(newsResultEntities);
mNewsAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "加載錯誤, e=" + throwable);
}
@Override
public void onComplete() {
Log.d(TAG, "加載完成");
}
};
}
複製代碼
concat
是不少文章都推薦使用的方式,由於它不會有任何問題,實現代碼以下:函數
private void refreshArticleUseContact() {
Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
複製代碼
上面這段代碼的運行結果爲: 優化
從控制檯的輸出能夠看到,整個過程是先取讀取緩存,等緩存的數據讀取完畢以後,纔開始請求網絡,所以整個過程的耗時爲兩個階段的相加,即2500ms
。
它的原理圖以下所示:
從原理圖中也驗證了咱們前面的現象,它會鏈接多個
Observable
,而且必需要等到前一個
Observable
的全部數據項都發送完以後,纔會開始下一個
Observable
數據的發送。
那麼,concat
操做符的缺點是什麼呢?很明顯,咱們白白浪費了前面讀取緩存的這段時間,能不能同時發起讀取緩存和網絡的請求,而不是等到讀取緩存完畢以後,纔去請求網絡呢?spa
爲了解決前面沒有同時發起請求的問題,咱們可使用concatEager
,它的使用方法以下:3d
private void refreshArticleUseConcatEager() {
List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
複製代碼
它和concat
最大的不一樣就是多個Observable
能夠同時開始發射數據,若是後一個Observable
發射完成後,前一個Observable
還有發射完數據,那麼它會將後一個Observable
的數據先緩存起來,等到前一個Observable
發射完畢後,纔將緩存的數據發射出去。code
上面代碼中,請求緩存的時長改成500ms
,而請求網絡的時長改成2000ms
,運行結果爲:
咱們將請求緩存的時長改成2000ms
,而請求網絡的時長改成500ms
,查看控制檯的輸出,能夠驗證上面的結論:
下面,咱們來看一下merge
操做符的示例:
private void refreshArticleUseMerge() {
Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
複製代碼
merge
的原理圖以下所示:
concatEager
同樣,會讓多個
Observable
同時開始發射數據,可是它不須要
Observable
之間的互相等待,而是直接發送給下游。
當緩存時間爲500ms
,而請求網絡時間爲2000ms
時,它的結果爲:
使用publish
的實現以下所示:
private void refreshArticleUsePublish() {
Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {
@Override
public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
}
});
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
複製代碼
這裏面一共涉及到了三個操做符,publish
、merge
和takeUnti
,咱們先來看一下它可否解決咱們以前三種方式的缺陷:
500ms
,請求網絡的時間爲2000ms
2000ms
,請求網絡的時間爲500ms
這裏要感謝簡友 無意下棋 在評論裏提到的問題:若是網絡請求先返回時發生了錯誤(例如沒有網絡等)致使發送了onError
事件,從而使得緩存的Observable
也沒法發送事件,最後界面顯示空白。
針對這個問題,咱們須要對網絡的Observable
進行優化,讓其不將onError
事件傳遞給下游。其中一種解決方式是經過使用onErrorResume
操做符,它能夠接收一個Func
函數,其形參爲網絡發送的錯誤,而在上游發生錯誤時會回調該函數。咱們能夠根據錯誤的類型來返回一個新的Observable
,讓訂閱者鏡像到這個新的Observable
,而且忽略onError
事件,從而避免onError
事件致使整個訂閱關係的結束。
這裏爲了不訂閱者在鏡像到新的Observable
時會收到額外的時間,咱們返回一個Observable.never()
,它表示一個永遠不發送事件的上游。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "開始加載網絡數據");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("網絡");
entity.setDesc("序號=" + i);
results.add(entity);
}
//a.正常狀況。
//observableEmitter.onNext(results);
//observableEmitter.onComplete();
//b.發生異常。
observableEmitter.onError(new Throwable("netWork Error"));
Log.d(TAG, "結束加載網絡數據");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {
@Override
public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
Log.d(TAG, "網絡請求發生錯誤throwable=" + throwable);
return Observable.never();
}
});
}
複製代碼
當發生錯誤時,控制檯的輸出以下,能夠看到緩存仍然正常地發送給了下游:
下面,咱們就來分析一下它的實現原理。
takeUntil
的原理圖以下所示:
sourceObservable
經過
takeUntil
傳入了另外一個
otherObservable
,它表示
sourceObservable
在
otherObservable
發射數據以後,就不容許再發射數據了,這就恰好知足了咱們前面說的「只要網絡源發送了數據,那麼緩存源就不該再發射數據」。
以後,咱們再用前面介紹過的merge
操做符,讓兩個緩存源和網絡源同時開始工做,去取數據。
可是上面有一點缺陷,就是調用merge
和takeUntil
會發生兩次訂閱,這時候就須要使用publish
操做符,它接收一個Function
函數,該函數返回一個Observable
,該Observable
是對原Observable
,也就是上面網絡源的Observable
轉換以後的結果,該Observable
能夠被takeUntil
和merge
操做符所共享,從而實現只訂閱一次的效果。
publish
的原理圖以下所示: