Observable
) & 合併須要發送的事件
3.1 concat() / concatArray()數據結構
做用:組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行app
區別:組合被觀察者的數量,即concat()
組合被觀察者數量≤4個,而concatArray()
則可>4個ide
public static void concat() { Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:函數
08-06 16:28:21.348 22182 22182 D Operation: onSubscribe
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 1
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 2
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 3
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 4
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 5
08-06 16:28:21.349 22182 22182 D Operation: onComplete
3.2 merge() / mergeArray()----- 見rxdocs.pdf第139頁spa
做用:組合多個被觀察者一塊兒發送數據,合併後 按時間線並行執行3d
區別:組合被觀察者的數量,即merge()
組合被觀察者數量≤4個,而mergeArray()
則可>4個code
區別上述concat()
操做符:一樣是組合多個被觀察者一塊兒發送數據,但concat()
操做符合並後是按發送順序串行執行server
public static void merge() { Observable.merge(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:blog
08-06 16:42:42.503 23013 23013 D Operation: onSubscribe
08-06 16:42:42.505 23013 23037 D Operation: onNext: value = 0
08-06 16:42:43.505 23013 23038 D Operation: onNext: value = 6
08-06 16:42:44.506 23013 23037 D Operation: onNext: value = 1
08-06 16:42:45.506 23013 23038 D Operation: onNext: value = 7
08-06 16:42:46.505 23013 23037 D Operation: onNext: value = 2
08-06 16:42:47.506 23013 23038 D Operation: onNext: value = 8
08-06 16:42:47.509 23013 23038 D Operation: onComplete
3.3 concatDelayError() / mergeDelayError()----- 見rxdocs.pdf第140頁事件
做用:若其中一個被觀察者發生onError事件,推遲到其餘觀察者發送完事件後才觸發。
3.4 zip()----- 見rxdocs.pdf第147頁
做用:將多個Observable發射的數據按順序組合起來,每一個數據只能組合一次,並且都是有序的。最終組合的數據的數量由發射數據最少的Observable來決定。
public static void zip() { Observable.zip(Observable.just(1, 2, 3, 4), Observable.just("A", "B", "C"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-06 17:19:13.248 24303 24303 D Operation: onSubscribe 08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 1A 08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 2B 08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 3C 08-06 17:19:13.249 24303 24303 D Operation: onComplete
3.5 combineLatest()----- 見rxdocs.pdf第133頁
做用:當兩個Observables中的任何一個發射了數據時,使用一個函數結合每一個Observable發射的最 近數據項,而且基於這個函數的結果發射數據
與zip()
的區別:zip()
= 按個數合併,即1對1合併;combineLatest()
= 按時間合併,即在同一個時間點上合併
public static void combineLatest() { Observable.combineLatest(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS), new BiFunction<Long, Long, String>() { @Override public String apply(Long aLong, Long aLong2) throws Exception { return "" + aLong + "+" + aLong2; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-06 17:50:20.240 5792 5792 D Operation: onSubscribe
08-06 17:50:21.248 5792 5811 D Operation: onNext: value = 0+6
08-06 17:50:22.242 5792 5810 D Operation: onNext: value = 1+6
08-06 17:50:23.242 5792 5811 D Operation: onNext: value = 1+7
08-06 17:50:24.241 5792 5810 D Operation: onNext: value = 2+7
08-06 17:50:25.242 5792 5811 D Operation: onNext: value = 2+8
08-06 17:50:25.242 5792 5811 D Operation: onComplete
3.6 combineLatestDelayError()
做用:做用相似於concatDelayError()
/ mergeDelayError()
,即錯誤處理,此處不做過多描述
3.7 reduce()----- 見rxdocs.pdf第210頁
做用:把被觀察者須要發送的事件聚合成1個事件 & 發送
public static void reduce() { Observable.just(1, 2, 3, 4, 5) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-06 18:02:50.062 7606 7606 D Operation: onSubscribe
08-06 18:02:50.062 7606 7606 D Operation: onSuccess: value = 15
3.8 collect()
做用:將被觀察者Observable
發送的數據事件收集到一個數據結構裏
public static void collect() { Observable.just(1, 2, 3) .collect(new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { return new ArrayList<>(); } }, new BiConsumer<List<Integer>, Integer>() { @Override public void accept(List<Integer> list, Integer integer) throws Exception { list.add(integer); } }).subscribe(new SingleObserver<List<Integer>>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(List<Integer> list) { for (Integer i : list) { Log.d(TAG, "onSuccess: value = " + i); } } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-06 19:09:56.003 12640 12640 D Operation: onSubscribe
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 1
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 2
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 3
3.9 startWith() / startWithArray()----- 見rxdocs.pdf第144頁
做用:在一個被觀察者發送事件前,追加發送一些數據 / 一個新的被觀察者
public static void startWith() { Observable.just(1, 2, 3) .startWith(Observable.just(7, 8, 9)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-06 19:18:16.663 13408 13408 D Operation: onSubscribe
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 7
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 8
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 9
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 1
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 2
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 3
08-06 19:18:16.664 13408 13408 D Operation: onComplete
3.10 count()----- 見rxdocs.pdf第207頁
做用:統計被觀察者發送事件的數量
public static void count() { Observable.just(1, 2, 3) .count() .subscribe(new SingleObserver<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Long aLong) { Log.d(TAG, "onSuccess: value = " + aLong); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-06 19:25:07.151 14405 14405 D Operation: onSubscribe
08-06 19:25:07.151 14405 14405 D Operation: onSuccess: value = 3