import android.util.Log; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.ObservableSource; import io.reactivex.Observer; import io.reactivex.Scheduler; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.functions.BiFunction; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; /** * Created by vein on 2018/1/31. */ public class RxDemo { /** * 1.建立一個發射對象,並對其進行觀察 */ public void create() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { //發射數據源 // e.onNext(0); // e.onComplete(); // e.onError(new Throwable()); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { //Disposable能夠終止事件傳遞,若是執行 d.dispose();,後續onNext則接收不到發射源數據 } @Override public void onNext(Integer integer) { //接收發射內容 } @Override public void onError(Throwable e) { } @Override public void onComplete() { //若是發射源 } }); } /** * 2. 合併發射事件 * <p> * 分別從兩個上游事件中各取出一個組合 * 一個事件只能被使用一次,順序嚴格按照事件發送的順序 * 最終下游事件收到的是和上游事件最少的數目相同(必須兩兩配對,多餘的捨棄) * <p> */ public void zip() { Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() { @Override public String apply(String s, Integer integer) throws Exception { return s + integer; } }).subscribe(new Consumer<String>() {//Consumer內部實現,可用subscribe(new Observer<Integer>() {}代替 @Override public void accept(String resultStr) throws Exception { Log.d("", resultStr); //結果爲 A1 B2 C3 //依據最短數據源爲結果長度,多餘捨棄 } }); } //構造事件1,做爲zip()的數據源 private Observable<String> getStringObservable() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception { if (!e.isDisposed()) { e.onNext("A"); e.onNext("B"); e.onNext("C"); } } }); } //構造事件2,做爲zip()數據源 private Observable<Integer> getIntegerObservable() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { if (!e.isDisposed()) { e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); } } }); } /** * 3.對上游發送的每個事件應用一個函數,使得每個事件都按照指定的函數去變化 */ public void map() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { //此處添加轉換方法 String resultStr = "map result " + integer; return resultStr; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //結果 //map result 1 //map result 2 //map result 3 } }); } /** * 4. * FlatMap將一個發送事件的上游Observable變換成多個發送事件的Observables, * 而後將它們發射的事件合併後放進一個單獨的Observable裏 * 可是flatMap並不保證事件的順序,若是想保證順序性用concatMap */ public void flatmap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { //事件合併方法 List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list); // int delayTime = (int) (1 + Math.random() * 50); // return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //結果(注意,每條輸出的順序不固定) //I am value 1 //I am value 2 //I am value 1 //I am value 2 //I am value 2 //I am value 3 //I am value 2 //I am value 3 //I am value 3 } }); } /** * 5. 與flatMap功能同樣,惟一區別是能保證順序 */ public void concatMap() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { List<String> list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("I am value " + integer); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { //結果 //I am value 1 //I am value 1 //I am value 1 //I am value 2 //I am value 2 //I am value 2 //I am value 3 //I am value 3 //I am value 3 } }); } /** * 6. 讓訂閱者在接收到數據前作一些操做的操做符, * 例如來了消息以後,先保存數據,保存數據以後再顯示,能夠用該操做符 */ public void doOnNext() { Observable.just("1", "2").doOnNext(new Consumer<String>() { @Override public void accept(String s) throws Exception { //save 1 ,2 } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { } }); } /** * 7. 過濾操做符,取正確的值 */ public void filter() { Observable.just(1, 20, 65, -5, 7, 19) .filter(new Predicate<Integer>() { @Override public boolean test(@NonNull Integer integer) throws Exception { //過濾條件 return integer >= 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { //返回過濾以後的值,結果 //20,65,19 } }); } /** * 8.跳過多少個事件後開始接收 */ public void skip() { Observable.just(1, 2, 3, 4, 5) .skip(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //結果3,4,5 } }); } /** * 9.用於指定訂閱者最多收到多少數據 */ public void take() { Flowable.fromArray(1, 2, 3, 4, 5) .take(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //結果3,4,5 } }); } /** * 10.延遲(間隔)執行,rxjava2中已經廢棄,用11.interval代替 */ @Deprecated public void timer() { Observable.timer(2, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { } }); } /** * 11.間隔執行操做,默認在新線程 */ public void interval() { Disposable mDisposable;//銷燬界面時注意取消訂閱 //延遲3秒開始執行,每2秒執行一次 mDisposable = Observable.interval(3, 2, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) // 因爲interval默認在新線程,因此咱們應該切回主線程 .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { } }); //注意在activity的onDestroy()裏取消訂閱 if (mDisposable != null && !mDisposable.isDisposed()) { mDisposable.dispose(); } } /** * 12.Single只會接收一個參數,SingleObserver只會調用onError或者onSuccess */ public void single() { Single.just(new Random().nextInt()).subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(Integer integer) { } @Override public void onError(Throwable e) { } }); } /** * 13. 鏈接操做符,可接受Observable的可變參數,或者Observable的集合 */ public void concat() { Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //結果1,2,3,4,5,6 } }); } /** * 14.去重 */ public void distinct() { Observable.just(1, 1, 2, 2, 3, 4, 5, 6, 6) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //結果1,2,3,4,5,6 } }); } /** * 15.一次用一個方法處理一個值,能夠有一個seed做爲初始值 * (scan用法與reduce相同,區別是scan輸出整個執行過程,reduce輸出最後計算的值) */ public void reduce() { Observable.just(1, 2, 3) .reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception { //integer + integer2用於求1+2+3+4+5+6 //integer*integer2 用於1*2*3*4*5*6 return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { //結果6 //若是是scan用法,則輸出1,3,6 } }); } /** * 按照時間劃分窗口,將數據發送給不一樣的Observable */ public void window() { Observable.interval(600, TimeUnit.MILLISECONDS) // 間隔一秒發一次 .take(15) // 最多接收15個 .window(3, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Observable<Long>>() { @Override public void accept(@NonNull Observable<Long> longObservable) throws Exception { //時間按3秒總體傳遞一次事件,該處爲3秒執行一次 longObservable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long aLong) throws Exception { //該輸出爲再take範圍內的,每個3秒傳送過來的數據 } }); } }); } //1.若是有被壓問題使用Flowable代替Observable //2.RxJava 2.x 新增Consumer,可自定義實現,accept 裏面至關於本來的onNext }