本篇文章已受權微信公衆號 YYGeeker
獨家發佈轉載請標明出處java
CSDN學院課程地址react
- RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…
RxJava操做符也是其精髓之一,能夠經過一個簡單的操做符,實現複雜的業務邏輯,甚至還能夠將操做符組合起來(即RxJava的組合過程),完成更爲複雜的業務需求。好比咱們前面用到的.create()
,.subscribeOn()
,.observeOn()
,.subscribe()
都是RxJava的操做符之一,下面咱們將對RxJava的操做符進行分析c++
掌握RxJava操做符前,首先要學會看得懂RxJava的圖片,圖片是RxJava主導的精髓,下面咱們經過例子說明c#
這張圖片咱們先要分清楚概念上的東西,上下兩行橫向的直線區域表明着事件流,上面一行(上游)是咱們的被觀察者Observable
,下面一行(下游)是咱們的觀察者Observer
,事件流就是從上游的被觀察者發送給下游的觀察者的。而中間一行的flatMap區域則是咱們的操做符部分,它能夠對咱們的數據進行變換操做。最後,數據流則是圖片上的圓形、方形、菱形等區域,也是從上游流向下游的,不一樣的形狀表明着不一樣的數據類型緩存
這張圖片並非表示沒有被觀察者Observable
,而是Create方法自己就是建立了被觀察者,因此能夠將被觀察者的上游省略。在進行事件的onNext()
分發後,執行onComplete()
事件,這樣就表示事件流已經結束,後續若是上游繼續發事件,則下游表示不接收。當事件流的onCompleted()
或者onError()
正好被調用過一次後,此後就不能再調用觀察者的任何其它回調方法bash
在理解RxJava操做符以前,須要將這幾個概念弄明白,整個操做符的章節都是圍繞這幾個概念進行的微信
Observable
,事件流開始的地方和數據流發射的地方Observer
,事件流結束的地方和數據流接收的地方一、create網絡
Observable
最原始的建立方式,建立出一個最簡單的事件流,可使用發射器發射特定的數據類型數據結構
public static void main(String[] args) {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
e.onNext(i);
}
e.onComplete();
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出併發
onNext=1
onNext=2
onNext=3
onNext=4
onComplete
複製代碼
二、from
建立一個事件流併發出特定類型的數據流,其發射的數據流類型有以下幾個操做符
public static void main(String[] args) {
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
三、just
just操做符和from操做符很像,只是方法的參數有所差異,它能夠接受多個參數
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
四、defer
defer與just的區別是,just是直接將發射當前的數據流,而defer會等到訂閱的時候,纔會去執行它的call()回調,再去發射當前的數據流。複雜點的理解就是:defer操做符是將一組數據流在原有的事件流基礎上緩存一個新的事件流,直到有人訂閱的時候,纔會建立它緩存的事件流
public static void main(String[] args) {
i = 10;
Observable<Integer> just = Observable.just(i, i);
Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
//緩存新的事件流
return Observable.just(i, i);
}
});
i = 15;
just.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
defer.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + (int) o);
}
});
i = 20;
defer.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + (int) o);
}
});
}
複製代碼
輸出
onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20
複製代碼
五、interval
interval操做符是按固定的時間間隔發射一個無限遞增的整數數據流,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行,interval默認在computation調度器上執行
public void interval() {
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
......
複製代碼
六、range
range操做符發射一個範圍內的有序整數數據流,你能夠指定範圍的起始和長度
public static void main(String[] args) {
Observable.range(1, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
七、repeat
repeat操做符能夠重複發送指定次數的某個事件流,repeat操做符默認在trampoline調度器上執行
public static void main(String[] args) {
Observable.just(1).repeat(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=1
onNext=1
onNext=1
onNext=1
複製代碼
八、timer
timer操做符能夠建立一個延時的事件流,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行,默認在computation調度器上執行
public void timer() {
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=0
複製代碼
九、小結
補充:interval()、timer()、delay()的區別
一、map
map操做符能夠將數據流進行類型轉換
public static void main(String[] args) {
Observable.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "發送過來的數據會被變成字符串" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
複製代碼
輸出
onNext=發送過來的數據會被變成字符串1
複製代碼
二、flatMap
flatMap操做符將數據流進行類型轉換,而後將新的數據流傳遞給新的事件流進行分發,這裏經過模擬請求登陸的延時操做進行說明,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void flatMap() {
Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(UserParams userParams) throws Exception {
return Observable.just(userParams.username + "登陸成功").delay(2, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
public static class UserParams {
public UserParams(String username, String password) {
this.username = username;
this.password = password;
}
public String username;
public String password;
}
複製代碼
輸出
hensen登陸成功
複製代碼
補充:
三、groupBy
groupBy操做符能夠將發射出來的數據項進行分組,並將分組後的數據項保存在具備key-value映射的事件流中。groupBy具體的分組規則由groupBy操做符傳遞進來的函數參數Function
所決定的,它能夠將key和value按照Function
的返回值進行分組,返回一個具備分組規則的事件流GroupedObservable
,注意這裏分組出來的事件流是按照原始事件流的順序輸出的,咱們能夠經過sorted()
對數據項進行排序,而後輸出有序的數據流。
public static void main(String[] args) {
Observable.just("java", "c++", "c", "c#", "javaScript", "Android")
.groupBy(new Function<String, Character>() {
@Override
public Character apply(String s) throws Exception {
return s.charAt(0);//按首字母分組
}
})
.subscribe(new Consumer<GroupedObservable<Character, String>>() {
@Override
public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
//排序後,直接訂閱輸出key和value
characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
}
});
}
});
}
複製代碼
輸出
onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript
複製代碼
四、scan
scan操做符會對發射的數據和上一輪發射的數據進行函數處理,並返回的數據供下一輪使用,持續這個過程來產生剩餘的數據流。其應用場景有簡單的累加計算,判斷全部數據的最小值等
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
System.out.println("onNext=" + item);
}
});
}
複製代碼
輸出
onNext=8
onNext=2
onNext=2
onNext=1
onNext=1
複製代碼
五、buffer
buffer操做符能夠將發射出來的數據流,在給定的緩存池中進行緩存,當緩存池中的數據項溢滿時,則將緩存池的數據項進行輸出,重複上述過程,直到將發射出來的數據所有發射出去。若是發射出來的數據不夠緩存池的大小,則按照當前發射出來的數量進行輸出。若是對buffer操做符設置了skip
參數,則buffer每次緩存池溢滿時,會跳過指定的skip
數據項,而後再進行緩存和輸出。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.buffer(5).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("onNext=" + integers.toString());
}
});
複製代碼
輸出
onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]
複製代碼
六、window
window操做符和buffer操做符在功能上實現的效果是同樣的,但window操做符最大區別在於一樣是緩存必定數量的數據項,window操做符最終發射出來的是新的事件流integerObservable
,而buffer操做符發射出來的是新的數據流,也就是說,window操做符發射出來新的事件流中的數據項,還能夠通過Rxjava其餘操做符進行處理。
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9
複製代碼
七、小結
一、debounce
debounce操做符會去過濾掉髮射速率過快的數據項,下面的例子onNext
事件能夠想象成按鈕的點擊事件,若是在2秒種內頻繁的點擊,則其點擊事件會被忽略,當i爲3的除數的時候,發射的事件的時間會超過規定忽略事件的時間,那麼則容許觸發點擊事件。這就有點像咱們頻繁點擊按鈕,但始終只會觸發一次點擊事件,這樣就不會致使重複去響應點擊事件
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 100; i++) {
if (i % 3 == 0) {
Thread.sleep(3000);
} else {
Thread.sleep(1000);
}
emitter.onNext(i);
}
}
}).debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......
複製代碼
二、distinct
distinct操做符會過濾重複發送的數據項
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3).distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
複製代碼
三、elementAt
elementAt操做符只取指定的角標的事件
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
複製代碼
四、filter
filter操做符能夠過濾指定函數的數據項
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=3
onNext=4
onNext=3
複製代碼
五、first
first操做符只發射第一項數據項
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.first(7)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
複製代碼
六、ignoreElements
ignoreElements操做符不發射任何數據,只發射事件流的終止通知
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.ignoreElements()
.subscribe(new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onComplete
複製代碼
七、last
last操做符只發射最後一項數據
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.last(7)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=3
複製代碼
八、sample
sample操做符會在指定的事件內從數據項中採集所須要的數據,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void sample() {
Observable.interval(1, TimeUnit.SECONDS)
.sample(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=2
onNext=4
onNext=6
onNext=8
複製代碼
九、skip
skip操做符能夠忽略事件流發射的前N項數據項,只保留以後的數據
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.skip(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
複製代碼
輸出
onNext=4
onNext=5
onNext=6
onNext=7
onNext=8
複製代碼
十、skipLast
skipLast操做符能夠抑制事件流發射的後N項數據
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.skipLast(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
十一、take
take操做符能夠在事件流中只發射前面的N項數據
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
複製代碼
十二、takeLast
takeLast操做符事件流只發射數據流的後N項數據項,忽略前面的數據項
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takeLast(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
複製代碼
輸出
onNext=6
onNext=7
onNext=8
複製代碼
還有一個操做符叫takeLastBuffer,它和takeLast相似,,惟一的不一樣是它把全部的數據項收集到一個List再發射,而不是依次發射一個
1三、小結
一、merge/concat
merge操做符能夠合併兩個事件流,若是在merge操做符上增長延時發送的操做,那麼就會致使其發射的數據項是無序的,會跟着發射的時間點進行合併。雖然是將兩個事件流合併成一個事件流進行發射,但在最終的一個事件流中,發射出來的倒是兩次數據流。因爲concat操做符和merge操做符的效果是同樣的,這裏只舉一例
merge和concat的區別
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
System.out.println("onNext=" + serializable.toString());
}
});
}
複製代碼
輸出
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
二、zip
zip操做符是將兩個數據流進行指定的函數規則合併
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
Observable.zip(just1, just2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
複製代碼
輸出
onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5
複製代碼
三、startWith
startWith操做符是將另外一個數據流合併到原數據流的開頭
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
just1.startWith(just2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
複製代碼
四、join
join操做符是有時間期限的合併操做符,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void join() {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
just1.join(just2, new Function<String, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(String s) throws Exception {
return Observable.timer(3, TimeUnit.SECONDS);
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long l) throws Exception {
return Observable.timer(8, TimeUnit.SECONDS);
}
}, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
複製代碼
join操做符有三個函數須要設置
因爲just2的期限只有3秒的時間,而just2延時1秒發送一次,因此just2只發射了2次,其輸出的結果就只能和just2輸出的兩次進行合併,其輸出格式有點相似咱們的排列組合
onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1
複製代碼
五、combineLatest
conbineLatest操做符會尋找其餘事件流最近發射的數據流進行合併,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public static String[] str = {"A", "B", "C", "D", "E"};
public void combineLatest() {
Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return str[(int) (aLong % 5)];
}
});
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
複製代碼
輸出
onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
複製代碼
六、小結
一、onErrorReturn
onErrorReturn操做符表示當錯誤發生時,它會忽略onError的回調且會發射一個新的數據項並回調onCompleted()
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
複製代碼
二、onErrorResumeNext
onErrorResumeNext操做符表示當錯誤發生時,它會忽略onError的回調且會發射一個新的事件流並回調onCompleted()
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
return Observable.just(-1);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
複製代碼
三、onExceptionResumeNext
onExceptionResumeNext操做符表示當錯誤發生時,若是onError收到的Throwable不是一個Exception,它會回調onError方法,且不會回調備用的事件流,若是onError收到的Throwable是一個Exception,它會回調備用的事件流進行數據的發射
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onException crash"));
//e.onError(new Error("onError crash"));
}
e.onNext(i);
}
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
//備用事件流
observer.onNext(8);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=8
複製代碼
四、retry
retry操做符表示當錯誤發生時,發射器會從新發射
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retry(1)
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
複製代碼
五、retryWhen
retryWhen操做符和retry操做符類似,區別在於retryWhen將錯誤Throwable傳遞給了函數進行處理併產生新的事件流進行處理,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
private static int retryCount = 0;
private static int maxRetries = 2;
public void retryWhen(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
return Observable.timer(1, TimeUnit.SECONDS);
}
return Observable.error(throwable);
}
});
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
複製代碼
六、小結
一、delay
delay操做符能夠延時某次事件發送的數據流,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void deley() {
Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
delay和delaySubscription的效果是同樣的,只不過delay是對數據流的延時,而delaySubscription是對事件流的延時
二、do
do操做符能夠監聽整個事件流的生命週期,do操做符分爲多個類型,並且每一個類型的做用都不一樣
public static void main(String[] args) {
Observable.just(1, 2, 3)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("doOnNext");
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("doOnEach");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("doOnSubscribe");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnDispose");
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnTerminate");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("doOnError");
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnComplete");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("doFinally");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally
複製代碼
三、materialize/dematerialize
materialize操做符將發射出的數據項轉換成爲一個Notification對象,而dematerialize操做符則是跟materialize操做符相反,這兩個操做符有點相似咱們Java對象的裝箱和拆箱功能
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).materialize()
.subscribe(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("onNext=" + integerNotification.getValue());
}
});
Observable.just(1, 2, 3, 4, 5).materialize().dematerialize()
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object object) throws Exception {
System.out.println("onNext=" + object.toString());
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
輸出的時候,materialize會輸出多個null,是由於null的事件爲onCompleted事件,而dematerialize把onCompleted事件給去掉了,這個緣由也能夠從圖片中看出來
四、serialize
serialize操做符能夠將異步執行的事件流進行同步操做,直到事件流結束
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).serialize()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
五、timeInterval
timeInterval操做符能夠將發射的數據項轉換爲帶有時間間隔的數據項,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void timeInterval(){
Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS)
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(Timed<Long> longTimed) throws Exception {
System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
}
});
}
複製代碼
輸出
onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2
複製代碼
六、timeout
timeout操做符表示當發射的數據項超過了規定的限制時間,則發射onError事件,這裏直接讓程序超過規定的限制時間,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void timeOut(){
Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
});
}
複製代碼
輸出
onError
複製代碼
七、timestamp
timestamp操做符會給每一個發射的數據項帶上時間戳,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void timeStamp() {
Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(Timed<Long> longTimed) throws Exception {
System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
}
});
}
複製代碼
輸出
onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132
複製代碼
八、using
using操做符可讓你的事件流存在一次性的數據項,即用完就將資源釋放掉
using操做符接受三個參數:
public static class UserBean {
String name;
int age;
public UserBean(String name, int age) {
this.name = name;
this.age = age;
}
}
public static void main(String[] args) {
Observable.using(new Callable<UserBean>() {
@Override
public UserBean call() throws Exception {
//從網絡中獲取某個對象
return new UserBean("俊俊俊", 22);
}
}, new Function<UserBean, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(UserBean userBean) throws Exception {
//拿出你想要的資源
return Observable.just(userBean.name);
}
}, new Consumer<UserBean>() {
@Override
public void accept(UserBean userBean) throws Exception {
//釋放對象
userBean = null;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + o.toString());
}
});
}
複製代碼
輸出
onNext=俊俊俊
複製代碼
九、to
to操做符能夠將數據流中的數據項進行集合的轉換,to操做符分爲多個類型,並且每一個類型的做用都不一樣
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("onNext=" + integers.toString());
}
});
}
複製代碼
輸出
onNext=[1, 2, 3, 4, 5]
複製代碼
十、小結
一、all
all操做符表示對全部數據項進行校驗,若是全部都經過則返回true,不然返回false
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 0;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
複製代碼
輸出
onNext=true
複製代碼
二、contains
contains操做符表示事件流中發射的數據項當中是否包含有指定的數據項
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.contains(2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
複製代碼
輸出
onNext=true
複製代碼
三、amb
amb操做符在多個事件流中只發射最早發出數據的事件流,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void amb(){
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));
list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));
list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=7
onNext=8
onNext=9
複製代碼
四、defaultIfEmpty
defaultIfEmpty操做符會在事件流沒有發射任何數據時,發射一個指定的默認值
public static void main(String[] args) {
Observable.empty()
.defaultIfEmpty(-1)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + o.toString());
}
});
}
複製代碼
輸出
onNext=-1
複製代碼
五、sequenceEqual
sequenceEqual操做符能夠判斷兩個數據流是否徹底相等
public static void main(String[] args) {
Observable<Integer> just1 = Observable.just(1, 2, 3);
Observable<Integer> just2 = Observable.just(1, 2, 3);
Observable.sequenceEqual(just1, just2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
複製代碼
輸出
onNext=true
複製代碼
六、skipUntil/skipWhile
skipUtils操做符是在兩個事件流發射的時候,第一個事件流會等到第二個事件流開始發射的時候,第一個事件流纔開始發射出數據項,它會忽略以前發射過的數據項,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void skipUntil(){
Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);
just1.skipUntil(just2)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=2
onNext=3
onNext=4
onNext=5
......
複製代碼
skipWhile操做符是在一個事件流中,從第一項數據項開始判斷是否符合某個特定條件,若是判斷值返回true,則不發射該數據項,繼續從下一個數據項執行一樣的判斷,直到某個數據項的判斷值返回false時,則終止判斷,發射剩餘的全部數據項。須要注意的是,這裏只要一次判斷爲false則後面的全部數據項都不判斷
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=3
onNext=4
onNext=5
複製代碼
七、takeUntil/takeWhile
takeUntil操做符跟skipUntil相似,skip表示跳過的意思,而take表示取值的意思,takeUntil操做符是在兩個事件流發射的時候,第一個事件流會等到第二個事件流開始發射的時候,第一個事件流中止發射數據項,它會忽略以後的數據項,因爲這段代碼的的延時操做都是非阻塞型的,因此在Java上運行會致使JVM的立馬中止,只能把這段代碼放在Android來運行
public void takeUntil(){
Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);
just1.takeUntil(just2)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=0
onNext=1
複製代碼
takeWhile操做符是在一個事件流中,從第一項數據項開始判斷是否符合某個特定條件,若是判斷值返回true,則發射該數據項,繼續從下一個數據項執行一樣的判斷,直到某個數據項的判斷值返回false時,則終止判斷,且剩餘的全部數據項不會發射。須要注意的是,這裏只要一次判斷爲false則後面的全部數據項都不判斷
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 0)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
複製代碼
八、小結
數學運算操做符比較簡單,對於數學運算操做符會放在小結中介紹,下面是對聚合操做符作介紹
一、reduce
reduce操做符跟scan操做符是同樣的,會對發射的數據和上一輪發射的數據進行函數處理,並返回的數據供下一輪使用,持續這個過程來產生剩餘的數據流。reduce與scan的惟一區別在於reduce只輸出最後的結果,而scan會輸出每一次的結果,這點從圖片中也能看出來
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
System.out.println("onNext=" + item);
}
});
}
複製代碼
輸出
onNext=1
複製代碼
二、collect
collect操做符跟reduce操做符相似,只不過collect增長了一個可改變數據結構的函數供咱們處理
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {
@Override
public String call() throws Exception {
return "A";
}
}, new BiConsumer<String, Integer>() {
@Override
public void accept(String s, Integer integer) throws Exception {
System.out.println("onNext=" + s + " " + integer);
}
}).subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) throws Exception {
System.out.println("onNext2=" + s);
}
});
}
複製代碼
輸出
onNext=A 8
onNext=A 2
onNext=A 13
onNext=A 1
onNext=A 15
onNext2=A
複製代碼
三、小結
數學運算操做符的使用須要在gradle中添加rxjava-math的依賴
implementation 'io.reactivex:rxjava-math:1.0.0'
複製代碼
一、publish
publish操做符是將普通的事件流轉化成可鏈接的事件流ConnectableObservable
,它與普通的事件流不同,ConnectableObservable
在沒有調用connect()進行鏈接的狀況下,事件流是不會發射數據的
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
無
複製代碼
二、connect
connect操做符是將可鏈接的事件流進行鏈接並開始發射數據。這個方法須要注意的是,connect操做符必須在全部事件流被訂閱後纔開始發射數據。若是放在subscribe
以前的話,則訂閱者是沒法收到數據的。若是後面還有訂閱者將訂閱這次事件流,則會丟失已經調用了connect
後,發射出去的數據項
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
connectableObservable.connect();
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
三、refCount
refCount操做符能夠將可鏈接的事件流轉換成普通的事件流
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.refCount().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
複製代碼
輸出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
複製代碼
四、replay
replay操做符將彌補connect
操做符的缺陷,因爲connect會讓後面進行訂閱的訂閱者丟失以前發射出去的數據項,因此使用replay操做符能夠將發射出去的數據項進行緩存,這樣使得後面的訂閱者均可以得到完整的數據項。這裏須要注意的是,replay操做符不能和publish操做符同時使用,不然將不會發射數據。例子中,讀者能夠將replay操做符換成publish操做符,這時候的輸出就會丟失前2秒發射的數據項
public void replay(){
ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();
connectableObservable.connect();
connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
複製代碼
輸出
onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......
複製代碼
五、小結