目錄html
接續上篇: Rxjava2 Observable的數據變換詳解及實例(一)java
按期未來自原始Observable的數據分解爲一個Observable窗口,發射這些窗口,而不是每次發射一項數據。react
Window
和 Buffer
相似,但不是發射來自原始Observable的數據包,它發射的是 Observables,這些Observables中的每個都發射原始Observable數據的一個子集,最後發 射一個 onCompleted 通知。git
和 Buffer
同樣,Window 有不少變體,每一種都以本身的方式將原始Observable分解爲多個做爲結果的Observable,每個都包含一個映射原始數據的 window
。用 Window操做符的術語描述就是,當一個窗口打開(when a window "opens")意味着一個新的Observable已經發射 (產生)了,並且這個Observable開始發射來自原始Observable的數據;當一個窗口關閉 (when a window "closes")意味着發射(產生)的Observable中止發射原始Observable的數據, 而且發射終止通知 onCompleted 給它的觀察者們。github
在RxJava中有許多種Window
操做符的方法。緩存
window
的這個方法會當即打開它的第一個窗口。每當它觀察到closingSelector
返回的 Observable發射了一個對象時,它就關閉當前打開的窗口並當即打開一個新窗口。用這個方法,這種 window 變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據是一一對應的。網絡
解析: 一開始開啓一個 window
接收原始數據,每當它觀察到closingSelector
返回的 Observable發射了一個對象時,它就關閉當前打開的窗口並取消此時訂閱closingSelector 的Observable ( 此時多是沒有數據 window
)並當即打開一個新窗口,注意: 每一個窗口開啓前都會去訂閱一個closingSelector
返回的 Observable。app
實例代碼:ide
// 1. window(Callable boundary) // 開啓一個window,並訂閱觀察boundary返回的Observable發射了一個數據, // 則關閉此window,將收集的數據以Observable發送, 從新訂閱boundary返回的Observable,開啓新window Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .window(new Callable<Observable<Long>>() { @Override public Observable<Long> call() throws Exception { System.out.println("--> call(1)"); return Observable.timer(2, TimeUnit.SECONDS); // 兩秒後關閉當前窗口 } }).subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { // 接受每一個window接受的數據的Observable t.subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(1): " + t); } }); } });
輸出:函數
--> call(1) --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> call(1) --> accept(1): 4 --> accept(1): 5 --> call(1) --> accept(1): 6 --> accept(1): 7 --> call(1) --> accept(1): 8 --> accept(1): 9 --> call(1) --> accept(1): 10
Javadoc: window(closingSelector)
Javadoc: window(closingSelector, bufferSize)
當 openingIndicator
發射一個數據,就會打開一個 window
, 同時訂閱 closingIndicator
返回的Observable,當這個Observable發射一個數據,就結束此 window 和 ,發送收集數據的 Observable。
不管什麼時候,只要 window 觀察到 windowOpenings 這個Observable發射了一個 Opening
對象,它就打開一個窗口,而且同時調用 closingSelector
生成一個與那個窗口關聯的關閉 (closing)Observable
。當這個關閉 (closing)Observable 發射了一個對象時,window
操做符就會關閉那個窗口以及關聯的closingSelector
的 Observable。
注意: 對這個方法來講,因爲當前窗口的關閉和新窗口的打開是由單獨的 Observable 管理的,它建立的窗口可能會存在重疊(重複某些來自原始Observable的數據) 或間隙(丟棄某些來自原始Observable的數據)。
實例代碼:
// 2. window(ObservableSource openingIndicator, Function<T, ObservableSource<R>> closingIndicator) // 當openingIndicator發射一個數據,就會打開一個window, 同時訂閱closingIndicator返回的Observable, // 當這個Observable發射一個數據,就結束此window以及對應的closingIndicator,發送收集數據的 Observable。 Observable<Long> openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> openingIndicator is subscribe!"); } }).doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> openingIndicator is completed!"); } }).doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> openingIndicator emitter: " + t); } }); Observable<Long> dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> DataSource is subscribe!"); } }).doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource emitter: " + t); } }); dataSource.window(openingIndicator, new Function<Long, Observable<Long>>() { @Override public Observable<Long> apply(Long t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> closingIndicator is subscribe!"); } }); } }).subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { System.out.println("-------------------> new window data"); t.subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }); } });
輸出:
--> DataSource is subscribe! --> openingIndicator is subscribe! --> openingIndicator emitter: 1 --> DataSource emitter: 1 -------------------> new window data --> apply(2): 1 --> closingIndicator is subscribe! --> openingIndicator emitter: 2 --> DataSource emitter: 2 -------------------> new window data --> apply(2): 2 --> closingIndicator is subscribe! --> accept(2): 2 --> accept(2): 2 --> openingIndicator emitter: 3 --> DataSource emitter: 3 -------------------> new window data --> apply(2): 3 --> closingIndicator is subscribe! --> accept(2): 3 --> accept(2): 3 --> accept(2): 3 --> DataSource emitter: 4 --> openingIndicator emitter: 4 --> accept(2): 4 --> accept(2): 4 -------------------> new window data --> apply(2): 4 --> closingIndicator is subscribe! --> DataSource emitter: 5 --> accept(2): 5 --> accept(2): 5 --> openingIndicator emitter: 5
Javadoc: window(openingIndicator, closingIndicator)
Javadoc: window(openingIndicator, closingIndicator,bufferSize)
這個 window
的方法當即打開它的第一個窗口。每當當前窗口發射了 count
項數據,它就關閉當前窗口並打開一個新窗口。若是從原始Observable收到了 onError
或 onCompleted
通知它也會關閉當前窗口。
這種 window 方法發射一系列不重疊的窗口,這些窗口的數據集合與原始 Observable發射的數據是 一一對應 的。
實例代碼:
// 3. window(count) // 以count爲緩存大小收集的不重疊的Observables對象,接受的數據與原數據彼此對應 Observable.range(1, 20) .window(5) // 設置緩存大小爲5 .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> t) throws Exception { System.out.println("--------------> new data window"); t.subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept window(3): " + t); } }); } });
輸出:
--------------> new data window --> accept window(3): 1 --> accept window(3): 2 --> accept window(3): 3 --> accept window(3): 4 --> accept window(3): 5 --------------> new data window --> accept window(3): 6 --> accept window(3): 7 --> accept window(3): 8 --> accept window(3): 9 --> accept window(3): 10 --------------> new data window --> accept window(3): 11 --> accept window(3): 12 --> accept window(3): 13 --> accept window(3): 14 --> accept window(3): 15 --------------> new data window --> accept window(3): 16 --> accept window(3): 17 --> accept window(3): 18 --> accept window(3): 19 --> accept window(3): 20
Javadoc: window(count)
這個 window
的方法當即打開它的第一個窗口。原始Observable每發射 skip
項數據它就打開 一個新窗口(例如,若是 skip 等於3,每到第三項數據,它會建立一個新窗口)。每當當前窗口發射了 count
項數據,它就關閉當前窗口並打開一個新窗口。若是從原始Observable 收到了onError
或 onCompleted
通知它也會關閉當前窗口。
解析: window 一開始打開一個 window,每發射 skip 項數據就會打開一個 window 獨立收集 原始數據,當 window 收集了 count 個數據就會關閉,開啓另一個。當原始Observable發送了onError或者onCompleted通知也會關閉當前窗口。
實例代碼:
// 4. window(count,skip) // window一開始打開一個window,每發射skip項數據就會打開一個window獨立收集原始數據 // 當window收集了count個數據就會關閉window,開啓另一個。 // 當原始Observable發送了onError 或者 onCompleted 通知也會關閉當前窗口。 // 4.1 skip = count: 會依次順序接受原始數據,同window(count) Observable.range(1, 10) .window(2, 2) // skip = count, 數據會依次順序輸出 .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId()); } }); } }); // 4.2 skip > count: 兩個窗口可能會有 skip-count 項數據丟失 Observable.range(1, 10) .window(2, 3) // skip > count, 數據會存在丟失 .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId()); } }); } }); // 4.3 skip < count: 兩個窗口可能會有 count-skip 項數據重疊 Observable.range(1, 10) .window(3, 2) // skip < count, 數據會重疊 .subscribe(new Consumer<Observable<Integer>>() { @Override public void accept(Observable<Integer> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId()); } }); } });
輸出:
--> accept window(4-1): 1 , ThreadID: 11 --> accept window(4-1): 2 , ThreadID: 11 --> accept window(4-1): 4 , ThreadID: 12 --> accept window(4-1): 3 , ThreadID: 11 --> accept window(4-1): 5 , ThreadID: 12 --> accept window(4-1): 6 , ThreadID: 12 --> accept window(4-1): 7 , ThreadID: 13 --> accept window(4-1): 8 , ThreadID: 13 --> accept window(4-1): 9 , ThreadID: 13 --> accept window(4-1): 10 , ThreadID: 14 --> accept window(4-2): 1 , ThreadID: 15 --> accept window(4-2): 2 , ThreadID: 15 --> accept window(4-2): 4 , ThreadID: 16 --> accept window(4-2): 5 , ThreadID: 16 --> accept window(4-2): 7 , ThreadID: 17 --> accept window(4-2): 8 , ThreadID: 17 --> accept window(4-2): 10 , ThreadID: 18 --> accept window(4-3): 1 , ThreadID: 19 --> accept window(4-3): 2 , ThreadID: 19 --> accept window(4-3): 3 , ThreadID: 19 --> accept window(4-3): 3 , ThreadID: 20 --> accept window(4-3): 4 , ThreadID: 20 --> accept window(4-3): 5 , ThreadID: 20 --> accept window(4-3): 5 , ThreadID: 21 --> accept window(4-3): 6 , ThreadID: 21 --> accept window(4-3): 7 , ThreadID: 21 --> accept window(4-3): 7 , ThreadID: 22 --> accept window(4-3): 8 , ThreadID: 22 --> accept window(4-3): 9 , ThreadID: 22 --> accept window(4-3): 9 , ThreadID: 23 --> accept window(4-3): 10 , ThreadID: 23
Javadoc: window(count, skip)
這個 window
的方法當即打開它的第一個窗口收集數據。每當過了 timespan
這麼長的時間段它就關閉當前窗口並打開一個新窗口(時間單位是 unit
,可選在調度器 scheduler
上執行)收集數據。若是從原始 Observable 收到了 onError 或 onCompleted 通知它也會關閉當前窗口。
這種 window 方法發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是 一一對應 的。
實例代碼:
// 5. window(long timespan, TimeUnit unit) // 每當過了 timespan 的時間段,它就關閉當前窗口並打開另外一個新window收集數據 Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .window(2, TimeUnit.SECONDS) // 間隔2秒關閉當前 window 並打開一個新 window 收集數據 // .window(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中 .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() ); } }); } });
輸出:
--> accept window(5): 1 , ThreadID: 11 --> accept window(5): 2 , ThreadID: 11 --> accept window(5): 3 , ThreadID: 11 --> accept window(5): 4 , ThreadID: 14 --> accept window(5): 5 , ThreadID: 14 --> accept window(5): 6 , ThreadID: 15 --> accept window(5): 7 , ThreadID: 16 --> accept window(5): 8 , ThreadID: 16 --> accept window(5): 9 , ThreadID: 17 --> accept window(5): 10 , ThreadID: 17
Javadoc: window(timespan, TimeUnit)
Javadoc: window(timespan, TimeUnit, scheduler)
這個 window
的方法當即打開它的第一個窗口。這個變體是 window(count) 和 window(timespan, unit[, scheduler]) 的結合,每當過了 timespan
的時長或者當前窗口收到了 count
項數據,它就關閉當前窗口並打開另外一個。若是從原始 Observable收到了 onError
或 onCompleted
通知它也會關閉當前窗口。
這種window方法發射 一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是 一一對應 的。
實例代碼:
// 6. window(long timespan, TimeUnit unit, long count) // 每當過了timespan的時間段或者當前窗口收到了count項數據,它就關閉當前window並打開另外一個window收集數據 Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS) .window(2, TimeUnit.SECONDS, 5) // 每隔2秒關閉當前收集數據的window並開啓一個window收集5項數據 // .window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 ) // 指定在 newThread 線程中 .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() ); } }); } });
輸出:
--> accept window(6): 1 , ThreadID: 11 --> accept window(6): 2 , ThreadID: 11 --> accept window(6): 3 , ThreadID: 11 --> accept window(6): 4 , ThreadID: 11 --> accept window(6): 5 , ThreadID: 11 --> accept window(6): 6 , ThreadID: 14 --> accept window(6): 7 , ThreadID: 14 --> accept window(6): 8 , ThreadID: 14 --> accept window(6): 9 , ThreadID: 14 --> accept window(6): 10 , ThreadID: 14 --> accept window(6): 11 , ThreadID: 15 --> accept window(6): 12 , ThreadID: 15
Javadoc: window(timespan, TimeUnit, count)
Javadoc: window(timespan, TimeUnit, scheduler, count)
這個 window
的方法當即打開它的第一個窗口。隨後每當過了 timeskip
的時長就打開一個新窗口(時間單位是 unit
,可選在調度器 scheduler
上執行),當窗口打開的時長達 到 timespan
,它就關閉當前打開的窗口。若是從原始Observable收到 了 onError 或 onCompleted 通知它也會關閉當前窗口。窗口的數據可能重疊也可能有間隙,取決於你設置的 timeskip
和 timespan
的值。
解析: 在每個 timeskip 時期內都建立一個新的 window,而後獨立收集 timespan 時間段的原始Observable發射的每一項數據。注意:由於每一個 window 都是獨立接收數據,當接收數據的時間與建立新 window 的時間不一致時會有數據項重複,丟失等狀況。
實例代碼:
// 7. window(long timespan, long timeskip, TimeUnit unit) // 在每個timeskip時期內都建立一個新的window,而後獨立收集timespan時間段的原始Observable發射的每一項數據, // 若是timespan長於timeskip,它發射的數據包將會重疊,所以可能包含重複的數據項。 // 7.1 skip = timespan: 會依次順序接受原始數據,同window(count) Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS) .window(1, 1, TimeUnit.SECONDS) // 設置每秒建立一個window,收集2秒的數據 // .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中 .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId()); } }); } }); // 7.2 skip > timespan: 兩個窗口可能會有 skip-timespan 項數據丟失 Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS) .window(1, 2, TimeUnit.SECONDS) // 設置每秒建立一個window,收集2秒的數據 // .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中 .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId()); } }); } }); // 7.3 skip < timespan: 兩個窗口可能會有 timespan-skip 項數據重疊 Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS) .window(2, 1, TimeUnit.SECONDS) // 設置每秒建立一個window,收集2秒的數據 // .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread()) // 指定在 newThread 線程中 .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(Observable<Long> t) throws Exception { t.observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId()); } }); } });
輸出:
--> accept window(7-1): 1 , ThreadID: 11 --> accept window(7-1): 2 , ThreadID: 11 --> accept window(7-1): 3 , ThreadID: 14 --> accept window(7-1): 4 , ThreadID: 15 --> accept window(7-1): 5 , ThreadID: 17 ---------------------------------------------------------------------- --> accept window(7-2): 1 , ThreadID: 11 --> accept window(7-2): 3 , ThreadID: 14 --> accept window(7-2): 5 , ThreadID: 15 ---------------------------------------------------------------------- --> accept window(7-3): 1 , ThreadID: 11 --> accept window(7-3): 2 , ThreadID: 11 --> accept window(7-3): 2 , ThreadID: 14 --> accept window(7-3): 3 , ThreadID: 14 --> accept window(7-3): 3 , ThreadID: 15 --> accept window(7-3): 4 , ThreadID: 15 --> accept window(7-3): 4 , ThreadID: 16 --> accept window(7-3): 5 , ThreadID: 16 --> accept window(7-3): 5 , ThreadID: 17
Javadoc: window(timespan, timeskip, TimeUnit)
Javadoc: window(timespan, timeskip, TimeUnit, scheduler)
將一個 Observable 分拆爲一些 Observables 集合,它們中的每個發射原始 Observable 的一個子序列。
RxJava實現了 groupBy
操做符。它返回Observable的一個特殊子類 GroupedObservable
,實現了GroupedObservable
接口的對象有一個額外的方法 getKey ,這個 Key 用於將數據分組到指定的Observable。有一個版本的 groupBy 容許你傳遞一個變換函數,這樣它能夠在發射結果 GroupedObservable 以前改變數據項。
若是你取消訂閱一個 GroupedObservable ,那個 Observable 將會終止。若是以後原始的 Observable又發射了一個與這個Observable的Key匹配的數據, groupBy 將會爲這個 Key 建立一個新的 GroupedObservable。
注意: groupBy
將原始 Observable 分解爲一個發射多個 GroupedObservable
的Observable,一旦有訂閱,每一個 GroupedObservable 就開始緩存數據。所以,若是你忽略這 些 GroupedObservable 中的任何一個,這個緩存可能造成一個潛在的內存泄露。所以,若是你不想觀察,也不要忽略 GroupedObservable 。你應該使用像 take(0)
這樣會丟棄本身的緩存的操做符。
GroupBy
操做符將原始 Observable 分拆爲一些 Observables
集合,它們中的每個發射原始 Observable 數據序列的一個子序列。哪一個數據項由哪個 Observable 發射是由一個函數斷定的,這個函數給每一項指定一個Key
,Key相同的數據會被同一個 Observable 發射。還有一個 delayError
參數的方法,指定是否延遲 Error
通知的Observable。
實例代碼:
// 1. groupBy(keySelector) // 將原始數據處理後加上分組tag,經過GroupedObservable發射分組數據 Observable.range(1, 10) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 不一樣的key將會產生不一樣分組的Observable return t % 2 == 0 ? "Even" : "Odd"; // 將數據奇偶數進行分組, } }).observeOn(Schedulers.newThread()) .subscribe(new Consumer<GroupedObservable<String, Integer>>() { @Override public void accept(GroupedObservable<String, Integer> grouped) throws Exception { // 獲得每一個分組數據的的Observable grouped.subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { // 獲得數據 System.out.println("--> accept groupBy(1): groupKey: " + grouped.getKey() + ", value: " + t); } }); } });
輸出:
--> accept groupBy(1): groupKey: Odd, value: 1 --> accept groupBy(1): groupKey: Odd, value: 3 --> accept groupBy(1): groupKey: Odd, value: 5 --> accept groupBy(1): groupKey: Odd, value: 7 --> accept groupBy(1): groupKey: Odd, value: 9 --> accept groupBy(1): groupKey: Even, value: 2 --> accept groupBy(1): groupKey: Even, value: 4 --> accept groupBy(1): groupKey: Even, value: 6 --> accept groupBy(1): groupKey: Even, value: 8 --> accept groupBy(1): groupKey: Even, value: 10
Javadoc: groupBy(keySelector)
Javadoc: groupBy(keySelector, delayError)
GroupBy
操做符經過 keySelector
將原始 Observable 按照 Key
分組,產生不一樣的 Observable,再經過 valueSelector
對原始的數據進行處理,在發送每個被處理完成的數據。
實例代碼:
// 2. groupBy(Function(T,R),Function(T,R)) // 第一個func對原數據進行分組處理(僅僅分組添加key,不處理原始數據),第二個func對原始數據進行處理 Observable.range(1, 10) .groupBy(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 對原始數據進行分組處理 return t % 2 == 0 ? "even" : "odd"; } },new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 對原始數據進行數據轉換處理 return t + " is " + (t % 2 == 0 ? "even" : "odd"); } }).observeOn(Schedulers.newThread()).subscribe(new Consumer<GroupedObservable<String, String>>() { @Override public void accept(GroupedObservable<String, String> grouped) throws Exception { grouped.subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { // 接受最終的分組處理以及原數據處理後的數據 System.out.println("--> accept groupBy(2): groupKey = " + grouped.getKey() + ", value = " + t); } }); } });
輸出:
--> accept groupBy(2): groupKey = odd, value = 1 is odd --> accept groupBy(2): groupKey = odd, value = 3 is odd --> accept groupBy(2): groupKey = odd, value = 5 is odd --> accept groupBy(2): groupKey = odd, value = 7 is odd --> accept groupBy(2): groupKey = odd, value = 9 is odd --> accept groupBy(2): groupKey = even, value = 2 is even --> accept groupBy(2): groupKey = even, value = 4 is even --> accept groupBy(2): groupKey = even, value = 6 is even --> accept groupBy(2): groupKey = even, value = 8 is even --> accept groupBy(2): groupKey = even, value = 10 is even
Javadoc: groupBy(keySelector, valueSelector)
Javadoc: groupBy(keySelector, valueSelector, delayError)
Javadoc: groupBy(keySelector, valueSelector, delayError, bufferSize)
連續地對數據序列的每一項應用一個函數,而後連續發射結果。
Scan
操做符對原始 Observable 發射的第一項數據應用一個函數,而後將那個函數的結果做爲 本身的第一項數據發射。它將函數的結果同第二項數據一塊兒填充給這個函數來產生它本身的第二項數據。它持續進行這個過程來產生剩餘的數據序列。這個操做符在某些狀況下被叫作 accumulator
。
解析: 先發送原始數據第一項數據,而後將這個數據與下一個原始數據做爲參數傳遞給 accumulator
, 處理後發送這個數據,並與下一個原始數據一塊兒傳遞到下一次 accumulator ,直到數據序列結束。相似一個累積的過程。
實例代碼:
// 1. scan(BiFunction(Integer sum, Integer t2)) // 接受數據序列,從第二個數據開始,每次會將上次處理數據和如今接受的數據進行處理後發送 Observable.range(1, 10) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer LastItem, Integer item) throws Exception { System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item); return LastItem + item; // 實現求和操做 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept scan(1): " + t); } });
輸出:
--> accept scan(1): 1 --> apply: LastItem = 1, CurrentItem = 2 --> accept scan(1): 3 --> apply: LastItem = 3, CurrentItem = 3 --> accept scan(1): 6 --> apply: LastItem = 6, CurrentItem = 4 --> accept scan(1): 10 --> apply: LastItem = 10, CurrentItem = 5 --> accept scan(1): 15 --> apply: LastItem = 15, CurrentItem = 6 --> accept scan(1): 21 --> apply: LastItem = 21, CurrentItem = 7 --> accept scan(1): 28 --> apply: LastItem = 28, CurrentItem = 8 --> accept scan(1): 36 --> apply: LastItem = 36, CurrentItem = 9 --> accept scan(1): 45 --> apply: LastItem = 45, CurrentItem = 10 --> accept scan(1): 55
Javadoc: scan(accumulator)
有一個 scan
操做符的方法,你能夠傳遞一個種子值給累加器函數的第一次調用(Observable 發射的第一項數據)。若是你使用這個版本,scan
將發射種子值做爲本身的第一項數據。
注意: 傳遞 null
做爲種子值與不傳遞是不一樣的,null
種子值是合法的。
解析: 指定初始種子值,第一次發送種子值,後續發送原始數據序列以及累計處理數據。
實例代碼:
// 2. scan(R,Func2) // 指定初始種子值,第一次發送種子值,後續發送原始數據序列以及累計處理數據 Observable.range(1, 10) .scan(100, new BiFunction<Integer, Integer, Integer>() { // 指定初始種子數據爲100 @Override public Integer apply(Integer lastValue, Integer item) throws Exception { System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item); return lastValue + item; // 指定初值的求和操做 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept sacn(2) = " + t); } });
輸出:
--> accept sacn(2) = 100 --> apply: lastValue = 100, item = 1 --> accept sacn(2) = 101 --> apply: lastValue = 101, item = 2 --> accept sacn(2) = 103 --> apply: lastValue = 103, item = 3 --> accept sacn(2) = 106 --> apply: lastValue = 106, item = 4 --> accept sacn(2) = 110 --> apply: lastValue = 110, item = 5 --> accept sacn(2) = 115 --> apply: lastValue = 115, item = 6 --> accept sacn(2) = 121 --> apply: lastValue = 121, item = 7 --> accept sacn(2) = 128 --> apply: lastValue = 128, item = 8 --> accept sacn(2) = 136 --> apply: lastValue = 136, item = 9 --> accept sacn(2) = 145 --> apply: lastValue = 145, item = 10 --> accept sacn(2) = 155
注意: 這個操做符默認不在任何特定的調度器上執行。
Javadoc: scan(initialValue, accumulator)
Cast
將原始Observable發射的每一項數據都強制轉換爲一個指定的類型,而後再發射數據,它是 map 的一個特殊版本。轉換失敗會有Error通知。
將原始數據強制轉換爲指定的 clazz
類型,若是轉換成功發送轉換後的數據,不然發送Error
通知。通常用於 數據類型的轉換 和 數據實際類型的檢查(多態)。
實例代碼:
// cast(clazz) // 1. 基本類型轉換 Observable.range(1, 5) .cast(Integer.class) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("-- accept cast(1): " + t); } }); // 2. 轉換失敗通知 System.out.println("------------------------------------"); Observable.just((byte)1) .cast(Integer.class) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer t) { System.out.println("--> onNext(2) = " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(2) = " + e.toString()); } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); System.out.println("------------------------------------"); class Animal{ public int id; } class Dog extends Animal{ public String name; @Override public String toString() { return "Dog [name=" + name + ", id=" + id + "]"; } } // 3. 多態轉換,檢查數據的實際類型 Animal animal = new Dog(); animal.id = 666; Observable.just(animal) .cast(Dog.class) .subscribe(new Consumer<Dog>() { @Override public void accept(Dog t) throws Exception { System.out.println("--> accept cast(3): " + t); } });
輸出:
-- accept cast(1): 1 -- accept cast(1): 2 -- accept cast(1): 3 -- accept cast(1): 4 -- accept cast(1): 5 ------------------------------------ --> onSubscribe(2) --> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer ------------------------------------ --> accept cast(3): Dog [name=null, id=666]
Javadoc: cast(clazz)
在實際開發場景中,好比網絡數據請求場景,原始的數據格式或類型可能並不知足開發的實際須要,須要對數據進行處理。數據變換操做在實際開發場景中仍是很是多的,因此數據的變換是很是重要的。使用Rx的數據變換操做能夠輕鬆完成大多數場景的數據變換操做,提升開發效率。
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: