RxJava【變換】操做符 map flatMap concatMap buffer MD

Markdown版本筆記 個人GitHub首頁 個人博客 個人微信 個人郵箱
MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina.com

RxJava【變換】操做符 map flatMap concatMap buffer MD
demo地址
參考 html


目錄

變換操做符

經常使用的變換操做符 git

  • map、cast:【數據類型轉換】將Observable發送的事件轉換爲另外一種類型的事件
  • flatMap、concatMap、switchMap、flatMapIterable:扁平化
    • flatMap:【化解循環嵌套和接口嵌套】將Observable發送的事件序列進行拆分 & 轉換 後合併成一個新的事件序列,最後再進行發送
    • concatMap:【有序】與 flatMap 的 區別在於,拆分 & 從新合併生成的事件序列 的順序與被觀察者舊序列生產的順序一致
    • switchMap:當原始Observable發射一個新的數據時,它將取消訂閱並中止監視產生執以前那個數據的Observable,只監視當前這一個
    • flatMapIterable:至關於對 flatMap 的數據進行了二次扁平化
  • buffer:【打包】按期從Observable發送的事件中獲取必定數量的事件並放到緩存區中,而後把這些數據集合打包發射
  • scan:【連續】對Observable發射的每一項數據應用一個函數,而後按順序依次發射每個值
  • groupBy:【分組】將一個Observable分拆爲一些Observables集合,它們中的每個發射原始Observable的一個子序列
  • window:按期未來自Observable的數據分拆成一些Observable窗口,而後發射這些窗口,而不是每次發射一項

map cast

Map操做符對Observable發射的每一項數據應用一個函數,執行變換操做 github

Map操做符對原始Observable發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。 緩存

Observable.just(new Date()) // Date 類型 
      .map(Date::getTime) // long 類型 
      .map(time -> time + 1000 * 60 * 60)// 改變 long 類型時間的值 
      .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 類型 
      .subscribe(this::log);

cast操做符將原始Observable發射的每一項數據都強制轉換爲一個指定的類型,而後再發射數據,它是map的一個特殊版本。微信

Observable.just(28)
    .cast(Number.class)
    .subscribe(number -> log(number.getClass().getSimpleName())); //Integer

flatMap concatMap switchMap flatMapIterable

flatMap 將一個發射數據的Observable變換爲多個Observables,而後將它們發射的數據合併後放進一個單獨的Observable 網絡

flatMap 操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後FlatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射。 數據結構

注意:flatMap 對這些Observables發射的數據作的是合併(merge)操做,所以它們多是交錯的。 併發

flatMap 有不少個重載方法。 app

flatMap(Function mapper) 
flatMap(Function mapper, boolean delayErrors) 
flatMap(Function mapper, boolean delayErrors, int maxConcurrency) 
flatMap(Function mapper, boolean delayErrors, int maxConcurrency, int bufferSize) 
flatMap(Function onNextMapper, Function onErrorMapper, Callable onCompleteSupplier) 
flatMap(Function onNextMapper, Function onErrorMapper, Callable onCompleteSupplier, int maxConcurrency) 
flatMap(Function mapper, int maxConcurrency) 
flatMap(Function  mapper, BiFunction resultSelector) 
flatMap(Function mapper, BiFunction combiner, boolean delayErrors) 
flatMap(Function mapper, BiFunction combiner, boolean delayErrors, int maxConcurrency) 
flatMap(Function mapper, BiFunction combiner, boolean delayErrors, int maxConcurrency, int bufferSize) 
flatMap(Function mapper, BiFunction combiner, int maxConcurrency)

使用 flatMap 化解循環嵌套

Observable.just(new Person(Arrays.asList("籃球", "足球", "排球")), new Person(Arrays.asList("畫畫", "跳舞"))) 
      .map(person -> person.loves) 
      .flatMap(Observable::fromIterable) //fromIterable:逐個發送集合中的元素 
      .subscribe(this::log);
籃球,22:56:43 009,true 
足球,22:56:43 010,true 
排球,22:56:43 010,true 
畫畫,22:56:43 011,true 
跳舞,22:56:43 012,true

注意:若是任何一個經過這個 flatMap 操做產生的單獨的 Observable 調用 onError 異常終止了,這個 Observable 自身會當即調用 onError 並終止。例如:函數

Observable.just(new Person(Arrays.asList("籃球", null, "排球")), new Person(Arrays.asList("畫畫", "跳舞"))) ...
籃球,00:20:14 762,true 
onError:The iterator returned a null value,00:20:14 767,true

concatMap

concatMap 操做符的功能和 flatMap 很是類似,只不過通過 flatMap 操做變換後,最後輸出的序列有多是交錯的(flatMap最後合併結果採用的是 merge 操做符),而 concatMap 最終輸出的數據序列和原數據序列是一致的。

flatMap:

long start = System.currentTimeMillis(); 
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)) 
      .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是無序的 
      .subscribe((i -> log("f:" + i)), e -> log("f"), () -> log("f耗時" + (System.currentTimeMillis() - start))); //3秒
f:4,23:21:07 944,false   //flatMap後,訂閱者首先接收到的事件是【4】而不是【1】 
f:5,23:21:07 945,false 
f:1,23:21:08 942,false 
f:2,23:21:08 943,false 
f:3,23:21:08 943,false 
f耗時3025,23:21:08 945,false    //flatMap耗時3秒

concatMap:

Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)) 
      .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的 
      .subscribe(i -> log("c:" + i), e -> log("c"), () -> log("c耗時" + (System.currentTimeMillis() - start))); //5秒
c:1,14:27:27 450,false   //concatMap後,訂閱者首先接收到的事件是【1】 
c:2,14:27:27 451,false 
c:3,14:27:27 451,false 
c:4,14:27:29 454,false 
c:5,14:27:29 455,false 
c耗時5021,14:27:29 455,false    //concatMap耗時5秒

switchMap

switchMap 和 flatMap 很像,除了一點:當源Observable發射一個新的數據項時,若是舊數據項訂閱還未完成,就取消舊訂閱數據和中止監視那個數據項產生的Observable,開始監視新的數據項。

It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring鏡像 the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

若是是在同一線程產生數據,由於當第二個數據項來臨時,第一個已經完成了,此時其和 concatMap 是徹底一致的。
若是是併發產生數據項,當第二個數據項來臨時,若是前一個任務還沒有執行結束,就會被後一個任務給取消。

Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)) 
     .switchMap(Observable::fromIterable) 
     .subscribeOn(Schedulers.newThread()) //與這裏的線程無關 
     .subscribe(i -> log("s:" + i)); //1, 2, 3,4, 5 

Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)) 
     .switchMap(list -> Observable.fromIterable(list).subscribeOn(Schedulers.newThread()))  //只與這裏的線程有關 
     .subscribeOn(AndroidSchedulers.mainThread()) //與這裏的線程無關 
     .observeOn(AndroidSchedulers.mainThread()) //與這裏的線程無關 
     .subscribe(i -> log("s:" + i)); //4, 5 

Observable.range(1, 8) 
     .switchMap(i -> Observable.just(i).subscribeOn(Schedulers.newThread()))  //只與這裏的線程有關 
     .subscribe(i -> log("s:" + i)); //8
s:1,15:05:23 537,false 
s:2,15:05:23 537,false 
s:3,15:05:23 538,false 
s:4,15:05:23 538,false 
s:5,15:05:23 539,false 

s:4,15:05:21 013,true 
s:5,15:05:21 014,true 

s:8,15:05:22 669,false

使用 flatMap 化解接口嵌套

能夠利用 flatMap 操做符實現網絡請求依次依賴,即:第一個接口的返回值包含第二個接口請求須要用到的數據。

首先是兩個請求網絡的操做:

private Observable<String> firstRequest(String parameter) { 
   return Observable.create(emitter -> { 
      SystemClock.sleep(2000);//模擬網絡請求 
      emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis()))); 
      emitter.onComplete(); 
   }); 
} 
private Observable<String> secondRequest(String parameter) { 
   return Observable.create(emitter -> { 
      SystemClock.sleep(3000);//模擬網絡請求 
      emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis()))); 
      emitter.onComplete(); 
   }); 
}

而後能夠經過 flatMap 將二者串聯起來:

firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis()))) 
      .subscribeOn(Schedulers.io()) // 在io線程進行網絡請求 
      .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理請求結果 
      .doOnNext(response -> log("【第一個網絡請求結束,響應爲】" + response))//true 
      .observeOn(Schedulers.io()) // 回到 io 線程去處理下一個網絡請求 
      .flatMap(this::secondRequest)//實現多個網絡請求依次依賴 
      .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理請求結果 
      .subscribe(string -> log("【第二個網絡請求結束,響應爲】" + string));//true,5 秒

打印結果爲:

【第一個網絡請求結束,響應爲】原始值:23:58:11 220,第一次修改:23:58:13 245,true
【第二個網絡請求結束,響應爲】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true

簡化形式的Demo代碼爲:

Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一個網絡請求,返回姓名 
      .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二個網絡請求,返回性別 
      .flatMap(s -> Observable.just(s + ",28歲").delay(1000, TimeUnit.MILLISECONDS)) //第三個網絡請求,返回年齡 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(this::log); //包青天,男,28歲,耗時:3058毫秒,true

固然這種狀況下使用 concatMap 的效果也是徹底同樣的,然而由於 concatMap 的核心是用來保證在合併時"有序"的,而這兩種狀況根本就沒涉及到合併,因此這些狀況下使用 concatMap是沒有任何意義的。

flatMapIterable

flatMapIterable這個變體成對的打包數據,而後生成Iterable而不是原始數據和生成的Observables,可是處理方式是相同的。

flatMapIterable 返回一個Observable,它將源ObservableSource發出的每一個項目與 the values in an Iterable corresponding to that item that is generated by a selector 合併[merges]。

public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper)

參數 mapper:一個在源 ObservableSource 發出指定項時,返回一個 Iterable 值序列的函數。

案例1:

Observable.just(Arrays.asList("籃球1", "足球1")) 
      .flatMap(Observable::fromIterable) //返回一個 Observable 
      .subscribe(string -> log("" + string)); 

Observable.just(Arrays.asList("籃球2", "足球2")) 
      .flatMapIterable(list -> list) //返回一個 Iterable 而不是另外一個 Observable 
      .subscribe(string -> log("" + string)); 

Observable.fromIterable(Arrays.asList("籃球3", "足球3")) //和上面兩種方式的結果同樣 
      .subscribe(string -> log("" + string));
籃球1,01:00:39 493,true 
足球1,01:00:39 494,true 
籃球2,01:00:39 496,true 
足球2,01:00:39 496,true 
籃球3,01:00:39 499,true 
足球3,01:00:39 499,true

案例2:

Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾濤", "你好"))) 
      .map(person -> person.loves) 
      .flatMap(Observable::fromIterable) //返回一個 Observable 
      .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一個 Observable 
      .subscribe(array -> log(Arrays.toString(array))); 

Observable.just(new Person(Arrays.asList("廣州", "上海")), new Person(Arrays.asList("武漢", "長沙"))) 
      .map(person -> person.loves) 
      .flatMap(Observable::fromIterable) //返回一個 Observable 
      .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一個 Iterable 而不是另外一個 Observable 
      .subscribe(array -> log(Arrays.toString(array))); 

Observable.just(new Person(Arrays.asList("你妹", "泥煤")), new Person(Arrays.asList("你美", "你沒"))) 
      .map(person -> person.loves) 
      .flatMapIterable(list -> { 
               List<char[]> charList = new ArrayList<>(); 
               for (String string : list) { 
                  charList.add(string.toCharArray()); 
               } 
               return charList; //返回一個 Iterable 而不是另外一個 Observable 
            } 
      ).subscribe(array -> log(Arrays.toString(array)));
[包, 青, 天],21:48:37 917,true 
[哈, 哈],21:48:37 919,true 
[白, 乾, 濤],21:48:37 921,true 
[你, 好],21:48:37 921,true 

[廣, 州],21:48:37 925,true 
[上, 海],21:48:37 926,true 
[武, 漢],21:48:37 926,true 
[長, 沙],21:48:37 927,true 

[你, 妹],21:48:37 929,true 
[泥, 煤],21:48:37 929,true 
[你, 美],21:48:37 930,true 
[你, 沒],21:48:37 931,true

buffer

按期收集Observable的數據放進一個數據包裹,而後發射這些數據包裹,而不是一次發射一個值。
Buffer操做符將一個Observable變換爲另外一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。

注意:若是原來的Observable發射了一個onError通知,Buffer會當即傳遞這個通知,而不是首先發射緩存的數據,即便在這以前緩存中包含了原始Observable發射的數據[without first emitting the buffer it is in the process of assembling]。

Window操做符與Buffer相似,可是它在發射以前把收集到的數據放進單獨的Observable,而不是放進一個數據結構。

buffer
在RxJava中有許多Buffer的變體:

buffer(ObservableSource openingIndicator, Function closingIndicator) 
buffer(ObservableSource openingIndicator, Function closingIndicator, Callable bufferSupplier)

buffer(count)

buffer(count)以列表(List)的形式發射非重疊的緩存,每個緩存至多包含來自原始Observable的count項數據(最後發射的列表數據可能少於count項)。

每接收到 count 個數據包裹,將這 count 個包裹打包,發送給訂閱者

一次訂閱2個:

Observable.range(1, 5) 
    .buffer(2) //緩存區大小,步長==緩存區大小,等價於buffer(count, count) 
    .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成

一次所有訂閱(將全部元素組裝到集合中的效果):

Observable.range(1, 10)
    .buffer(10)
    .subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

buffer(count, skip)

buffer(count, skip)從原始Observable的第一項數據開始建立新的緩存,此後每當收到skip項數據,用count項數據填充緩存:開頭的一項和後續的count-1項,它以列表(List)的形式發射緩存,取決於count和skip的值,這些緩存可能會有重疊部分(好比skip < count時),也可能會有間隙(好比skip > count時)。

生成的ObservableSource每隔 skip 項就會 emits buffers,每一個 buffers 都包含 count 個 items。

隊列效果(先進先出):

Observable.range(1, 5) 
      .buffer(3, 1) // 緩存區大小,步長(每次獲取新事件的數量) 
      .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]

每次剔除一個效果:

Observable.range(1, 5).buffer(5, 1) 
    .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]

只取奇數個效果:

Observable.range(1, 5)
    .buffer(1, 2)
    .subscribe(list -> log(list.toString()));//[1],[3],[5]

buffer(timespan, unit)

buffer(timespan, unit)按期以List的形式發射新的數據,每一個時間段,收集來自原始Observable的數據(從前面一個數據包裹以後,或者若是是第一個數據包裹,從有觀察者訂閱原來的Observale以後開始)。

持續收集直到指定的每隔時間後,而後發射一次並清空緩存區。

週期性訂閱多個結果:

Observable.intervalRange(0, 8, 100, 100, TimeUnit.MILLISECONDS) //從0開始發射8個 
    .buffer(250, TimeUnit.MICROSECONDS) //等價於 count = Integer.MAX_VALUE 
    .subscribe(list -> log("緩存區中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]

buffer(timespan, unit, count)

每當收到來自原始Observable的count項數據,或者每過了一段指定的時間後,buffer(timespan, unit, count)就以List的形式發射這期間的數據,即便數據項少於count項。

當達到指定時間【或】緩衝區中達到指定數量時發射

Observable.intervalRange(0, 8, 100, 100, TimeUnit.MILLISECONDS) //從0開始發射8個 
    .buffer(250, TimeUnit.MICROSECONDS, 2) //能夠指定工做所在的線程 
    .subscribe(list -> log("緩存區中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]

scan

連續地對數據序列的每一項應用一個函數,而後連續發射結果
scan

Scan操做符對原始Observable發射的第一項數據應用一個函數,而後將那個函數的結果做爲本身的第一項數據發射。它將函數的結果同第二項數據一塊兒填充給這個函數來產生它本身的第二項數據。它持續進行這個過程來產生剩餘的數據序列。這個操做符在某些狀況下被叫作accumulator。

Observable.range(1, 10) 
      .scan((i1, i2) -> i1 + i2) 
      .subscribe(sum -> log("" + sum)); //1,3,6,10,15,21... 
Observable.just("包青天", "你好", "我是泥煤") 
      .scan((s1, s2) -> s1 + "," + s2) 
      .subscribe(s -> log("值爲:" + s)); //包青天,你好,我是泥煤

groupBy

GroupBy操做符將原始Observable分拆爲一些Observables集合,它們中的每個發射原始Observable數據序列的一個子序列。
哪一個數據項由哪個Observable發射是由一個函數斷定的,這個函數給每一項指定一個Key,Key相同的數據會被同一個Observable發射。

注意:groupBy將原始Observable分解爲一個發射多個GroupedObservable的Observable,一旦有訂閱,每一個GroupedObservable就開始緩存數據。所以,若是你忽略這些GroupedObservable中的任何一個,這個緩存可能造成一個潛在的內存泄露。所以,若是你不想觀察,也不要忽略GroupedObservable。你應該使用像take(0)這樣會丟棄本身的緩存的操做符。

若是你取消訂閱一個GroupedObservable,那個Observable將會終止。若是以後原始的Observable又發射了一個與這個Observable的Key匹配的數據,groupBy將會爲這個Key建立一個新的GroupedObservable。

有一個版本的groupBy容許你傳遞一個變換函數,這樣它能夠在發射結果GroupedObservable以前改變數據項。

groupBy(Function keySelector) 
groupBy(Function keySelector, boolean delayError) 
groupBy(Function keySelector, Function valueSelector) 
groupBy(Function keySelector, Function valueSelector, boolean delayError) 
groupBy(Function keySelector, Function valueSelector, boolean delayError, int bufferSize)

案例:

Observable.range(1, 5) 
      .groupBy(i -> "包青天" + i % 2) //返回值決定組名 
      .subscribe(groupedObservable -> 
            groupedObservable.subscribe(i -> log("組名爲:" + groupedObservable.getKey() + ",值爲:" + i)));
組名爲:包青天1,值爲:1,16:34:47 561,true 
組名爲:包青天0,值爲:2,16:34:47 562,true 
組名爲:包青天1,值爲:3,16:34:47 564,true 
組名爲:包青天0,值爲:4,16:34:47 565,true 
組名爲:包青天1,值爲:5,16:34:47 566,true

window

按期未來自原始Observable的數據分解爲一個Observable窗口,發射這些窗口,而不是每次發射一項數據

Window和Buffer相似,但不是發射來自原始Observable的數據包,它發射的是Observables,這些Observables中的每個都發射原始Observable數據的一個子集,最後發射一個onCompleted通知。

和Buffer同樣,Window有不少變體,每一種都以本身的方式將原始Observable分解爲多個做爲結果的Observable,每個都包含一個映射原始數據的window。用Window操做符的術語描述就是,當一個窗口打開(when a window "opens")意味着一個新的Observable已經發射了,並且這個Observable開始發射來自原始Observable的數據;當一個窗口關閉(when a window "closes")意味着發射的Observable中止發射原始Observable的數據,而且發射終止通知onCompleted給它的觀察者們。

若是從原始Observable收到了onError或onCompleted通知它也會關閉當前窗口。

和Buffer同樣,Window有不少變體:
window

window(ObservableSource openingIndicator, Function closingIndicator) 
window(ObservableSource openingIndicator, Function closingIndicator, int bufferSize)

如下案例中均用到如下代碼:

private Consumer<Observable<Integer>> consumer = observable -> { 
   SystemClock.sleep(100); 
   String name = new SimpleDateFormat("SSS", Locale.getDefault()).format(new Date()); 
   log("打開了一個新的窗口 " + name); //每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口 
   observable.subscribe(i -> { 
      SystemClock.sleep(100); 
      log("窗口 " + name + " 發射了數據:" + i); 
      SystemClock.sleep(100); 
   }, e -> log("窗口 " + name + " 異常了"), () -> log("窗口 " + name + " 關閉了")); 
};

window(count)

這個window的變體當即打開它的第一個窗口。每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據是一一對應的。

Observable.range(1, 5) 
      .window(2) 
      .subscribe(consumer);
打開了一個新的窗口 418 
窗口 418 發射了數據:1 
窗口 418 發射了數據:2 
窗口 418 關閉了 

打開了一個新的窗口 926 
窗口 926 發射了數據:3 
窗口 926 發射了數據:4 
窗口 926 關閉了 

打開了一個新的窗口 435 
窗口 435 發射了數據:5 
窗口 435 關閉了

window(count, skip)

這個window的變體當即打開它的第一個窗口。原始Observable每發射skip項數據它就打開一個新窗口(例如,若是skip等於3,每到第三項數據,它會打開一個新窗口)。每當當前窗口發射了count項數據,它就關閉當前窗口並打開一個新窗口。

若是skip > count,在兩個窗口之間會有skip - count項數據被丟棄。

Observable.range(20, 5)
    .window(1, 2)
    .subscribe(consumer);  //每發射2項就打開一個新窗口,每當當前窗口發射了1項就關閉當前窗口並打開一個新窗口
打開了一個新的窗口 714 
窗口 714 發射了數據:1 
窗口 714 關閉了 
打開了一個新的窗口 020 
窗口 020 發射了數據:3 
窗口 020 關閉了 
打開了一個新的窗口 326 
窗口 326 發射了數據:5 
窗口 326 關閉了

若是skip=count,它的行爲與window(count)相同

Observable.range(20, 5)
    .window(2, 2)
    .subscribe(consumer);  //每發射2項就打開一個新窗口,每當當前窗口發射了2項就關閉當前窗口並打開一個新窗口 `
打開了一個新的窗口 867 
窗口 867 發射了數據:10 
窗口 867 發射了數據:11 
窗口 867 關閉了 

打開了一個新的窗口 376 
窗口 376 發射了數據:12 
窗口 376 發射了數據:13 
窗口 376 關閉了 

打開了一個新的窗口 885 
窗口 885 發射了數據:14 
窗口 885 關閉了

若是skip < count,窗口可會有count - skip 個重疊的數據;

Observable.range(20, 5)
    .window(2, 1)
    .subscribe(consumer);  //每發射1項就打開一個新窗口,每當當前窗口發射了2項就關閉當前窗口並打開一個新窗口 `
打開了一個新的窗口 005 
窗口 005 發射了數據:20 
打開了一個新的窗口 310 
窗口 005 發射了數據:21 
窗口 310 發射了數據:21 
窗口 005 關閉了 
打開了一個新的窗口 818 
窗口 310 發射了數據:22 
窗口 818 發射了數據:22 
窗口 310 關閉了 
打開了一個新的窗口 327 
窗口 818 發射了數據:23 
窗口 327 發射了數據:23 
窗口 818 關閉了 
打開了一個新的窗口 836 
窗口 327 發射了數據:24 
窗口 836 發射了數據:24 
窗口 327 關閉了 
窗口 836 關閉了

window(timespan, unit)

這個window的變體當即打開它的第一個窗口。每當過了timespan這麼長的時間它就關閉當前窗口並打開一個新窗口。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是一一對應的。

Observable.range(1, 6)
    .window(500, TimeUnit.MILLISECONDS)
    .subscribe(consumer); `
打開了一個新的窗口 195 
窗口 195 發射了數據:1 
窗口 195 發射了數據:2 
窗口 195 發射了數據:3 
窗口 195 關閉了 

打開了一個新的窗口 908 
窗口 908 發射了數據:4 
窗口 908 發射了數據:5 
窗口 908 關閉了 

打開了一個新的窗口 416 
窗口 416 發射了數據:6 
窗口 416 關閉了

window(timespan, unit, count)

這個window的變體當即打開它的第一個窗口。這個變體是window(count)和window(timespan, unit)的結合,每當過了timespan的時長或者當前窗口收到了count項數據,它就關閉當前窗口並打開另外一個。
這種window變體發射一系列不重疊的窗口,這些窗口的數據集合與原始Observable發射的數據也是一一對應的。

Observable.range(10, 4)
    .window(1000, TimeUnit.MILLISECONDS, 2)
    .subscribe(consumer);
打開了一個新的窗口 725 
窗口 725 發射了數據:10 
窗口 725 發射了數據:11 
窗口 725 關閉了 

打開了一個新的窗口 233 
窗口 233 發射了數據:12 
窗口 233 發射了數據:13 
窗口 233 關閉了 

打開了一個新的窗口 250 
窗口 250 關閉了

案例2:

Observable.range(10, 3)
    .window(200, TimeUnit.MILLISECONDS, 2)
    .subscribe(consumer); `
打開了一個新的窗口 696 
窗口 696 發射了數據:10 
窗口 696 關閉了 

打開了一個新的窗口 001 
窗口 001 發射了數據:11 
窗口 001 關閉了 

打開了一個新的窗口 306 
窗口 306 關閉了 

打開了一個新的窗口 408 
窗口 408 發射了數據:12 
窗口 408 關閉了 

打開了一個新的窗口 712 
窗口 712 關閉了 

打開了一個新的窗口 815 
窗口 815 關閉了

2018-9-18

相關文章
相關標籤/搜索