RxJava && Agera 從源碼簡要分析基本調用流程(2)

版權聲明:本文由晉中望原創文章,轉載請註明出處: 
文章原文連接:https://www.qcloud.com/community/article/124react

來源:騰雲閣 https://www.qcloud.com/communitygit

 

接上篇RxJava && Agera 從源碼簡要分析基本調用流程(1)咱們從"1.訂閱過程"、「2.變換過程」進行分析,下篇文章咱們繼續分析"3.線程切換過程"github

3.線程切換過程

從上文中咱們知道了RxJava可以幫助咱們對數據流進行靈活的變換,以達到鏈式結構操做的目的,然而它的強大不止於此。下面咱們就來看看它的又一利器,調度器Scheduler:就像咱們所知道的,Scheduler是給Observable數據流添加多線程功能所準備的,通常咱們會經過使用subscribeOn()observeOn()方法傳入對應的Scheduler去指定數據流的每部分操做應該以何種方式運行在何種線程。對於咱們而言,最多見的莫過於在非主線程獲取並處理數據以後在主線程更新UI這樣的場景了:

這是咱們十分常見的調用方法,一鼓作氣就把不一樣線程之間的處理都搞定了,由於是鏈式因此結構也很清晰,咱們如今來看看這其中的線程切換流程。緩存

  • subscribeOn()

    當咱們調用subscribeOn()的時候:

    能夠看到這裏也是調用了create()去生成一個Observable,而OperatorSubscribeOn則是實現了OnSubscribe接口,同時將原始的Observable和咱們須要的scheduler傳入:


    能夠看出來,這裏對subscriber的處理與前文中OperatorMapcall()subscriber的處理很類似。在這裏咱們一樣會根據傳入的subscriber構造出新的Subscribers,不過這一系列的過程大部分都是由worker經過schedule()去執行的,從後面setProducer()中對於線程的判斷,再結合subscribeOn()方法的目的咱們能大概推測出,這個worker在必定程度上就至關於一個新線程的代理執行者,schedule()所實現的與Thread類中run()應該十分相似。咱們如今來看看這個worker的執行過程。
    首先從Schedulers.io()進入:

    這個經過hook拿到scheduler的過程咱們先無論,直接進CachedThreadScheduler,看它的createWorker()方法:

    這裏的pool是一個原子變量引用AtomicReference,所持有的則是CachedWorkerPool,於是這個pool顧名思義就是用來保存worker的緩存池啦,咱們從緩存池裏拿到須要的worker並做了一層封裝成爲EventLoopWorker

    在這裏咱們終於發現目標ThreadWorker,它繼承自NewThreadWorker,以前的schedule()方法最終都會到這個scheduleActual()方法裏:

    這裏咱們看到了executor線程池,咱們用Schedulers.io()最終實現的線程切換的本質就在這裏了。如今再結合以前的過程咱們從頭梳理一下:

    subscribeOn()時,咱們會新生成一個Observable,它的成員onSubscribe會在目標Subscriber訂閱時使用傳入的Scheduler的worker做爲線程調度執行者,在對應的線程中通知原始Observable發送消息給這個過程當中臨時生成的Subscriber,這個Subscriber又會通知到目標Subscriber,這樣就完成了subscribeOn()的過程。多線程

  • observeOn()

    下面咱們接着來看看observeOn()

    咱們直接看最終調用的部分,能夠看到這裏又是一個lift(),在這裏傳入了OperatorObserveOn,它與OperatorSubscribeOn不一樣,是一個OperatorOperator的功能咱們上文中已經講過就不贅述了),它構造出了新的觀察者ObserveOnSubscriber並實現了Action0接口:

    能夠看出來,這裏ObserveOnSubscriber全部的發送給目標Subscriber child的消息都被切換到了recursiveScheduler的線程做處理,也就達到了將線程切回的目的。app

總結observeOn()總體流程以下:

對比subscribeOn()observeOn()這兩個過程,咱們不難發現二者的區別:subscribeOn()將初始Observable的訂閱事件總體都切換到了另外一個線程;而observeOn()則是將初始Observable發送的消息切換到另外一個線程通知到目標Subscriber。前者把 「訂閱 + 發送」 的切換了一個線程,後者把 「發送」 切換了一個線程。因此,咱們的代碼中所實現的功能實際上是:

這樣就能很容易實現耗時任務在子線程操做,在主線程做更新操做等這些常見場景的功能啦。框架

4.其餘角色

Subject
Subject在Rx系列是一個比較特殊的角色,它繼承了Observable的同時也實現了Observer接口,也就是說它既可做爲觀察者,也可做爲被觀察者,他通常被用來做爲鏈接多個不一樣Observable、Observer之間的紐帶。可能你會奇怪,咱們不是已經有了像map()flatMap()這類的操做符去變化 Observable數據流了嗎,爲何還要引入Subject這個東西呢?這是由於Subject所承擔的工做並不是是針對Observable數據流內容的轉換鏈接,而是數據流自己在Observable、Observer之間的調度。光這麼說可能仍是很模糊,咱們舉個《RxJava Essentials》中的例子:
異步

咱們經過create()建立了一個PublishSubject,觀察者成功訂閱了這個subject,然而這個subject卻沒有任何數據要發送,咱們只是知道他將來會發送的會是String值而已。以後,當咱們調用subject.onNext()時,消息才被髮送,Observer的onNext()被觸發調用,輸出了"Hello World"。oop

這裏咱們注意到,當訂閱事件發生時,咱們的subject是沒有產生數據流的,直到它發射了"Hello World",數據流纔開始運轉,試想咱們若是將訂閱過程和subject.onNext()調換一下位置,那麼Observer就必定不會接受到"Hello World"了(這不是廢話嗎- -|||),於是這也在根本上反映了Observable的冷熱區別。google

通常而言,咱們的Observable都屬於Cold Observables,就像看視頻,每次點開新視頻咱們都要從頭開始播放;而Subject則默認屬於Hot Observables,就像看直播,視頻數據永遠都是新的。
基於這種屬性,Subject天然擁有了對接收到的數據流進行選擇調度等的能力了,所以,咱們對於Subject的使用也就一般基於以下的思路:

在前面的例子裏咱們用到的是PublishSubject,它只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。等一下,這功能聽起來是否是有些似曾相識呢?

沒錯,就是EventBus和Otto。(RxJava的出現慢慢讓Otto退出了舞臺,如今Otto的Repo已是Deprecated狀態了,而EventBus依舊堅挺)基於RxJava的觀察訂閱取消的能力和PublishSubject的功能,咱們十分容易就能寫出實現了最基本功能的簡易事件總線框架:

固然Subject還有其餘如BehaviorSubjectReplaySubjectAsyncSubject等類型,你們能夠去看官方文檔,寫得十分詳細,這裏就不介紹了。

三.後記

前面相信最近這段日子裏,提到RxJava,你們就會想到Google最近剛剛開源的Agera。Agera做爲專門爲Android打造的Reactive Programming框架,不免會被拿來與RxJava作對比。本文前面RxJava的主體流程分析已近尾聲,如今咱們再來看看Agera這東東又是怎麼一回事。

首先先上結論

Agera最初是爲了Google Play Movies而開發的一個內部框架,如今開源出來了,它雖然是在RxJava以後纔出現,可是徹底獨立於RxJava,與它沒有任何關係(只不過開源的時間十分微妙罷了233333)。 與RxJava比起來,Agera更加專一於Android的生命週期,而RxJava則更加純粹地面向Java平臺而非Android。

也許你可能會問:「那麼RxAndroid呢,不是還有它嗎?」事實上,RxAndroid早在1.0版本的時候就進行了很大的重構,不少模塊被拆分到其餘的項目中去了,同時也刪除了部分代碼,僅存下來的部分可能是和Android線程相關的部分,好比AndroidSchedulersMainThreadSubscription等。鑑於這種狀況,咱們暫且不去關注RxAndroid,先把目光放在Agera上。

一樣也是基於觀察者模式,Agera和RxJava的角色分類大體類似,在Agera中,主要角色有兩個:Observable(被觀察者)、Updatable(觀察者)。



是的,相較於RxJava中的Observable,Agera中的Observable只是一個簡單的接口,也沒有範性的存在,Updatable亦是如此,這樣咱們要如何作到消息的傳遞呢?這就須要另一個接口了:

終於看到了泛型T,咱們的消息的傳遞能力就是依賴於此接口了。因此咱們將這個接口和基礎的Observable結合一下:

這裏的Repository<T>在必定程度上就是咱們想要的RxJava中的Observable<T>啦。相似地,Repository也有兩種類型的實現:

  • Direct - 所包含的數據老是可用的或者是可被同步計算出來的;一個Direct的Repository老是處於活躍(active)狀態下

  • Deferred - 所包含的數據是異步計算或拉去所得;一個Deffered的Repository直到有Updatable被添加進來以前都會是非活躍(inactive)狀態下
    是否是感到似曾相識呢?沒錯,Repository也是有冷熱區分的,不過咱們如今暫且不去關注這一點。回到上面接着看,既然如今發數據的角色有了,那麼咱們要如何接收數據呢?答案就是Receiver

相信看到這裏,你們應該也隱約感受到了:在Agera的世界裏,數據的傳輸與事件的傳遞是相互隔離開的,這是目前Agera與Rx系列的最大本質區別。Agera所使用的是一種push event, pull data的模型,這意味着event並不會攜帶任何data,Updatable在須要更新時,它本身會承擔起從數據源拉取數據的任務。這樣,提供數據的責任就從Observable中拆分了出來交給了Repository,讓其自身可以專一於發送一些簡單的事件如按鈕點擊、一次下拉刷新的觸發等等。

那麼,這樣的實現有什麼好處呢?

當這兩種處理分發邏輯分離開時,Updatable就沒必要觀察到來自Repository的完整數據變化的歷史,畢竟在大多數場景下,尤爲是更新UI的場景下,最新的數據每每纔是有用的數據。

可是我就是須要看到變化的歷史數據,怎麼辦?

不用擔憂,這裏咱們再請出一個角色Reservoir

顧名思義,Reservoir就是咱們用來存儲變化中的數據的地方,它繼承了ReceiverRepository,也就至關於同時具備了接收數據,發送數據的能力。經過查看其具體實現咱們能夠知道它的本質操做都是使用內部的Queue實現的:經過accept()接收到數據後入列,經過get()拿到數據後出列。若一個Updatable觀察了此Reservoir,其隊列中發生調度變化後即將出列的下一個數據若是是可用的(非空),就會通知該Updatable,進一步拉取這個數據發送給Receiver

如今,咱們已經大概瞭解了這幾個角色的功能屬性了,接下來咱們來看一段官方示例代碼:


是否是有些雲裏霧裏的感受呢?多虧有註釋,咱們大概可以猜出到底上面都作了什麼:使用須要的圖片規格做爲參數拼接到url中,拉取對應的圖片並用ImageView顯示出來。咱們結合API來看看整個過程:

  • Repositories.repositoryWithInitialValue(Result.absent())
    建立一個可運行(抑或說執行)的repository。
    初始化傳入值是Result,它用來歸納一些諸如apply()merge()的操做的結果的不可變對象,而且存在兩種狀態succeeded()failed()
    返回REventSource

  • observe()
    用於添加新的Observable做爲更新咱們的圖片的Event source,本例中不須要。
    返回RFrequency

  • onUpdatesPerLoop()
    在每個Looper Thread loop中如有來自多個Event Source的update()處理時,只需開啓一個數據處理流。
    返回RFlow

  • getFrom(new Supplier(…))
    忽略輸入值,使用來自給定Supplier的新獲取的數據做爲輸出值。
    返回RFlow

  • goTo(executor)
    切換到給定的executor繼續數據處理流。

  • attemptTransform(function())
    使用給定的function()變換輸入值,若變換失敗,則終止數據流;若成功,則取新的變換後的值做爲當前流指令的輸出。
    返回RTermination

  • orSkip()
    若前面的操做檢查爲失敗,就跳過剩下的數據處理流,而且不會通知全部已添加的Updatable。

  • thenTransform(function())
    與attemptTransform(function())類似,區別在於當必要時會發出通知。
    返回RConfig

  • onDeactivation(SEND_INTERRUPT)
    用於明確repository再也不active時的行爲。
    返回RConfig

  • compile()
    執行這個repository。
    返回Repository

總體流程乍看起來並無什麼特別的地方,可是真正的玄機其實藏在執行每一步的返回值裏:
初始的REventSource<T, T>表明着事件源的開端,它從傳入值接收了T initialValue,這裏的中,第一個T是當前repository的數據的類型,第二個T則是數據處理流開端的時候的數據的類型。

以後,當observe()調用後,咱們傳入事件源給REventSource,至關於設定好了須要的事件源和對應的開端,這裏返回的是RFrequency<T, T>,它繼承自REventSource,爲其添加了事件源的發送頻率的屬性。

以後,咱們來到了onUpdatesPerLoop(),這裏明確了所開啓的數據流的個數(也就是前面所講的頻率)後,返回了RFlow,這裏也就意味着咱們的數據流正式生成了。同時,這裏也是流式調用的起點。

拿到咱們的RFlow以後,咱們就能夠爲其提供數據源了,也就是前面說的Supplier,因而調用getFrom(),這樣咱們的數據流也就真正意義擁有了數據「乾貨」。

有了數據以後咱們就能夠按具體須要進行數據轉換了,這裏咱們能夠直接使用transform(),返回RFlow,以便進一步進行流式調用;也能夠調用attemptTransform()來對可能出現的異常進行處理,好比orSkip()、orEnd()以後繼續進行流式調用。

通過一系列的流式調用以後,咱們終於對數據處理完成啦,如今咱們能夠選擇先對成型的數據在作一次最後的包裝thenTransform(),或是與另外一個Supplier合併thenMergeIn()等。這些處理以後,咱們的返回值也就轉爲了RConfig,進入了最終配置和repository聲明結束的狀態。
在最終的這個配置過程當中,咱們調用了onDeactivation(),爲這個repository明確了最終進入非活躍狀態時的行爲,若是不須要其餘多餘的配置的話,咱們就能夠進入最終的compile()方法了。當咱們調用compile()時,就會按照前面所走過的全部流程與配置去執行並生成這個repository。到此,咱們的repository才真正被建立了出來。

以上就是repository從無到有的全過程。當repository誕生後,咱們也就能夠傳輸須要的數據啦。再回到上面的示例代碼:

咱們在onResume()onPause()這兩個生命週期下分別添加、移除了Updatable。相較於RxJava中經過Subscription去取消訂閱的作法,Agera的這種寫法顯然更爲清晰也更爲整潔。咱們的Activity實現了Updatable和Receiver接口,直接看其實現方法:

能夠看到這裏repository將數據發送給了receiver,也就是本身,在對應的accept()方法中接收到咱們想要的bitmap後,這張圖片也就顯示出來了,示例代碼中的完整流程也就結束了。

總結一下上述過程:

  • 首先Repositories.repositoryWithInitialValue()生成原點REventSource。

  • 配置完Observable以後進入RFrequency狀態,接着配置數據流的流數。

  • 前面配置完成後,數據流RFlow生成,以後經過getFrom()mergeIn()、transform()等方法可進一步進行流式調用;也可使用attemptXXX()方法代替原方法,後面接着調用orSkip()orEnd()進行error handling處理。當使用attemptXXX()方法時,數據流狀態會變爲RTermination,它表明此時的狀態已具備終結數據流的能力,是否終結數據流要根據failed check觸發,結合後面跟着調用的orSkip()orEnd(),咱們的數據流會從RTermination再次切換爲RFlow,以便進行後面的流式調用。

  • 通過前面一系列的流式處理,咱們須要結束數據流時,能夠選擇調用thenXXX()方法,對數據流進行最終的處理,處理以後,數據流狀態會變爲 RConfig;也能夠爲此行爲添加error handling處理,選擇thenAttemptXXX()方法,後面一樣接上orSkip()orEnd()便可,最終數據流也會轉爲Rconfig狀態。

  • 此時,咱們能夠在結束前按須要選擇對數據流進行最後的配置,例如:調用onDeactivation()配置從「訂閱」到「取消訂閱」的過程是否須要繼續執行數據流等等。

  • 一切都部署完畢後,咱們compile()這個RConfig,獲得最終的成型的Repository,它具備添加Updatable、發送數據通知Receiver的能力。

  • 咱們根據須要添加Updatablerepository在數據流處理完成後會經過update()發送event通知Updatable

  • Updatable收到通知後則會拉取repository的成果數據,並將數據經過accept()發送給Receiver。完成 Push event, pull data 的流程。

以上就是一次Agera的流式調用的內部基本流程。能夠看到,除了 Push event, pull data 這一特色、goLazy的加載模式(本文未介紹)等,依託於較爲精簡的方法,Agera的流式調用過程一樣也可以作到過程清晰,而且上手難度相較於RxJava也要簡單一些,開源做者是Google的團隊也讓一些G粉對其好感度提高很多。不過Agera在本文撰寫時則是 agera-1.0.0-rc2,將來的版本還有不少不肯定因素,相比之下Rx系列發展了這麼久,框架已經相對成熟。究竟用Agera仍是RxJava,你們按本身的喜愛選擇吧。

新人處女做,文章中不免會有錯誤遺漏以及表述不清晰的地方,但願你們多多批評指正,謝謝!

參考&拓展:
RxJava Wiki
Agera Wiki
給 Android 開發者的 RxJava 詳解
Google Agera vs. ReactiveX
When Iron Man becomes reactive
Top 7 Tips for RxJava on Android
How to Keep your RxJava Subscribers from Leaking
RxJava – the production line

文章來源公衆號:QQ空間終端開發團隊(qzonemobiledev)

相關文章
相關標籤/搜索