RxJava 1.x使用與理解

RxJava 1.x使用與理解——2018.5.22html

前一段時間,項目引入RxJava,用起來很簡單,可是對原理不甚理解,因而參考各類資料,對照源碼,進行了深刻學習,寫在這裏,但願對看到的小夥伴有所幫助react

RxJava源碼理解並不簡單,感謝各位前輩們的無私分析分享,才讓我能更高效地學習進步,再次感謝!android

本文定位:學習筆記

學習過程記錄,加深理解,提高文字組合表達能力。也但願能給學習RxJava的同窗一些靈感
大部份內容整理於[此文]數據庫

RxJava是什麼


tips:無相關基礎的話,看了下面的相關定義,是不太可能直接理解的,因此,不建議一開始糾結於此段內容,能夠先看後面內容,回頭就會豁然開朗了。編程

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. ————官網
翻譯:RxJava – Reactive Extensions for the JVM – 一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫網絡

這是須要強調的是**Reactive Extensions**,響應式編程擴展,RxJava實際是實現了Java中的響應式編程風格,RxJava是ReactiveX((有多語言版本)的Java實現數據結構

ReactiveX:An API for asynchronous programming with observable streams異步

RxJava優勢(ReactiveX優勢)


在Java平臺實現了響應式編程風格async

分步操做,每一步閱讀起來清晰:
複雜操做分步進行,使代碼在複雜邏輯下看下來依然邏輯清晰。經過鏈式調用,將每一步功能組合起來實現複雜功能。ide

異步操做很關鍵的一點是程序的簡潔性,由於在調度過程比較複雜的狀況下,異步代碼常常會既難寫也難被讀懂。 Android 創造的 AsyncTask 和Handler ,其實都是爲了讓異步代碼更加簡潔。RxJava 的優點也是簡潔,但它的簡潔的不同凡響之處在於,隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔。

響應式編程的思路


數據流:

Event buses 或者 Click events 本質上就是異步事件流,你能夠監聽並處理這些事件。響應式編程的思路大概以下:你能夠用包括 Click 和 Hover 事件在內的任何東西建立 Data stream。Stream 廉價且常見,任何東西均可以是一個 Stream:變量、用戶輸入、屬性、Cache、數據結構等等。舉個例子,想像一下你的 Twitter feed 就像是 Click events 那樣的 Data stream,你能夠監聽它並相應的做出響應。——jikexueyuan

數據流操做:

在這個基礎上,你還有使人驚豔的函數去組合、建立、過濾這些 Streams。這就是函數式魔法的用武之地。Stream 能接受一個,甚至多個 Stream 爲輸入。你能夠融合兩個 Stream,也能夠從一個 Stream 中過濾出你感興趣的 Events 以生成一個新的 Stream,還能夠把一個 Stream 中的數據值 映射到一個新的 Stream 中。

擴展的觀察者模式


觀察者模式

觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,須要在 B 變化的一瞬間作出反應。

Android 開發中一個比較典型的例子是點擊監聽器 OnClickListener 。對設置 OnClickListener 來講, View 是被觀察者, OnClickListener 是觀察者,兩者經過 setOnClickListener() 方法達成訂閱關係。訂閱以後用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已經註冊的 OnClickListener 。

採起這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也可以獲得最高的反饋速度。

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer。

與傳統觀察者模式不一樣, RxJava 的事件回調方法除了普通事件 onNext() (至關於 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
  • 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted() 和 onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

RxJava變換


所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不一樣的事件或事件序列。

RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大緣由。

RxJava的API,主要都是基於變換實現的。

變換的原理:lift()

這些變換雖然功能各有不一樣,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法: lift(Operator)。首先看一下 lift() 的內部實現(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
// 若是須要看源碼,能夠去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

這段代碼頗有意思:它生成了一個新的 Observable 並返回,並且建立新 Observable 所用的參數 OnSubscribe 的回調方法 call() 中的實現居然看起來和前面講過的 Observable.subscribe() 同樣!然而它們並不同喲~不同的地方關鍵就在於第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對象不一樣

subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象,這個沒有問題,可是 lift() 以後的狀況就複雜了點。

當含有 lift() 時:

  1. lift() 建立了一個 Observable 後,加上以前的原始 Observable,已經有兩個 Observable 了;
  2. 而一樣地,新 Observable 裏的新 OnSubscribe 加上以前的原始 Observable 中的原始 OnSubscribe,也就有了兩個 OnSubscribe;
  3. 當用戶調用通過 lift() 後的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,因而它所觸發的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe;
  4. 而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call() 方法裏,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這裏,經過本身的 call() 方法將新 Subscriber 和原始 Subscriber 進行關聯,並插入本身的『變換』代碼以實現變換),而後利用這個新 Subscriber 向原始 Observable 進行訂閱。

這樣就實現了 lift() 過程,有點像一種代理機制,經過事件攔截和處理實現事件序列的變換

精簡掉細節的話,也能夠這麼說:在 Observable 執行了 lift(Operator) 方法以後,會返回一個新的 Observable,這個新的 Observable 會像一個代理同樣,負責接收原始的 Observable 發出的事件,並在處理後發送給 Subscriber。

兩次和屢次的 lift() 圖示:

線程控制 —— Scheduler


在不指定線程的狀況下, RxJava 遵循的是線程不變的原則,即:在哪一個線程調用 subscribe(),就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到 Scheduler (調度器)。

Scheduler 的 API

在RxJava 中,Scheduler ——調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:

  • Schedulers.immediate(): 直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。
  • Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。
  • Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。

有了這幾個 Scheduler ,就可使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫作事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫作事件消費的線程。

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

前面講到了,能夠利用 subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發生在不一樣的線程。但是在瞭解了 map() flatMap() 等變換方法後,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾回線程?

答案是:能。由於 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 並非(嚴格說應該爲『不必定是』,但這裏不妨理解爲『不是』)subscribe() 參數中的 Subscriber ,而是 observeOn() 執行時的當前 Observable 所對應的 Subscriber ,即它的直接下級 Subscriber 。換句話說,observeOn() 指定的是它以後的操做所在的線程。所以若是有屢次切換線程的需求,只要在每一個想要切換線程的位置調用一次 observeOn() 便可。

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

不一樣於 observeOn() , subscribeOn() 的位置放在哪裏均可以,但它是隻能調用一次的。

Scheduler 的原理

subscribeOn() 和 observeOn() 的內部實現,也是用的 lift()

subscribeOn()在新返回的Observable中,線程切換髮生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件尚未開始發送,所以 subscribeOn() 的線程控制能夠從事件發出的開端就形成影響

而 observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級 Subscriber 發送事件時,所以 observeOn() 控制的是它後面的線程

多個 subscribeOn() 和 observeOn() 混合使用時,線程調度圖解

圖中共有 5 處含有對事件的操做。由圖中能夠看出,①和②兩處受第一個 subscribeOn() 影響,運行在紅色線程;③和④處受第一個 observeOn() 的影響,運行在綠色線程;⑤處受第二個 onserveOn() 影響,運行在紫色線程;而第二個 subscribeOn() ,因爲在通知過程當中線程就被第一個 subscribeOn() 截斷,所以對整個流程並無任何影響。這裏也就回答了前面的問題:當使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起做用。

RxJava 的適用場景

綜合來說:對於能夠分步進行的複雜的數據變換很是友好

舉例:

  • 與 Retrofit 的結合
  • 數據多條件過濾
  • 各類異步操做

參考

相關文章
相關標籤/搜索