RxJava給咱們提供了不少變換的操做符,map、flatMap就是比較經常使用的操做符,通常咱們使用的時候,都是看官方文檔來了解每一個操做符的含義,可是我本身感受下來,看官方文檔使用沒問題,可是總有一點隔靴搔癢的意思,因此我還要去RxJava的源碼一探究竟,作到心中有數。html
咱們先從相對簡單的 Map 開始react
官方定義:transform the items emitted by an Observable by applying a function to each itemapp
拙劣的翻譯:應用一個函數 轉換全部的被髮射的item函數
官方的圖解:oop
到這裏咱們總結一下:線程
這裏拋出一個問題,map 調用咱們提供的function進行轉換,那麼這個function在何時被調用?在哪一個線程被調用?(這個對咱們實際工程中使用map有意義,知道代碼被執行的線程是必須的)翻譯
廢話很少說,進入源碼3d
Observable類是RxJava的門面,基本上全部的轉換符都在這裏定義,直接看Map 的方法定義orm
能夠看到,Function類,泛型有2個參數,第一個是原數據類型,第二個是轉換後的數據類型,最終返回的是ObservableMap 類(RxJava的類命名很規範,若是是Observable類型的就是Observable開頭 + 具體的操做符名稱,若是是Observer類型的 就是 具體的操做符名稱 + Observer結尾)咱們進入ObservableMap類,Observable類以前的文章有提到過,subscribeActual 是個重要的鉤子方法,因此咱們直接看ObservableMap如何重寫該方法的cdn
方法代碼就一行,調用裝飾的Observable的subscribe方法,傳遞一個MapObserver對象,Observer類咱們就比較熟悉了,咱們這裏主要看onNext方法
代碼也很簡單,紅框標識的就是 mapper 轉換函數被調用的地方,獲得轉換後的對象v,傳遞給被裝飾的Observer 的onNext方法,到這裏,一次數據的map轉換就結束了。源碼的實現仍是很簡單的,在咱們瞭解了源碼的實現後,思路會更清晰,寫代碼時也會更有把握。
如今咱們來解答前面咱們拋出的問題,Function在何時被調用?在哪一個線程被調用? Function調用的地方已經清楚了,在ObserverMap 的 onNext方法中,那麼調用的線程呢,由於是在Observer方法中被調用,因此若是在map 以前 調用了 ObserverOn 方法設置監聽線程,那麼就在該監聽線程,若是沒有設置 ObserverOn 可是設置了 SubscribeOn方法設置發射線程,那麼就在該 發射線程,若是SubscribeOn也沒有設置,那就在Observable的建立線程。
到此Map 就介紹完了,接下來是Map 的好兄弟 FlatMap,調用邏輯稍微複雜一點點,看官們耐心 -。-
###FlatMap
官方定義:transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
拙劣的翻譯:應用一個函數 轉換全部的被髮射的item從一個Observable轉成爲多個Observable,並將全部要發射的數據平鋪爲一個Observable
官方的圖解:
到這裏咱們總結一下:
這裏拋出一個問題,flatMap會將原來的Observable,轉換爲多個Observable來發射數據,那麼這些發射的數據是否會嚴格按順序發射而後被Observer接收?
問題先留在這裏,進入源碼
FlatMap操做符涉及的代碼會相對多一些,可是也是有規律可循。 一樣到Observable 類中看 flatMap的定義,源碼做者爲了方便開發者調用,提供了多個方法重載,咱們最經常使用的方法定義以下
最終調用的方法是
跟map 的套路 差很少,咱們直接進入 ObservableFlatMap類, 咱們仍是看它的 subscribeActual 方法實現
能夠看到,它給原Observer 裝飾後的 Observer 是 MergeObserver,咱們再繼續看 MergeObserver 的 onNext 方法
因爲咱們默認調用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 因此最終會調用 subscribeInner(p),注意這裏咱們的mapper方法以及被調用了,p就是跟咱們傳入的Function生成的Observable,咱們再繼續往下看
通常咱們傳入的Function 生成的Observable 都不是 Callable類型的,因此最終傳給Observable p 的 是InnerObserver, 找到了最終元兇,直接去看它的onNext方法實現吧。
funsionMode 默認是 None,走第一個if 邏輯,最終調用的是 上面的MergeObserable 的 tryEmit 方法,繼續進去看
這裏要插一句,MergeObserver 繼承了 AtomicInteger,因此這裏的tryEmit方法就利用了 AtomicInteger 的同步機制,因此同時只會有一個 value 被 actual Observer 發射,並且這裏 恰好 能夠解答咱們上面留下的 問題,因爲 AtomicInteger CAS鎖只能保證操做的原子性,並不保證鎖的獲取順序,是搶佔式的,因此最終數據的發射順序並非固定的(同一個Observable發出的數據是有序的)
若是沒有獲取到鎖,就會將要發射的數據放入 隊列中,drainLoop 方法會循環去獲取隊列中的 數據,而後發射,因爲篇幅有限,更詳細的調用過程你們能夠看源碼。
Map 和 FlatMap 二個操做符的 源碼就解析到這裏,水平有限,有不對的,還望大佬不吝賜教。