關於RxJava的一次實踐

前言

以前寫RxJava相關的文章或小冊的時候,有讀者但願我能寫一點RxJava實踐相關的文章,雖然答應下來,可是也一直沒有找到太好的機會,由於糗百一直處在維護老項目的工做中,真的在業務層面深度使用RxJava的機會很少,至於用RxJava請求一下網絡接口就顯得有點不值一提了,所以一直沒有找到合適的業務代碼。最近在作一個新項目,是直播相關的產品,在一個重要的小業務裏用上了RxJava,就以此做爲案例,來向你們介紹RxJava在生產實踐中的使用思路。web

如何理解RxJava

在講如何使用RxJava以前,確定要講如何理解RxJava,由於你怎樣看待它決定了你會怎樣使用它,對於如何理解和使用RxJava,我在掘金小冊《響應式編程 —— RxJava 高階指南》中的第六章中有很是詳細的說明,你們若是有興趣去看看,順即可以支持一下。數據庫

固然,我在這裏仍是簡要的介紹一下,你須要把RxJava做爲一種構建對象(業務)關係的一種編程範式,而不是一種異步編程庫。編程

那麼什麼叫構建關係的編程範式?咱們舉個例子,以APP初始化工做爲例,假設咱們APP須要初始化sdk,數據庫,登錄以後,才跳轉到下一個頁面。那麼這裏的業務關係是怎樣的呢?後端

下面是一個初始化的業務,而他們的關係以下圖所示緩存

它應當是上圖這樣的,頁面跳轉以來上述三個業務的完成,而sdk,數據庫初始化,登錄接口這三者之間並不依賴。這樣咱們就梳理了這個小需求裏面的幾個業務之間的關係。那麼RxJava如何經過代碼實現構建關係呢?bash

之前,咱們多是這樣寫這段業務:websocket

initSDK(context)  // 業務1

initDatabase(context)  // 業務2

login(getUserId(context),new Callback{  // 業務3
   void onSucc(){
        startActivity()  // 業務4
    }
})

複製代碼

以上這段代碼,是很常見的業務初始化代碼,但實際上他們是順序嚴格的前後執行,這並非他們這個幾個業務之間真實的關係,他們真正的關係應該是分別完成業務1,2,3,而後完成業務4。網絡

所以想要真正構建業務之間的關係,RxJava就派上用場了。異步

// 初始化sdk
Observable obserInitSDK = Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())
// 初始化數據庫
Observable obserInitDB = Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())
// 完成登錄
Observable obserLogin = Observable.create((context)->{login(getUserId(context))})
                              .map((isLogin)->{returnContext()})
                            .subscribeOn(Schedulers.newThread())
                            
// 分別完成三個業務
Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
//分開完成三個業務以後跳轉到下一個
observable.subscribe(()->{startActivity()})

複製代碼

固然,不少時候,咱們並不須要嚴格按照業務關係來編寫代碼,由於不必,好比這裏initSDK(context),initDatabase(context)可能都不是耗時操做,咱們把它放在某個線程中前後執行也沒有問題。可是若是他們都是須要網絡請求接口的呢?顯然此時經過RxJava取構建業務關係是更簡單而高效的。socket

這個例子能夠說比較生動的解釋了關係構建的範式編程。RxJava經過構建關係,實際上使它具有了提升複雜/耗時操做的執行效率,以及解構複雜邏輯的特殊能力。而反過來只有以構建關係的角度來理解RxJava,你對它的使用纔會駕輕就熟,而不是侷限於異步編程,只見樹木不見森林。

更多關於RxJava相關的深度內容,你們能夠看看個人《響應式編程 —— RxJava 高階指南》

經過RxJava完成業務

說到這裏,才進入正題,關於RxJava的一次實踐。

先說一下需求背景:直播間的消息模塊,經過websocket鏈接實現實時收發直播間消息(單獨的線程),後端提供了一個獲取最近離線消息的接口,用於離線重連後獲取離線期間產生的直播間消息(顯然是另外一個線程或者主線程),此時的業務需求是,當websoket異常斷開時,客戶端須要主動重連,鏈接成功後,一邊接受websocket新消息,一邊拉取離線期間所產生的直播間消息,按照時間順序(離線消息在前)把消息統一轉發到上層模塊。

下圖是我對這段業務邏輯畫的一個示意圖,基本描述了這個業務邏輯的總體結構:

看完這個需求,咱們分析下這裏可能的難點;

  • websocket接受新消息和接口拉取的離線消息在不一樣的子線程,須要有序轉發到主線程
  • websocket重連以後,就開始接收新消息,另外一邊要從離線接口拉取離線消息,最後離線消息老是須要排在新消息以前

你們能夠想想,若是不用RxJava的話,正常狀況下要如何實現?複雜度如何?

我沒有想過這個問題,由於我首先考慮的是怎樣用RxJava完成這個需求,這裏也告訴你們一個原則,複雜的業務邏輯優先使用RxJava

咱們仍然以構建關係的思路來思考,可是咱們須要先思考那些業務是須要用RxJava實現的,在這裏,websocket的斷開重連,websocket如何獲取消息都是被sdk封裝好或者和由sdk本身的回調觸發,這些咱們不該當強行用RxJava實現,所以真正用RxJava這裏大概是三個業務邏輯:

  • 從websocket中獲取消息而後轉發(暫時稱業務1)
  • 從離線列表接口中獲取消息並轉發(暫時稱業務2)
  • 把兩個渠道的消息轉發到activity展現(暫時稱業務3)

有人可能要問,爲何不把順序排序也算上一個業務呢?它看起來很像一個單獨的業務。由於在這裏的順序排序其實表示的就是業務1和業務2的關係,由於示意圖很差描述前後順序,所以就先這麼畫了,看起來像一個單獨的業務,其實不算。

這三個業務各自都很簡單,他們的關係,總結來講就是:業務2的數據必須在業務1以前轉發到業務3

// 業務1
  // 用於傳遞從websocket sdk處獲取消息
   private var  onLineStream = ReplayProcessor.create<String>().toSerialized()
    // websocket中獲取的消息直接轉發到這裏
    fun senMessageFromOnLine(message: String = "", isCompleted: Boolean = false) {
        var stream = onLineStream
        if (stream.hasComplete() || stream.hasThrowable()) {
            return
        }
        if (!TextUtils.isEmpty(message)) {
            stream.onNext(message)
        }
        if (isCompleted) {
            stream.onComplete()
        }
    }
    
    // 業務2
    //經過接口獲取離線的消息,若是沒有,則返回空列表
    fun getOfflineMessageFlowable() {
        return Flowable.create(object :FlowableOnSubscribe<String>{
            override fun subscribe(emitter: FlowableEmitter<String>) {
                emitter.onNext(getOfflineMessageFromServer())
                emitter.onComplete()
            }

        },BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
    }
    
複製代碼

爲何使用的是Flowable類型,而不是Observable類型,由於這裏須要考慮消息的緩存,可能存在背壓的問題(關於RxJava最友好的文章——背壓),因此使用Flowable比較合理。

而爲何websocket消息使用的實現類是Processor不是Flowable呢?這由於websocket消息都是從外部獲取的,而不是咱們經過RxJava內部建立的數據,因此使用Processor會更靈活。

好了,咱們已經定義了前兩個業務了,第三個業務也很簡單,關鍵是經過關係構建把三個業務聯繫起來呢?咱們以前已經總結了業務之間的關係,從RxJava的角度來說,就是先接收離線消息的數據,再接收websocket消息的數據,咱們根據需求查找相關的文檔很容易找到concat操做符。因而:

/**
     * wesocket鏈接成功以後,調用
     */
    fun connected(){
        // concat老是保證先訂閱拉取第一個flowable數據,等第一個完成後,
        //再訂閱拉取第二個Flowable數據
        Flowable.concat<String>(getOfflineMessageFlowable(),
                onLineStream.onBackpressureBuffer(150, true))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(message-> dispatchToActivity(message) // 把有序的消息轉發到activity 業務3
    }

複製代碼

這樣,基本上核心代碼就完成了,這幾行代碼基本上實現了咱們的需求

相關文章
相關標籤/搜索