RXJAVA提供瞭如下變換操做,對Observable的消息進行變換操做:app
1.windowide
按期未來自Observable的數據分拆成一些Observable窗口,而後發射這些窗口,而不是每次發射一項。函數
Observable<String> observable = Observable.just("123", "456","789","abc");spa
observable.window(3).subscribeWith(new Observer<Observable<String>>(){server
@Overrideit
public void onComplete() {io
System.out.println("complete");class
}map
@Override方法
public void onError(Throwable arg0) {
System.out.println("error");
}
@Override
public void onNext(Observable<String> arg0) {
arg0.subscribeWith(new Observer<String>(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onComplete() {
System.out.println("complete");
}});
}
@Override
public void onSubscribe(Disposable arg0) {
System.out.println("onSubscribe");
}
});
}
輸出結果
onSubscribe
onSubscribe
123
456
789
complete
onSubscribe
abc
complete
complete
2.map
變換接收到的數據,從新發放出去。map函數只有一個參數,參數通常是Func1,Func1的<I,O>I,O模版分別爲輸入和輸出值的類型,實現Func1的call方法對I類型進行處理後返回O類型數據。
Observable.just("123", "456","789").map(new Function<String,Integer>(){
@Override
public Integer apply(String t) throws Exception {
return Integer.parseInt(t);
}}).subscribeWith(new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}});
}
輸出結果
onSubscribe
123
456
789
onComplete
3.flatmap
將Observable發射的數據變換爲Observables集合,而後將這些Observable發射的數據平坦化的放進一個單獨的Observable,內部採用merge合併。
Observable<String> observable = Observable.just("123", "456","789","abc");
observable.flatMap(new Function<String,Observable<String>>(){
@Override
public Observable<String> apply(String t) throws Exception {
return Observable.just(t+"flatmap");
}}
).subscribeWith(new Observer<String>(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}});
}
輸出結果
onSubscribe
123flatmap
456flatmap
789flatmap
abcflatmap
onComplete