以前寫過一系列RxJava的文章,也承諾過會盡快有RxJava2的介紹。無奈實際項目中還未真正的使用RxJava2,不敢妄動筆墨。因此此次仍是給你們分享一個使用RxJava1解決問題的案例,但願對你們在使用RxJava的時候有一點點啓發。對RxJava還不瞭解的同窗能夠先去看看我以前的RxJava系列文章:git
拿MinimalistWeather這個開源的天氣App來舉例:github
進入App首頁後,首先咱們須要從數據庫中獲取當前城市的天氣數據,若是數據庫中存在天氣數據則在UI頁面上展現天氣數據;若是數據庫中未存儲當前城市的天氣數據,或者已存儲的天氣數據的發佈時間相比如今已經超過了一小時,而且網絡屬於鏈接狀態則調用API從服務端獲取天氣數據。若是獲取到到的天氣數據發佈時間和當前數據庫中的天氣數據發佈時間一致則丟棄掉從服務端獲取到的天氣數據,若是不一致則更新數據庫而且在頁面上展現最新的天氣信息。(同時天氣數據源是可配置的,可選擇是小米天氣數據源仍是Know天氣數據源)數據庫
首先咱們須要建立一個從數據庫獲取天氣數據的Observable observableForGetWeatherFromDB
,同時咱們也須要建立一個從API獲取天氣數據的Observable observableForGetWeatherFromNetWork
;爲了在無網絡狀態下免於建立observableForGetWeatherFromNetWork
咱們在這以前須要首先判斷下網絡狀態。最後使用contact
操做符將兩個Observable合併,同時使用distinct
和takeUntil
操做符來過濾篩選數據以符合業務需求,而後結合subscribeOn
和observeOn
作線程切換。上述這一套複雜的業務邏輯若是使用傳統編碼方式將是極其複雜的。下面咱們來看看使用RxJava如何清晰簡潔的來實現這個複雜的業務:markdown
Observable<Weather> observableForGetWeatherData;
//首先建立一個從數據庫獲取天氣數據的Observable
Observable<Weather> observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe<Weather>() {
@Override
public void call(Subscriber<? super Weather> subscriber) {
try {
Weather weather = weatherDao.queryWeather(cityId);
subscriber.onNext(weather);
subscriber.onCompleted();
} catch (SQLException e) {
throw Exceptions.propagate(e);
}
}
});
if (!NetworkUtils.isNetworkConnected(context)) {
observableForGetWeatherData = observableForGetWeatherFromDB;
} else {
//接着建立一個從網絡獲取天氣數據的Observable
Observable<Weather> observableForGetWeatherFromNetWork = null;
switch (configuration.getDataSourceType()) {
case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
.map(new Func1<KnowWeather, Weather>() {
@Override
public Weather call(KnowWeather knowWeather) {
return new KnowWeatherAdapter(knowWeather).getWeather();
}
});
break;
case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
.map(new Func1<MiWeather, Weather>() {
@Override
public Weather call(MiWeather miWeather) {
return new MiWeatherAdapter(miWeather).getWeather();
}
});
break;
}
assert observableForGetWeatherFromNetWork != null;
observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
.doOnNext(new Action1<Weather>() {
@Override
public void call(Weather weather) {
Schedulers.io().createWorker().schedule(() -> { try { weatherDao.insertOrUpdateWeather(weather); } catch (SQLException e) { throw Exceptions.propagate(e); } }); } }); //使用concat操做符將兩個Observable合併 observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork) .filter(new Func1<Weather, Boolean>() { @Override public Boolean call(Weather weather) { return weather != null && !TextUtils.isEmpty(weather.getCityId()); } }) .distinct(new Func1<Weather, Long>() { @Override public Long call(Weather weather) { return weather.getRealTime().getTime();//若是天氣數據發佈時間一致,咱們再認爲是相同的數據從丟棄掉 } }) .takeUntil(new Func1<Weather, Boolean>() { @Override public Boolean call(Weather weather) { return System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000;//若是天氣數據發佈的時間和當前時間差在一小時之內則終止事件流 } }); } observableForGetWeatherData.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Weather>() { @Override public void call(Weather weather) { displayWeatherInformation(); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show(); } });
上面的代碼看起來比較複雜,咱們採用Lambda表達式簡化下代碼:網絡
Observable<Weather> observableForGetWeatherData;
//首先建立一個從數據庫獲取天氣數據的Observable
Observable<Weather> observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe<Weather>() {
@Override
public void call(Subscriber<? super Weather> subscriber) {
try {
Weather weather = weatherDao.queryWeather(cityId);
subscriber.onNext(weather);
subscriber.onCompleted();
} catch (SQLException e) {
throw Exceptions.propagate(e);
}
}
});
if (!NetworkUtils.isNetworkConnected(context)) {
observableForGetWeatherData = observableForGetWeatherFromDB;
} else {
//接着建立一個從網絡獲取天氣數據的Observable
Observable<Weather> observableForGetWeatherFromNetWork = null;
switch (configuration.getDataSourceType()) {
case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
.map(knowWeather -> new KnowWeatherAdapter(knowWeather).getWeather());
break;
case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
.map(miWeather -> new MiWeatherAdapter(miWeather).getWeather());
break;
}
assert observableForGetWeatherFromNetWork != null;
observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
.doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> { try { weatherDao.insertOrUpdateWeather(weather); } catch (SQLException e) { throw Exceptions.propagate(e); } })); //使用concat操做符將兩個Observable合併 observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork) .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId())) .distinct(weather -> weather.getRealTime().getTime())//若是天氣數據發佈時間一致,咱們再認爲是相同的數據從丟棄掉 .takeUntil(weather -> System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000);//若是天氣數據發佈的時間和當前時間差在一小時之內則終止事件流 } observableForGetWeatherData.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(weather -> displayWeatherInformation(), throwable -> Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show());
在上述的實現中有幾點是咱們須要注意的:ide
爲何我須要在判斷網絡那塊整個if else?這樣看起來很不優雅,咱們經過RxJava符徹底能夠實現一樣的操做啊!之因此這樣作是爲了在無網絡情況下去建立沒必要要的Observable observableForGetWeatherFromNetWork
;源碼分析
更新數據庫的操做不該該阻塞更新UI,所以咱們在observableForGetWeatherFromNetWork
的doOnNext
中須要經過Schedulers.io().createWorker()
去另起一條線程,以此保證更新數據庫不會阻塞更新UI的操做。編碼
有同窗可能會問爲何不在
doOnNext
以後再調用一次observeOn
把更新數據庫的操做切換到一條新的子線程去操做呢?其實一開始我也是這樣作的,後來想一想不對。整個Observable的事件傳遞處理就像是在一條流水線上完成的,雖然咱們能夠經過observeOn
來指定子線程去處理更新數據庫的操做,可是隻有等這條子線程完成了更新數據庫的任務後事件纔會繼續日後傳遞,這樣就阻塞了更新UI的操做。對此有疑問的同窗能夠去看看我以前關於RxJava源碼分析的文章或者本身動手debug看看。spa
最後給你們留個兩個問題:線程
observableForGetWeatherData
中使用distinct
和takeUntil
過濾篩選天氣數據的時候網絡請求會不會已經發出去了?這樣作還有意義嗎?歡迎你們留言討論。
本文中的代碼在MinimalistWeather中的
WeatherDataRepository
類中有一樣的實現,文章中爲了更完整的將整個實現過程呈現出來,對代碼作了部分改動。若是你喜歡個人文章,就關注下個人知乎專欄或者在 GitHub 上添個 Star 吧!