版權聲明:本文由晉中望原創文章,轉載請註明出處:
文章原文連接:https://www.qcloud.com/community/article/123編程
來源:騰雲閣 https://www.qcloud.com/community網絡
相信不少作Android或是Java研發的同窗對RxJava應該都早有耳聞了,尤爲是在Android開發的圈子裏,RxJava漸漸開始廣爲流行。一樣有不少同窗已經開始在本身的項目中使用RxJava。它可以幫助咱們在處理異步事件時可以省去那些複雜而繁瑣的代碼,尤爲是當某些場景邏輯中回調中嵌入回調時,使用RxJava依舊可以讓咱們的代碼保持極高的可讀性與簡潔性。不只如此,這種基於異步數據流概念的編程模式事實上一樣也能普遍運用在移動端這種包括網絡調用、用戶觸摸輸入和系統彈框等在內的多種響應驅動的場景。那麼如今,就讓咱們一塊兒分析一下RxJava的響應流程吧。
(本文基於RxJava-1.1.3)異步
首先來看一個簡單的例子:
函數
運行結果爲:
從結果中咱們不難看出總體的調用流程:spa
首先經過調用Observable.create()
方法生成一個被觀察者,緊接着在這裏咱們又調用了map()
方法對原被觀察者進行數據流的變換操做,生成一個新的被觀察者(爲什麼是新的被觀察者後文會講),最後調用subscribe()
方法,傳入咱們的觀察者,這裏觀察者訂閱的則是調用map()以後生成的新被觀察者。線程
在整個過程當中咱們會注意到三個主角:Observable、OnSubscribe、Subscriber,全部的操做都是圍繞它們進行的。不難看出這裏三個角色的分工:code
因此接下來咱們以這三個角色爲中心來分析具體的流程。orm
首先咱們進入Observable.create()
看看:
這裏調用構造函數生成了一個Observable對象並將傳入的OnSubscribe賦給本身的成員變量onsubscribe
,等等,這個hook是從哪裏冒出來的?咱們向上找:
RxJavaObservableExecutionHook
這個抽象Proxy類默認對OnSubscribe對象不作任何處理,不過經過繼承該類並重寫onCreate()
等方法咱們能夠對這些方法對應的時機作一些額外處理好比打Log或者一些數據收集方面的工做。對象
到目前最初始的被觀察者已經生成了,咱們再來看看觀察者這邊。咱們知道經過調用observable.subscribe()
方法傳入一個觀察者即構成了觀察者與被觀察者之間的訂閱關係,那麼這內部又是如何實現的呢?看代碼:
繼承
這裏咱們略去部分無關代碼看主要部分,subscribe.onStart()
默認空實現咱們暫且不用管它,對於傳進來的subscriber
要包裝成SafeSubscriber
,這個SafeSubscriber
對原來的subscriber
的一系列方法作了更完善的處理,包括:onError()
與onCompleted()
只會有一個被執行;保證一旦onError()
或者onCompleted()
被執行,將再也不能再執onNext()
等狀況。這裏封裝爲SafeSubscriber
以後,調用onSubscribe.call()
,並將subscriber傳入,這樣就完成了一次訂閱。
顯而易見,Subscriber做爲觀察者,在訂閱行爲完成後,其具體行爲在整個鏈式調用中起着相當重要的做用,咱們來看看它內部的構成的主要部分:
每一個Subscriber都持有一個SubscriptionList
,這個list保存的是全部該觀察者的訂閱事件,同時Subscriber也對應實現了Subscription
接口,當這個Subscriber取消訂閱的時候會將持有事件列表中的全部Subscription
取消訂閱,而且今後再也不接受任何訂閱事件。同時,經過Producer
能夠去限定該Subscriber所接收的數據流的總量,這個限制量實際上是加在Subscriber.onNext()
方法上的,onComplete()
、onError()
則不會受到其影響。由於是底層抽象類,onNext()
、onComplete()
、onError()
統一不在這裏處理。
在收到Observable的消息以前咱們有可能會對數據流進行處理,例如map()、flatMap()、deBounce()、buffer()等方法,本例中咱們用了map()方法,它接收了原被觀察者發射的數據並將經過該方法返回的結果做爲新的數據發射出去,至關於作了一層中間轉化:
咱們接着看這個轉化過程:
這裏是經過一個lift()
方法實現的,再查看其餘的轉化方法發現內部也都使用lift()實現的,看來這個lift()
就是關鍵所在了,不過不急,咱們先來看看這個OperationMap
是什麼:
OperationMap實現了Operator接口的call()
方法,該方法接受外部傳入的觀察者,並將其做爲參數構造出了一個新的觀察者,咱們不難發現o.onNext(transformer.call(t))
;這一句起了相當重要的做用,這裏的接口transformer
將泛型T轉化爲泛型R:
這樣以後,再將轉換後的數據傳回至原觀察者的onNext()方法,就完成了觀察數據流的轉化,可是你應該也注意到了,咱們用來作轉換的這個新的觀察者並無實現訂閱被觀察者的操做,這個訂閱操做又是在哪裏實現的呢?答案就是接下來的lift()
:
在這裏咱們新生成了一個Observable
對象,在這個新對象的onSubscribe
成員的call()方法中咱們經過operator.call()
拿到以前生成的未產生訂閱的觀察者st,以後將它做爲參數傳入一開始的onSubscribe.call()
中,即完成了這個中間訂閱的過程。
如今咱們將整個流程梳理一下:
一次map()變換
根據Operator實例生成新的Subscriber
經過lift()生成新的Observable
原Subscriber訂閱新的Observavble
新的Observable中onSubscribe通知新Subscriber訂閱原Observable
爲了便於理解,這裏借用一下扔物線的圖:
以上就是一次map()
變換的流程,事實上屢次map()
也是一樣道理:最外層的目標Subscriber發生訂閱行爲後,onSubscribe.onNext()
會逐層嵌套調用,直至初始Observable被最底層的Subscriber訂閱,經過Operator的一層層變化將消息傳到目標Subscriber。再次祭出扔物線的圖:
至於其餘的多種變化的實現流程也都很相似,藉助於Operator的不一樣實現來達到變換數據流的目的。例如其中的flatMap()
,它須要進行兩次lift()
,其中第二次是OperationMerge,將轉換成的每個Observable數據流經過InnerSubscriber
這個紐帶訂閱後,在InnerSubscriber的onNext()
中拿到R,再經過傳入的parent(也就是原MergeSubscriber
)將它們所有發射(emit)出去,由最外層咱們傳入的Subscriber
統一接收,這樣就完成了 T => Observable<R> => R
的轉化:
除此以外,還有許多各式各樣的操做符,若是它們還不能知足你的須要,你也能夠經過實現Operator接口定製新的操做符。靈活運用它們每每能達到事半功倍的效果,好比經過使用sample()
、debounce()
等操做符有效避免backpressure
的須要等等,這裏就不一一介紹了。
下篇將繼續從"線程切換過程"開始分析
文章來源公衆號:QQ空間終端開發團隊(qzonemobiledev)