本文是 "RxJava 沉思錄" 系列的第二篇分享。本系列全部分享:html
在上一篇分享中,咱們澄清了目前有關 RxJava 的幾個最流行的誤解,它們分別是:「鏈式編程是 RxJava 的厲害之處」,「RxJava 等於異步加簡潔」,「RxJava 是用來解決 Callback Hell 的」。在上一篇的最後,咱們瞭解了 RxJava 其實給咱們最基礎的功能就是幫咱們統一了全部異步回調的接口。可是 RxJava 並不止於此,本文咱們將首先介紹 Observable 在空間維度上從新組織事件的能力。java
情景:有一個相冊應用,從網絡獲取當前用戶的照片列表,展現在 RecyclerView 裏:react
public interface NetworkApi {
@GET("/path/to/api")
Call<List<Photo>> getAllPhotos();
}
複製代碼
上面是使用 Retrofit 定義的從網絡獲取照片的 API 的接口。你們都知道,若是咱們使用 Retrofit 的 RxJavaCallAdapter 就能夠把接口中的返回類型從 Call<List<Photo>>
轉爲 Observable<List<Photo>>
:git
public interface NetworkApi {
@GET("/path/to/api")
Observable<List<Photo>> getAllPhotos();
}
複製代碼
那麼咱們使用這個接口展現照片的代碼應該長下面這樣:github
NetworkApi networkApi = ...
networkApi.getAllPhotos()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(photos -> {
adapter.setData(photos);
adapter.notifyDataSetChanged();
});
複製代碼
如今新加一個需求,請求當前用戶照片列表這個網絡請求,須要加入緩存功能(緩存的是網絡響應中的圖片的URL,圖片的 Bitmap 緩存交給專門的圖片加載框架,例如 Glide),也就是說,當用戶但願展現圖片列表時,先去緩存讀取用戶的照片列表進行加載(若是緩存裏有這個接口的上次訪問的數據),同時發起網絡請求,待網絡請求返回以後,更新緩存,同時使用使用最新的返回數據刷新照片列表。若是咱們選擇使用 JakeWharton 的 DiskLruCache 做爲咱們的緩存介質,那麼上面的代碼將變爲:編程
DiskLruCache cache = ...
DiskLruCache.Snapshot snapshot = cache.get("getAllPhotos");
if (snapshot != null) {
// 讀取緩存數據並反序列化
List<Photo> cachedPhotos = new Gson().fromJson(
snapshot.getString(VALUE_INDEX),
new TypeToken<List<Photo>>(){}.getType()
);
// 刷新照片列表
adapter.setData(photos);
adapter.notifyDataSetChanged();
}
NetworkApi networkApi = ...
networkApi.getAllPhotos()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(photos -> {
adapter.setData(photos);
adapter.notifyDataSetChanged();
// 更新緩存
DiskLruCache.Editor editor = cache.edit("getAllPhotos");
editor.set(VALUE_INDEX, new Gson().toJson(photos)).commit();
});
複製代碼
上面的代碼就是最直觀的能夠解決需求的代碼,咱們進一步思考一下,讀取文件緩存也屬於耗時操做,咱們最好把它封裝爲異步任務,既然網絡請求已經被封裝成 Observable
了,咱們嘗試把讀取文件緩存也封裝爲 Observable
:api
Observable<List<Photo>> cachedObservable = Observable.create(emitter -> {
DiskLruCache.Snapshot snapshot = cache.get("getAllPhotos");
if (snapshot != null) {
List<Photo> cachedPhotos = new Gson().fromJson(
snapshot.getString(VALUE_INDEX),
new TypeToken<List<Photo>>(){}.getType()
);
emitter.onNext(cachedPhotos);
}
emitter.onComplete();
});
複製代碼
到目前爲止,發起網絡請求和讀取緩存這兩個異步操做都被咱們封裝成了 Observable
的形式,前面作了這麼多鋪墊,接下來進入正題:把原先的面向 Callback 的異步操做統一改寫爲 Observable
的形式之後,首先帶來的好處就是能夠對 Observable 在空間維度上進行從新組織。數組
networkApi.getAllPhotos()
.doOnNext(photos ->
// 更新緩存
cache.edit("getAllPhotos")
.set(VALUE_INDEX, new Gson().toJson(photos))
.commit()
)
// 讀取現有緩存
.startWith(cachedObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(photos -> {
adapter.setData(photos);
adapter.notifyDataSetChanged();
});
複製代碼
調用
startWith
操做符後,會生成一個新的 Observable,新的Observable
會首先發射傳入的Observable
包含的元素,然後纔會發射原來的Observable
包含的元素。例如Observable
A 包含 a1, a2 兩個元素,Observable
B 包含 b1, b2 兩個元素,那麼 b.startWith(a) 返回的新Observable
發射序列順序爲: a1, a2, b1, b2。—— 參考資料:StartWith緩存
在上面的例子中,咱們鏈接了網絡請求和讀取緩存這兩個 Observable,原先須要分別處理結果的兩個異步任務,咱們如今把它們結合成了一個,指定了一個觀察者就知足了需求。這個觀察者會被回調 2 次,第一次是來自緩存的結果,第二次是來自網絡的結果,體如今界面上就是列表刷新了兩次。網絡
這裏引起了咱們的思考,原先 Callback 的寫法,若是咱們有 n 個異步任務,咱們就須要指定 n 個回調;而若是在 n 個異步任務都已經被封裝成 Observable
的狀況下,咱們就能夠對 Observable
進行分類、組合、變換,通過這樣的處理之後,咱們的觀察者的數量就會減小,並且職責會變的簡單而直接,只須要對它所關心的數據類型作出響應,而不須要關心數據從何而來,經歷過怎樣的變化。
咱們再進一步,上面的例子再加一個需求:若是從網絡請求回來的數據和緩存中提早響應的數據一致,就不須要再刷新一次了。也就是說,若是緩存數據和網絡數據一致,那緩存數據刷新一次列表之後,網絡數據不須要再去刷新一次列表了。
咱們考慮一下,若是咱們使用傳統 Callback 的形式,指定了兩個 Callback 去處理這個需求,爲了保證第二次網絡請求回來的相同數據不刷新,咱們勢必須要在兩個 Callback 以外,定義一個變量來保存緩存數據,而後在網絡請求的回調內,比較兩個值,來決定是否須要刷新界面。
但若是咱們用 RxJava 如何來實現這個需求,該如何寫呢:
networkApi.getAllPhotos()
.doOnNext(photos ->
cache.edit("getAllPhotos")
.set(VALUE_INDEX, new Gson().toJson(photos))
.commit()
)
.startWith(cachedObservable)
// 保證不會出現相同數據
.distinctUntilChanged()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(photos -> {
adapter.setData(photos);
adapter.notifyDataSetChanged();
});
複製代碼
distinctUntilChanged
操做符用來確保Observable
發射的元素裏,相鄰的兩個元素必須是不相等的。 參考資料:Distinct
與原先的寫法相比,只多了一行 .distinctUntilChanged()
( 咱們假設用於比較兩個對象是否相等的 equals
方法已經實現 ),就能夠知足,在網絡數據和緩存數據一致的狀況下,觀察者只回調一次。
咱們比較一下使用 Callback 的寫法和使用 Observable
進行組裝的寫法,能夠發現,使用 Callback 的寫法,常常會因爲需求的變化,致使 Callback 內部的邏輯發生變更,而使用 Observable
的寫法,觀察者的核心邏輯則較爲穩定,不多發生變化(本例中爲刷新列表)。Observable 經過內置的操做符對自身發射的元素在空間維度上從新組織,或者與其餘的 Observable
一塊兒在空間維度上進行從新組織,使得觀察者的邏輯簡單而直接,不須要關心數據從何而來,從而使觀察者的邏輯較爲穩定。
情景:實現一個具備多種類型的 RecyclerView,如圖所示:
假設列表中有 3 種類型的數據,這 3 種類型共同填充了一個 RecyclerView,簡單起見,咱們定義 Retrofit 接口以下:
public interface NetworkApi {
@GET("/path/to/api")
Observable<List<ItemA>> getItemListOfTypeA();
@GET("/path/to/api")
Observable<List<ItemB>> getItemListOfTypeB();
@GET("/path/to/api")
Observable<List<ItemC>> getItemListOfTypeC();
}
複製代碼
到目前爲止,狀況仍是簡單的, 我只要維護 3 個 RecyclerView 並分別各自更新便可。可是咱們如今接到新加需求,這 3 種類型的數據在列表中出現的順序是可配置的,並且 3 種類型數據不必定所有須要展現,也就是說可能展現 3 種,也可能只展現其中 2 種。咱們定義與之對應的接口:
public interface NetworkApi {
@GET("/path/to/api")
Observable<List<ItemA>> getItemListOfTypeA();
@GET("/path/to/api")
Observable<List<ItemB>> getItemListOfTypeB();
@GET("/path/to/api")
Observable<List<ItemC>> getItemListOfTypeC();
// 須要展現的數據順序
@GET("/path/to/api")
Observable<List<String>> getColumns();
}
複製代碼
新加的 getColumns
接口,返回的數據形如:
["a", "b", "c"]
["b", "a"]
["b", "c"]
首先考慮使用普通的 Callback 形式如何來實現這個需求。因爲 3 種數據如今順序可變,數量也沒法肯定,若是仍是考慮由多個 RecyclerView 來維護的話須要在佈局中調用 addView
, removeView
來添加移除 RecyclerView,這樣的話性能上不夠好,咱們考慮把全部數據填充到一個 RecyclerView 中,不一樣類型的數據經過不一樣 ItemType 進行區分。下面的代碼中我依然使用了 Observable
,只是我僅僅把它當成普通的 Callback 功能使用:
private NetworkApi networkApi = ...
// 不一樣類型數據出現的順序
private List<String> resultTypes;
// 這些類型對應的數據的集合
private LinkedList<List<? extends Item>> responseList;
public void refresh() {
networkApi.getColumns().subscribe(columns -> {
// 保存配置的欄目順序
resultTypes = columns;
responseList = new LinkedList<>(Collections.nCopies(columns.size(), new ArrayList<>()));
for (String type : columns) {
switch (type) {
case "a":
networkApi.getItemListOfTypeA().subscribe(data -> onOk("a", data));
break;
case "b":
networkApi.getItemListOfTypeB().subscribe(data -> onOk("b", data));
break;
case "c":
networkApi.getItemListOfTypeC().subscribe(data -> onOk("c", data));
break;
}
}
});
}
private void onOk(String type, List<? extends Item> response) {
// 按配置的順序,更新對應位置上的數據
responseList.set(resultTypes.indexOf(type), response);
// 把當前已返回的數據填充到一個 List 中
List<Item> data = new ArrayList<>();
for (List<? extends Item> itemList: responseList) {
data.addAll(itemList);
}
// 更新列表
adapter.setData(data);
adapter.notifyDataSetChanged();
}
複製代碼
上面的代碼,爲了不 Callback Hell 出現,我已經提早把 onOk
提到了外部層次,使代碼便於從上往下閱讀。可是不知道你有沒有和我相同的感受,就是相似這樣的代碼總給人一種不是很 「內聚」 的感受,就是爲了把 Callback 展平,致使一些中間變量被暴露到了外層空間。
帶着這個問題,咱們先分析一下數據流動:
refresh
方法發起第一次請求,獲得須要被展現的 n 種數據的類型以及順序。onOk
方法做爲觀察者, 會被回調 n 次,按照第一個接口裏返回的順序正確的彙總 2 中每一個數據接口返回的結果,而且通知界面更新。有點像寫做文同樣,這是一種 總——分——總 的結構。
接下來咱們使用 RxJava 來實現這個需求,咱們會用到 RxJava 的一些操做符,來對 Observable
進行從新組織:
NetworkApi networkApi = ...
networkApi.getColumns()
.map(types -> {
List<Observable<? extends List<? extends Item>>> requestObservableList = new ArrayList<>();
for (String type : types) {
switch (type) {
case "a":
requestObservableList.add(
networkApi.getItemListOfTypeA().startWith(new ArrayList<ItemA>())
);
break;
case "b":
requestObservableList.add(
networkApi.getItemListOfTypeB().startWith(new ArrayList<ItemB>())
);
break;
case "c":
requestObservableList.add(
networkApi.getItemListOfTypeC().startWith(new ArrayList<ItemC>())
);
break;
}
}
return requestObservableList;
})
.flatMap(requestObservables -> Observable.combineLatest(requestObservables, objects -> {
List<Item> items = new ArrayList<>();
for (Object response : objects) {
items.addAll((List<? extends Item>) response);
}
return items;
}))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
adapter.setData(data);
adapter.notifyDataSetChanged();
});
複製代碼
咱們一步一步分析 RxJava 處理的具體步驟。首先是第一步,獲取須要展現的欄目列表,這是最簡單的,networkApi.getColumns()
這個方法返回是一個只發射一個元素的 Observable
,這個元素即爲展現的欄目列表,爲了方便後續討論,假設欄目的順序爲 ["a", "b", "c"]
, 以下圖所示:
接下來的操做符是 map
操做符,原來的 Observable
進行了變換,變成了一個新的 Observable
,新的 Observable
仍是隻發射一個元素,這個元素的類型仍是 List ,只不過 List 內部的數據類型從原先的字符串(表明數據類型)變成了 Observable
。Observable
發射的元素還能夠是 「Observable
的 List 」 嗎?是的,沒有什麼不能夠 : )
map
操做符負責把一個Observable
裏發射的元素所有進行轉換,生成一個發射新的元素的Observable
,元素的種類會發生改變,可是發射的元素的數量不會發生改變。 參考資料:Map
這個操做,在業務上的含義是,根據上一步取回的欄目列表,即 ["a", "b", "c"]
,根據不一樣的數據類型,分別發起請求去獲取對應欄目的數據列表,例如欄目類型是 a
的話,就對應發起 networkApi.getItemListOfTypeA()
請求。這裏有一點值得注意,就是每個具體的請求後面都跟了一個 .startWith(new ArrayList<>())
,也就是說每一個具體請求欄目內容的 Observable
在返回真正的數據 List 以前都會返回一個空的 List ,這裏這麼處理的緣由咱們會在下一步中解釋。
接下來這一步多是最難理解的一步了,map
操做以後,緊接着是 flatMap
操做符,而 flatMap
操做符傳入的 lambda 表達式內部,又調用了 Observable.combineLatest
操做符,咱們先從裏面的 combineLatest
操做符開始講起,請看下圖:
combineLatest
操做符的第一個參數 requestObservables
,它的類型是 Observable
的 List,它就是上一步中 map
操做符進行變換以後,新的 Observable
發射的數據,即由
networkApi.getItemListOfTypeA().startWith(...)
networkApi.getItemListOfTypeB().startWith(...)
networkApi.getItemListOfTypeC().startWith(...)
3 個 Observable
組成的 List。
combineLatest
操做符的第二個參數是個 lambda 表達式,這個 lambda 表達式的參數類型是 Object[]
,這個數組的長度等於 requestObservables
的長度,Object[]
數組中每一個元素即爲 requestObservables
中每一個 Observable
發射的元素,即:
Object[0]
對應 requestObservables[0]
發射的元素Object[1]
對應 requestObservables[1]
發射的元素Object[2]
對應 requestObservables[2]
發射的元素那這個 lambda 表達式被調用的時機是何時呢?當 requestObservables
中任意一個 Observable
發射一個元素時,這個元素便會和 requestObservables
中剩餘的全部 Observable
最近一次 發射的元素一塊兒,做爲參數調用這個 lambda 表達式。
那麼整個 combineLatest
操做符的做用就是,返回一個新的 Observable
, 根據第一個參數裏輸入的一組 Obsevable
,按照上面說的時機,調用第二個參數裏的那個 lambda 表達式,把這個 lambda 表達式的返回值,做爲新的 Observable
發射的值,lambda 被調用幾回,就發射幾個元素。
咱們這裏 lambda 表達式內部的邏輯比較簡單,就是把 3 個接口裏返回的數據進行彙總,組成一個新的 List 。咱們再回過頭看上面那張圖,咱們能夠看到,Observable.combinLatest
返回的新的 Observable
一共發射了 4 個元素,它們分別是:
[]
[{ItemB}, {ItemB}, ...]
[{ItemA}, {ItemA}, ..., {ItemB}, {ItemB}, ...]
[{ItemA}, {ItemA}, ..., {ItemB}, {ItemB}, ..., {ItemC}, {ItemC}, ...]
前面留了一個問題沒有解釋,爲何 3 個獲取具體的欄目數據的接口須要調用 startWith
操做符發射一個空白列表,就像這樣:networkApi.getItemListOfTypeA().startWith(...)
,如今這個答案應該清晰了,若是不調用這個操做符,那麼 combineLatest
操做符生成的新 Observable
將會只發射一個元素, 即上面 4 個元素的最後一個,從用戶的感覺來看,必需要等全部欄目所有請求成功之後纔會一次性展現,而不是漸進地展現。
說完了內部的 combineLatest
操做符,如今該說外層的 flatMap
操做符了,flatMap
操做符也會生成一個新的 Observable
,它會經過傳入的 lambda 表達式,把舊的 Observable
裏發射的每個元素都映射成一個 Observable
,而後把這些 Observable
發射的全部元素做爲新的 Observable
發射的元素。
因爲咱們這裏的狀況,調用 flatMap
以前的 Observable
只發射了一個元素,因此 flatMap
以後生成的新 Observable
發射的元素,就是 flatMap
操做符傳入的那個 lambda 表達式執行完生成的那個 Observable
所發射的元素,也就是說 flatMap
操做符執行完後的那個新的 Observable
發射的元素,和咱們剛剛討論的 combineLatest
操做符執行完後的 Observable
發射的元素是一致的。
到這裏爲止,RxJava 實現的版本的每一步咱們都解釋完了,咱們回過頭從新梳理一下 RxJava 對 Observable
進行變換的過程,以下圖:
經過 RxJava 的操做符,咱們把 networkApi
裏的 4 個接口返回的 4 個 Observable
,在空間維度進行了從新組織,最終把它們轉成了一個 Observable
,這個 Observable
發射的元素類型是 List<Item>
,而這正是咱們的觀察者 -- Adapter 所關心的數據類型,觀察者只須要監聽這個 Observable
,並更新數據便可。
咱們在講 RxJava 實現的這個版本以前的時候,說到過 Callback 實現的版本不夠 內聚,比較一下如今這個 RxJava 的版本,確實能夠發現的確 RxJava 這個版本更內聚。可是並不是 Callback 版本沒有辦法作到更內聚,咱們能夠把 Callback 版本里的 onOk
, refresh
,resultTypes
, responseList
這幾個方法和字段封裝到一個對象中,對外只暴露 refresh
方法和一個設置觀察者的方法,也能夠作到同樣的內聚,可是這就須要額外的工做量了。可若是咱們使用 RxJava 就不同了,它提供了一堆現成的操做符,經過 Observable
之間的變換與重組,直接就能夠寫出內聚的代碼。
在上面代碼裏出現的全部操做符中,最核心的一個操做符就是 combineLatest
操做符,仔細比較 RxJava 版本和 Callback 版本就能夠發現,combineLatest
操做符的功能其實和 Callback 版本里的 onOk
方法前半部分, resultTypes
, responseList
合在一塊兒功能是至關的,一方面負責收集多個接口返回的數據,另外一方面保證收集回來的數據的順序是和上一個接口返回的應該展現的數據的順序是一致的。
從代碼量上來看,RxJava 版本與 Callback 版本相差無幾,對函數式編程比較擅長的人來講,RxJava 版本里 for
循環的寫法,不夠 「函數式」,咱們能夠把原來的寫法改爲一種更緊湊、更函數式的寫法:
NetworkApi networkApi = ...
netWorkApi.getColumns()
.flatMap(types -> Observable.fromIterable(types)
.map(type -> {
switch (type) {
case "a": return netWorkApi.getItemListOfTypeA().startWith(new ArrayList<ItemA>());
case "b": return netWorkApi.getItemListOfTypeB().startWith(new ArrayList<ItemB>());
case "c": return netWorkApi.getItemListOfTypeC().startWith(new ArrayList<ItemC>());
default: throw new IllegalArgumentException();
}
})
.<List<Observable<? extends List<? extends Item>>>>collectInto(new ArrayList<>(), List::add)
.toObservable()
)
.flatMap(requestObservables -> Observable.combineLates(requestObservables, objects -> objects))
.flatMap(objects -> Observable.fromArray(objects)
.<List<Item>>collectInto(new ArrayList<>(), (items, o) -> items.addAll((List<Item>) o))
.toObservable()
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
adapter.setData(data);
adapter.notifyDataSetChanged();
});
複製代碼
這裏引入了一個新的操做符
collectInto
,用於把一個Observable
裏面發射的元素,收集到一個可變的容器內部,本例中用它來替換for
循環相關邏輯,具體內容這裏再也不詳細展開。 參考資料:CollectInto
第二個例子花了這麼大篇幅來說,超出了我一開始的預期,這也能夠看出來的確 RxJava 學習的曲線是陡峭的,不過我認爲這個例子很好的表達我這一小節要闡述的觀點,即 Observable 在空間維度上對事件的從新組織,讓咱們的事件驅動型編程更具想象力 ,由於原先的編程中,咱們面對多少個異步任務,就會寫多少個回調,若是任務之間有依賴關係,咱們的作法就是修改觀察者(回調函數)邏輯以及新增數據結構保證依賴關係,RxJava 給咱們帶來的新思路是,Observable
的事件在到達觀察者以前,能夠先經過操做符進行一系列變換(固然變換的規則仍是和具體業務邏輯有關的),對觀察者屏蔽數據產生的複雜性,只提供給觀察者簡單的數據接口。
那麼是否在這個例子中,是否 RxJava 的版本更好呢,我我的的觀點是雖然 RxJava 版本展示了其更有想象力的編程方式,可是就這個具體的例子,二者並無太大的差距。RxJava 能夠寫出更短更內聚的代碼,可是編寫和理解的難度較大;Callback 版本雖然樸實無華,可是便於編寫以及理解,可維護性更好。對於二者的好壞,咱們也不要過於着急下結論,不妨繼續看看 RxJava 還有什麼其餘的優點。
(未完待續)
本文屬於 "RxJava 沉思錄" 系列,歡迎閱讀本系列的其餘分享:
若是您對個人技術分享感興趣,歡迎關注個人我的公衆號:麻瓜日記,不按期更新原創技術分享,謝謝!:)