RxJava2系列之轉換型操做符

原文首發於微信公衆號:躬行之(jzman-blog),歡迎關注交流!java

  1. buffer 操做符
  2. window操做符
  3. map操做符
  4. groupBy操做符
  5. cast操做符
  6. scan操做符
  7. To操做符

buffer 操做符

buffer 操做符重載方法比較多,這裏選取典型的幾個來講明 buffer 操做符的使用,buffer 操做符的使用能夠分爲以下三類,具體以下:緩存

//第一類
public final Observable<List<T>> buffer(int count) 
public final Observable<List<T>> buffer(int count, int skip) 
//第二類
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit) 
//第三類
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
            ObservableSource<? extends TOpening> openingIndicator,
            Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
複製代碼
buffer(int count)

buffer 操做符將一個 Observable 轉換爲一個 Observable,這個 Observable 用於收集原來發送的數據,而後發送這些緩存的數據集合,buffer 將發送的單個事件轉換成元素集合,下面是針對此種狀況的官方示意圖:bash

buffer(int count)

以下面的事件的發送過程,若是不設置 buffer 則須要發送四次,若是使用以下 buffer 進行轉換,則只需發送兩次,測試代碼以下:微信

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
        .buffer(2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接收...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "接收的數據...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });
複製代碼

上述代碼的執行結果以下:併發

第1次接收...
accept--->2
接收的數據...
accept--->2---Event1
accept--->2---Event2
第2次接收...
accept--->2
接收的數據...
accept--->2---Event3
accept--->2---Event4
複製代碼
buffer(int count, int skip)

相較 buffer(int count), skip 能夠指定下一次由源 Observable 轉換的 Observable 收集事件的位置,若是 count 等於 skip,則 buffer(int count,int skip) 等價於 buffer(int count),官方示意圖以下:app

buffer(int count, int skip)

以下面的事件發送過程,至關於每 3 個事件一組進行發送,但每次收集數據的位置參數 skip 爲 2,則每次收集的數據中會有數據重複,測試代碼以下:ide

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
        .buffer(3, 2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接收...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "接收的數據...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });
複製代碼

上述代碼的執行結果以下:學習

第1次接收...
accept--->3
接收的數據...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
第2次接收...
accept--->3
接收的數據...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
第3次接收...
accept--->1
接收的數據...
accept--->1---Event5
複製代碼
buffer(long timespan, TimeUnit unit)

buffer 操做符會將一個 Observable 轉換爲一個新的 Observable,timespan 決定新的的 Observsable 在發出緩存的數據的時間間隔,官方示意圖以下:測試

buffer(long timespan, TimeUnit unit)

以下面的事件發送過程,源 Observable 每隔 2 秒發送事件,而 buffer 新生成的 Obsrevable 則以每隔 1 秒的間隔發送緩存的事件集合,固然,這樣會在間隔的時間段收集不到數據致使丟失數據,測試代碼以下:this

Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
        .buffer(1,TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]
複製代碼
buffer(long timespan, long timeskip, TimeUnit unit)

buffer 操做符會將一個 Observable 轉換爲一個 Observable,timeskip 決定讓新生成的 Observable 按期啓動一個新的緩衝區,而後新的 Observable 會發出在 timespan 時間間隔內收集的事件集合,官方示意圖以下:

buffer(long timespan, long timeskip, TimeUnit unit)

以下面的事件發送過程,源 Observable 會每隔 1 秒發送 1 到 12 的整數,buffer 新生成的 Observable 會每隔 5 秒接收源 Observable 發送的事件,測試代碼以下:

Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(1,5, TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->[1]
accept--->[6]
accept--->[11]
複製代碼
buffer(ObservableSource boundary)

buffer(boundary) 會監視一個名叫 boundary 的 Observable,每當這個 Observable 發射了一個事件,它就建立一個新的 List 開始收集來自原始 Observable 的發送的事件併發送收集到的數據,官方示意圖以下:

buffer(ObservableSource<B> boundary)

以下面事件的發送過程,收集到的原事件會由於時間間隔的不一樣最終發送的收集到的事件的個數也不一樣,測試代碼以下:

Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
        .buffer(Observable.interval(3, TimeUnit.SECONDS))
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]
複製代碼
buffer(openingIndicator, closingIndicator)

buffer(openingIndicator, closingIndicator)會監視一個名叫 openingIndicator 的 Observable,這個 Observable 每發射一個事件,它就建立一個 List 收集原始 Observable 發送的數據,並將收集的數據給 closingIndicator,closingIndicator 會返回一個 Observable,這個 buffer 會監視 closingIndicator 返回的Observable,檢測到這個 Observable 的數據時,就會關閉這個 List 發射剛纔從 openingIndicator 得到數據,也就是名爲 openingIndicator 的 Observable 收集的數據,下面是針對此種狀況的官方示意圖:

buffer(openingIndicator, closingIndicator)

以下面時間發送過程,原始的 Observable 以每一個 1 秒的間隔發送 1 到 12 之間的整數,名爲 openingIndicator 的 Observable 會每隔 3 秒建立一個 List 手機發送的事件,而後將收集的數據給 closingIndicator,closingIndicator 會延時 1 秒發送從名爲 openingIndicator 的 Observable 拿到的數據,下面是測試代碼:

Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Object o) throws Exception {
                return closingIndicator;
            }
        })
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->[4, 5]
accept--->[7]
accept--->[10]
複製代碼

window操做符

這裏就以 window(long count) 爲例來介紹 window 操做符的使用,window 操做符的使用和 buffer 使用相似,不一樣之處是經 buffer 轉換成的 Observable 發送的時源 Observable 發送事件的事件集合,而經 window 操做符轉換成的 Observable 會依次發送 count 個源 Observable 發送的事件,該操做符官方示意圖以下:

window(long count)

測試代碼以下:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .window(2)
        .subscribe(new Consumer<Observable<String>>() {
            @Override
            public void accept(Observable<String> stringObservable) throws Exception {
                Log.i(TAG, "accept--Observable->");
                stringObservable.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "accept--->" + s);
                    }
                });
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4
複製代碼

map操做符

map(mapper)

map 操做符可對發送的數據進行相應的類型轉化,map 操做的官方示意圖以下:

map(mapper)

以下面的事件發送過程,通過 map 操做符轉換,可對源 Observable 發送的事件進行進一步的加工和轉換,測試代碼以下:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "this is " + s;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});
複製代碼

上述代碼的執行結果以下:

accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4
複製代碼
flatMap(mapper)

flatMap 操做符使用時,當源 Observable 發出事件會相應的轉換爲能夠發送多個事件的 Observable,這些 Observable 最終匯入同一個 Observable,而後這個 Observable 將這些事件統一發送出去,這裏決定再也不想上文中同樣,每一個重載方法都進行說明,這裏已經常使用的 flatMap(mapper) 爲例,其官方示意圖以下:

flatMap(mapper)

以下面的事件發送過程,使用了 flatMap 操做符以後,源 Observable 發送事件時,相應的生成對應的 Observable,最終發送的事件都匯入同一個 Observable,而後將事件結果回調給觀察者,測試代碼以下:

final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
        .flatMap(new Function<String, Observable<String>>() {
            @Override
            public Observable<String> apply(String s) throws Exception {
                return observable;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});
複製代碼

上述代碼的執行結果以下:

accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
複製代碼
concatMap(mapper)

concatMap 的使用與 flatMap 的使用大體相似,相較flatMap可以保證事件接收的順序,而 flatMap 不能保證事件接收的順序,concatMap 操做符的官方示意圖以下:

concatMap(mapper)

以下面的事件發送過程,咱們在源 Observable 發送整數 1 時延時 3 秒,而後繼續發送其餘事件,下面是測試代碼:

Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
        .concatMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                int delay = 0;
                if (aLong == 1) {
                    delay = 3;
                }
                return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});

複製代碼

使用 concatMap 操做符上述代碼的執行結果以下:

accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7
複製代碼

使用 flatMap 操做符上述代碼的執行結果以下:

accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7
複製代碼

可見,concatMap 相較 flatMap 可以保證事件接收的順序。

switchMap(mapper)

當源 Observable 發送事件時會相應的轉換爲能夠發送多個事件的 Observable,switchMap 操做符只關心當前這個 Observable,也就是說,源 Observable 每當發送一個新的事件時,就會丟棄前面一個發送多個事件的 Observable,官方示意圖以下:

switchMap(mapper)

以下面的事件發送過程,源 Observable 每一個 2 秒發送 1 和 2,轉換成的能夠發送多個事件的 Observable 每一個 1 秒發送從 4 開始的整數,使用 switchMap 操做符時,源 Observable 發送一個整數 1 時,這個新的能夠發送多個事件的 Observable 只發送兩個整數,也就是 4 和 5 以後就中止發送了,由於此時源 Observable 又開始發送事件了,此時會丟棄前一個可發送多個時間的 Observable,開始下一次源 Observable 發送事件的監聽,測試代碼以下:

Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
        .switchMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                Log.i(TAG, "accept-aLong-->" + aLong);
                return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});
複製代碼

上述代碼執行結果以下:

accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7
複製代碼

此外,還有與之相關的操做符:concatMapDelayError、concatMapEager、concatMapEagerDelayError concatMapIterable、flatMapIterable 、switchMapDelayError,都是上述操做符的擴展,這裏就不在介紹了。

groupBy操做符

groupBy 操做符會對接收的數據按照指定的規則進行分類,而後再被 GroupedObservable 等訂閱輸出,官方示意圖以下:

groupBy

以下面的事件發送過程,咱們會按照成績進行分組輸出,具體以下:

List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("成績是95分", 95));
beanList.add(new DataBean("成績是70分", 70));
beanList.add(new DataBean("成績是56分", 56));
beanList.add(new DataBean("成績是69分", 69));
beanList.add(new DataBean("成績是90分", 90));
beanList.add(new DataBean("成績是46分", 46));
beanList.add(new DataBean("成績是85分", 85));

Observable.fromIterable(beanList)
        .groupBy(new Function<DataBean, String>() {
            @Override
            public String apply(DataBean dataBean) throws Exception {
                int score = dataBean.getScore();
                if (score >= 80) {
                    return "A";
                }

                if (score >= 60 && score < 80) {
                    return "B";
                }

                if (score < 60) {
                    return "C";
                }
                return null;
            }
        })
        .subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
            @Override
            public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
                groupedObservable.subscribe(new Consumer<DataBean>() {
                    @Override
                    public void accept(DataBean dataBean) throws Exception {
                        Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "組--->"+dataBean.getDesc());
                    }
                });
            }
        });

複製代碼

上述代碼的執行結果以下:

accept--->A組--->成績是95分 
accept--->B組--->成績是70分
accept--->C組--->成績是56分
accept--->B組--->成績是69分
accept--->A組--->成績是90分
accept--->C組--->成績是46分
accept--->A組--->成績是85分
複製代碼

cast操做符

cast 操做符用於類型轉化,cast 操做符官方示意圖以下:

cast

測試代碼以下:

Observable.just(1,2,3,4,5)
        .cast(String.class)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String String) throws Exception {
                Log.i(TAG, "accept--->" + String);
            }
        });
複製代碼

測試會出現以下異常:

java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String
複製代碼

從結果可知,發現不一樣類型之間轉化會出現類型轉化異常,cast 操做符並不能進行不一樣類型之間的轉化,可是可使用 cast 操做來校驗發送的事件數據類型是否是指定的類型。

scan操做符

scan 操做符會依次掃描每兩個元素,第一個元素沒有上一個元素,則第一個元素的上一個元素會被忽略,當掃描第二個元素時,會獲取到第一個元素,以後 apply 方法的返回值會做爲上一個元素的值參與計算,最終返回轉化後的結果,scan 官方示意圖以下:

scan(accumulator)

看一下下面的事件發送過程,第一次掃描時,第一個元素是 1,這裏至關於 last,第二個元素是 2 ,這裏至關於 item,此時 apply 方法返回的結果是 2,這個 2 會做爲 last 的值參與下一次掃描計算,則下一次返回的值確定是 2 * 3,也就是 6,測試代碼以下:

Observable.just(1, 2, 3, 4, 5)
        .scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer last, Integer item) throws Exception {
                Log.i(TAG, "accept--last->" + last);
                Log.i(TAG, "accept--item->" + item);
                return last * item;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "accept--->" + integer);
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120
複製代碼

To操做符

toList()

toList 操做符會將發送的一系列數據轉換成 List,而後一次性發送出去,toList 的官方示意圖以下:

toList()

測試代碼以下:

Observable.just(1, 2, 3, 4)
        .toList()
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.i(TAG, "accept--->" + integers);
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->[1, 2, 3, 4]
複製代碼
toMap(keySelector)

toMap操做符會將要發送的事件按照指定的規則轉化爲 Map 形式,而後一次性發送出去,toMap 操做符官方示意圖以下:

toMap(keySelector)

測試代碼以下:

Observable.just(1, 2, 3, 4)
        .toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key"+integer;
            }
        })
        .subscribe(new Consumer<Map<String, Integer>>() {
            @Override
            public void accept(Map<String, Integer> map) throws Exception {
                Log.i(TAG, "accept--->" + map);
            }
        });
複製代碼

上述代碼的執行結果以下:

accept--->{key2=2, key4=4, key1=1, key3=3}
複製代碼

我的微信公衆號:jzman-blog 能夠一塊兒交流學習!

相關文章
相關標籤/搜索