目錄html
需求瞭解:java
對於 Observable 發射的數據有的時候可能不知足咱們的要求,或者須要轉化爲其餘類型的數據,好比:緩存,數據類型轉化,數據攔截等。此時可使用 Rx 中的一些對於數據操做的操做進行數據的變換,方便咱們的開發。react
執行變換的操做方法:api
Cast:在發射以前強制將Observable發射的全部數據轉換爲指定類型緩存
按期收集Observable的數據放進一個數據包裹(緩存),而後發射這些數據包裹,而不是一次發射一個值。網絡
Buffer 操做符將一個Observable變換爲另外一個,原來的Observable正常發射數據,變換產生 的Observable發射這些數據的緩存集合。 Buffer 操做符在不少語言特定的實現中有不少種變 體,它們在如何緩存這個問題上存在區別。數據結構
Window 操做符與 Buffer 相似,可是它在發射以前把收集到的數據放進單獨的Observable, 而不是放進一個數據結構。併發
注意: 若是原來的Observable發射了一個 onError 通知, Buffer 會當即傳遞這個通知,而不是首先發射緩存的數據,即便在這以前緩存中包含了原始Observable發射的數據。app
在RxJava中的一些 Buffer 的操做以下:
ide
以列表(List)的形式發射非重疊的緩存,每個緩存至多包含來自原始 Observable 的 count
項數據(最後發射的列表數據可能少於count項)。
實例代碼:
// 1. buffer(count) // 以列表(List)的形式發射非重疊的緩存, // 每個緩存至多包含來自原始 Observable的count項數據(最後發射的列表數據可能少於count項) Observable.range(1, 10) .buffer(3) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(1) accept: " + t); } });
輸出:
--> bufferr(1) accept: [1, 2, 3] --> bufferr(1) accept: [4, 5, 6] --> bufferr(1) accept: [7, 8, 9] --> bufferr(1) accept: [10]
Javadoc: buffer(count)
開始建立一個List
收集原始 Observable 數據,監視一個名叫 boundary
的Observable,每當這個Observable發射了一個值,它就建立一個新的 List
開始收集來自原始Observable的數據併發射原來已經收集數據的 List
, 當 boundary
Observable 發送了完成通知,會將此時還未發送的 List 發送。
注意: 全部發送的 List 可能沒有收集到數據,此時數據的收集可能並不會完整收集全部原始 Observable 數據。
實例代碼:
// 2. buffer(boundary) 監視一個名叫boundary的Observable, // 開始建立一個List收集原始 Observable 數據,監視一個名叫boundary的Observable, // 每當這個Observable發射了一個值,它就建立一個新的List開始收集來自原始Observable的數據併發射原來已經收集數據的List, // 當boundary發送了完成通知,會將此時還未發送的 List 發送。 // 全部發送的 List 可能沒有收集到數據,此時數據的收集可能並不會完整收集全部原始Observable數據。 Observable.range(1, 10000) .buffer(Observable.timer(1, TimeUnit.MILLISECONDS)) // 1毫秒後開始接受原始數據 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(2): " + t.size()); // 每次收集的數據序列個數 } });
輸出:
--> accept(2): 2858 --> accept(2): 5471
Javadoc: buffer(boundary)
從原始Observable的第一項數據開始建立新的緩存,此後每當收 到 skip
項數據,用 count
項數據填充緩存:開頭的一項和後續的 count-1 項,它以列表 (List)的形式發射緩存,取決於 count 和 skip 的值,這些緩存可能會有重疊部分(好比skip < count時),也可能會有間隙(好比skip > count時)。
解析: 在指定的數據序列中移動指針
來獲取緩存數據:指針每次移動 skip
個數據長度,每次緩存指針位置及後面count
個數據,指針初始位置在原始數據的第一個(存在的狀況下)。
實例代碼:
// 3. buffer(int count, int skip) // 在指定的數據中移動指針來獲取緩存數據:指針每次移動1個數據長度,每次緩存3個數據 Observable.range(1, 5) .buffer(3, 1) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(3) accept: " + t); } });
輸出:
--> bufferr(3) accept: [1, 2, 3] --> bufferr(3) accept: [2, 3, 4] --> bufferr(3) accept: [3, 4, 5] --> bufferr(3) accept: [4, 5] --> bufferr(3) accept: [5]
Javadoc: buffer(count, skip)
按期以 List 的形式發射新的數據,在每一個時間段,收集來自原始 Observable 的數據(從前面一個數據包裹以後,或者若是是第一個數據包裹,從有觀察者訂閱原來的 Observale 以後開始)。還有另外一個版本的 buffer 接受一個 Scheduler 參數。
解析: 每隔 timespan
時間段以 List
的形式收集原始Observable的數據
實例代碼:
// 4. buffer(long timespan, TimeUnit unit) // 每隔timespan時間段以list的形式收集數據 Observable.range(1, 50000) .buffer(1, TimeUnit.MILLISECONDS) // 每隔1毫秒收集一次原始序列數據 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(4) accept: " + t.size()); // 每次收集的數據序列個數 } });
輸出:
--> bufferr(4) accept: 2571 --> bufferr(4) accept: 5457 --> bufferr(4) accept: 13248 --> bufferr(4) accept: 12755 --> bufferr(4) accept: 9543 --> bufferr(4) accept: 6426
注意:
buffer(timespan,TimeUnit)
默認狀況下會使用 computation 調度器
Javadoc: buffer(timespan,TimeUnit)
Javadoc: buffer(timespan,TimeUnit,Scheduler)
每當收到來自原始 Observable
的 count
項數據,或者每過了一段指定 timespan
時間後, 就以 List 的形式發射這期間的數據,即便數據項少於 count 項。還有另外一個版本的 buffer 接受一個 Scheduler
參數。
實例代碼:
// 5. buffer(long timespan, TimeUnit unit, int count) // 每隔1毫秒緩存50個數據 Observable.range(1, 1000) .buffer(1, TimeUnit.MILLISECONDS, 50) // 每隔1毫秒收集50個數據序列 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(5) accept: " + t.size()); // 每次收集的數據序列個數 } });
輸出:
--> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 20 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 4 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 26
注意:
buffer(timespan, TimeUnit, count)
默認狀況下會使用 computation 調度器
Javadoc: buffer(timespan, TimeUnit, count)
Javadoc: buffer(timespan, TimeUnit, scheduler, count)
在每個 timeskip
時期內都建立一個新的 List
,而後用原始 Observable 發射的每一項數據填充這個列表(在把這個 List 當作本身的數據發射前,從建立時開始,直到過了 timespan 這麼長的時間)。若是 timespan
長於 timeskip
,它發射的數據包將會重疊,所以可能包含重複的數據項。
解析: 在每隔 timeskip
時間段都建立一個新的 List ,每一個 List 都獨立收集 timespan
時間段原始Observable發射的數據。 所以在 timespan 長於 timeskip 時,它發射的數據包將會重疊,所以不一樣 List 中可能包含重複的數據項。 還有另外一個版本的 buffer 接受一個 Scheduler 參數。
實例代碼:
// 6. buffer(long timespan, long timeskip, TimeUnit unit) // 在每個timeskip時期內都建立一個新的 List, // 每一個List都獨立收集timespan時間段原始Observable發射的數據, // 若是 timespan 長於 timeskip,它發射的數據包將會重疊,所以不一樣List中可能包含重複的數據項 Observable.range(1, 50000) .buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread()) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(6): " + t.size()); // 每次收集的數據序列個數 } });
輸出:
--> accept(6): 1412 --> accept(6): 733 --> accept(6): 10431 --> accept(6): 694 --> accept(6): 18944 --> accept(6): 10710 --> accept(6): 944 --> accept(6): 6132
注意:
buffer(imespan, timeskip, TimeUnit)
默認狀況下會使用 computation 調度器。
Javadoc: buffer(imespan, timeskip, TimeUnit)
Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)
當它訂閱原來的Observable時,開始將數據收集到一個List
,而後它調用 bufferClosingSelector
生成第二個Observable
,當第二個Observable 發射一個TClosing
時,buffer 發射當前的 List
,而後重複
這個過程:開始組裝一個新的List,而後調用bufferClosingSelector建立一個新的Observable並監視它。
注意: 它會一直這樣作直到原來的Observable執行完成,能夠收集完整
的原始 Observable 的數據
實例代碼:
// 7. buffer(Callable<ObservableSource<T>> boundarySupplier) // 當它訂閱原來的Observable時,開始將數據收集到一個List,而後它調用 bufferClosingSelector 生成第二個Observable, // 當第二個Observable 發射一個 TClosing 時,buffer 發射當前的 List , // 而後重複這個過程:開始組裝一個新的List,而後調用bufferClosingSelector建立一個新的Observable並監視它。 // 它會一直這樣作直到原來的Observable執行完成。會收集完整的原始 Observable 的數據 Observable.range(1, 50000) .buffer(new Callable<Observable<Long>>() { @Override public Observable<Long> call() throws Exception { return Observable.timer(1, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(7): " + t.size()); // 每次收集的數據序列個數 } });
輸出:
--> accept(7): 14650 --> accept(7): 9708 --> accept(7): 25642
Javadoc: buffer(bufferClosingSelector)
對Observable發射的每一項數據應用一個函數,執行變換操做。
實例代碼:
// map(Function<T,R)) // 接受原始Observable的數據,發送處理後的數據 Observable.range(1, 5) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer t) throws Exception { System.out.println("--> apply: " + t); return t*t; // 計算原始數據的平方 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept Map: " + t); } });
輸出:
--> apply: 1 --> accept Map: 1 --> apply: 2 --> accept Map: 4 --> apply: 3 --> accept Map: 9 --> apply: 4 --> accept Map: 16 --> apply: 5 --> accept Map: 25
Javadoc: map(mapper)
主要對原始數據進行轉換操做後發送至訂閱者。
RxJava2 中的一些 FlatMap 操做方法以下:
FlatMap 將一個發射數據的 Observable 變換爲 多個 Observables,而後將它們發射的數據合併後放進一個單獨的 Observable。
FlatMap 操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後 FlatMap
合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射。
這個方法是頗有用的,例如,當你有一個這樣的Observable:它發射一個數據序列,這些數據自己包含Observable成員或者能夠變換爲Observable,所以你能夠建立一個新的 Observable發射這些次級Observable發射的數據的完整集合。
注意: FlatMap 對這些Observables發射的數據作的是合併(merge)操做,所以它們多是交錯的。
在許多語言特定的實現中,還有一個操做符不會讓變換後的Observables發射的數據交錯,它按照嚴格的順序發射這些數據,這個操做符一般被叫做ConcatMap
或者相似的名字。
實例代碼:
// 1. flatMap(Function) // 對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable, // 而後FlatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); // 原始數據 return Observable.range(1, t).subscribeOn(Schedulers.newThread()); // 處理後數據 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(1): " + t); // 接受的全部數據 } });
輸出:
--> apply(1): 1 --> apply(1): 2 --> apply(1): 3 --> apply(1): 4 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> apply(1): 5 --> accept flatMap(1): 1 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3 --> accept flatMap(1): 4 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3 --> accept flatMap(1): 4 --> accept flatMap(1): 5 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3
Javadoc: flatMap(mapper)
maxConcurrency
這個參數設置 flatMap 從原來的Observable映射Observables的最大同時訂閱數。當達到這個限制時,它會等待其中一個終止而後再訂閱另外一個。
實例代碼:
// 2. flatMap(Function, maxConcurrency) // maxConcurrency 這個參數設置 flatMap 從原來的Observable映射Observables的最大同時訂閱數。 // 當達到這個限制時,它會等待其中一個終止而後再訂閱另外一個 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, t).subscribeOn(Schedulers.newThread()); } // 指定最大訂閱數爲1,此時等待上一個訂閱的Observable結束,在進行下一個Observable訂閱 }, 1).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(2): "+ t); } });
輸出:
--> apply(2): 1 --> apply(2): 2 --> apply(2): 3 --> apply(2): 4 --> apply(2): 5 --> accept flatMap(2): 1 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 4 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 4 --> accept flatMap(2): 5
Javadoc: flatMap(mapper, maxConcurrency)
delayError
這個參數指定是否延遲發生 Error
的Observable通知。還有一個能夠指定最大訂閱數參數 maxConcurrency
的變體。
解析: 當值爲 true
時延遲發生Error
的這個訂閱的Observable通知,不中斷當前的訂閱操做,繼續下一個Observable的訂閱,在全部訂閱的Observable所有結束後發送 Error 這個Observable的通知,當值爲 false
時則中斷全部訂閱的操做,併發送 Error
的通知。
實例代碼:
// 3. flatMap(Function, delayErrors) // delayErrors 這個參數指定是否延遲發生Error的Observable通知 // 當true 時延遲發生Error的這個訂閱的Observable通知,不中斷當前的訂閱操做, // 繼續下一個Observable的訂閱,在全部訂閱的Observable所有結束後發送Error這個Observable的通知 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(3): " + t); return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { if( t == 3) { throw new NullPointerException("delayErrors test!"); // 測試 Error } for (int i = 1; i <= t; i++) { emitter.onNext(i); } emitter.onComplete(); } }); } // 設置延遲 Error 通知到最後 }, true).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(3): "+ t); } },new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("--> acceot Error(3): " + t); } });
輸出:
--> apply(3): 1 --> accept flatMap(3): 1 --> apply(3): 2 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> apply(3): 3 --> apply(3): 4 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> accept flatMap(3): 3 --> accept flatMap(3): 4 --> apply(3): 5 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> accept flatMap(3): 3 --> accept flatMap(3): 4 --> accept flatMap(3): 5 --> acceot Error(3): java.lang.NullPointerException: delayErrors test!
Javadoc: flatMap(Function, delayErrors)
Javadoc: flatMap(Function, delayErrors, maxConcurrency)
flatMapIterable 這個變體成對的打包數據,而後生成 Iterable
而不是原始數據和生成的 Observables,可是處理方式是相同的。
解析: 對數據進行處理轉換成 Iterable
來發射數據。
實例代碼:
// 4. flatMapIterable(Function(T,R)) // 對數據進行處理轉換成Iterable來發射數據 Observable.range(1, 5) .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() { @Override public Iterable<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply: " + t); ArrayList<Integer> list = new ArrayList<Integer>(); list.add(888); list.add(999); return list; // 將原始數據轉換爲兩個數字發送 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMapIterable(4): " + t); } });
輸出:
--> apply: 1 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 2 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 3 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 4 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 5 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999
Javadoc: flatMapIterable(mapper)
參數 mapper
接收原始數據,resultSelector
同時接收原始數據和 mapper
處理的數據,進行二次數據轉換。
實例代碼:
// 5. flatMapIterable(Function(T,R),Function(T,T,R)) // 第一個func接受原始數據,轉換數據,第二個func同時接受原始和處理的數據,進行二次轉換處理 Observable.range(1, 3) .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() { @Override public Iterable<? extends Integer> apply(Integer t) throws Exception { ArrayList<Integer> list = new ArrayList<Integer>(); list.add(888); list.add(999); return list; // 將原始數據轉換爲兩個數字發送 } }, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) throws Exception { System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2); return t1 + t2; // 將原始數據和處理過的數據組合進行二次處理髮送 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMapIterable(5): " + t); } });
輸出:
--> apply(5): t1 = 1, t2 = 888 --> accept flatMapIterable(5): 889 --> apply(5): t1 = 1, t2 = 999 --> accept flatMapIterable(5): 1000 --> apply(5): t1 = 2, t2 = 888 --> accept flatMapIterable(5): 890 --> apply(5): t1 = 2, t2 = 999 --> accept flatMapIterable(5): 1001 --> apply(5): t1 = 3, t2 = 888 --> accept flatMapIterable(5): 891 --> apply(5): t1 = 3, t2 = 999 --> accept flatMapIterable(5): 1002
concatMap
操做符的功能和 flatMap
是很是類似的,只是有一點,concatMap 最終輸出的數據序列和原數據序列是一致,它是按順序連接Observables,而不是合併(flatMap用的是合併)。
經過 mapper
處理原數據後,轉換成 Observables ,按照順序進行鏈接 Observables 發送數據。
解析: concatMap
和flatMap
的功能是同樣的, 將一個發射數據的Observable變換爲多個Observables,而後將它們發射的數據放進一個單獨的Observable。只不過最後合併ObservablesflatMap採用的merge
,而concatMap採用的是鏈接(concat
)。區別:concatMap是有序的,flatMap是無序的,concatMap最終輸出的順序與原序列保持一致,而flatMap則不必定,有可能出現交錯。
實例代碼:
// 1. concatMap(Function(T,R)) // 按照順序依次處理原始數據和處理的數據 Observable.range(1, 3) .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); return Observable.range(1, t).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept(1): Observable on Subscribe"); // 當前的Observable被訂閱 } }); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept concatMap(1): " + t); } }); System.out.println("--------------------------------------------"); // 2. concatMap(mapper, prefetch) // prefetch 參數是在處理後的Observables發射的數據流中預讀數據個數,不影響原數據的發射和接收順序 Observable.range(1, 3) .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, 3).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept(2): Observable on Subscribe"); // 當前的Observable被訂閱 } }); } }, 2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept concatMap(2): " + t); } });
輸出:
--> apply(1): 1 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> apply(1): 2 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> accept concatMap(1): 2 --> apply(1): 3 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> accept concatMap(1): 2 --> accept concatMap(1): 3 -------------------------------------------- --> apply(2): 1 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3 --> apply(2): 2 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3 --> apply(2): 3 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3
Javadoc: concatMap(mapper)
Javadoc: concatMap(mapper, refetch)
有選擇的訂閱 Observable
,當原始 Observable 發射一個數據,經過 witchMap
返回一個 Observable,
當原始Observable發射一個新的數據時,它將取消訂閱並中止監視產生執以前的Observable,開始監視當前新的Observable。
解析: 若是上一個任務還沒有完成時,就開始下一個任務的話,上一個任務就會被取消掉。若是全部任務都是在同一個線程裏執行的話,此時這個操做符與 ContactMap 一致,都是依次順序執行。只有在不一樣的線程裏執行的時候,即線程方案爲newThread的時候,纔會出現這種狀況,經常使用於網絡請求
中。
實例代碼:
// 1. witchMap(Function(T,R)) // 同一個線程執行 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); return Observable.range(1, 3); // 每一個任務指定在同一個線程執行 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(1): " + t); } }); System.out.println("---------------------------------------"); // 2. witchMap(Function(T,R)) // 不一樣線程執行 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, 3) .subscribeOn(Schedulers.newThread()); // 每一個任務指定在子線程執行 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(2): " + t); } }); System.out.println("---------------------------------------"); // 3. switchMap(mapper, bufferSize) // bufferSize 參數是從當前活動的Observable中預讀數據的大小 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(3): " + t); return Observable.range(1, 5).subscribeOn(Schedulers.newThread()); } }, 3).subscribe(new Consumer<Integer>() { // 指定緩存大小爲3 @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(3): " + t); } });
輸出:
--> apply(1): 1 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --> apply(1): 2 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --> apply(1): 3 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --------------------------------------- --> apply(2): 1 --> apply(2): 2 --> apply(2): 3 --> accept switchMap(2): 1 --> accept switchMap(2): 2 --> accept switchMap(2): 3 --------------------------------------- --> apply(3): 1 --> apply(3): 2 --> apply(3): 3 --> accept switchMap(3): 1 --> accept switchMap(3): 2 --> accept switchMap(3): 3 --> accept switchMap(3): 4 --> accept switchMap(3): 5
Javadoc: switchMap(mapper)
Javadoc: switchMap(mapper, bufferSize)
後續的Rx相關數據變換部分請參考: Rxjava2 Observable的數據變換詳解及實例(二)
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例