RXJAVA之變換操做

RXJAVA提供瞭如下變換操做,對Observable的消息進行變換操做:app

1.windowide

按期未來自Observable的數據分拆成一些Observable窗口,而後發射這些窗口,而不是每次發射一項。函數

Observable<String> observable = Observable.just("123", "456","789","abc");spa

    observable.window(3).subscribeWith(new Observer<Observable<String>>(){server

@Overrideit

public void onComplete() {io

System.out.println("complete");class

}map

@Override方法

public void onError(Throwable arg0) {

System.out.println("error");

 

}

@Override

public void onNext(Observable<String> arg0) {

arg0.subscribeWith(new Observer<String>(){

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(String t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("error");

}

 

@Override

public void onComplete() {

System.out.println("complete");

}});

 

}

@Override

public void onSubscribe(Disposable arg0) {

System.out.println("onSubscribe");

 

}

});

    }

輸出結果

onSubscribe

onSubscribe

123

456

789

complete

onSubscribe

abc

complete

complete

2.map

變換接收到的數據,從新發放出去。map函數只有一個參數,參數通常是Func1,Func1的<I,O>I,O模版分別爲輸入和輸出值的類型,實現Func1的call方法對I類型進行處理後返回O類型數據。

Observable.just("123", "456","789").map(new Function<String,Integer>(){

@Override

public Integer apply(String t) throws Exception {

return Integer.parseInt(t);

}}).subscribeWith(new Observer<Integer>(){

 

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(Integer t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("onError");

}

 

@Override

public void onComplete() {

System.out.println("onComplete");

}});

    }

 輸出結果

onSubscribe

123

456

789

onComplete

3.flatmap 

將Observable發射的數據變換爲Observables集合,而後將這些Observable發射的數據平坦化的放進一個單獨的Observable,內部採用merge合併。

Observable<String> observable = Observable.just("123", "456","789","abc");

    observable.flatMap(new Function<String,Observable<String>>(){

 

@Override

public Observable<String> apply(String t) throws Exception {

return Observable.just(t+"flatmap");

}}

    ).subscribeWith(new Observer<String>(){

 

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

 

@Override

public void onNext(String t) {

System.out.println(t);

}

 

@Override

public void onError(Throwable e) {

System.out.println("onError");

}

 

@Override

public void onComplete() {

System.out.println("onComplete");

}});

    }

輸出結果

onSubscribe

123flatmap

456flatmap

789flatmap

abcflatmap

onComplete

相關文章
相關標籤/搜索