今天,咱們以一個請求天氣數據的例子,來演示如何用RxJava
實現網絡重連時的自動請求,首先,咱們對這個需求進行一個簡單的描述,整個項目的框架以下所示: html
本文的示例代碼在 RxSample 的第十一章中。java
咱們經過一個後臺線程來模擬定位的過程,它每隔一段時間獲取一次定位的結果,並將該結果經過mCityPublish
發送數據給它的訂閱者。git
//用於發佈定位到的城市結果。
private PublishSubject<Long> mCityPublish;
//模擬定位模塊的回調。
private void startUpdateLocation() {
mLocationThread = new Thread() {
@Override
public void run() {
while (true) {
try {
for (long cityId : CITY_ARRAY) {
if (isInterrupted()) {
break;
}
Log.d(TAG, "從新定位");
Thread.sleep(5000);
Log.d(TAG, "定位到城市信息=" + cityId);
mCityPublish.onNext(cityId);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
mLocationThread.start();
}
複製代碼
在mCityPublish
發送消息到訂閱者收到消息之間,咱們還須要作一些特殊的處理:github
private Observable<Long> getCityPublish() {
return mCityPublish.distinctUntilChanged().doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
saveCacheCity(aLong);
}
});
}
複製代碼
這裏咱們作了兩步處理:api
distinctUntilChanged
對定位結果進行過濾,若是這次定位的結果和上次定位的結果相同,那麼不通知訂閱者。distinctUntilChanged
的原理圖以下所示:
doOnNext
,在返回結果給訂閱者以前,先把最新一次的定位結果存儲起來,用於在以後網絡重連以後進行請求。與定位模塊相似,咱們也須要一個mNetStatusPublish
,其類型爲PublishSubject
,它在網絡狀態發生變化時通知訂閱者。這裏須要註冊一個廣播,在收到廣播以後,咱們經過mNetStatusPublish
通知訂閱者,代碼以下:緩存
private void registerBroadcast() {
mReceiver = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
if (mNetStatusPublish != null) {
mNetStatusPublish.onNext(isNetworkConnected());
}
}
};
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
registerReceiver(mReceiver, filter);
}
複製代碼
在收到網絡狀態變化的消息以後:服務器
private Observable<Long> getNetStatusPublish() {
return mNetStatusPublish.filter(new Predicate<Boolean>() {
@Override
public boolean test(Boolean aBoolean) throws Exception {
return aBoolean && getCacheCity() > 0;
}
}).map(new Function<Boolean, Long>() {
@Override
public Long apply(Boolean aBoolean) throws Exception {
return getCacheCity();
}
}).subscribeOn(Schedulers.io());
}
複製代碼
這裏咱們作了兩步處理:網絡
filter
對消息進行過濾,只有在 聯網狀況而且以前已經定位到了城市 以後才通知訂閱者,filter
的原理圖以下所示,該操做符用於過濾掉一些不須要的數據:
map
,讀取當前緩存的城市名,返回給訂閱者,map
的原理圖以下所示,該操做符能夠用於執行變換操做。
在2.1
和2.2
中,咱們分別用getCityPublish()
和getNetStatusPublish()
來獲取被訂閱者,它們分別對應於定位模塊和網絡狀態模塊發生變化時所發送的城市數據,下面來看咱們經過城市數據獲取城市天氣信息的代碼:app
private void startUpdateWeather() {
Observable.merge(getCityPublish(), getNetStatusPublish()).flatMap(new Function<Long, ObservableSource<WeatherEntity>>() {
@Override
public ObservableSource<WeatherEntity> apply(Long aLong) throws Exception {
Log.d(TAG, "嘗試請求天氣信息=" + aLong);
return getWeather(aLong).subscribeOn(Schedulers.io());
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.d(TAG, "請求天氣信息過程當中發生錯誤,進行重訂閱");
return Observable.just(0);
}
});
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<WeatherEntity>() {
@Override
public void onSubscribe(Disposable disposable) {
mCompositeDisposable.add(disposable);
}
@Override
public void onNext(WeatherEntity weatherEntity) {
WeatherEntity.WeatherInfo info = weatherEntity.getWeatherinfo();
if (info != null) {
Log.d(TAG, "嘗試請求天氣信息成功");
StringBuilder builder = new StringBuilder();
builder.append("城市名:").append(info.getCity()).append("\n").append("溫度:").append(info.getTemp()).append("\n").append("風向:").append(info.getWD()).append("\n").append("風速:").append(info.getWS()).append("\n");
mTvNetworkResult.setText(builder.toString());
}
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "嘗試請求天氣信息失敗");
}
@Override
public void onComplete() {
Log.d(TAG, "嘗試請求天氣信息結束");
}
});
}
private Observable<WeatherEntity> getWeather(long cityId) {
WeatherApi api = new Retrofit.Builder()
.baseUrl("http://www.weather.com.cn/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build().create(WeatherApi.class);
return api.getWeather(cityId);
}
複製代碼
這裏咱們作了如下幾個操做:框架
使用merge
合併兩個數據源,咱們經過getWeather(long cityId)
來獲取城市信息,這裏面用到了 RxJava2 實戰知識梳理(4) - 結合 Retrofit 請求新聞資訊 的知識,只不過這裏的接口是使用的天氣信息網的數據,merge
的原理在 RxJava2 實戰知識梳理(8) - 使用 publish + merge 優化先加載緩存,再讀取網絡數據的請求過程 也已經作了介紹。
使用retryWhen
進行重訂閱,由於在獲取到城市,以後轉換成城市天氣信息的時候有可能發生錯誤,若是發生了錯誤,那麼整個調用鏈就結束了,須要從新訂閱。這裏的重訂閱使用的retryWhen
操做符,關於重訂閱更詳細的解釋能夠看前面的這篇文章 RxJava2 實戰知識梳理(6) - 基於錯誤類型的重試請求,下面是其中的部分說明:
使用observeOn
切換到主線程進行界面的更新,原理如: RxJava2 實戰知識梳理(1) - 後臺執行耗時操做,實時通知 UI 更新
本章的示例代碼在 RxSample 的第十一章中,咱們演示兩種狀況:
1s
:
控制檯輸出以下,能夠看到只有當先後兩次定位信息不一樣時纔會發起網絡請求天氣信息:
在這個示例中,咱們用到了如下幾種操做符,若是有不明白的地方,你們能夠去對應的連接中查看更詳細的解釋: