RxJava2和Retrofit2配合使用詳解

不講 rxjava 和 retrofit 而是直接上手 2 了,由於 2 封裝的更好用的更多。java

1. 觀察者模式

常見的 button 點擊事件爲例,button 是被觀察者,listener 是觀察者,setOnClickListener 過程是訂閱,有了訂閱關係後在 button 被點擊的時候,監聽者 listener 就能夠響應事件。react

這裏的button.setOnClickListener(listener)看上去意思是被觀察者訂閱了觀察者(雜誌訂閱了讀者),邏輯上不符合平常生活習慣。其實這是設計模式的習慣,沒必要糾結,習慣了這種模式就利於理解觀察者模式了。數據庫

2. RxJava 中的觀察者模式

  • Observable:被觀察者(ble 結尾的單詞通常表示 可...的,可觀察的)
  • Observer:觀察者(er 結尾的單詞通常表示 ...者,...人)
  • subscribe:訂閱

首先建立 Observable 和 Observer,而後 observable.subscribe(observer),這樣 Observable 發出的事件就會被 Observer 響應。通常咱們不手動建立 Observable,而是由 Retrofit 返回給咱們,咱們拿到 Observable 以後只需關心如何操做 Observer 中的數據便可。
不過爲了由淺入深的演示,仍是手動建立 Observable 來說解。json

2.1 建立 Observable

常見的幾種方式,不經常使用的不寫了,由於我以爲這個模塊不是重點。設計模式

  • var observable=Observable.create(ObservableOnSubscribe<String> {...})
  • var observable=Observable.just(...)
  • var observable = Observable.fromIterable(mutableListOf(...))

2.1.1 create()

var observable2=Observable.create(object :ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("Hello ")
                emitter.onNext("RxJava ")
                emitter.onNext("GoodBye ")
                emitter.onComplete()            }

        })
複製代碼

ObservableOnSubscribeObservableEmitter都是陌生人,這個要是詳細講涉及到源碼分析,東西可就多了(主要是我不熟悉),因此能夠理解成 ObservableOnSubscribe 是用來幫助建立 Observable 的,ObservableEmitter 是用來發出事件的(這些事件在觀察者 Observer 中能夠響應處理)。
emitter 一次發射了三個事件,而後調用了 onComplete() 這些在下面講觀察者 Observer 時還會提到,一併講解。api

2.1.2 just

var observable=Observable.just("Hello","RxJava","GoodBye")
複製代碼

這句的效果等同於上面用 create 建立 observable,即 調用 3 次 onNext 後再調 onComplete。數組

2.1.3 fromIterable

var observable = Observable.fromIterable(mutableListOf("Hello","RxJava","GoodBye"))
複製代碼

這句的效果等同於上面用 create 建立 observable,即 調用 3 次 onNext 後再調 onComplete。bash

2.2 建立 Observer

val observer = object : Observer<String> {
            override fun onComplete() {
                Log.e("abc", "-----onComplete-----")
            }

            override fun onSubscribe(d: Disposable) {
                Log.e("abc", "-----onSubscribe-----")
            }

            override fun onNext(t: String) {
                Log.e("abc", "-----onNext-----$t")
            }

            override fun onError(e: Throwable) {
                Log.e("abc", "-----onError-----$e")
            }
        }
//訂閱
observable.subscribe(observer)
複製代碼

log 打印狀況:網絡

-----onSubscribe-----
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
-----onComplete-----
複製代碼

能夠看到,先是創建訂閱關係,而後根據前面 observable 的發射順序來打印 onNext,參數經過 onNext(t: String) 傳進來,最後調用 onComplete,多說一句,在 just 和 fromIterable 的狀況下,沒有手動調用 Emitter,可是仍會先調用 onNext,最後調用 onCompleteapp

2.3 Consumer 和 Action

這兩個詞意思分別是消費者(能夠理解爲消費被觀察者發射出來的事件)和行爲(能夠理解爲響應被觀察者的行爲)。對於 Observer 中的 4 個回調方法,咱們未必都能用獲得,若是隻須要用到其中的一部分,就須要 Consumer 和 Action 上場了。

有參數的onSubscribeonNextonError咱們用 Consumer 來代替,無參的onComplete用 Action 代替:

2.3.1 subscribe(Consumer<? super T> onNext)

observable.subscribe(object :Consumer<String>{
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        })
//打印
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
複製代碼

說明一下,若是 subscribe 中咱們只傳一個對象參數,那隻能是subscribe(Consumer<? super T> onNext)(onNext 方法),不能是 Action 或 Consumer<? super Throwable> onError、Consumer<? super Disposable> onSubscribe

==注意==:Consumer 中的回調方法名稱是 accept,區別於前面的 onNext

2.3.2 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

帶有兩個 Consumer 參數,分別負責 onNext 和 onError 的回調。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        })
複製代碼

若是想要一個帶有兩個 Consumer 可是不是這種搭配(好比subscribe(Consumer<? super T> onNext, Consumer<? super Disposable> onSubscribe)),能夠嗎?答案是:不行

2.3.3 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

帶有三個參數,分別負責onNext、onError和onComplete的回調。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        })
複製代碼

一樣,三個參數只能有這一種搭配

==注意==:Action 中的回調方法名稱是 run,區別於前面的 onComplete

2.3.4 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

這種狀況和直接 new 出來的 Observer 效果同樣。

observable2.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        },object : Consumer<Disposable>{
            override fun accept(t: Disposable?) {
                Log.e("abc", "-----onSubscribe-----")            }
        })
複製代碼

3. 變換

在上面的例子中,Observable 發送的都是 String 類型的數據,因此在 Observer 中接收的也都是 String,現實開發中的數據多種多樣,並且有時候 Observable 提供的數據不是咱們理想的狀況,這種狀況下就須要用到轉換操做符。 一樣咱們只講經常使用的:

3.1 map

好比咱們想把上游的 Int 類型的數據轉換成 String 能夠這樣操做:

Observable.fromIterable(mutableListOf<Int>(1, 3, 5, 7, 8))
                .map(object : Function<Int, String> {
                    override fun apply(t: Int): String {
                        return "zb$t"
                    }
                })
                .subscribe(object : Consumer<String> {
                    override fun accept(t: String?) {
                        Log.e("abc","-- $t --")
                    }
                })
//Log日誌
-- zb1 --
-- zb3 --
-- zb5 --
-- zb7 --
-- zb8 --
複製代碼

經過map操做符,Int 類型數據,到 Consumer 裏已經成了 String(這裏爲了簡單的只看數據就沒用 Observer 而改用 Consumer,二者均可以)。這裏面用到了Function,它的第一個泛型是 Observable 中發射的數據類型,第二個泛型是咱們想要裝換以後的數據類型,在 Function 的 apply 方法中手動完成數據的轉化。
示意圖:map 把圓的變成了方的。

map

3.2 flatMap

與 map 類似,不過 flatMap 返回的是一個 Observable,也就是說 Function 的第二個泛型固定了,就是 Observable,這樣說不太好理解,看個例子:
假如如今有多個學生,每一個學生有多個科目,每一個科目考了屢次試,如今要打印全部的分數。單單隻用 map 就不能直接搞定,試試吧

class Course(var name: String, var scores: MutableList<Int>)
class Student(var name: String, var courses: MutableList<Course>)

var stu1Course1 = Course("體育",mutableListOf(80, 81, 82))
var stu1Course2 = Course("美術",mutableListOf(63, 62, 60))
var stu1 = Student("StuA", mutableListOf(stu1Course1, stu1Course2))
var stu2Course1 = Course("音樂",mutableListOf(74, 77, 79))
var stu2Course2 = Course("希臘語",mutableListOf(90, 90, 91))
var stu2 = Student("StuB", mutableListOf(stu2Course1, stu2Course2))

Observable.just(stu1,stu2)
                .map(object :Function<Student,MutableList<Course>>{
                    override fun apply(t: Student): MutableList<Course> {
                        return t.courses
                    }
                })
                .subscribe(object :Consumer<MutableList<Course>>{
                    override fun accept(t: MutableList<Course>?) {
                        for (item in t!!){
                            for (i in item.scores){
                                Log.e("abc","--->$i")
                            }
                        }
                    }
                })

複製代碼

經過兩層 for 循環能夠打印,這也是沒辦法的事,由於在 map 裏面只能拿到 Course 集合。使用 flatMap 的狀況是這樣的:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int>> {
                    override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?) {
                        Log.e("abc", "---> $t")
                    }
                })
// log 打印
    ---> 80
    ---> 81
    ---> 82
    ---> 63
    ---> 62
    ---> 60
    ---> 74
    ---> 77
    ---> 79
    ---> 90
    ---> 90
    ---> 91
複製代碼

用了兩次 flatMap,鏈式調用比縮進式更清晰。這裏面的 flatMap 返回值類型的是 ObservableSource 並非咱們在前面提到的 Observable,查看 Observable 源碼能夠看到,它繼承了 ObservableSource,因此這種多態用法是能夠的。
另外在 apply 中返回的Observable.fromIterable(t.courses)這一句不就是咱們建立 Observable 的方式嗎?
簡單的說,map 是把 Observable 發射的數據變換一下類型,flatMap 是把數據中集合/數組中的每一個元素再次經過 Observable 發射。
示意圖:faltMap 把一系列圓的經過一系列的 Observable 變成了一系列方的。

flatMap

圖雖然畫的醜,可是我想意思比較明白了。

3.3 filter

filter是過濾的意思,經過判斷是否符合咱們想要的邏輯,來決定是否發射事件,只有返回 true 的事件才被髮射,其餘的被拋棄。還以上面的例子爲例,假如咱們只想看 80 分以上的成績能夠這樣過濾:

Observable.just(stu1, stu2)
                .flatMap(object : Function<Student, ObservableSource<Course>> {
                    override fun apply(t: Student): ObservableSource<Course> {
                        return Observable.fromIterable(t.courses)
                    }
                })
                .flatMap(object : Function<Course, ObservableSource<Int>> {
                    override fun apply(t: Course): ObservableSource<Int> {
                        return Observable.fromIterable(t.scores)
                    }

                })
                .filter(object :Predicate<Int>{
                    override fun test(t: Int): Boolean {
                        return t > 80
                    }

                })
                .subscribe(object : Consumer<Int> {
                    override fun accept(t: Int?) {
                        Log.e("abc", "---> $t")
                    }
                })
// log 打印
    ---> 81
    ---> 82
    ---> 90
    ---> 90
    ---> 91
複製代碼

注意,filter 裏面不是用 Function 了,而是 Predicate,這個單詞是「基於...」的意思,基於 t > 80,也就是選擇大於 80 分的成績。

4. 結合 Retrofit 使用

前面 3 小節講了不少,都是爲了講清楚 RxJava 的整個工做流程,還沒涉及到線程切換。現實開發中更多的時候 Observable 是經過 Retrofit 返回給咱們的。Retrofit 是一個網絡請求框架,它基於 OkHttp3,作了更好的封裝,結合 RxJava 用慣了的話能夠大大提到開發效率。仍是同樣,咱們只看怎麼用,不涉及源碼解讀。

4.1 Retrofit 進行簡單的 Get 請求

implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
複製代碼

先引入依賴,而後咱們請求一個知乎日報的新聞數據(點擊查看數據:news-at.zhihu.com/api/4/news/…):

// ZhEntity
class ZhEntity {
    var date: String? = null
    var stories: MutableList<StoriesBean>? = null
    var top_stories: MutableList<TopStoriesBean>? = null

    class StoriesBean {
        var image_hue: String? = null
        var title: String? = null
        var url: String? = null
        var hint: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0
        var images: MutableList<String>? = null
    }

    class TopStoriesBean {
        var image_hue: String? = null
        var hint: String? = null
        var url: String? = null
        var image: String? = null
        var title: String? = null
        var ga_prefix: String? = null
        var type: Int = 0
        var id: Int = 0
    }
}
複製代碼
// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET("news/latest")
    fun getLatestNews(): Call<ZhEntity>
}
複製代碼
// 調用
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews()
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
                Log.e("abc", "--> $t")
            }

            override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
                Log.e("abc", "-->${Gson().toJson(response?.body())}")
            }
        })
複製代碼

代碼有點多,分別解釋一下,ZhEntity 是實體類,ApiService 是一個接口,裏面用註解的方式定義了一個方法 getLatestNews,@GET表示 Get 請求,由此能夠想象確定有@POST@GET裏面還有參數,這是請求地址 BaseUrl 後面的子文件夾。

getLatestNews 函數返回類型是 Call,這個是 Retrofit 定義用來請求網絡的。
第三段代碼,現實建立了一個 Retrofit 對象,addConverterFactory(GsonConverterFactory.create())是把接口返回的 json 類型的數據轉換成實體類的類型,這個東西在implementation 'com.squareup.retrofit2:converter-gson:2.6.2'時被引入。

而後是一系列的 Call 調用 qnqueue 操做什麼的,看得出,沒有用 Rxjava 同樣能夠完成網絡請求,並且代碼不復雜,好了,本文到此結束。

好吧,我在扯淡。繼續講,有人說不喜歡 url 被截成兩段,能夠這樣修改,效果徹底相同:

// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
    @GET
    fun getLatestNews(@Url url:String): Call<ZhEntity>
}
複製代碼
// 調用
val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("https://www.baidu.com")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val call: Call<ZhEntity> = service.getLatestNews("https://news-at.zhihu.com/api/4/news/latest")
        call.enqueue(object : Callback<ZhEntity> {
            override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
                Log.e("abc", "--> $t")
            }

            override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
                Log.e("abc", "-->${Gson().toJson(response?.body())}")
            }
        })
複製代碼

baseUrl 仍是要的,不過設置成其餘值無所謂了,由於不會被請求。

4.2 Retrofit 結合 RxJava

囉嗦了這麼多,纔講到這裏。抱歉水平有限,沒辦法用簡單的語言說清複雜的問題。
首先,引入依賴時多加一句對 RxJava 的支持:

implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'
複製代碼

而後,咱們的 getLatestNews 就能夠直接返回一個 Observable 了!

import io.reactivex.Observable
import retrofit2.http.GET

interface ApiService {
    @GET("news/latest")
    fun getLatestNews(): Observable<ZhEntity>
}
複製代碼

放心寫,不會報錯,有了 Observable,就好辦了,輕車熟路:

val retrofit = Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .baseUrl("https://news-at.zhihu.com/api/4/")
                .build()
        val service: ApiService = retrofit.create(ApiService::class.java)
        val observable = service.getLatestNews()
        observable.subscribeOn(Schedulers.newThread())
                .subscribe(object : Observer<ZhEntity> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: ZhEntity) {
                Log.e("abc","-->${Gson().toJson(t)}")
            }

            override fun onError(e: Throwable) {
                Log.e("abc","-->$e")
            }
        })
複製代碼

除了 Observable 來源變了,其餘與本文最先講的 RxJava 沒什麼不一樣。非要說不一樣,有一點,多了一句subscribeOn(Schedulers.newThread()),下面講講這個。

4.3 線程切換

  • subscribeOn:定義 Observable 發射事件所處的線程
  • observeOn:定義轉換/響應事件所處的線程(map、flatMap、Observer 等),可屢次切換

線程切換比較常見,好比 子線程請求網絡數據主線程更新 UI,subscribeOnobserveOn有哪些線程能夠選擇?它們又是怎樣使用的?咱們先看一個例子:

Thread(object : Runnable {
            override fun run() {
                Log.e("abc","Thread當前線程:${Thread.currentThread().name}")
                observable.subscribeOn(Schedulers.newThread())
                        .doOnNext(object :Consumer<ZhEntity>{
                            override fun accept(t: ZhEntity?) {
                                Log.e("abc","doOnNext當前線程:${Thread.currentThread().name}")
                            }
                        })
                        .observeOn(Schedulers.io())
                        .flatMap(object :Function<ZhEntity,ObservableSource<ZhEntity.StoriesBean>>{
                            override fun apply(t: ZhEntity): ObservableSource<ZhEntity.StoriesBean> {
                                Log.e("abc","flatMap當前線程:${Thread.currentThread().name}")
                                return Observable.fromIterable(t.stories)
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(object : Observer<ZhEntity.StoriesBean> {
                            override fun onComplete() {
                            }

                            override fun onSubscribe(d: Disposable) {
                                Log.e("abc","onSubscribe當前線程:${Thread.currentThread().name}")
                            }

                            override fun onNext(t: ZhEntity.StoriesBean) {
                                Log.e("abc","Observer當前線程:${Thread.currentThread().name}")
                                Log.e("abc", "-->${Gson().toJson(t)}")
                            }

                            override fun onError(e: Throwable) {
                                Log.e("abc", "-->$e")
                            }
                        })
            }
        }).start()
// log 打印
Thread當前線程:Thread-4
onSubscribe當前線程:Thread-4
doOnNext當前線程:RxNewThreadScheduler-1
flatMap當前線程:RxCachedThreadScheduler-1
Observer當前線程:main
Observer當前線程:main
Observer當前線程:main
複製代碼

這裏面只有doOnNext沒講過,如今說說:每發送 onNext() 以前都會先回調這個方法,因此 doOnNext 和 Observable 的 subscribe(發射事件的方法)處於同一個線程。
從這個例子能夠看出:

  1. Observable 和 Observer 創建訂閱關係是在當前線程中(Thread-4)
  2. subscribeOn決定 Observable 發射事件所處的線程(即 Retrofit 請求網絡所在線程)
  3. 第一次observeOn決定 flatMap 所在的線程(RxCachedThreadScheduler-1)
  4. 再次observeOn決定 Observer 所在線程(Android 主線程 main)

因此每次調用observeOn就會切換線程,而且決定的是接下來的變換/響應的線程。多說一句,屢次設置 subscribeOn,只有第一次生效

線程可選值

線 程 名 稱 說明
Schedulers.immediate() 默認的 Scheduler,直接在當前線程運行,至關於不指定線程
Schedulers.newThread() 啓用新線程,並在新線程執行操做
Schedulers.io() I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程
Schedulers.computation() 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU
AndroidSchedulers.mainThread() Android 主線程

4.4 Disposable 和 CompositeDisposable

最後介紹一下這兩個類,Disposable前文出現過,在 Observer 的 onSubscribe 函數中,有一個 Disposable 類型的參數:override fun onSubscribe(d: Disposable) {},經過前面介紹咱們知道,Observable 和 Observer 創建訂閱關係時會調用 onSubscribe 方法,可是沒有說這個參數的做用。

4.4.1 DisPosable

Disposable 的 dispose() 函數能夠用來解除訂閱,這樣就不會收到 Observable 發射的事件:

var dis ?= null
val observable = Observable.fromIterable(mutableListOf("Hello", "RxJava", "GoodBye"))
        val observer = object : Observer<String> {
            override fun onComplete() {
            }
            override fun onSubscribe(d: Disposable) {
                dis=d
                Log.e("abc", "-----onSubscribe-----$d")
            }

            override fun onNext(t: String) {
                if (t=="Hello") dis.dispose()
                Log.e("abc", "-----onNext-----$t")
            }

            override fun onError(e: Throwable) {
            }
        }
observable.subscribe(observer)
// log 打印
-----onNext-----Hello
複製代碼

能夠看到,調用dis.dispose()後,就不在打印上游發射的"RxJava"和"GoodBye"了。

4.4.2 CompositeDisposable

CompositeDisposable 能夠用來管理多個 Disposable,經過add()方法添加 Disposable 對象,而後在 onDestroy 方法裏面調用clear()或者dispose()來清除全部的 Disposable,這樣能夠防止內存泄漏。

val cDis = CompositeDisposable()
// ...代碼省略
override fun onSubscribe(d: Disposable) {
                cDis.add(d)
            }
// ...代碼省略
override fun onDestroy() {
        super.onDestroy()
        cDis.clear()
    }
複製代碼

多說一句,經過查看RxJava2CallAdapterFactory.create()源碼可知,dispose()方法能主動斷開 Observable 和 Observer 之間的鏈接,還能取消 Retrofit 的網絡請求,因此放心的用吧。

相關文章
相關標籤/搜索