RxJava 合併操做

在項目開發中經常會在一個頁面中執行多個任務,多線程異步執行任務時哪一個任務先結束出結果這些並很差控制,譬如要進行幾個併發的網絡請求在都拿到結果後須要對數據進行處理,但願的是用戶只感知一次數據加載,若是不能作到這樣,反饋到用戶界面的體驗也就很差了。經過 RxJava 的合併操做符咱們可以很方便的應對這些狀況。java

經常使用的幾個合併操做符

咱們先建立以下三個不一樣數據類型的 observable 做爲示例數據數組

// 先建立三個不一樣類型的示例 observable
    private val observable = Observable.fromArray(1, 2, 3)
            .concatMap(object : Function<Int, ObservableSource<Int>> {
                override fun apply(t: Int): ObservableSource<Int> {
                    return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
                }
            })

    private val observable1 = Observable.just("a", "b", "c")
            .concatMap(object : Function<String, ObservableSource<String>> {
                override fun apply(t: String): ObservableSource<String> {
                    return Observable.just(t).delay(400, TimeUnit.MILLISECONDS)
                }
            })
			
	private val observable2 = Observable.fromArray(1.0f, 2.0f, 3.0f)
            .concatMap(object : Function<Float, ObservableSource<Float>> {
                override fun apply(t: Float): ObservableSource<Float> {
                    return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS)
                }
            })
複製代碼

concat()

concat() 操做符將多個 Observable 按前後順序進行合併,當合並的 Observable 泛型類型不一致時,事件流中的對象類型只能使用 Object(java),Any(kotlin)。經過以下示例代碼能夠直觀的看到 concat 後事件的發送順序:網絡

Observable.concat(observable, observable1)
                .subscribe(object : AppCallBack<Any>() {
                    override fun success(data: Any?) {
                        println("contact----->$data")
                    }

                    override fun fail(code: Long?, msg: String?) {

                    }

                    override fun finish(success: Boolean) {
                        merge()
                    }

                })
複製代碼

獲得的輸出結果爲: 多線程

concat.png
observable 發送 一、二、3 的時間間隔是大於 observable1發送 a、b、c 的間隔的,但輸出結果爲 123 打印完畢後再打印 abc 。這說明 concat 操做符是線性有序的,要等前一個 observable 發送完畢後纔會處理下一個 observable。當咱們須要多個接口的返回數據按順序進行處理時可使用 concat 操做符合並請求。

多於四個 observable 的合併: 兩種方式併發

  • concat() 傳入一個 Iterable<? extends ObservableSource<? extends T> 對象
  • concatArray() 傳入一個 Observable 數組。

merge()

merge() 操做符將多個 Observable 無序合併,當合並的 Observable 泛型類型不一致時,事件流中的對象類型只能使用 Object(java),Any(kotlin)。app

Observable.merge(observable, observable1)
                .subscribe(object : AppCallBack<Any>() {
                    override fun success(data: Any?) {
                        println("merge----->$data")
                    }

                    override fun fail(code: Long?, msg: String?) {

                    }

                    override fun finish(success: Boolean) {
                        zip()
                    }

                })
複製代碼

獲得的輸出結果爲: 異步

merge.png
abc 與 123 是按照時間前後順序交錯進行輸出的,說明 merge() 後事件的發送是併發的無序的,先發送先處理

多於四個 observable 的合併: 兩種方式ide

  • merge() 傳入一個 Iterable<? extends ObservableSource<? extends T> 對象
  • mergeArray() 傳入一個 Observable 數組。

zip()

上面的兩種合併都是單個事件 item 訂閱監聽,若是想合併的事件 item 都接收到數據時處理這兩個事件數據就須要使用 zip() 操做符。spa

Observable.zip(observable, observable1, object : BiFunction<Int, String, String> {
            override fun apply(t1: Int, t2: String): String {
                return "t1=$t1 t2=$t2"
            }
        }).subscribe(object : AppCallBack<String>() {
            override fun success(data: String?) {
                println("zip----->$data")
            }

            override fun fail(code: Long?, msg: String?) {

            }

            override fun finish(success: Boolean) {

            }

        })
複製代碼

獲得的輸出結果爲: 線程

zip.png
使用 zip 合併時,會等待每一次的合併項都發送完畢後再發送下一輪的事件。當咱們須要從兩個數據源拿數據,可是須要統一合併顯示時可使用 zip 操做符對事件流進行合併。

多個 observable 的合併: zip() 的多事件合併就有點厲害了,支持九個 Observable 按數據類型合併,除了兩個觀察者對象合併時 zipper 是 BiFunction 其餘的爲 FunctionX ,X 爲合併個數。若是多於九個觀察者對象合併,與上面兩種合併同樣可使用 zipArray() 進行合併,可是合併後的觀察結果是一個 Object 數組對象,須要本身判斷數據類型

結語

除了以上的建立 Observable 對象級別的合併操做符,還有一些事件流中的操做符,譬如第一段代碼中的 concatMap ,在事件流前面添加其餘事件的 startWith() 等。仍是比較完善的

相關文章
相關標籤/搜索