這是《使用Kotlin開發一個現代的APP》系列文章的第三部分,還沒看過前2部分的,能夠先看一下:java
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part1react
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part2android
正文開始!git
關於RxJava,一個普遍的概念是-RxJava是用於異步編程的API的Java實現,它具備可觀察流和響應式的API。實際上,它是這三個概念的結合:觀察者模式、迭代器模式和函數式編程。這裏也有其餘編程語言實現的庫,如:RxSwift、RxJs 、RxNet等。github
我RxJava上手很難,有時,它確實很使人困惑,若是實施不當,可能會給您帶來一些問題。儘管如此,咱們仍是值得花時間學習它。我將嘗試經過簡單的步驟來解釋RxJava。面試
首先,讓咱們回答一些簡單的問題,當您開始閱讀有關RxJava時,可能會問本身:編程
答案是否認的,RxJava只是能夠在Android開發中使用的又一個庫。若是使用Kotlin開發,它也不是必須的,我但願你明白我說的,它只一個很幫助你的庫,就像你使用的因此其餘庫同樣。後端
你能夠直接從RxJava2開始,不過,做爲Android開發人員,知道這兩種狀況對你仍是有好處的,由於你可能會參與維護其餘人的RxJava1代碼。數組
RxJava能用在任何Java開發平臺,不只僅是Android,好比,對於後端開發來講,RxJava 能夠與Spring等框架一塊兒使用,RxAndroid是一個庫,其中包含在Android中使用RxJava所需的庫。所以,若是要在Android開發中使用RxJava,則必須再添加RxAndroid。稍後,我將解釋RxAndroid基於RxJava所添加的內容。微信
咱們沒有必要另外再添加一個Rx 庫了,由於Kotlin與Java是徹底兼容的,這裏確實有一個RxKotin庫:https://github.com/ReactiveX/... ,不過該庫是在RxJava之上編寫的。它只是將Kotlin功能添加到RxJava。您能夠將RxJava與Kotlin一塊兒使用,而無需使用RxKotlin庫。爲了簡單起見,在這一部分中我將不使用RxKotlin。
要使用RxJava,你須要在build.gradle
中添加以下代碼:
dependencies { ... implementation "io.reactivex.rxjava2:rxjava:2.1.8" implementation "io.reactivex.rxjava2:rxandroid:2.0.1" ... }
而後,點擊sync
,下載Rxjava庫。
我想把RxJava分爲如下三部分:
Observables
和 Observers
Observables
和 Observers
咱們已經解釋了這種模式。您能夠將Observable視爲數據的源(被觀察者
),將Observer視爲接收數據的源(觀察者
)。
有不少建立Observables的方式,最簡單的方法是使用Observable.just()
來獲取一個項目並建立Observable來發射該項目。
讓咱們轉到GitRepoRemoteDataSource
類並更改getRepositories
方法,以返回Observable:
class GitRepoRemoteDataSource { fun getRepositories() : Observable<ArrayList<Repository>> { var arrayList = ArrayList<Repository>() arrayList.add(Repository("First from remote", "Owner 1", 100, false)) arrayList.add(Repository("Second from remote", "Owner 2", 30, true)) arrayList.add(Repository("Third from remote", "Owner 3", 430, false)) return Observable.just(arrayList).delay(2,TimeUnit.SECONDS) } }
Observable <ArrayList <Repository >>
表示Observable發出Repository對象的數組列表。若是要建立發出Repository對象的Observable <Repository>,則應使用Observable.from(arrayList)
。
.delay(2,TimeUnit.SECONDS)
表示延遲2s後纔開始發射數據。
可是,等等!咱們並無高數Observable什麼時候發射數據啊?Observables一般在一些Observer訂閱後就開始發出數據。
請注意,咱們再也不須要如下接口了
interface OnRepoRemoteReadyCallback { fun onRemoteDataReady(data: ArrayList<Repository>) }
在GitRepoLocalDataSource:
類中作一樣的更改
class GitRepoLocalDataSource { fun getRepositories() : Observable<ArrayList<Repository>> { var arrayList = ArrayList<Repository>() arrayList.add(Repository("First From Local", "Owner 1", 100, false)) arrayList.add(Repository("Second From Local", "Owner 2", 30, true)) arrayList.add(Repository("Third From Local", "Owner 3", 430, false)) return Observable.just(arrayList).delay(2, TimeUnit.SECONDS) } fun saveRepositories(arrayList: ArrayList<Repository>) { //todo save repositories in DB } }
一樣的,也不須要這個接口了:
interface OnRepoLocalReadyCallback { fun onLocalDataReady(data: ArrayList<Repository>) }
如今,咱們須要在repository
中返回Observable
class GitRepoRepository(private val netManager: NetManager) { private val localDataSource = GitRepoLocalDataSource() private val remoteDataSource = GitRepoRemoteDataSource() fun getRepositories(): Observable<ArrayList<Repository>> { netManager.isConnectedToInternet?.let { if (it) { //todo save those data to local data store return remoteDataSource.getRepositories() } } return localDataSource.getRepositories() } }
若是網絡已鏈接,咱們從遠程數據源返回Observable,不然,從本地數據源返回Observable,一樣的,咱們也再也不須要OnRepositoryReadyCallback
接口。
如你所料,咱們須要更改在MainViewModel中獲取數據的方式。如今咱們應該從gitRepoRepository
獲取Observable並訂閱它。一旦咱們向Observer訂閱了該Observable,Observable將開始發出數據:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... fun loadRepositories() { isLoading.set(true) gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{ override fun onSubscribe(d: Disposable) { //todo } override fun onError(e: Throwable) { //todo } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } }
一旦Observer訂閱了Observable,onSubscribe
方法將被調用,主要onSubscribe
的參數Disposable
,稍後將講到它。
每當Observable發出數據時,將調用onNext()
方法。當Observable完成s數據發射時,onComplete()
將被調用一次。以後,Observable終止。
若是發生某些異常,onError()
方法將被回調,而後Observable終止。這意味着Observable將再也不發出數據,所以onNext()
不會被調用,也不會調用onComplete()
。
另外,請注意。若是嘗試訂閱已終止的Observable,則將收到IllegalStateException
。
那麼,RxJava如何幫助咱們?
onError()
方法中返回,所以咱們能夠向用戶顯示適當的錯誤消息。咱們再一次看一下ViewModel的生命週期圖
一旦Activity銷燬,ViewModel的onCleared
方法將被調用,在onCleared
方法中,咱們須要取消全部訂閱
class MainViewModel(application: Application) : AndroidViewModel(application) { ... lateinit var disposable: Disposable fun loadRepositories() { isLoading.set(true) gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{ override fun onSubscribe(d: Disposable) { disposable = d } override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if(!disposable.isDisposed){ disposable.dispose() } } }
咱們能夠優化一下上面的代碼:
首先,使用DisposableObserver
替換Observer
,它實現了Disposable而且有dispose()
方法,咱們再也不須要onSubscribe()
方法,由於咱們能夠直接在DisposableObserver實例上調用dispose()
。
第二步,替換掉返回Void的.subscribe()
方法,使用.subscribeWith()
方法,他能返回指定的Observer
class MainViewModel(application: Application) : AndroidViewModel(application) { ... lateinit var disposable: Disposable fun loadRepositories() { isLoading.set(true) disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { // todo } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if(!disposable.isDisposed){ disposable.dispose() } } }
上面的代碼還能夠繼續優化:
咱們保存了一個Disposable實例,所以,咱們才能夠在onCleared()
回調中調用dispose()
,可是等等!咱們須要爲每個調用都這樣作嗎?若是有10個回調,那麼咱們得保存10個實例,在onCleared()
中取消10次訂閱?顯然不可能,這裏有更好的方法,咱們應該將它們所有保存在一個存儲桶中,並在調用onCleared()
方法時,將它們所有一次處理。咱們可使用CompositeDisposable
。
CompositeDisposable
:可容納多個Disposable的容器
所以,每次建立一個Disposable,都須要將其添加到CompositeDisposable
中:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... private val compositeDisposable = CompositeDisposable() fun loadRepositories() { isLoading.set(true) compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } })) } override fun onCleared() { super.onCleared() if(!compositeDisposable.isDisposed){ compositeDisposable.dispose() } } }
感謝Kotlin的擴展函數,咱們還能夠更進一步:
與C#和Gosu類似,Kotlin提供了使用新功能擴展類的能力,而沒必要繼承該類,也就是擴展函數。
讓咱們建立一個新的包,叫作extensions
,而且添加一個新的文件RxExtensions.kt
import io.reactivex.disposables.CompositeDisposable import io.reactivex.disposables.Disposable operator fun CompositeDisposable.plusAssign(disposable: Disposable) { add(disposable) }
如今咱們可使用+ =
符號將Disposable對象添加到CompositeDisposable實例:
class MainViewModel(application: Application) : AndroidViewModel(application) { ... private val compositeDisposable = CompositeDisposable() fun loadRepositories() { isLoading.set(true) compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() { override fun onError(e: Throwable) { //if some error happens in our data layer our app will not crash, we will // get error here } override fun onNext(data: ArrayList<Repository>) { repositories.value = data } override fun onComplete() { isLoading.set(false) } }) } override fun onCleared() { super.onCleared() if (!compositeDisposable.isDisposed) { compositeDisposable.dispose() } } }
如今,咱們運行程序,當你點擊Load Data
按鈕,2s以後,程序crash,而後,若是查看日誌,您將看到onNext
方法內部發生錯誤,而且異常的緣由是:
java.lang.IllegalStateException: Cannot invoke setValue on a background thread
爲什麼會發生這個異常?
RxJava附帶有調度器(Schedulers),使咱們能夠選擇在哪一個線程代碼上執行。更準確地說,咱們能夠選擇使用subscribeOn()
方在哪一個線程執行,observeOn()
方法能夠觀察哪一個線程觀察者。一般狀況下,咱們全部的數據層代碼都應該在後臺線程執行,例如,若是咱們使用Schedulers.newThread()
,每當咱們調用它時,調度器都會給咱們分配一個新的線程,爲了簡單起見,Scheduler中還有其餘一些方法,我將不在本博文中介紹。
可能您已經知道全部UI代碼都是在Android 主線程上完成的。 RxJava是Java庫,它不瞭解Android主線程,這就是咱們使用RxAndroid的緣由。 RxAndroid使咱們能夠選擇Android Main線程做爲執行代碼的線程。顯然,咱們的Observer應該在Android Main線程上運行。
讓咱們更改一下代碼:
... fun loadRepositories() { isLoading.set(true) compositeDisposable += gitRepoRepository .getRepositories() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(object : DisposableObserver<ArrayList<Repository>>() { ... }) } ...
而後再運行代碼,一切都正常了,nice~
這裏還有一些其餘的observable 類型
onSuccess()
事件或者異常 Observable<T>
同樣,不發射數據,或者發射n個數據,或者發射異常,可是Observable不支持背壓,而Flowable卻支持。爲了記住一些概念,我喜歡將它們與現實中的一些例子類比
把它類比成一個通道,若是你向通道中塞入瓶頸可以接受的最多的商品,這將會變得很糟,這裏也是一樣的,有時,你的觀察者沒法處理其收到的事件數量,所以須要放慢速度。
你能夠看看RxJava 關於背壓的文檔:https://github.com/ReactiveX/...
RxJava中,最牛逼的就是它的操做符了,僅用一行代碼便可在RxJava中解決一些一般須要10行或更多行的問題。這些是操做符能夠幫咱們作的:
我給你舉一個例子,讓咱們將數據保存到GitRepoLocalDataSource中。由於咱們正在保存數據,因此咱們須要Completable來模擬它。假設咱們還想模擬1秒的延遲。天真的方法是:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable { return Completable.complete().delay(1,TimeUnit.SECONDS) }
爲何說天真?
Completable.complete()
返回一個Completable實例,該實例在訂閱後當即完成。
一旦Completable 完成後,它將終止。所以,以後將不執行任何運算符(延遲是運算符之一)。在這種狀況下,咱們的Completable不會有任何延遲。讓咱們找解決方法:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable { return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable() }
爲何是這種方式?
Single.just(1)
建立一個Single實例,而且僅發射一個數字1,由於咱們用了delay(1,TimeUnit.SECONDS)
,所以發射操做延遲1s。
toCompletable()
返回一個Completable,它丟棄Single的結果,並在此Single調用onSuccess
時調用onComplete
。
所以,上面的代碼將返回Completable,而且1s後調用onComplete()
。
如今,咱們應該更改咱們的GitRepoRepository。讓咱們回顧一下邏輯。咱們檢查互聯網鏈接。若是有互聯網鏈接,咱們從遠程數據源獲取數據,將其保存在本地數據源中並返回數據。不然,咱們僅從本地數據源獲取數據。看一看:
fun getRepositories(): Observable<ArrayList<Repository>> { netManager.isConnectedToInternet?.let { if (it) { return remoteDataSource.getRepositories().flatMap { return@flatMap localDataSource.saveRepositories(it) .toSingleDefault(it) .toObservable() } } } return localDataSource.getRepositories() }
使用了.flatMap
,一旦remoteDataSource.getRepositories()
發射數據,該項目將被映射到發出相同項目的新Observable。咱們從Completable建立的新Observable發射的相同項目保存在本地數據存儲中,而且將其轉換爲發出相同發射項的Single。由於咱們須要返回Observable,因此咱們必須將Single轉換爲Observable。
很瘋狂,huh? 想象一下RxJava還能爲咱們作些啥!
RxJava是一個很是有用的工具,去使用它,探索它,我相信你會愛上它的!
以上就是本文得所有內容,下一篇文章將是本系列的最後一篇文章,敬請期待!
本系列已更新完畢:
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part1
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part2
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part3
【譯】使用Kotlin從零開始寫一個現代Android 項目-Part4
文章首發於公衆號:「 技術最TOP 」
,天天都有乾貨文章持續更新,能夠微信搜索「 技術最TOP 」
第一時間閱讀,回覆【思惟導圖】【面試】【簡歷】有我準備一些Android進階路線、面試指導和簡歷模板送給你