本篇文章繼續介紹下面類型的操做符java
combineLatest操做符把兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable。markdown
這兩個Observable中隨意一個Observable產生的結果,都和還有一個Observable最後產生的結果,依照必定的規則進行合併。ide
流程圖例如如下:
調用樣例例如如下:函數
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong+aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
執行結果例如如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.post
join操做符把類似於combineLatest操做符,也是兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable。但是join操做符可以控制每個Observable產生結果的生命週期,在每個結果的生命週期內,可以與還有一個Observable產生的結果依照必定的規則進行合併。流程圖例如如下:
this
join方法的使用方法例如如下:
observableA.join(observableB,
observableA產生結果生命週期控制函數,
observableB產生結果生命週期控制函數。
observableA產生的結果與observableB產生的結果的合併規則)spa
調用樣例例如如下:3d
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable延遲600毫秒執行
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//使Observable延遲600毫秒執行
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
return aLong + aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
執行結果例如如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.code
groupJoin操做符很類似於join操做符,差異在於join操做符中第四個參數的傳入函數不一致。其流程圖例如如下:
blog
調用樣例例如如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Observable<Long>, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong, Observable<Long> observable) {
return observable.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong2) {
return aLong + aLong2;
}
});
}
}).subscribe(new Subscriber<Observable<Long>>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Observable<Long> observable) {
observable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
}
});
執行結果例如如下:
Next: 0
Next: 5
Next: 10
Next: 15
Next: 20
Next: 25
Next: 30
Next: 35
Next: 40
Next: 45
Next: 50
Next: 60
Next: 55
Sequence complete.
merge操做符是依照兩個Observable提交結果的時間順序,對Observable進行合併,如ObservableA每隔500毫秒產生數據爲0,5,10,15,20。而ObservableB每隔500毫秒產生數據0,10,20,30,40。當中第一個數據延遲500毫秒產生,最後合併結果爲:0,0,5,10,10,20,15,30,20,40;其流程圖例如如下:
調用樣例例如如下:
//產生0,5,10,15,20數列
Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(5);
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
Observable.merge(observable1, observable2)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
執行結果例如如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:15
Next:30
Next:20
Next:40
Sequence complete.
從merge操做符的流程圖可以看出,一旦合併的某一個Observable中出現錯誤,就會當即中止合併,並對訂閱者回調執行onError方法,而mergeDelayError操做符會把錯誤放到所有結果都合併完畢以後才執行,其流程圖例如如下:
調用樣例例如如下:
//產生0,5,10數列,最後會產生一個錯誤
Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));
//產生0,10,20,30,40數列
Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
Observable.mergeDelayError(observable1, observable2)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
執行結果例如如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:30
Next:40
Error: this is end!
startWith操做符是在源Observable提交結果以前。插入指定的某些數據。其流程圖例如如下:
調用樣例例如如下:
Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
執行結果例如如下:
Next:2
Next:3
Next:4
Next:10
Next:20
Next:30
Sequence complete.
switchOnNext操做符是把一組Observable轉換成一個Observable,轉換規則爲:對於這組Observable中的每個Observable所產生的結果,假設在同一個時間內存在兩個或多個Observable提交的結果,僅僅取最後一個Observable提交的結果給訂閱者,其流程圖例如如下:
調用樣例例如如下:
//每隔500毫秒產生一個observable
Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//每隔200毫秒產生一組數據(0,10,20,30,40)
return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(5);
}
}).take(2);
Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next:" + aLong);
}
});
執行結果例如如下:
Next:0
Next:10
Next:20
Next:0
Next:10
Next:20
Next:30
Next:40
Sequence complete.
zip操做符是把兩個observable提交的結果,嚴格依照順序進行合併,其流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable1 = Observable.just(10,20,30);
Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
執行結果例如如下:
Next:14
Next:28
Next:42
Sequence complete.
onErrorReturn操做符是在Observable錯誤發生或異常的時候(即將回調oError方法時),攔截錯誤並執行指定的邏輯,返回一個跟源Observable一樣類型的結果。最後回調訂閱者的onComplete方法。其流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//循環輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 1004; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.
onErrorResumeNext操做符跟onErrorReturn類似,僅僅只是onErrorReturn僅僅能在錯誤或異常發生時僅僅返回一個和源Observable一樣類型的結果,而onErrorResumeNext操做符是在錯誤或異常發生時返回一個Observable,也就是說可以返回多個和源Observable一樣類型的結果,其流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error。"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { return Observable.just(100,101, 102); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
onExceptionResumeNext操做符和onErrorResumeNext操做符類似。不一樣的地方在於onErrorResumeNext操做符是當Observable錯誤發生或異常時觸發,而onExceptionResumeNext是當Observable發生異常時才觸發。
這裏要普及一個概念就是,java的異常分爲錯誤(error)和異常(exception)兩種。它們都是繼承於Throwable類。
錯誤(error)一般是比較嚴重的系統問題,比方咱們經常遇到的OutOfMemoryError、StackOverflowError等都是錯誤。錯誤通常繼承於Error類,而Error類又繼承於Throwable類,假設需要捕獲錯誤,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。
使用try..catch(Exception e)句式沒法捕獲錯誤
異常(Exception)也是繼承於Throwable類。一般是依據實際處理業務拋出的異常。分爲執行時異常(RuntimeException)和普通異常。普通異常直接繼承於Exception類。假設方法內部沒有經過try..catch句式進行處理。必須經過throws關鍵字把異常拋出外部進行處理(即checked異常);而執行時異常繼承於RuntimeException類,假設方法內部沒有經過try..catch句式進行處理,不需要顯式經過throws關鍵字拋出外部。如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是執行時異常。固然RuntimeException也是繼承於Exception類,所以是可以經過try..catch(Exception e)句式進行捕獲處理的。
onExceptionResumeNext流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
retry操做符是當Observable錯誤發生或者異常時,又一次嘗試執行Observable的邏輯。假設通過n次又一次嘗試執行後仍然出現錯誤或者異常,則最後回調執行onError方法。固然假設源Observable沒有錯誤或者異常出現,則依照正常流程執行。
其流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.retry(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!
retryWhen操做符類似於retry操做符,都是在源observable出現錯誤或者異常時,又一次嘗試執行源observable的邏輯,不一樣在於retryWhen操做符是在源Observable出現錯誤或者異常時,經過回調第二個Observable來推斷是否又一次嘗試執行源Observable的邏輯。假設第二個Observable沒有錯誤或者異常出現。則就會又一次嘗試執行源Observable的邏輯,不然就會直接回調執行訂閱者的onError方法。其流程圖例如如下:
調用樣例例如如下:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("subscribing");
subscriber.onError(new RuntimeException("always fails"));
}
});
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() { @Override public Observable<?
> call(Integer integer) { System.out.println("delay retry by " + integer + " second(s)"); //每一秒中執行一次 return Observable.timer(integer, TimeUnit.SECONDS); } }); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });
執行結果例如如下:
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
Sequence complete.
好了,先介紹這麼多。下回繼續介紹其它的操做符。敬請期待!