以前寫RxJava相關的文章或小冊的時候,有讀者但願我能寫一點RxJava實踐相關的文章,雖然答應下來,可是也一直沒有找到太好的機會,由於糗百一直處在維護老項目的工做中,真的在業務層面深度使用RxJava的機會很少,至於用RxJava請求一下網絡接口就顯得有點不值一提了,所以一直沒有找到合適的業務代碼。最近在作一個新項目,是直播相關的產品,在一個重要的小業務裏用上了RxJava,就以此做爲案例,來向你們介紹RxJava在生產實踐中的使用思路。web
在講如何使用RxJava以前,確定要講如何理解RxJava,由於你怎樣看待它決定了你會怎樣使用它,對於如何理解和使用RxJava,我在掘金小冊《響應式編程 —— RxJava 高階指南》中的第六章中有很是詳細的說明,你們若是有興趣去看看,順即可以支持一下。數據庫
固然,我在這裏仍是簡要的介紹一下,你須要把RxJava做爲一種構建對象(業務)關係的一種編程範式,而不是一種異步編程庫。編程
那麼什麼叫構建關係的編程範式?咱們舉個例子,以APP初始化工做爲例,假設咱們APP須要初始化sdk,數據庫,登錄以後,才跳轉到下一個頁面。那麼這裏的業務關係是怎樣的呢?後端
下面是一個初始化的業務,而他們的關係以下圖所示緩存
它應當是上圖這樣的,頁面跳轉以來上述三個業務的完成,而sdk,數據庫初始化,登錄接口這三者之間並不依賴。這樣咱們就梳理了這個小需求裏面的幾個業務之間的關係。那麼RxJava如何經過代碼實現構建關係呢?bash
之前,咱們多是這樣寫這段業務:websocket
initSDK(context) // 業務1
initDatabase(context) // 業務2
login(getUserId(context),new Callback{ // 業務3
void onSucc(){
startActivity() // 業務4
}
})
複製代碼
以上這段代碼,是很常見的業務初始化代碼,但實際上他們是順序嚴格的前後執行,這並非他們這個幾個業務之間真實的關係,他們真正的關係應該是分別完成業務1,2,3,而後完成業務4。網絡
所以想要真正構建業務之間的關係,RxJava就派上用場了。異步
// 初始化sdk
Observable obserInitSDK = Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())
// 初始化數據庫
Observable obserInitDB = Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())
// 完成登錄
Observable obserLogin = Observable.create((context)->{login(getUserId(context))})
.map((isLogin)->{returnContext()})
.subscribeOn(Schedulers.newThread())
// 分別完成三個業務
Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
//分開完成三個業務以後跳轉到下一個
observable.subscribe(()->{startActivity()})
複製代碼
固然,不少時候,咱們並不須要嚴格按照業務關係來編寫代碼,由於不必,好比這裏initSDK(context),initDatabase(context)可能都不是耗時操做,咱們把它放在某個線程中前後執行也沒有問題。可是若是他們都是須要網絡請求接口的呢?顯然此時經過RxJava取構建業務關係是更簡單而高效的。socket
這個例子能夠說比較生動的解釋了關係構建的範式編程。RxJava經過構建關係,實際上使它具有了提升複雜/耗時操做的執行效率,以及解構複雜邏輯的特殊能力。而反過來只有以構建關係的角度來理解RxJava,你對它的使用纔會駕輕就熟,而不是侷限於異步編程,只見樹木不見森林。
更多關於RxJava相關的深度內容,你們能夠看看個人《響應式編程 —— RxJava 高階指南》
說到這裏,才進入正題,關於RxJava的一次實踐。
先說一下需求背景:直播間的消息模塊,經過websocket鏈接實現實時收發直播間消息(單獨的線程),後端提供了一個獲取最近離線消息的接口,用於離線重連後獲取離線期間產生的直播間消息(顯然是另外一個線程或者主線程),此時的業務需求是,當websoket異常斷開時,客戶端須要主動重連,鏈接成功後,一邊接受websocket新消息,一邊拉取離線期間所產生的直播間消息,按照時間順序(離線消息在前)把消息統一轉發到上層模塊。
下圖是我對這段業務邏輯畫的一個示意圖,基本描述了這個業務邏輯的總體結構:
看完這個需求,咱們分析下這裏可能的難點;
你們能夠想想,若是不用RxJava的話,正常狀況下要如何實現?複雜度如何?
我沒有想過這個問題,由於我首先考慮的是怎樣用RxJava完成這個需求,這裏也告訴你們一個原則,複雜的業務邏輯優先使用RxJava,
咱們仍然以構建關係的思路來思考,可是咱們須要先思考那些業務是須要用RxJava實現的,在這裏,websocket的斷開重連,websocket如何獲取消息都是被sdk封裝好或者和由sdk本身的回調觸發,這些咱們不該當強行用RxJava實現,所以真正用RxJava這裏大概是三個業務邏輯:
有人可能要問,爲何不把順序排序也算上一個業務呢?它看起來很像一個單獨的業務。由於在這裏的順序排序其實表示的就是業務1和業務2的關係,由於示意圖很差描述前後順序,所以就先這麼畫了,看起來像一個單獨的業務,其實不算。
這三個業務各自都很簡單,他們的關係,總結來講就是:業務2的數據必須在業務1以前轉發到業務3
// 業務1
// 用於傳遞從websocket sdk處獲取消息
private var onLineStream = ReplayProcessor.create<String>().toSerialized()
// websocket中獲取的消息直接轉發到這裏
fun senMessageFromOnLine(message: String = "", isCompleted: Boolean = false) {
var stream = onLineStream
if (stream.hasComplete() || stream.hasThrowable()) {
return
}
if (!TextUtils.isEmpty(message)) {
stream.onNext(message)
}
if (isCompleted) {
stream.onComplete()
}
}
// 業務2
//經過接口獲取離線的消息,若是沒有,則返回空列表
fun getOfflineMessageFlowable() {
return Flowable.create(object :FlowableOnSubscribe<String>{
override fun subscribe(emitter: FlowableEmitter<String>) {
emitter.onNext(getOfflineMessageFromServer())
emitter.onComplete()
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
}
複製代碼
爲何使用的是Flowable類型,而不是Observable類型,由於這裏須要考慮消息的緩存,可能存在背壓的問題(關於RxJava最友好的文章——背壓),因此使用Flowable比較合理。
而爲何websocket消息使用的實現類是Processor不是Flowable呢?這由於websocket消息都是從外部獲取的,而不是咱們經過RxJava內部建立的數據,因此使用Processor會更靈活。
好了,咱們已經定義了前兩個業務了,第三個業務也很簡單,關鍵是經過關係構建把三個業務聯繫起來呢?咱們以前已經總結了業務之間的關係,從RxJava的角度來說,就是先接收離線消息的數據,再接收websocket消息的數據,咱們根據需求查找相關的文檔很容易找到concat操做符。因而:
/**
* wesocket鏈接成功以後,調用
*/
fun connected(){
// concat老是保證先訂閱拉取第一個flowable數據,等第一個完成後,
//再訂閱拉取第二個Flowable數據
Flowable.concat<String>(getOfflineMessageFlowable(),
onLineStream.onBackpressureBuffer(150, true))
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(message-> dispatchToActivity(message) // 把有序的消息轉發到activity 業務3
}
複製代碼
這樣,基本上核心代碼就完成了,這幾行代碼基本上實現了咱們的需求