提高開發效率,下降維護成本一直是開發團隊永恆不變的宗旨。近兩年來國內的技術圈子中愈來愈多的開始說起 RxJava ,愈來愈多的應用和麪試中都會有 RxJava ,而就目前的狀況,Android 的網絡庫基本被 Retrofit + OkHttp 一統天下了,而配合上響應式編程 RxJava 可謂如魚得水。想必你們確定被近期的 Kotlin 炸開了鍋,筆者也在閒暇之時去了解了一番(做爲一個與時俱進的有理想的青年怎麼可能不與時俱進?),發現其中有個很是好的優勢就是簡潔,支持函數式編程。是的, RxJava 最大的優勢也是簡潔,但它不止是簡潔,並且是** 隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔 **(這貨潔身自好呀有木有)。
咳咳,要例子,猛戳這裏:給 Android 開發者的 RxJava 詳解html
上面咱們說起了響應式編程,很多新司機對它可謂一臉懵逼,那什麼是響應式編程呢?響應式編程是一種基於異步數據流概念的編程模式。數據流就像一條河:它能夠被觀測,被過濾,被操做,或者爲新的消費者與另一條流合併爲一條新的流。java
響應式編程的一個關鍵概念是事件。事件能夠被等待,能夠觸發過程,也能夠觸發其它事件。事件是惟一的以合適的方式將咱們的現實世界映射到咱們的軟件中:若是屋裏太熱了咱們就打開一扇窗戶。一樣的,當咱們的天氣app從服務端獲取到新的天氣數據後,咱們須要更新app上展現天氣信息的UI;汽車上的車道偏移系統探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應事件。react
今天,響應式編程最通用的一個場景是UI:咱們的移動App必須作出對網絡調用、用戶觸摸輸入和系統彈框的響應。在這個世界上,軟件之因此是事件驅動並響應的是由於現實生活也是如此。git
RxJava 這些年可謂愈來愈流行,而在去年的晚些時候發佈了2.0正式版。大半年已過,雖然網上已經出現了大部分的 RxJava 教程(其實細心的你仍是會發現 1.x 的超級多),前些日子,筆者花了大約兩週的閒暇之時寫了 RxJava 2.x 系列教程,也獲得了很多反饋,其中就有很多讀者以爲每一篇的教程過短,抑或是但願更多的側重適用場景的介紹,在這樣的大前提下,這篇完結版教程就此誕生,僅供各位新司機採納。
github
RxJava 2.x 已經按照 Reactive-Streams specification 規範徹底的重寫了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y
下,因此 RxJava 2.x 獨立於 RxJava 1.x 而存在,而隨後官方宣佈的將在一段時間後終止對 RxJava 1.x 的維護,因此對於熟悉 RxJava 1.x 的老司機天然能夠直接看一下 2.x 的文檔和異同就能輕鬆上手了,而對於不熟悉的年輕司機,不要慌,本醬帶你裝逼帶你飛,立刻就發車,坐穩了:https://github.com/nanchen2251/RxJava2Examples面試
你只須要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'
(2.1.1爲寫此文章時的最新版本)
數據庫
RxJava 2.x 擁有了新的特性,其依賴於4個基礎接口,它們分別是編程
其中最核心的莫過於 Publisher
和 Subscriber
。Publisher
能夠發出一系列的事件,而 Subscriber
負責和處理這些事件。api
其中用的比較多的天然是 Publisher
的 Flowable
,它支持背壓,有興趣的能夠看一下官方對於背壓的講解緩存
能夠明顯地發現,RxJava 2.x 最大的改動就是對於 backpressure
的處理,爲此將原來的 Observable
拆分紅了新的Observable
和Flowable
,同時其餘相關部分也同時進行了拆分,但使人慶幸的是,是它,是它,仍是它,仍是咱們最熟悉和最喜歡的 RxJava。
在 RxJava 1.x 中,咱們最熟悉的莫過於 Observable
這個類了,筆者在剛剛使用 RxJava 2.x 的時候,建立了 一個 Observable
,瞬間一臉懵逼有木有,竟然連咱們最最熟悉的 Subscriber
都沒了,取而代之的是 ObservableEmmiter
,俗稱發射器。此外,因爲沒有了Subscriber
的蹤跡,咱們建立觀察者時需使用 Observer
。而 Observer
也不是咱們熟悉的那個 Observer
,又出現了一個 Disposable
參數帶你裝逼帶你飛。
廢話很少說,從會用開始,還記得 RxJava 的三部曲嗎?
** 第一步:初始化 Observable
第二步:初始化 Observer
第三步:創建訂閱關係 **
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable emit 1" + "\n"); e.onNext(1); Log.e(TAG, "Observable emit 2" + "\n"); e.onNext(2); Log.e(TAG, "Observable emit 3" + "\n"); e.onNext(3); e.onComplete(); Log.e(TAG, "Observable emit 4" + "\n" ); e.onNext(4); } }).subscribe(new Observer<Integer>() { // 第三步:訂閱 // 第二步:初始化Observer private int i; private Disposable mDisposable; @Override public void onSubscribe(@NonNull Disposable d) { mDisposable = d; } @Override public void onNext(@NonNull Integer integer) { i++; if (i == 2) { // 在RxJava 2.x 中,新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件 mDisposable.dispose(); } } @Override public void onError(@NonNull Throwable e) { Log.e(TAG, "onError : value : " + e.getMessage() + "\n" ); } @Override public void onComplete() { Log.e(TAG, "onComplete" + "\n" ); } });
不難看出,RxJava 2.x 與 1.x 仍是存在着一些區別的。首先,建立 Observable
時,回調的是 ObservableEmitter
,字面意思即發射器,而且直接 throws Exception。其次,在建立的 Observer 中,也多了一個回調方法:onSubscribe
,傳遞參數爲Disposable
,Disposable
至關於 RxJava 1.x 中的 Subscription
, 用於解除訂閱。能夠看到示例代碼中,在 i 自增到 2 的時候,訂閱關係被切斷。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false 07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1 07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1 07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2 07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2 07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true 07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3 07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
固然,咱們的 RxJava 2.x 也爲咱們保留了簡化訂閱方法,咱們能夠根據需求,進行相應的簡化訂閱,只不過傳入對象改成了 Consumer
。
Consumer
即消費者,用於接收單個值,BiConsumer
則是接收兩個值,Function
用於變換對象,Predicate
用於判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應該都知道意思,這裏也再也不贅述。
關於線程切換這點,RxJava 1.x 和 RxJava 2.x 的實現思路是同樣的。這裏簡單的說一下,以便於咱們的新司機入手。
同 RxJava 1.x 同樣,subscribeOn
用於指定subscribe()
時所發生的線程,從源碼角度能夠看出,內部線程調度是經過 ObservableSubscribeOn
來實現的。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn
的核心源碼在 subscribeActual
方法中,經過代理的方式使用 SubscribeOnObserver
包裝 Observer
後,設置 Disposable
來將 subscribe
切換到 Scheduler
線程中。
observeOn
方法用於指定下游 Observer
回調發生的線程。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
RxJava 內置的線程調度器的確可讓咱們的線程切換駕輕就熟,但其中也有些須要注意的地方。
subscribeOn()
指定的就是發射事件的線程,observerOn
指定的就是訂閱者接收事件的線程。subscribeOn()
只有第一次的有效,其他的會被忽略。observerOn()
,下游的線程就會切換一次。Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); e.onNext(1); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName()); } });
輸出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1 07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main 07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
實例代碼中,分別用 Schedulers.newThread()
和 Schedulers.io()
對發射線程進行切換,並採用 observeOn(AndroidSchedulers.mainThread()
和 Schedulers.io()
進行了接收線程的切換。能夠看到輸出中發射線程僅僅響應了第一個 newThread
,但每調用一次 observeOn()
,線程便會切換一次,所以若是咱們有相似的需求時,便知道如何處理了。
RxJava 中,已經內置了不少線程選項供咱們選擇,例若有:
Schedulers.io()
表明io操做的線程, 一般用於網絡,讀寫文件等io密集型的操做;Schedulers.computation()
表明CPU計算密集型的操做, 例如須要大量計算的操做;Schedulers.newThread()
表明一個常規的新線程;AndroidSchedulers.mainThread()
表明Android的主線程這些內置的 Scheduler
已經足夠知足咱們開發的需求,所以咱們應該使用內置的這些選項,而 RxJava 內部使用的是線程池來維護這些線程,因此效率也比較高。
關於操做符,在官方文檔中已經作了很是完善的講解,而且筆者前面的系列教程中也着重講解了絕大多數的操做符做用,這裏受於篇幅限制,就很少作贅述,只挑選幾個進行實際情景的講解。
map
操做符能夠將一個 Observable
對象經過某種關係轉換爲另外一個Observable
對象。在 2.x 中和 1.x 中做用幾乎一致,不一樣點在於:2.x 將 1.x 中的 Func1
和 Func2
改成了 Function
和 BiFunction
。
想必你們都知道,不少時候咱們在使用 RxJava 的時候老是和 Retrofit 進行結合使用,而爲了方便演示,這裏咱們就暫且採用 OkHttp3 進行演示,配合 map
,doOnNext
,線程切換進行簡單的網絡請求:
1)經過 Observable.create() 方法,調用 OkHttp 網絡請求;
2)經過 map 操做符集合 gson,將 Response 轉換爲 bean 類;
3)經過 doOnNext() 方法,解析 bean 中的數據,並進行數據庫存儲等操做;
4)調度線程,在子線程中進行耗時操做任務,在主線程中更新 UI ;
5)經過 subscribe(),根據請求成功或者失敗來更新 UI 。
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Builder builder = new Builder() .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, MobileAddress>() { @Override public MobileAddress apply(@NonNull Response response) throws Exception { if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:轉換前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class); } } return null; } }).observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress s) throws Exception { Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress data) throws Exception { Log.e(TAG, "成功:" + data.toString() + "\n"); }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "失敗:" + throwable.getMessage() + "\n"); } });
concat
能夠作到不交錯的發射兩個甚至多個 Observable 的發射事件,而且只有前一個 Observable
終止(onComplete
) 後纔會訂閱下一個 Observable
。
想必在實際應用中,不少時候(對數據操做不敏感時)都須要咱們先讀取緩存的數據,若是緩存沒有數據,再經過網絡請求獲取,隨後在主線程更新咱們的UI。
concat
操做符簡直就是爲咱們這種需求量身定作。
利用 concat
的必須調用 onComplete
後才能訂閱下一個 Observable
的特性,咱們就能夠先讀取緩存數據,假若獲取到的緩存數據不是咱們想要的,再調用 onComplete()
以執行獲取網絡數據的Observable
,若是緩存數據能應咱們所需,則直接調用 onNext()
,防止過分的網絡請求,浪費用戶的流量。
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() { @Override public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception { Log.e(TAG, "create當前線程:"+Thread.currentThread().getName() ); FoodList data = CacheManager.getInstance().getFoodListData(); // 在操做符 concat 中,只有調用 onComplete 以後纔會執行下一個 Observable if (data != null){ // 若是緩存數據不爲空,則直接讀取緩存數據,而不讀取網絡數據 isFromNet = false; Log.e(TAG, "\nsubscribe: 讀取緩存數據:" ); runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 讀取緩存數據:\n"); } }); e.onNext(data); }else { isFromNet = true; runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 讀取網絡數據:\n"); } }); Log.e(TAG, "\nsubscribe: 讀取網絡數據:" ); e.onComplete(); } } }); Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list") .addQueryParameter("rows",10+"") .build() .getObjectObservable(FoodList.class); // 兩個 Observable 的泛型應當保持一致 Observable.concat(cache,network) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<FoodList>() { @Override public void accept(@NonNull FoodList tngouBeen) throws Exception { Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() ); if (isFromNet){ mRxOperatorsText.append("accept : 網絡獲取數據設置緩存: \n"); Log.e(TAG, "accept : 網絡獲取數據設置緩存: \n"+tngouBeen.toString() ); CacheManager.getInstance().setFoodListData(tngouBeen); } mRxOperatorsText.append("accept: 讀取數據成功:" + tngouBeen.toString()+"\n"); Log.e(TAG, "accept: 讀取數據成功:" + tngouBeen.toString()); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() ); Log.e(TAG, "accept: 讀取數據失敗:"+throwable.getMessage() ); mRxOperatorsText.append("accept: 讀取數據失敗:"+throwable.getMessage()+"\n"); } });
有時候咱們的緩存可能還會分爲 memory 和 disk ,實際上都差很少,無非是多寫點 Observable ,而後經過 concat 合併便可。
想必這種狀況也在實際狀況中比比皆是,例如用戶註冊成功後須要自動登陸,咱們只須要先經過註冊接口註冊用戶信息,註冊成功後立刻調用登陸接口進行自動登陸便可。
咱們的 flatMap
剛好解決了這種應用場景,flatMap
操做符能夠將一個發射數據的 Observable
變換爲多個 Observables
,而後將它們發射的數據合併後放到一個單獨的 Observable
,利用這個特性,咱們很輕鬆地達到了咱們的需求。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list") .addQueryParameter("rows", 1 + "") .build() .getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,並解析到FootList .subscribeOn(Schedulers.io()) // 在io線程進行網絡請求 .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結果 .doOnNext(new Consumer<FoodList>() { @Override public void accept(@NonNull FoodList foodList) throws Exception { // 先根據獲取食品列表的響應結果作一些操做 Log.e(TAG, "accept: doOnNext :" + foodList.toString()); mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n"); } }) .observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求 .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() { @Override public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception { if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) { return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show") .addBodyParameter("id", foodList.getTngou().get(0).getId() + "") .build() .getObjectObservable(FoodDetail.class); } return null; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<FoodDetail>() { @Override public void accept(@NonNull FoodDetail foodDetail) throws Exception { Log.e(TAG, "accept: success :" + foodDetail.toString()); mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: error :" + throwable.getMessage()); mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n"); } });
在實際應用中,咱們極有可能會在一個頁面顯示的數據來源於多個接口,這時候咱們的 zip
操做符爲咱們排憂解難。
zip
操做符能夠將多個 Observable
的數據結合爲一個數據源再發射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .build() .getObjectObservable(MobileAddress.class); Observable<CategoryResult> observable2 = Network.getGankApi() .getCategoryData("Android",1,1); Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() { @Override public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception { return "合併後的數據爲:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who; } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { Log.e(TAG, "accept: 成功:" + s+"\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: 失敗:" + throwable+"\n"); } });
想必即時通信等須要輪訓的任務在現在的 APP 中已經是很常見,而 RxJava 2.x 的 interval
操做符可謂完美地解決了咱們的疑惑。
這裏就簡單的意思一下輪訓。
private Disposable mDisposable; @Override protected void doSomething() { mDisposable = Flowable.interval(1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: doOnNext : "+aLong ); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: 設置文本 :"+aLong ); mRxOperatorsText.append("accept: 設置文本 :"+aLong +"\n"); } }); } /** * 銷燬時中止心跳 */ @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null){ mDisposable.dispose(); } }
因爲 RxJava 2.x 變化較大沒法直接升級,幸運的是,官方爲咱們提供了 RxJava2Interrop 這個庫,能夠方便地把 RxJava 1.x 升級到 RxJava 2.x,或者將 RxJava 2.x 轉回到 RxJava 1.x。
本醬看你都看到這兒了,實爲將來的棟樑之才,因此且送你一本經書:
https://github.com/nanchen2251/RxJava2Examples