RxJava2源碼解讀之 Map、FlatMap

RxJava給咱們提供了不少變換的操做符,map、flatMap就是比較經常使用的操做符,通常咱們使用的時候,都是看官方文檔來了解每一個操做符的含義,可是我本身感受下來,看官方文檔使用沒問題,可是總有一點隔靴搔癢的意思,因此我還要去RxJava的源碼一探究竟,作到心中有數。html

咱們先從相對簡單的 Map 開始react

Map

官方定義:transform the items emitted by an Observable by applying a function to each itemapp

拙劣的翻譯:應用一個函數 轉換全部的被髮射的item函數

官方的圖解:oop

map 圖例.png

到這裏咱們總結一下:線程

  • map 轉換是一對一的,原來發射了幾個數據,轉換以後仍是幾個
  • map 轉換能夠改變發射的數據類型

這裏拋出一個問題,map 調用咱們提供的function進行轉換,那麼這個function在何時被調用?在哪一個線程被調用?(這個對咱們實際工程中使用map有意義,知道代碼被執行的線程是必須的)翻譯

廢話很少說,進入源碼3d

Map源碼

Observable類是RxJava的門面,基本上全部的轉換符都在這裏定義,直接看Map 的方法定義orm

map 方法.png

能夠看到,Function類,泛型有2個參數,第一個是原數據類型,第二個是轉換後的數據類型,最終返回的是ObservableMap 類(RxJava的類命名很規範,若是是Observable類型的就是Observable開頭 + 具體的操做符名稱,若是是Observer類型的 就是 具體的操做符名稱 + Observer結尾)咱們進入ObservableMap類,Observable類以前的文章有提到過,subscribeActual 是個重要的鉤子方法,因此咱們直接看ObservableMap如何重寫該方法的cdn

ObservaleMap.png

方法代碼就一行,調用裝飾的Observable的subscribe方法,傳遞一個MapObserver對象,Observer類咱們就比較熟悉了,咱們這裏主要看onNext方法

map onnext.png

代碼也很簡單,紅框標識的就是 mapper 轉換函數被調用的地方,獲得轉換後的對象v,傳遞給被裝飾的Observer 的onNext方法,到這裏,一次數據的map轉換就結束了。源碼的實現仍是很簡單的,在咱們瞭解了源碼的實現後,思路會更清晰,寫代碼時也會更有把握。

如今咱們來解答前面咱們拋出的問題,Function在何時被調用?在哪一個線程被調用? Function調用的地方已經清楚了,在ObserverMap 的 onNext方法中,那麼調用的線程呢,由於是在Observer方法中被調用,因此若是在map 以前 調用了 ObserverOn 方法設置監聽線程,那麼就在該監聽線程,若是沒有設置 ObserverOn 可是設置了 SubscribeOn方法設置發射線程,那麼就在該 發射線程,若是SubscribeOn也沒有設置,那就在Observable的建立線程。

到此Map 就介紹完了,接下來是Map 的好兄弟 FlatMap,調用邏輯稍微複雜一點點,看官們耐心 -。-

###FlatMap

官方定義:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

拙劣的翻譯:應用一個函數 轉換全部的被髮射的item從一個Observable轉成爲多個Observable,並將全部要發射的數據平鋪爲一個Observable

官方的圖解:

flatmap 圖例.png

到這裏咱們總結一下:

  • flatmap 轉換是一對多的(一對一固然也支持),原來發射了幾個數據,轉換以後能夠是更多個
  • flatMap 轉換一樣能夠改變發射的數據類型
  • flatMap 轉換後的數據,仍是會逐個發射給咱們的Observer來接收(就像這些數據是由一個Observable發射的同樣,實際上是多個Observable發射而後合併的)

這裏拋出一個問題,flatMap會將原來的Observable,轉換爲多個Observable來發射數據,那麼這些發射的數據是否會嚴格按順序發射而後被Observer接收

問題先留在這裏,進入源碼

FlatMap 源碼

FlatMap操做符涉及的代碼會相對多一些,可是也是有規律可循。 一樣到Observable 類中看 flatMap的定義,源碼做者爲了方便開發者調用,提供了多個方法重載,咱們最經常使用的方法定義以下

flatmap 方法1.png

最終調用的方法是

flatmap 方法.png

跟map 的套路 差很少,咱們直接進入 ObservableFlatMap類, 咱們仍是看它的 subscribeActual 方法實現

ObservableFlatMap.png

能夠看到,它給原Observer 裝飾後的 Observer 是 MergeObserver,咱們再繼續看 MergeObserver 的 onNext 方法

MergeObserver onnext.png

因爲咱們默認調用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 因此最終會調用 subscribeInner(p),注意這裏咱們的mapper方法以及被調用了,p就是跟咱們傳入的Function生成的Observable,咱們再繼續往下看

MergeObserver subscribeInner.png

通常咱們傳入的Function 生成的Observable 都不是 Callable類型的,因此最終傳給Observable p 的 是InnerObserver, 找到了最終元兇,直接去看它的onNext方法實現吧。

MergeObserver onnext.png

funsionMode 默認是 None,走第一個if 邏輯,最終調用的是 上面的MergeObserable 的 tryEmit 方法,繼續進去看

MergeObservable tryEmit.png

這裏要插一句,MergeObserver 繼承了 AtomicInteger,因此這裏的tryEmit方法就利用了 AtomicInteger 的同步機制,因此同時只會有一個 value 被 actual Observer 發射,並且這裏 恰好 能夠解答咱們上面留下的 問題,因爲 AtomicInteger CAS鎖只能保證操做的原子性,並不保證鎖的獲取順序,是搶佔式的,因此最終數據的發射順序並非固定的(同一個Observable發出的數據是有序的)

若是沒有獲取到鎖,就會將要發射的數據放入 隊列中,drainLoop 方法會循環去獲取隊列中的 數據,而後發射,因爲篇幅有限,更詳細的調用過程你們能夠看源碼。

dramloop.png

Map 和 FlatMap 二個操做符的 源碼就解析到這裏,水平有限,有不對的,還望大佬不吝賜教。

相關文章
相關標籤/搜索