本篇文章主要介紹Rxjava 2.x的一些經常使用的操做符,對Rxjava不熟悉的朋友能夠先去看下我以前的兩篇介紹java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("This is Observer"); //經過 ObservableEmitter 發射器向觀察者發送事件。 e.onComplete(); } });
public static <T> Observable<T> just(T item) ...... public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Observable.just(1, 2, 3) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "-------onSubscribe"); } @Override public void onNext(Integer integer) { Log.d(TAG, "-------onNext " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "-------onError "); } @Override public void onComplete() { Log.d(TAG, "-------onComplete "); } });
使用just()方法建立Observable對象,Observable會將事件逐個發送segmentfault
From 操做符數組
public static <T> Observable<T> fromArray(T... items) Integer array[] = {1, 2, 3, 4}; Observable.fromArray(array) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "--------------onSubscribe"); } @Override public void onNext(Integer integer) { Log.d(TAG, "--------------onNext " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "--------------onError "); } @Override public void onComplete() { Log.d(TAG, "--------------onComplete "); } });
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) Observable.fromCallable(new Callable < Integer > () { @Override public Integer call() throws Exception { return 1; } }) .subscribe(new Consumer < Integer > () { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "--------------accept " + integer); } });
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) List<Integer> list = new ArrayList<>(); list.add(0); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list) .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "--------------onSubscribe"); } @Override public void onNext(Integer integer) { Log.d(TAG, "--------------onNext " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "--------------onError "); } @Override public void onComplete() { Log.d(TAG, "--------------onComplete "); } });
Observable.empty() .subscribe(new Observer < Object > () { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "---------------------onSubscribe"); } @Override public void onNext(Object o) { Log.d(TAG, "---------------------onNext"); } @Override public void onError(Throwable e) { Log.d(TAG, "---------------------onError " + e); } @Override public void onComplete() { Log.d(TAG, "==================onComplete"); } });
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) //將 Integer 類型的數據轉換成 String。 Observable.just(1, 2, 3) .map(new Function < Integer, String > () { @Override public String apply(Integer integer) throws Exception { return integer+"rxjava"; } }) .subscribe(new Observer < String > () { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "----------------------onSubscribe"); } @Override public void onNext(String s) { Log.e(TAG, "----------------------onNext " + s); } @Override public void onError(Throwable e) { Log.d(TAG, "---------------------onError " + e); } @Override public void onComplete() { Log.d(TAG, "---------------------onComplete" ); } });
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
flatMap() 其實與 map() 相似,可是 flatMap() 返回的是一個 Observerable。如今用一個map()的例子和flatMap()的例子來對比說明 flatMap() 的用法。併發
需求:咱們如今須要經過學校拿到院系列表,而後在每一個院系中拿到學生的信息.
傳統的實現方式有不少種,我就不舉例了,直接使用Rxjava實現:app
//學校 class School{ private String name; private List<Department> departments; public School(){} public School(String name, List<Department> departments) { this.name = name; this.departments = departments; } public String getName() { return name; } public void setName(String name) { this.name = name; } public List<Department> getDepartments() { return departments; } public void setDepartments(List<Department> departments) { this.departments = departments; } }
//院系 class Department{ private String name; private List<Student> students; public Department(){} public Department(String name, List<Student> students) { this.name = name; this.students = students; } public String getName() { return name; } public void setName(String name) { this.name = name; } public List<Student> getStudents() { return students; } public void setStudents(List<Student> students) { this.students = students; } }
//學生 class Student { private String name; private String school; public Student(){} public Student(String name, String school) { this.name = name; this.school = school; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSchool() { return school; } public void setSchool(String school) { this.school = school; } }
使用map()方法實現:ide
//使用map()實現方式 Observable.fromIterable(departments) .map(new Function<Department, List<Student>>() { @Override public List<Student> apply(Department department) throws Exception { return department.getStudents(); } }) .subscribe(new Observer<List<Student>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<Student> students) { for (Student student : students){ Log.d("----------", student.getName()+student.getSchool() ); //若是還須要獲取學生全部課程以及成績 ...................... } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
//使用flatMap()實現 Observable.fromIterable(departments) .flatMap(new Function<Department, ObservableSource<Student>>() { @Override public ObservableSource<Student> apply(Department department) throws Exception { return Observable.fromIterable(department.getStudents()); } }) .flatMap() //若是還須要獲取學生全部課程以及成績操做 .subscribe(new Observer<Student>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Student student) { Log.d("---------",student.getName()+student.getSchool()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
以上代碼中map()方法實現中,能夠看到咱們在onNext()方法中使用了for循環.若是代碼邏輯在複雜一些,就可能須要嵌套for循環來實現,那就真的迷之縮進了,而使用flatMap()方法實現,只須要實現一個flatMap()轉換一下就行了,隨着代碼邏輯增長,代碼依然清晰,這就是flatMap()的強大之處,也是不少人喜歡使用Rxjava的緣由所在.post
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) Observable.fromIterable(departments) .concatMap(new Function<Department, ObservableSource<Student>>() { @Override public ObservableSource<Student> apply(Department department) throws Exception { return Observable.fromIterable(department.getStudents()); } })
至關於handler的延遲發送事件 handler.sendEmptyMessageDelayed(0,2000);
public final Observable<T> delay(long delay, TimeUnit unit) Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) //延遲兩秒再發送事件 .subscribe(new Observer < Integer > () { @Override public void onSubscribe(Disposable d) { Log.d("------------onSubscribe"); } @Override public void onNext(Integer integer) { Log.d("------------"+integer); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "----------------onComplete"); } });
前方有坑,請集中注意力
Observable.doOnSubscribe()方法是在subscribe() 調用後並且在事件發送前執行。默認狀況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而若是在 doOnSubscribe() 以後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。fetch
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) //在io執行上述操做 .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { dialog.show(); //顯示dialog } }) .subscribeOn(AndroidSchedulers.mainThread()) //在UI線程執行上述準備操做 .observeOn(AndroidSchedulers.mainThread())//在UI線程執行下面操做 .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d("----","開始了"); } @Override public void onNext(String s) { Log.d("----", s); dialog.dismiss(); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d("----", "complete"); } });
public final Observable<T> subscribeOn(Scheduler scheduler)
public final Observable<T> observeOn(Scheduler scheduler) Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) // 新線程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 線程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); // Android 主線程,由 observeOn() 指定
以上就是Rxjava經常使用的一些操做符介紹和使用方法實例了
關於Rxjava系列二就到此結束啦,後面有時間我還會寫寫與retrofit2的結合使用,歡迎關注訂閱!this
歡迎關注做者darryrzhong,更多幹貨等你來拿喲.線程
更多精彩文章請關注