Observable
)發送的事件是否符合條件3.1 all()----- 見rxdocs.pdf第192頁ide
做用:判斷髮送的每項數據是否都知足 設置的函數條件函數
public static void all() { Observable.just(1, 2, 3, 4, 5, 10) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer >= 10; } }) .subscribe(new SingleObserver<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Boolean value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:spa
08-09 19:56:20.782 4775 4775 D Operation: onSubscribe 08-09 19:56:20.782 4775 4775 D Operation: onSuccess: value = false
3.2 takeWhile()----- 見rxdocs.pdf第201頁3d
做用:發射Observable發射的數據,直到一個指定的條件不成立code
public static void takeWhile() { Observable.just(1, 2, 3, 4, 5, 10) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 4; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:server
08-09 20:02:26.093 5131 5131 D Operation: onSubscribe
08-09 20:02:26.093 5131 5131 D Operation: onNext: value = 1
08-09 20:02:26.093 5131 5131 D Operation: onNext: value = 2
08-09 20:02:26.093 5131 5131 D Operation: onNext: value = 3
08-09 20:02:26.093 5131 5131 D Operation: onComplete
3.3 skipWhile()----- 見rxdocs.pdf第198頁blog
做用:丟棄Observable發射的數據,直到一個指定的條件不成立。即直到該判斷條件 = false
時,纔開始發送Observable
的數據。與takeWhile()相反。事件
public static void skipWhile() { Observable.just(1, 2, 3, 4, 5, 10) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 4; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:ip
08-12 11:04:21.003 12582 12582 D Operation: onSubscribe
08-12 11:04:21.003 12582 12582 D Operation: onNext: value = 4
08-12 11:04:21.003 12582 12582 D Operation: onNext: value = 5
08-12 11:04:21.003 12582 12582 D Operation: onNext: value = 10
08-12 11:04:21.003 12582 12582 D Operation: onComplete
3.4 takeUtil()----- 見rxdocs.pdf第199頁it
做用:執行到某個條件時,中止發送事件
public static void takeUtil() { Observable.just(1, 2, 3, 4, 5, 10) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 4; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-12 11:10:31.067 12856 12856 D Operation: onSubscribe
08-12 11:10:31.067 12856 12856 D Operation: onNext: value = 1
08-12 11:10:31.067 12856 12856 D Operation: onNext: value = 2
08-12 11:10:31.067 12856 12856 D Operation: onNext: value = 3
08-12 11:10:31.067 12856 12856 D Operation: onNext: value = 4
08-12 11:10:31.067 12856 12856 D Operation: onNext: value = 5
08-12 11:10:31.067 12856 12856 D Operation: onComplete
3.5 skipUtil()----- 見rxdocs.pdf第198頁
做用:丟棄原始Observable發射的數據,直到第二個Observable發射了一項數據
public static void skipUtil() { Observable.interval(1, TimeUnit.SECONDS) .skipUntil(Observable.timer(5, TimeUnit.SECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-12 11:18:39.298 13159 13159 D Operation: onSubscribe
08-12 11:18:45.303 13159 13184 D Operation: onNext: value = 5
08-12 11:18:46.302 13159 13184 D Operation: onNext: value = 6
08-12 11:18:47.303 13159 13184 D Operation: onNext: value = 7
3.6 sequenceEqual()----- 見rxdocs.pdf第196頁
做用:傳遞兩個Observable給SequenceEqual
操做符,它會比較兩個Observable的發射物,若是兩個序列是相同的(相同的數據,相同的順序,相同的終止狀態),它就發射true,不然發射false。
public static void sequenceEqual() { Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) .subscribe(new SingleObserver<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Boolean value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-12 11:24:11.229 13438 13438 D Operation: onSubscribe 08-12 11:24:11.230 13438 13438 D Operation: onSuccess: value = true
3.7 contains()----- 見rxdocs.pdf第194頁
做用:判斷髮送的數據中是否包含指定數據
public static void contains() { Observable.just(1, 2, 3, 4) .contains(2) .subscribe(new SingleObserver<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Boolean value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-12 11:28:58.716 13670 13670 D Operation: onSubscribe 08-12 11:28:58.716 13670 13670 D Operation: onSuccess: value = true
3.8 isEmpty()----- 見rxdocs.pdf第195頁
做用:判斷髮送的數據是否爲空
public static void isEmpty() { Observable.empty() .isEmpty() .subscribe(new SingleObserver<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Boolean value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-12 11:32:22.533 13892 13892 D Operation: onSubscribe 08-12 11:32:22.533 13892 13892 D Operation: onSuccess: value = true
3.9 amb()----- 見rxdocs.pdf第193頁
做用:當須要發送多個 Observable
時,只發送 先發送數據的Observable
的數據,而其他 Observable
則被丟棄。
public static void amb() { Observable.ambArray(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS) , Observable.just(7, 8, 9).delay(999, TimeUnit.MILLISECONDS)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-12 11:38:43.328 14136 14136 D Operation: onSubscribe
08-12 11:38:44.330 14136 14161 D Operation: onNext: value = 7
08-12 11:38:44.330 14136 14161 D Operation: onNext: value = 8
08-12 11:38:44.330 14136 14161 D Operation: onNext: value = 9
08-12 11:38:44.331 14136 14161 D Operation: onComplete
3.10 defaultIfEmpty()----- 見rxdocs.pdf第196頁
做用:在不發送任何有效事件( Nex
t事件)、僅發送了 Complete
事件的前提下,發送一個默認值
public static void defaultIfEmpty() { Observable.empty() .defaultIfEmpty("Andy") .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Object value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-12 11:48:24.847 14642 14642 D Operation: onSubscribe 08-12 11:48:24.847 14642 14642 D Operation: onNext: value = Andy 08-12 11:48:24.847 14642 14642 D Operation: onComplete