當Kotlin碰見RxJava多數據源

舒適提醒

閱讀本文最好有Kotlin基礎,若沒有基礎,可參考以前文章Kotlin初探使用Kotlin優雅的開發Android應用,以及RxJava基礎(本文基於RxJava2),固然我也會盡量詳細解釋讓你順利閱讀本文。javascript

源碼傳送門java

寫在前面

最近幾天回過頭,看了以前的總結RxJava操做符系列,感受對Rxjava多數據源的處理不是很理解,因此在總結學習一波。你們都知道,最近Kotlin語言一直佔據熱搜榜,褒貶不一,但我想說,無論有什麼想法都要拋在腦後,畢竟Google爸爸出手,你不情願也要跟隨它的步伐。鑑於此,本篇對RxJava多數據源的總結是基於Kotlin語言,也讓你們明白,使用Kotlin開發應用並非不能使用Java庫,如今有一部分人擔憂,Kotlin第三方庫那麼少,若是使用Kotlin開發那不是給本身找罪受,其實你徹底錯了,當你說這話的時候,我敢判定你都尚未接觸Kotlin,由於Koltin有一個最重要的優點就是和Java絕對兼容。android

多數據源處理操做符

在RxJava中多數據源處理的操做符不少,可是最經典的就要數merge,contact,zip了。若是對這三個操做符不是很熟悉的話,能夠去查看它的使用,固然若是你懶得去看,我也會簡單提一下。merge操做符能夠處理多個Observable發送的數據,它是一個異步操做,不保證數據發送的順序,即有可能出現數據交叉,當一個Observable發送了onError後,未執行的Observable不在繼續執行,直接執行merge的onError方法。git

contact操做符執行時一個同步操做,嚴格按照contact中傳入Observable前後執行,即前面的先執行後面的後執行,而且最終發送的數據也是有序的,即第一個Observable的數據發送完畢再發送第二個,依次類推。github

zip操做符和contact和merge有了本質的區別,它會將每一個Observable個數據項分佈對應返回一個Observable再發送,最終發送的數據量與最小數據長度相同。網絡

使用場景分析

假如如今咱們有三種商品,有一個查詢商品信息的接口,根據接口能夠查詢該商品的價格以及出售地點。商品實體類併發

data class Goods(var id:Int,var price: Int, var address: String)複製代碼

在Kotlin語言中,實體類建立用data class 關鍵詞,咱們不須要和Java同樣建立get/set方法,只需一行代碼搞定。dom

建立模擬網絡請求異步

object NetRequest {
    //模擬網絡請求
    fun getGoodsObservable(id: Int): Observable<Goods> {
fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }
    }
}複製代碼

在上面咱們建立了一個單例類,在Kotlin中使用object修飾類時即給咱們自動建立了一個單例對象。在每一句代碼結尾咱們不須要再和Java同樣寫一個分號「;」來結束,什麼也不用寫。ide

Observable.create使用的是lambda表達式,在Kotlin語言中是支持lambda表達式的。source 就是ObservableEmitter ,因此咱們能夠調用onNext發送數據。爲了更準確的模擬網絡請求,使用Thread.sleep隨機的延遲,模擬網絡請求的時間。

fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }複製代碼

固然因爲subscribe只有一個參數,因此咱們也能夠這樣寫。也就是省略了source ->,此時it就表示該參數數據。

return Observable.create {
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            it.onNext(data)
            it.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }複製代碼

在java中實現以下

return Observable.create(new ObservableOnSubscribe<Goods>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Goods> e) throws Exception {
             //處理邏輯
            }
        });複製代碼

merge

準備好了請求操做,開始使用merge看看執行的效果。

fun executeMerge() {
        Observable.merge(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }複製代碼

merge中有三個網絡請求操做,並經過subscribeOn(Schedulers.newThread())將網絡請求切換到線程中執行,數據都請求成功後,再經過observeOn(AndroidSchedulers.mainThread())切換到主線程請求數據。爲了三請求都成功後,咱們在更新UI,因此經過toList()將請求的數據轉換成List一塊發送。在上面的subscribe依然使用的lambda表達式,subscribe({},{})中第一個括號是onSuccess回調,裏面的it是接收到的List< Goods >數據,第二個括號是onError回調,it表示異常Throwable對象。
subscribe部分Java代碼

.subscribe(new Consumer<List<Goods>>() {
                    @Override
                    public void accept(@NonNull List<Goods> goodses) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                });複製代碼

固然若是你想使用RxJava2中onSubscribe(@NonNull Disposable d) ,你能夠這樣使用subscribe

.subscribe(object : SingleObserver<List<Goods>> {
                    override fun onSubscribe(d: Disposable?) {
                    }
                    override fun onError(e: Throwable?) {
                    }
                    override fun onSuccess(t: List<Goods>?) {
                    }
                })複製代碼

爲了觀察,咱們將請求成功的數據顯示在界面上,咱們建立一個Button,TextView。

class MainActivity : AppCompatActivity(), View.OnClickListener {

    val TAG = "MainActivity"
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        setSupportActionBar(toolbar)
        //加入這句import kotlinx.android.synthetic.main.activity_main.*
        //不用再findViewById,可直接使用
        merge.setOnClickListener(this)

    }
    override fun onClick(v: View) {
        when (v.id) {
            R.id.merge -> {
                executeMerge()
            }
        }
        //when 關鍵字和Java中的Switch關鍵詞是相似的,
        //只不過它比Java中的Switch強大的多,能夠接收任何參數,
        //而後判斷使用,也能夠以下使用
        when (v) {
            merge -> {
            }
        }
    }
}複製代碼

contact

咱們點擊執行幾回發現,返回的List的數據並非按照merge參數的前後順序執行的,它是併發的,最終的順序,是由網絡請求的快慢決定的,請求返回數據越快也就表示該數據最先發送,即在List中最靠前。那麼此時出現一個問題,若是我想返回數據的List順序嚴格按照位置的前後順序呢?那此時使用merge的話,是不太現實了。固然前面咱們提到contact可使用。那麼直接將merge更改成contact執行如下試試,

fun executeContact() {
        Observable.concat(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }複製代碼

的確,發現不管執行多少次List的數據都能按照contact中Observable順序發送,咱們想要的效果能夠實現了,不過你會發現,效率太差了,這是同步執行啊,只有第一個請求成功,纔會去請求第二個,而後第三個,假如一次請求須要一秒,那三次請求至少三秒啊,不能忍。

zip

鑑於上面兩種方式的利弊,若是咱們既想如merge同樣併發執行,又想和contact同樣保證順序,是否是有點強迫症的意思,固然強大的zip就能實現咱們想要的效果。以下實現。.

fun executeZip() {
        Observable.zip(getGoodsObservable(1),
                getGoodsObservable(2),
                getGoodsObservable(3),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }複製代碼

既然實現了,那咱們運行幾回,發現完美的實現了咱們想要的效果,即併發的執行了,也保證了咱們請求數據的順序性。

在回調中運用RxJava

在上面咱們的單個網絡請求是一個同步的請求,若是咱們的網絡請求封裝了,在線程中請求,請求成功後在主線程中回調,那咱們又該如何建立呢使用呢?
先來模擬一個子線程請求網絡,請求成功回調數據給主線程。

fun getGoods(ctx:Context,id: Int,callbacks:(goods:Goods)->Unit): Unit {
        ctx.doAsync {
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            ctx.runOnUiThread {
                callbacks(data)
            }
        }
    }複製代碼

getGoods傳了三個參數,第一個Context對象,第二個是商品ID,第三個參數是一個函數,(goods:Goods)->Unit表示第三個參數的類型是一個參數爲Goods類型而且返回Unit的函數。使用doAsync 模擬異步請求,請求成功後runOnUiThread 切換到UI線程。而後callbacks(data)將數據回調。這種使用方式比Java中回調優美好用太多了。
接下來就開始在回調成功後建立Observable

fun getGoodsCallBack(id: Int): Observable<Goods> {
        var subscrbe: ObservableEmitter<Goods>? = null
        var o = Observable.create<Goods> {
            subscrbe = it
        }
        //Kotlin特性
        getGoods(this@MainActivity, id) {
            subscrbe?.onNext(it)
        }
        return o
    }
    fun executeZipCallBack() {
        Observable.zip(getGoodsCallBack(1).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(2).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(3).subscribeOn(Schedulers.newThread()),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }複製代碼

ok,到這裏回調狀況下建立使用RxJava也介紹完畢,到此本篇文章就結束了,有問題歡迎指出,內容雜亂,多多擔待,Hava a wonderful day.

相關文章
相關標籤/搜索