博客主頁java
RxJava 的變換操做符主要包括如下幾種:segmentfault
對 Observable 發射的每一項數據應用一個函數,執行變換操做
數組
map 操做符對原始 Observable 發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的 Observable緩存
RxJava 將這個操做符實現爲 map 函數,這個操做符默認不在任何特定的調度器上執行。數據結構
Observable.just("HELLO") .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return s.toLowerCase(); } }) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return s + " world!"; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello world!
第一次轉換,將字符串 「HELLO「 轉換成全是小寫字母的字符串 」hello「。第二次轉換,在字符串 」hello「 後面添加新的字符串 」 world!「,它們組成了新的字符串也就是執行結果。app
flatMap 將一個發射數據的 Observable 變換爲多個 Observables, 而後將它們發射的數據合併後放進一個單獨 Observable
ide
flatMap 操做符使用一個指定的函數對原始 Observable 發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據 Observable,而後 flatMap 合併這些 Observables 發射的數據,最後將合併後的結果看成它本身的數據序列發射。函數
下面看一個例子。先定義一個用戶對象,包含用戶名和地址,因爲地址可能會包括生活、工做等地方,因此使用一個 List 對象來表示用戶的地址spa
public class User { public String username; public List<Address> addresses; public static class Address { public String street; public String city; } }
若是想打印出某個用戶全部的地址,那麼能夠藉助 map 操做符返回一個地址的列表。3d
Observable.just(user) .map(new Function<User, List<User.Address>>() { @Override public List<User.Address> apply(User user) throws Exception { return user.addresses; } }) .subscribe(new Consumer<List<User.Address>>() { @Override public void accept(List<User.Address> addresses) throws Exception { for (User.Address address : addresses) { Log.d(TAG, "Next: " + address.city); } } }); // 執行結果 Next: shang hai Next: su zhou
換成 flatMap 操做符以後,flatMap 內部將用戶的地址列表轉換成一個 Observable
Observable.just(user) .flatMap(new Function<User, ObservableSource<User.Address>>() { @Override public ObservableSource<User.Address> apply(User user) throws Exception { return Observable.fromIterable(user.addresses); } }) .subscribe(new Consumer<User.Address>() { @Override public void accept(User.Address address) throws Exception { Log.d(TAG, "Next: " + address.city); } }); // 執行結果 Next: shang hai Next: su zhou
flatMap 對這些 Observables 發射的數據作的是合併 (merge) 操做, 所以它們多是交錯的。還有一個操做符不會讓變換後 Observables 發射的數據交錯,它嚴格按照順序發射這些數據,這個操做符就是 concatMap
groupBy 操做符將一個 Observable 拆分爲一些 Observables 集合,它們中的每個都發射原始
Observable 的一個子序列
哪一個數據項由哪個 Observable 發射是由一個函數斷定的,這個函數給每一項指定一個 Key, Key 相同的數據會被同一個 Observable 發射。
最終返回的是 Observable 一個特殊子類 GroupedObservable 。它是一個抽象類。getKey()
方法是 GroupedObservable 方法,這個 Key 用於將數據分組到指定的 Observable
Observable.range(1, 8) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer % 2 == 0 ? "偶數組" : "奇數組"; } }) .subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { Log.d(TAG, "Next-> group name: " + stringIntegerGroupedObservable.getKey()); } }); // 執行結果 Next-> group name: 奇數組 Next-> group name: 偶數組
對上述代碼作一些修改, 對 GroupedObservable 使用 getKey() 方法,從而可以選出奇數組的
GroupedObservable ,最後打印出該 GroupedObservable 下的所有成員
Observable.range(1, 8) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer % 2 == 0 ? "偶數組" : "奇數組"; } }) .subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception { final String key = stringIntegerGroupedObservable.getKey(); if (key.equals("奇數組")) { stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + key + ":" + integer); } }); } } }); // 執行結果 Next->奇數組:1 Next->奇數組:3 Next->奇數組:5 Next->奇數組:7
buffer 會按期收集 Observable 數據並放進一個數據包裹,而後發射這些數據包裹,而不是一次發射一個值
buffer 操做符將 Observable 變換爲另外一個,原來的 Observable 正常發射數據,由變換產生 Observable 發射這些數據的緩存集合。
Observable.range(1, 10) .buffer(2) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { Log.d(TAG, "Next: " + integers); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: [1, 2] Next: [3, 4] Next: [5, 6] Next: [7, 8] Next: [9, 10] Complete.
上述代碼,發射了從 1 到 10 這 10 個數字,因爲使用了 buffer 操做符,它會將原先的 Observable 轉換成新的 Observable,而新的 Observable 每次可發射兩個數字,發射完畢後調用 onComplete()
方法。
查看 buffer 操做符的源碼,能夠看到使用 buffer 操做符以後轉換成
public final Observable<List<T>> buffer(int count) { return buffer(count, count); }
若是將發射的數據變成11,range(1, 11) 執行結果以下:
Next: [1, 2] Next: [3, 4] Next: [5, 6] Next: [7, 8] Next: [9, 10] Next: [11] Complete.
再修改一下代碼,緩存5個數字,執行結果以下:
Next: [1, 2, 3, 4, 5] Next: [6, 7, 8, 9, 10] Next: [11] Complete.
在 RxJava 有許多 buffer 的重載方法,例如比較經常使用的 buffer(count, skip)
buffer(count, skip) 從原始 Observable 的第一項數據開始建立新的緩存,此後每當收到 skip 項數據,就用 count 項數據填充緩存: 開頭的一項和後續的 count - 1 項。它以列表 (List) 的形式發射緩存,這些緩存可能會有重疊部分 (好比 skip < count),也可能會有間隙(好比 skip > count時),取決於 count 和 skip 的值。
Observable.range(1, 11) .buffer(5, 1) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { Log.d(TAG, "Next: " + integers); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: [1, 2, 3, 4, 5] Next: [2, 3, 4, 5, 6] Next: [3, 4, 5, 6, 7] Next: [4, 5, 6, 7, 8] Next: [5, 6, 7, 8, 9] Next: [6, 7, 8, 9, 10] Next: [7, 8, 9, 10, 11] Next: [8, 9, 10, 11] Next: [9, 10, 11] Next: [10, 11] Next: [11] Complete.
若是原來 Observable 發射了一個 onError 通知,那麼 buffer 會當即傳遞這個通知,而不是首先發射緩存的數據,即便在這以前緩存中包含了原始 Observable 發射的數據。
window 操做符與 buffer 相似, 但它在發射以前是把收集到的數據放進單獨的 Observable,而不是放進 一個數據結構。
按期未來自原始 Observable 的數據分解爲一個 Observable 窗口,發射這些窗口,而不是每次發射一項數據
window 發射的不是原始 Observable 數據包,而是 Observables,這些 Observables 中的每個都發射原始 Observable 數據的一個子集,最後發射一個 onComplete 通知。
Observable.range(1, 10) .window(2) .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> integerObservable) throws Exception { Log.d(TAG, "Next-> "); integerObservable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, " " + integer); } }); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 1 2 Next-> 3 4 Next-> 5 6 Next-> 7 8 Next-> 9 10 Complete.
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)