Android RxJava系列二: 經常使用拓展操做符

前言

本篇文章主要介紹Rxjava 2.x的一些經常使用的操做符,對Rxjava不熟悉的朋友能夠先去看下我以前的兩篇介紹java

建立操做符

  • create() 建立一個被觀察者
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

複製代碼
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("This is Observer"); //經過 ObservableEmitter 發射器向觀察者發送事件。
        e.onComplete();
    }
});

複製代碼
  • just() 建立一個被觀察者,併發送事件,發送的事件不能夠超過10個以上。
public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

複製代碼
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "-------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "-------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "-------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "-------onComplete ");
    }
});

複製代碼

使用just()方法建立Observable對象,Observable會將事件逐個發送segmentfault

  • From 操做符
    • fromArray() 這個方法和 just() 相似,只不過 fromArray 能夠傳入一個數組數組

    • fromCallable() Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值bash

    • fromIterable() 直接發送一個 List 集合數據給觀察者併發

public static <T> Observable<T> fromArray(T... items)

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});

複製代碼
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "--------------accept " + integer);
    }
});


複製代碼
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)


List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "--------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "--------------onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "--------------onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "--------------onComplete ");
    }
});


複製代碼
  • empty() 直接發送 onComplete() 事件
Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "---------------------onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "---------------------onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});

複製代碼

轉換操做符

  • map() map 能夠將被觀察者發送的數據類型轉變成其餘的類型,是一對一的轉換
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)


//將 Integer 類型的數據轉換成 String。
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return  integer+"rxjava";
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "----------------------onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "----------------------onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
      Log.d(TAG, "---------------------onError " + e);
    }

    @Override
    public void onComplete() {
 Log.d(TAG, "---------------------onComplete" );
    }
});



複製代碼
  • flatMap() 這個方法能夠將事件序列中的元素進行整合加工,返回一個新的被觀察者。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

複製代碼

flatMap() 其實與 map() 相似,可是 flatMap() 返回的是一個 Observerable。如今用一個map()的例子和flatMap()的例子來對比說明 flatMap() 的用法。app

需求:咱們如今須要經過學校拿到院系列表,而後在每一個院系中拿到學生的信息. 傳統的實現方式有不少種,我就不舉例了,直接使用Rxjava實現:ide

//學校
 class School{
        private String name;
        private List<Department> departments;

        public School(){}
        public School(String name, List<Department> departments) {
            this.name = name;
            this.departments = departments;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Department> getDepartments() {
            return departments;
        }

        public void setDepartments(List<Department> departments) {
            this.departments = departments;
        }
    }

複製代碼
//院系
 class Department{
        private String name;
        private List<Student> students;

        public Department(){}
        public Department(String name, List<Student> students) {
            this.name = name;
            this.students = students;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Student> getStudents() {
            return students;
        }

        public void setStudents(List<Student> students) {
            this.students = students;
        }
    }


複製代碼
//學生
class Student {
        private String name;
        private String school;

        public Student(){}
        public Student(String name, String school) {
            this.name = name;
            this.school = school;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }
    }


複製代碼

使用map()方法實現:post

//使用map()實現方式
        Observable.fromIterable(departments)
                .map(new Function<Department, List<Student>>() {
                    @Override
                    public List<Student> apply(Department department) throws Exception {
                        return department.getStudents();
                    }
                })
                .subscribe(new Observer<List<Student>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List<Student> students) {
                       for (Student student : students){
                           Log.d("----------", student.getName()+student.getSchool() );
                        //若是還須要獲取學生全部課程以及成績
                       ......................
                       }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

複製代碼
//使用flatMap()實現
        Observable.fromIterable(departments)
                .flatMap(new Function<Department, ObservableSource<Student>>() {
                    @Override
                    public ObservableSource<Student> apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })
                .flatMap() //若是還須要獲取學生全部課程以及成績操做
                .subscribe(new Observer<Student>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Student student) {
                       Log.d("---------",student.getName()+student.getSchool());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

複製代碼

以上代碼中map()方法實現中,能夠看到咱們在onNext()方法中使用了for循環.若是代碼邏輯在複雜一些,就可能須要嵌套for循環來實現,那就真的迷之縮進了,而使用flatMap()方法實現,只須要實現一個flatMap()轉換一下就行了,隨着代碼邏輯增長,代碼依然清晰,這就是flatMap()的強大之處,也是不少人喜歡使用Rxjava的緣由所在.fetch

  • concatMap()
    concatMap() 和 flatMap() 基本上是同樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的。
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)


  Observable.fromIterable(departments)
                .concatMap(new Function<Department, ObservableSource<Student>>() {
                    @Override
                    public ObservableSource<Student> apply(Department department) throws Exception {
                        return Observable.fromIterable(department.getStudents());
                    }
                })


複製代碼

功能操做符

  • delay() 延遲一段事件發送事件。
至關於handler的延遲發送事件

handler.sendEmptyMessageDelayed(0,2000);
複製代碼
public final Observable<T> delay(long delay, TimeUnit unit)

Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS) //延遲兩秒再發送事件
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d("------------onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d("------------"+integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "----------------onComplete");
    }
});

複製代碼
  • doOnSubscribe() Observable 每發送 onSubscribe() 以前都會回調這個方法。 此方法一般用來作準備工做,例如彈一個ProgressDialog提示用戶, But,這裏有一個小坑,特別拿出來講明一下:

前方有坑,請集中注意力ui

Observable.doOnSubscribe()方法是在subscribe() 調用後並且在事件發送前執行。默認狀況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而若是在 doOnSubscribe() 以後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
          .subscribeOn(Schedulers.io()) //在io執行上述操做
          .doOnSubscribe(new Consumer<Disposable>() {
              @Override
              public void accept(Disposable disposable) throws Exception {
                  dialog.show(); //顯示dialog
              }
          })
          .subscribeOn(AndroidSchedulers.mainThread()) //在UI線程執行上述準備操做
          .observeOn(AndroidSchedulers.mainThread())//在UI線程執行下面操做
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("----","開始了");
            }

            @Override
            public void onNext(String s) {
                Log.d("----", s);
                dialog.dismiss();
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d("----", "complete");
            }
        });

複製代碼
  • subscribeOn() 指定被觀察者的線程,要注意的時,若是屢次調用此方法,只有第一次有效。
public final Observable<T> subscribeOn(Scheduler scheduler)

複製代碼
  • observeOn() 指定觀察者的線程,每指定一次就會生效一次。
public final Observable<T> observeOn(Scheduler scheduler)

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

複製代碼

以上就是Rxjava經常使用的一些操做符介紹和使用方法實例了


  • RxJava2 只看這一篇文章就夠了這是玉剛說的一篇關於Rxjava經常使用API的介紹,基本囊括了Rxjava所用到的全部API,還有代碼舉例,也是強烈建議觀看收藏

關於Rxjava系列二就到此結束啦,後面有時間我還會寫寫與retrofit2的結合使用,歡迎關注訂閱!

歡迎關注做者darryrzhong,更多幹貨等你來拿喲.

請賞個小紅心!由於你的鼓勵是我寫做的最大動力!

更多精彩文章請關注

相關文章
相關標籤/搜索