RxJava2 只看這一篇文章就夠了

本文由 玉剛說寫做平臺 提供寫做贊助java

原做者:ZedeChanreact

版權聲明:本文版權歸微信公衆號 玉剛說 全部,未經許可,不得以任何形式轉載android

0. 簡介

RxJava 其實就是提供一套異步編程的 API,這套 API 是基於觀察者模式的,並且是鏈式調用的,因此使用 RxJava 編寫的代碼的邏輯會很是簡潔。編程

RxJava 有如下三個基本的元素:數組

  1. 被觀察者(Observable)
  2. 觀察者(Observer)
  3. 訂閱(subscribe)

下面來講說以上三者是如何協做的:bash

首先在 gradle 文件中添加依賴:微信

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
複製代碼
  1. 建立被觀察者:
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
});
複製代碼
  1. 建立觀察者:
Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
};
複製代碼
  1. 訂閱
observable.subscribe(observer);
複製代碼

這裏其實也可使用鏈式調用:數據結構

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
複製代碼

被觀察者發送的事件有如下幾種,總結以下表:併發

事件種類 做用
onNext() 發送該事件時,觀察者會回調 onNext() 方法
onError() 發送該事件時,觀察者會回調 onError() 方法,當發送該事件以後,其餘事件將不會繼續發送
onComplete() 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件以後,其餘事件將不會繼續發送

其實能夠把 RxJava 比喻成一個作果汁,家裏有不少種水果(要發送的原始數據),你想榨點水果汁喝一下,這時候你就要想究竟要喝什麼水果汁呢?若是你想喝牛油果雪梨檸檬汁,那你就要把這三種水果混在一塊兒榨汁(使用各類操做符變換你想發送給觀察者的數據),榨完後,你就能夠喝上你想要的果汁了(把處理好的數據發送給觀察者)。app

總結以下圖:

下面就來說解 RxJava 各類常見的操做符。

1. 建立操做符

如下就是講解建立被觀察者的各類操做符。

1.1 create()

方法預覽:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
複製代碼

有什麼用:

建立一個被觀察者

怎麼用:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});
複製代碼

上面的代碼很是簡單,建立 ObservableOnSubscribe 並重寫其 subscribe 方法,就能夠經過 ObservableEmitter 發射器向觀察者發送事件。

如下建立一個觀察者,來驗證這個被觀察者是否成功建立。

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("chan","=============onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d("chan","=============onComplete ");
    }
};
        
observable.subscribe(observer);
        
複製代碼

打印結果:

05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
=============onComplete
複製代碼

1.2 just()

方法預覽:

public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
複製代碼

有什麼用?

建立一個被觀察者,併發送事件,發送的事件不能夠超過10個以上。

怎麼用?

Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});

複製代碼

上面的代碼直接使用鏈式調用,代碼也很是簡單,這裏就不細說了,看看打印結果:

05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 
複製代碼

1.3 From 操做符

1.3.1 fromArray()

方法預覽:

public static <T> Observable<T> fromArray(T... items)
複製代碼

有什麼用?

這個方法和 just() 相似,只不過 fromArray 能夠傳入多於10個的變量,而且能夠傳入一個數組。

怎麼用?

Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});
複製代碼

代碼和 just() 基本上同樣,直接看打印結果:

05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete 
複製代碼

1.3.2 fromCallable()

方法預覽:

public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
複製代碼

有什麼用?

這裏的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發給觀察者的。

怎麼用?

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});
複製代碼

打印結果:

05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1
複製代碼

1.3.3 fromFuture()

方法預覽:

public static <T> Observable<T> fromFuture(Future<? extends T> future)
複製代碼

有什麼用?

參數中的 Future 是 java.util.concurrent 中的 Future,Future 的做用是增長了 cancel() 等方法操做 Callable,它能夠經過 get() 方法來獲取 Callable 返回的值。

怎麼用?

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
    @Override
    public String call() throws Exception {
        Log.d(TAG, "CallableDemo is Running");
        return "返回結果";
    }
});

Observable.fromFuture(futureTask)
    .doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        futureTask.run();
    }
})
.subscribe(new Consumer < String > () {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "================accept " + s);
    }
});
複製代碼

doOnSubscribe() 的做用就是隻有訂閱時纔會發送事件,具體會在下面講解。

打印結果:

05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
================accept 返回結果
複製代碼

1.3.4 fromIterable()

方法預覽:

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
複製代碼

有什麼用?

直接發送一個 List 集合數據給觀察者

怎麼用?

List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});
複製代碼

打印結果以下:

05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 

複製代碼

1.4 defer()

方法預覽:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
複製代碼

有什麼用?

這個方法的做用就是直到被觀察者被訂閱後纔會建立被觀察者。

怎麼用?

// i 要定義爲成員變量
Integer i = 100;
        
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

i = 200;

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

observable.subscribe(observer);

i = 300;

observable.subscribe(observer);
複製代碼

打印結果以下:

05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
================onNext 300
複製代碼

由於 defer() 只有觀察者訂閱的時候纔會建立新的被觀察者,因此每訂閱一次就會打印一次,而且都是打印 i 最新的值。

1.5 timer()

方法預覽:

public static Observable<Long> timer(long delay, TimeUnit unit) 
......
複製代碼

有什麼用?

當到指定時間後就會發送一個 0L 的值給觀察者。

怎麼用?

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "===============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0
複製代碼

1.6 interval()

方法預覽:

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......
複製代碼

有什麼用?

每隔一段時間就會發送一個事件,這個事件是從0開始,不斷增1的數字。

怎麼用?

Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5
複製代碼

從時間就能夠看出每隔4秒就會發出一次數字遞增1的事件。這裏說下 interval() 第三個方法的 initialDelay 參數,這個參數的意思就是 onSubscribe 回調以後,再次回調 onNext 的間隔時間。

1.7 intervalRange()

方法預覽:

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
複製代碼

有什麼用?

能夠指定發送事件的開始值和數量,其餘與 interval() 的功能同樣。

怎麼用?

Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6
複製代碼

能夠看出收到5次 onNext 事件,而且是從 2 開始的。

1.8 range()

方法預覽:

public static Observable<Integer> range(final int start, final int count)
複製代碼

有什麼用?

同時發送必定範圍的事件序列。

怎麼用?

Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }

    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe 
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6
複製代碼

1.9 rangeLong()

方法預覽:

public static Observable<Long> rangeLong(long start, long count)
複製代碼

有什麼用?

做用與 range() 同樣,只是數據類型爲 Long

怎麼用?

用法與 range() 同樣,這裏就再也不贅述了。

1.10 empty() & never() & error()

方法預覽:

public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
複製代碼

有什麼用?

  1. empty() : 直接發送 onComplete() 事件
  2. never():不發送任何事件
  3. error():發送 onError() 事件

怎麼用?

Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "==================onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});
複製代碼

打印結果:

05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
==================onComplete
複製代碼

換成 never() 的打印結果:

05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
複製代碼

換成 error() 的打印結果:

05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
==================onError java.lang.NullPointerException
複製代碼

2. 轉換操做符

2.1 map()

方法預覽:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
複製代碼

有什麼用?

map 能夠將被觀察者發送的數據類型轉變成其餘的類型

怎麼用?

如下代碼將 Integer 類型的數據轉換成 String。

Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return "I'm " + integer;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "===================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
===================onNext I'm 1 ===================onNext I'm 2
===================onNext I'm 3 複製代碼

2.2 flatMap()

方法預覽:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
......
複製代碼

有什麼用?

這個方法能夠將事件序列中的元素進行整合加工,返回一個新的被觀察者。

怎麼用?

flatMap() 其實與 map() 相似,可是 flatMap() 返回的是一個 Observerable。如今用一個例子來講明 flatMap() 的用法。

假設一個有一個 Person 類,這個類的定義以下:

public class Person {

    private String name;
    private List<Plan> planList = new ArrayList<>();

    public Person(String name, List<Plan> planList) {
        this.name = name;
        this.planList = planList;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public List<Plan> getPlanList() {
        return planList;
    }

    public void setPlanList(List<Plan> planList) {
        this.planList = planList;
    }

}
複製代碼

Person 類有一個 name 和 planList 兩個變量,分別表明的是人名和計劃清單。

Plan 類的定義以下:

public class Plan {

    private String time;
    private String content;
    private List<String> actionList = new ArrayList<>();

    public Plan(String time, String content) {
        this.time = time;
        this.content = content;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public List<String> getActionList() {
        return actionList;
    }

    public void setActionList(List<String> actionList) {
        this.actionList = actionList;
    }
}
複製代碼

如今有一個需求就是要將 Person 集合中的每一個元素中的 Plan 的 action 打印出來。 首先用 map() 來實現這個需求看看:

Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
    @Override
    public List < Plan > apply(Person person) throws Exception {
        return person.getPlanList();
    }
})
.subscribe(new Observer < List < Plan >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Plan > plans) {
        for (Plan plan: plans) {
            List < String > planActionList = plan.getActionList();
            for (String action: planActionList) {
                Log.d(TAG, "==================action " + action);
            }
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

能夠看到 onNext() 用了嵌套 for 循環來實現,若是代碼邏輯複雜起來的話,可能須要多重循環才能夠實現。

如今看下使用 flatMap() 實現:

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================action: " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

從代碼能夠看出,只須要兩個 flatMap() 就能夠完成需求,而且代碼邏輯很是清晰。

2.3 concatMap()

方法預覽:

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
複製代碼

有什麼用?

concatMap() 和 flatMap() 基本上是同樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的。

怎麼用?

仍是使用上面 flatMap() 的例子來說解,首先來試下 flatMap() 來驗證發送的事件是不是無序的,代碼以下:

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        if ("chan".equals(person.getName())) {
            return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);
        }
        return Observable.fromIterable(person.getPlanList());
    }
})
.subscribe(new Observer < Plan > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Plan plan) {
        Log.d(TAG, "==================plan " + plan.getContent());
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

爲了更好的驗證 flatMap 是無序的,使用了一個 delay() 方法來延遲,直接看打印結果:

05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上課
==================plan chan 寫做業
==================plan chan 打籃球
05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 開會
==================plan Zede 寫代碼
==================plan Zede 寫文章
複製代碼

能夠看到原本 Zede 的事件發送順序是排在 chan 事件以前,可是通過延遲後, 這兩個事件序列發送順序互換了。

如今來驗證下 concatMap() 是不是有序的,使用上面一樣的代碼,只是把 flatMap() 換成 concatMap(),打印結果以下:

05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 開會
==================plan Zede 寫代碼
==================plan Zede 寫文章
==================plan chan 上課
==================plan chan 寫做業
==================plan chan 打籃球
複製代碼

這就表明 concatMap() 轉換後發送的事件序列是有序的了。

2.4 buffer()

方法預覽:

public final Observable<List<T>> buffer(int count, int skip)
......
複製代碼

有什麼用?

從須要發送的事件當中獲取必定數量的事件,並將這些事件放到緩衝區當中一併發出。

怎麼用?

buffer 有兩個參數,一個是 count,另外一個 skip。count 緩衝區元素的數量,skip 就表明緩衝區滿了以後,發送下一次事件序列的時候要跳過多少元素。這樣說可能仍是有點抽象,直接看代碼:

Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Integer > integers) {
        Log.d(TAG, "================緩衝區大小: " + integers.size());
        for (Integer i: integers) {
            Log.d(TAG, "================元素: " + i);
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================緩衝區大小: 2
================元素: 1
================元素: 2
================緩衝區大小: 2
================元素: 2
================元素: 3
================緩衝區大小: 2
================元素: 3
================元素: 4
================緩衝區大小: 2
================元素: 4
================元素: 5
================緩衝區大小: 1
================元素: 5
複製代碼

從結果能夠看出,每次發送事件,指針都會日後移動一個元素再取值,直到指針移動到沒有元素的時候就會中止取值。

2.5 groupBy()

方法預覽:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
複製代碼

有什麼用?

將發送的數據進行分組,每一個分組都會返回一個被觀察者。

怎麼用?

Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 3;
    }
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "====================onSubscribe ");
    }

    @Override
    public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
        Log.d(TAG, "====================onNext ");
        integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "====================GroupedObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "====================GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "====================GroupedObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "====================GroupedObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "====================onComplete ");
    }
});
複製代碼

在 groupBy() 方法返回的參數是分組的名字,每返回一個值,那就表明會建立一個組,以上的代碼就是將1~10的數據分紅3組,來看看打印結果:

05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe     ====================GroupedObservable onNext  groupName: 2 value: 5
====================GroupedObservable onNext  groupName: 2 value: 2
====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 1 value: 4
====================GroupedObservable onNext  groupName: 1 value: 1
====================GroupedObservable onNext  groupName: 0 value: 6
====================GroupedObservable onNext  groupName: 2 value: 8
====================GroupedObservable onNext  groupName: 0 value: 9
====================GroupedObservable onNext  groupName: 1 value: 7
====================GroupedObservable onNext  groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================onComplete 
複製代碼

能夠看到返回的結果中是有3個組的。

2.6 scan()

方法預覽:

public final Observable<T> scan(BiFunction<T, T, T> accumulator)
複製代碼

有什麼用?

將數據以必定的邏輯聚合起來。

怎麼用?

Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        Log.d(TAG, "====================apply ");
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        return integer + integer2;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
複製代碼

打印結果:

05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
====================apply 
====================integer 1
====================integer2 2
====================accept 3
====================apply 
05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
====================integer2 3
====================accept 6
====================apply 
====================integer 6
====================integer2 4
====================accept 10
====================apply 
====================integer 10
====================integer2 5
====================accept 15
複製代碼

2.7 window()

方法預覽:

public final Observable<Observable<T>> window(long count)
......
複製代碼

有什麼用?

發送指定數量的事件時,就將這些事件分爲一組。window 中的 count 的參數就是表明指定的數量,例如將 count 指定爲2,那麼每發2個數據就會將這2個數據分紅一組。

怎麼用?

Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=====================onSubscribe ");
    }

    @Override
    public void onNext(Observable < Integer > integerObservable) {
        integerObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "=====================integerObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "=====================integerObservable onNext " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "=====================integerObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "=====================integerObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=====================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=====================onComplete ");
    }
});
複製代碼

打印結果:

05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 
05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 
05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 5
=====================integerObservable onComplete 
=====================onComplete 
複製代碼

從結果能夠發現,window() 將 1~5 的事件分紅了3組。

3. 組合操做符

3.1 concat()

方法預覽:

public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
複製代碼

有什麼用?

能夠將多個觀察者組合在一塊兒,而後按照以前發送順序發送事件。須要注意的是,concat() 最多隻能夠發送4個事件。

怎麼用?

Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印以下:

05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
複製代碼

3.2 concatArray()

方法預覽:

public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
複製代碼

有什麼用?

與 concat() 做用同樣,不過 concatArray() 能夠發送多於 4 個被觀察者。

怎麼用?

Observable.concatArray(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8),
Observable.just(9, 10))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
================onNext 9
================onNext 10
複製代碼

3.3 merge()

方法預覽:

public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
複製代碼

有什麼用?

這個方法月 concat() 做用基本同樣,知識 concat() 是串行發送事件,而 merge() 並行發送事件。

怎麼用?

如今來演示 concat() 和 merge() 的區別。

Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "A" + aLong;
    }
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
    @Override
    public String apply(Long aLong) throws Exception {
        return "B" + aLong;
    }
}))
    .subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "=====================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果以下:

05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
......
複製代碼

從結果能夠看出,A 和 B 的事件序列均可以發出,將以上的代碼換成 concat() 看看打印結果:

05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
......
複製代碼

從結果能夠知道,只有等到第一個被觀察者發送完事件以後,第二個被觀察者纔會發送事件。

mergeArray() 與 merge() 的做用是同樣的,只是它能夠發送4個以上的被觀察者,這裏就再也不贅述了。

3.4 concatArrayDelayError() & mergeArrayDelayError()

方法預覽:

public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
複製代碼

有什麼用?

在 concatArray() 和 mergeArray() 兩個方法當中,若是其中有一個被觀察者發送了一個 Error 事件,那麼就會中止發送事件,若是你想 onError() 事件延遲到全部被觀察者都發送完事件後再執行的話,就可使用 concatArrayDelayError() 和 mergeArrayDelayError()

怎麼用?

首先使用 concatArray() 來驗證一下發送 onError() 事件是否會中斷其餘被觀察者發送事件,代碼以下:

Observable.concatArray(
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onError(new NumberFormatException());
    }
}), Observable.just(2, 3, 4))
    .subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果:

05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
===================onError 
複製代碼

從結果能夠知道,確實中斷了,如今換用 concatArrayDelayError(),代碼以下:

Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onError(new NumberFormatException());
    }
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {

    }
});
複製代碼

打印結果以下:

05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError 
複製代碼

從結果能夠看到,onError 事件是在全部被觀察者發送完事件才發送的。mergeArrayDelayError() 也是有一樣的做用,這裏再也不贅述。

3.5 zip()

方法預覽:

public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
......
複製代碼

有什麼用?

會將多個被觀察者合併,根據各個被觀察者發送事件的順序一個個結合起來,最終發送的事件數量會與源 Observable 中最少事件的數量同樣。

怎麼用?

Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
    .map(new Function<Long, String>() {
        @Override
        public String apply(Long aLong) throws Exception {
            String s1 = "A" + aLong;
            Log.d(TAG, "===================A 發送的事件 " + s1);
            return s1;
        }}),
        Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
            .map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s2 = "B" + aLong;
                Log.d(TAG, "===================B 發送的事件 " + s2);
                return s2;
            }
        }),
        new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                String res = s + s2;
                return res;
            }
        })
.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "===================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
複製代碼

上面代碼中有兩個 Observable,第一個發送事件的數量爲5個,第二個發送事件的數量爲6個。如今來看下打印結果:

05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A1
05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B1
===================onNext A1B1
05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A2
05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B2
===================onNext A2B2
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A3
05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B3
05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A4
05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B4
05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 發送的事件 A5
05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 發送的事件 B5
05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
===================onComplete 
複製代碼

能夠發現最終接收到的事件數量是5,那麼爲何第二個 Observable 沒有發送第6個事件呢?由於在這以前第一個 Observable 已經發送了 onComplete 事件,因此第二個 Observable 不會再發送事件。

3.6 combineLatest() & combineLatestDelayError()

方法預覽:

public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
....... 
複製代碼

有什麼用?

combineLatest() 的做用與 zip() 相似,可是 combineLatest() 發送事件的序列是與發送的時間線有關的,當 combineLatest() 中全部的 Observable 都發送了事件,只要其中有一個 Observable 發送事件,這個事件就會和其餘 Observable 最近發送的事件結合起來發送,這樣可能仍是比較抽象,看看如下例子代碼。

怎麼用?

Observable.combineLatest(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
    .map(new Function < Long, String > () {@Override
    public String apply(Long aLong) throws Exception {
        String s1 = "A" + aLong;
        Log.d(TAG, "===================A 發送的事件 " + s1);
        return s1;
    }
}),
Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
    .map(new Function < Long, String > () {@Override
    public String apply(Long aLong) throws Exception {
        String s2 = "B" + aLong;
        Log.d(TAG, "===================B 發送的事件 " + s2);
        return s2;
    }
}),
new BiFunction < String, String, String > () {@Override
    public String apply(String s, String s2) throws Exception {
        String res = s + s2;
        return res;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "===================最終接收到的事件 " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
複製代碼

分析上面的代碼,Observable A 會每隔1秒就發送一次事件,Observable B 會隔2秒發送一次事件。來看看打印結果:

05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 
05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A1
05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A2
05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B1
05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A2B1
05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A3
===================最終接收到的事件 A3B1
05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 發送的事件 A4
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B2
05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B1
05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B2
05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B3
05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B3
05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B4
05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B4
05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 發送的事件 B5
05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最終接收到的事件 A4B5
===================onComplete 
複製代碼

分析上述結果能夠知道,當發送 A1 事件以後,由於 B 並無發送任何事件,因此根本不會發生結合。當 B 發送了 B1 事件以後,就會與 A 最近發送的事件 A2 結合成 A2B1,這樣只有後面一有被觀察者發送事件,這個事件就會與其餘被觀察者最近發送的事件結合起來了。

由於 combineLatestDelayError() 就是多了延遲發送 onError() 功能,這裏就再也不贅述了。

3.7 reduce()

方法預覽:

public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
複製代碼

有什麼用?

與 scan() 操做符的做用也是將發送數據以必定邏輯聚合起來,這兩個的區別在於 scan() 每處理一次數據就會將事件發送給觀察者,而 reduce() 會將全部數據聚合在一塊兒纔會發送事件給觀察者。

怎麼用?

Observable.just(0, 1, 2, 3)
.reduce(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        int res = integer + integer2;
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        Log.d(TAG, "====================res " + res);
        return res;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================accept " + integer);
    }
});
複製代碼

打印結果:

05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
====================integer2 1
====================res 1
====================integer 1
====================integer2 2
====================res 3
====================integer 3
====================integer2 3
====================res 6
==================accept 6
複製代碼

從結果能夠看到,其實就是前2個數據聚合以後,而後再與後1個數據進行聚合,一直到沒有數據爲止。

3.8 collect()

方法預覽:

public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
複製代碼

有什麼用?

將數據收集到數據結構當中。

怎麼用?

Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
    @Override
    public ArrayList < Integer > call() throws Exception {
        return new ArrayList < > ();
    }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
    @Override
    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
        integers.add(integer);
    }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
    @Override
    public void accept(ArrayList < Integer > integers) throws Exception {
        Log.d(TAG, "===============accept " + integers);
    }
});
複製代碼

打印結果:

05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]
複製代碼

3.9 startWith() & startWithArray()

方法預覽:

public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
複製代碼

有什麼用?

在發送事件以前追加事件,startWith() 追加一個事件,startWithArray() 能夠追加多個事件。追加的事件會先發出。

怎麼用?

Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});
複製代碼

打印結果:

05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7
複製代碼

3.10 count()

方法預覽:

public final Single<Long> count()
複製代碼

有什麼用?

返回被觀察者發送事件的數量。

怎麼用?

Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "=======================aLong " + aLong);
    }
});
複製代碼

打印結果:

05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3
複製代碼

4. 功能操做符

4.1 delay()

方法預覽:

public final Observable<T> delay(long delay, TimeUnit unit)
複製代碼

有什麼用?

延遲一段事件發送事件。

怎麼用?

Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "=======================onSubscribe");
    }
});
複製代碼

這裏延遲了兩秒才發送事件,來看看打印結果:

05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe
05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1
05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2
=======================onNext 3
=======================onSubscribe
複製代碼

從打印結果能夠看出 onSubscribe 回調2秒以後 onNext 纔會回調。

4.2 doOnEach()

方法預覽:

public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
複製代碼

有什麼用?

Observable 每發送一件事件以前都會先回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        //      e.onError(new NumberFormatException());
        e.onComplete();
    }
})
.doOnEach(new Consumer < Notification < Integer >> () {
    @Override
    public void accept(Notification < Integer > integerNotification) throws Exception {
        Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe 
==================doOnEach 1
==================onNext 1
==================doOnEach 2
==================onNext 2
==================doOnEach 3
==================onNext 3
==================doOnEach null
==================onComplete 
複製代碼

從結果就能夠看出每發送一個事件以前都會回調 doOnEach 方法,而且能夠取出 onNext() 發送的值。

4.3 doOnNext()

方法預覽:

public final Observable<T> doOnNext(Consumer<? super T> onNext)
複製代碼

有什麼用?

Observable 每發送 onNext() 以前都會先回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnNext(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================doOnNext " + integer);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe 
==================doOnNext 1
==================onNext 1
==================doOnNext 2
==================onNext 2
==================doOnNext 3
==================onNext 3
==================onComplete 
複製代碼

4.4 doAfterNext()

方法預覽:

public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
複製代碼

有什麼用?

Observable 每發送 onNext() 以後都會回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doAfterNext(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "==================doAfterNext " + integer);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});

複製代碼

打印結果:

05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================doAfterNext 1
==================onNext 2
==================doAfterNext 2
==================onNext 3
==================doAfterNext 3
==================onComplete 
複製代碼

4.5 doOnComplete()

方法預覽:

public final Observable<T> doOnComplete(Action onComplete)
複製代碼

有什麼用?

Observable 每發送 onComplete() 以前都會回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnComplete ");
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnComplete 
==================onComplete 
複製代碼

4.6 doOnError()

方法預覽:

public final Observable<T> doOnError(Consumer<? super Throwable> onError)
複製代碼

有什麼用?

Observable 每發送 onError() 以前都會回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.doOnError(new Consumer < Throwable > () {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "==================doOnError " + throwable);
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================doOnError java.lang.NullPointerException
==================onError 

複製代碼

4.7 doOnSubscribe()

方法預覽:

public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
複製代碼

有什麼用?

Observable 每發送 onSubscribe() 以前都會回調這個方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "==================doOnSubscribe ");
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe 
==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 
複製代碼

4.8 doOnDispose()

方法預覽:

public final Observable<T> doOnDispose(Action onDispose)
複製代碼

有什麼用?

當調用 Disposable 的 dispose() 以後回調該方法。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnDispose ");
    }
})
.subscribe(new Observer < Integer > () {
    private Disposable d;
    
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================doOnDispose 
複製代碼

4.9 doOnLifecycle()

方法預覽:

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
複製代碼

有什麼用?

在回調 onSubscribe 以前回調該方法的第一個參數的回調方法,可使用該回調方法決定是否取消訂閱。

怎麼用?

doOnLifecycle() 第二個參數的回調方法的做用與 doOnDispose() 是同樣的,如今用下面的例子來說解:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doOnLifecycle(new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "==================doOnLifecycle accept");
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnLifecycle Action");
    }
})
.doOnDispose(
    new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnDispose Action");
        }
})
.subscribe(new Observer<Integer>() {
    private Disposable d;
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
    
});
複製代碼

打印結果:

05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept
==================onSubscribe 
==================onNext 1
==================doOnDispose Action
==================doOnLifecycle Action
複製代碼

能夠看到當在 onNext() 方法進行取消訂閱操做後,doOnDispose() 和 doOnLifecycle() 都會被回調。

若是使用 doOnLifecycle 進行取消訂閱,來看看打印結果:

05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept
==================onSubscribe 
複製代碼

能夠發現 doOnDispose Action 和 doOnLifecycle Action 都沒有被回調。

4.10 doOnTerminate() & doAfterTerminate()

方法預覽:

public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)
複製代碼

有什麼用?

doOnTerminate 是在 onError 或者 onComplete 發送以前回調,而 doAfterTerminate 則是 onError 或者 onComplete 發送以後回調。

怎麼用?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
//      e.onError(new NullPointerException());
        e.onComplete();
    }
})
.doOnTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnTerminate ");
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
    
});
複製代碼

打印結果:

05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3
==================doOnTerminate 
==================onComplete 
複製代碼

doAfterTerminate 也是差很少,這裏就再也不贅述。

4.11 doFinally()

方法預覽:

public final Observable<T> doFinally(Action onFinally)
複製代碼

有什麼用?

在全部事件發送完畢以後回調該方法。

怎麼用?

這裏可能你會有個問題,那就是 doFinally() 和 doAfterTerminate() 到底有什麼區別?區別就是在於取消訂閱,若是取消訂閱以後 doAfterTerminate() 就不會被回調,而 doFinally() 不管怎麼樣都會被回調,且都會在事件序列的最後。

如今用如下例子說明下:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.doFinally(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doFinally ");
    }
})
.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doOnDispose ");
    }
})
.doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "==================doAfterTerminate ");
    }
})
.subscribe(new Observer<Integer>() {
    private Disposable d;
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
        this.d = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
        d.dispose();
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe 
05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1
==================doOnDispose 
==================doFinally 
複製代碼

能夠看到若是調用了 dispose() 方法,doAfterTerminate() 不會被回調。

如今試試把 dispose() 註釋掉看看,看看打印結果:

05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 
==================doAfterTerminate 
==================doFinally 
複製代碼

doAfterTerminate() 已經成功回調,doFinally() 仍是會在事件序列的最後。

4.12 onErrorReturn()

方法預覽:

public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
複製代碼

有什麼用?

當接受到一個 onError() 事件以後回調,返回的值會回調 onNext() 方法,並正常結束該事件序列。

怎麼用?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorReturn(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorReturn " + throwable);
        return 404;
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 18:35:18.175 19239-19239/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorReturn java.lang.NullPointerException
==================onNext 404
==================onComplete 
複製代碼

4.13 onErrorResumeNext()

方法預覽:

public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
複製代碼

有什麼用?

當接收到 onError() 事件時,返回一個新的 Observable,並正常結束事件序列。

怎麼用?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
        Log.d(TAG, "==================onErrorResumeNext " + throwable);
        return Observable.just(4, 5, 6);
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 18:43:10.910 26469-26469/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onErrorResumeNext java.lang.NullPointerException
==================onNext 4
==================onNext 5
==================onNext 6
==================onComplete 
複製代碼

4.14 onExceptionResumeNext()

方法預覽:

public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)
複製代碼

有什麼用?

與 onErrorResumeNext() 做用基本一致,可是這個方法只能捕捉 Exception。

怎麼用?

先來試試 onExceptionResumeNext() 是否能捕捉 Error。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Error("404"));
    }
})
.onExceptionResumeNext(new Observable<Integer>() {
    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observer.onNext(333);
        observer.onComplete();
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 22:23:08.873 1062-1062/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:23:08.874 1062-1062/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onError 
複製代碼

從打印結果能夠知道,觀察者收到 onError() 事件,證實 onErrorResumeNext() 不能捕捉 Error 事件。

將被觀察者的 e.onError(new Error("404")) 改成 e.onError(new Exception("404")),如今看看是否能捕捉 Exception 事件:

05-23 22:32:14.563 10487-10487/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 333
==================onComplete 
複製代碼

從打印結果能夠知道,這個方法成功捕獲 Exception 事件。

4.15 retry()

方法預覽:

public final Observable<T> retry(long times)
......
複製代碼

有什麼用?

若是出現錯誤事件,則會從新發送全部事件序列。times 是表明從新發的次數。

怎麼用?

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retry(2)
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 1
==================onNext 2
==================onNext 3
==================onError 
複製代碼

4.16 retryUntil()

方法預覽:

public final Observable<T> retryUntil(final BooleanSupplier stop)
複製代碼

有什麼用?

出現錯誤事件以後,能夠經過此方法判斷是否繼續發送事件。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retryUntil(new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
        if (i == 6) {
            return true;
        }
        return false;
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-23 22:57:32.905 23063-23063/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
05-23 22:57:32.906 23063-23063/com.example.louder.rxjavademo D/chan: ==================onNext 1
==================onNext 2
==================onNext 3
==================onError 

複製代碼

4.17 retryWhen()

方法預覽:

public final void safeSubscribe(Observer<? super T> s)
複製代碼

有什麼用?

當被觀察者接收到異常或者錯誤事件時會回調該方法,這個方法會返回一個新的被觀察者。若是返回的被觀察者發送 Error 事件則以前的被觀察者不會繼續發送事件,若是發送正常事件則以前的被觀察者會繼續不斷重試發送事件。

怎麼用?

Observable.create(new ObservableOnSubscribe < String > () {
    @Override
    public void subscribe(ObservableEmitter < String > e) throws Exception {
        e.onNext("chan");
        e.onNext("ze");
        e.onNext("de");
        e.onError(new Exception("404"));
        e.onNext("haha");
    }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                    return Observable.just("能夠忽略的異常");
                } else {
                    return Observable.error(new Throwable("終止啦"));
                }
            }
        });
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ==================onSubscribe 
05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ==================onNext chan
==================onNext ze
==================onNext de
05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ==================onError java.lang.Throwable: 終止啦
複製代碼

將 onError(new Exception("404")) 改成 onError(new Exception("303")) 看看打印結果:

==================onNext chan
05-24 09:54:08.653 29694-29694/? D/chan: ==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
==================onNext ze
==================onNext de
==================onNext chan
......
複製代碼

從結果能夠看出,會不斷重複發送消息。

4.18 repeat()

方法預覽:

public final Observable<T> repeat(long times)
......
複製代碼

有什麼用?

重複發送被觀察者的事件,times 爲發送次數。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.repeat(2)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
複製代碼

打印結果:

05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onSubscribe 
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 1
===================onNext 2
===================onNext 3
05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onComplete 
複製代碼

從結果能夠看出,該事件發送了兩次。

4.19 repeatWhen()

方法預覽:

public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
複製代碼

有什麼用?

這個方法能夠會返回一個新的被觀察者設定必定邏輯來決定是否重複發送事件。

怎麼用?

這裏分三種狀況,若是新的被觀察者返回 onComplete 或者 onError 事件,則舊的被觀察者不會繼續發送事件。若是被觀察者返回其餘事件,則會重複發送事件。

如今試驗發送 onComplete 事件,代碼以下:

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {
        return Observable.empty();
    //  return Observable.error(new Exception("404"));
    //  return Observable.just(4); null;
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
複製代碼

打印結果:

05-24 11:44:33.486 9379-9379/com.example.rxjavademo D/chan: ===================onSubscribe 
05-24 11:44:33.487 9379-9379/com.example.rxjavademo D/chan: ===================onComplete 
複製代碼

下面直接看看發送 onError 事件和其餘事件的打印結果。

發送 onError 打印結果:

05-24 11:46:29.507 9561-9561/com.example.rxjavademo D/chan: ===================onSubscribe 
05-24 11:46:29.508 9561-9561/com.example.rxjavademo D/chan: ===================onError 
複製代碼

發送其餘事件的打印結果:

05-24 11:48:35.844 9752-9752/com.example.rxjavademo D/chan: ===================onSubscribe 
===================onNext 1
===================onNext 2
===================onNext 3
===================onComplete 
複製代碼

4.20 subscribeOn()

方法預覽:

public final Observable<T> subscribeOn(Scheduler scheduler)
複製代碼

有什麼用?

指定被觀察者的線程,要注意的時,若是屢次調用此方法,只有第一次有效。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
//.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
複製代碼

如今不調用 subscribeOn() 方法,來看看打印結果:

05-26 10:40:42.246 21466-21466/? D/chan: ======================onSubscribe
05-26 10:40:42.247 21466-21466/? D/chan: =========================currentThread name: main
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
複製代碼

能夠看到打印被觀察者的線程名字是主線程。

接着調用 subscribeOn(Schedulers.newThread()) 來看看打印結果:

05-26 10:43:26.964 22530-22530/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:43:26.966 22530-22569/com.example.rxjavademo D/chan: =========================currentThread name: RxNewThreadScheduler-1
05-26 10:43:26.967 22530-22569/com.example.rxjavademo D/chan: ======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
複製代碼

能夠看到打印結果被觀察者是在一條新的線程。

如今看看屢次調用會不會有效,代碼以下:

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer < Integer > () {@Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
複製代碼

打印結果:

05-26 10:47:20.925 23590-23590/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:47:20.930 23590-23629/com.example.rxjavademo D/chan: =========================currentThread name: RxComputationThreadPool-1
======================onNext 1
======================onNext 2
======================onNext 3
======================onComplete
複製代碼

能夠看到第二次調動的 subscribeOn(Schedulers.newThread()) 並無效果。

4.21 observeOn()

方法預覽:

public final Observable<T> observeOn(Scheduler scheduler)
複製代碼

有什麼用?

指定觀察者的線程,每指定一次就會生效一次。

怎麼用?

Observable.just(1, 2, 3)
.observeOn(Schedulers.newThread())
.flatMap(new Function < Integer, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Integer integer) throws Exception {
        Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
        return Observable.just("chan" + integer);
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
        Log.d(TAG, "======================onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});
複製代碼

打印結果:

05-26 10:58:04.593 25717-25717/com.example.rxjavademo D/chan: ======================onSubscribe
05-26 10:58:04.594 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.595 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
======================flatMap Thread name RxNewThreadScheduler-1
05-26 10:58:04.617 25717-25717/com.example.rxjavademo D/chan: ======================onNext Thread name main
======================onNext chan1
======================onNext Thread name main
======================onNext chan2
======================onNext Thread name main
======================onNext chan3
05-26 10:58:04.618 25717-25717/com.example.rxjavademo D/chan: ======================onComplete
複製代碼

從打印結果能夠知道,observeOn 成功切換了線程。

下表總結了 RxJava 中的調度器:

調度器 做用
Schedulers.computation( ) 用於使用計算任務,如事件循環和回調處理
Schedulers.immediate( ) 當前線程
Schedulers.io( ) 用於 IO 密集型任務,若是異步阻塞 IO 操做。
Schedulers.newThread( ) 建立一個新的線程
AndroidSchedulers.mainThread() Android 的 UI 線程,用於操做 UI。

5. 過濾操做符

5.1 filter()

方法預覽:

public final Observable<T> filter(Predicate<? super T> predicate)
複製代碼

有什麼用?

經過必定邏輯來過濾被觀察者發送的事件,若是返回 true 則會發送事件,不然不會發送。

怎麼用?

Observable.just(1, 2, 3)
    .filter(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 2;
        }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

以上代碼只有小於2的事件纔會發送,來看看打印結果:

05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onComplete 
複製代碼

5.2 ofType()

方法預覽:

public final <U> Observable<U> ofType(final Class<U> clazz)
複製代碼

有什麼用?

能夠過濾不符合該類型事件

怎麼用?

Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 23:04:24.752 13229-13229/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
05-24 23:04:24.753 13229-13229/? D/chan: ==================onComplete 
複製代碼

5.3 skip()

方法預覽:

public final Observable<T> skip(long count)
.......
複製代碼

有什麼用?

跳過正序某些事件,count 表明跳過事件的數量

怎麼用?

Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 23:13:50.448 13831-13831/? D/chan: ==================onSubscribe 
05-24 23:13:50.449 13831-13831/? D/chan: ==================onNext 3
==================onComplete 
複製代碼

skipLast() 做用也是跳過某些事件,不過它是用來跳過正序的後面的事件,這裏就再也不講解了。

5.4 distinct()

方法預覽:

public final Observable<T> distinct() 
複製代碼

有什麼用?

過濾事件序列中的重複事件。

怎麼用?

Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 23:19:44.334 14206-14206/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 
複製代碼

5.5 distinctUntilChanged()

方法預覽:

public final Observable<T> distinctUntilChanged()
複製代碼

有什麼用?

過濾掉連續重複的事件

怎麼用?

Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 23:22:35.985 14424-14424/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onNext 2
==================onNext 1
==================onComplete 
複製代碼

由於事件序列中連續出現兩次3,因此第二次3並不會發出。

5.6 take()

方法預覽:

public final Observable<T> take(long count)
......
複製代碼

有什麼用?

控制觀察者接收的事件的數量。

怎麼用?

Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "==================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete ");
    }
});
複製代碼

打印結果:

05-24 23:28:32.899 14704-14704/? D/chan: ==================onSubscribe 
==================onNext 1
==================onNext 2
==================onNext 3
==================onComplete 
複製代碼

takeLast() 的做用就是控制觀察者只能接受事件序列的後面幾件事情,這裏就再也不講解了,你們能夠本身試試。

5.7 debounce()

方法預覽:

public final Observable<T> debounce(long timeout, TimeUnit unit)
......
複製代碼

有什麼用?

若是兩件事件發送的時間間隔小於設定的時間間隔則前一件事件就不會發送給觀察者。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onNext(1);
        Thread.sleep(900);
        e.onNext(2);
    }
})
.debounce(1, TimeUnit.SECONDS)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "===================onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "===================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "===================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "===================onComplete ");
    }
});
複製代碼

打印結果:

05-25 20:39:10.512 17441-17441/com.example.rxjavademo D/chan: ===================onSubscribe 
05-25 20:39:12.413 17441-17478/com.example.rxjavademo D/chan: ===================onNext 2
複製代碼

能夠看到事件1並無發送出去,如今將間隔時間改成1000,看看打印結果:

05-25 20:42:10.874 18196-18196/com.example.rxjavademo D/chan: ===================onSubscribe 
05-25 20:42:11.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 1
05-25 20:42:12.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 2
複製代碼

throttleWithTimeout() 與此方法的做用同樣,這裏就再也不贅述了。

5.8 firstElement() && lastElement()

方法預覽:
public final Maybe<T> firstElement()
public final Maybe<T> lastElement()
複製代碼

有什麼用?

firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最後一個元素。

怎麼用?

Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================firstElement " + integer);
    }
});

Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================lastElement " + integer);
    }
});
複製代碼

打印結果:

05-25 20:47:22.189 19909-19909/? D/chan: ====================firstElement 1
====================lastElement 4
複製代碼

5.9 elementAt() & elementAtOrError()

方法預覽:

public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)
複製代碼

有什麼用?

elementAt() 能夠指定取出事件序列中事件,可是輸入的 index 超出事件序列的總數的話就不會出現任何結果。這種狀況下,你想發出異常信息的話就用 elementAtOrError() 。

怎麼用?

Observable.just(1, 2, 3, 4)
.elementAt(0)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
複製代碼

打印結果:

05-25 20:56:22.266 23346-23346/com.example.rxjavademo D/chan: ====================accept 1
複製代碼

將 elementAt() 的值改成5,這時是沒有打印結果的,由於沒有知足條件的元素。

替換 elementAt() 爲 elementAtOrError(),代碼以下:

Observable.just(1, 2, 3, 4)
.elementAtOrError(5)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
複製代碼

打印結果:

io.reactivex.exceptions.OnErrorNotImplementedException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)
at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
at io.reactivex.Observable.subscribe(Observable.java: 10903)
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
at io.reactivex.Single.subscribe(Single.java: 2707)
at io.reactivex.Single.subscribe(Single.java: 2693)
at io.reactivex.Single.subscribe(Single.java: 2664)
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
at android.app.Activity.performCreate(Activity.java: 6942)
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
at android.app.ActivityThread. - wrap14(ActivityThread.java)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
at android.os.Handler.dispatchMessage(Handler.java: 102)
at android.os.Looper.loop(Looper.java: 154)
at android.app.ActivityThread.main(ActivityThread.java: 6682)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
Caused by: java.util.NoSuchElementException
at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117) 
at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110) 
at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36) 
at io.reactivex.Observable.subscribe(Observable.java: 10903) 
at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37) 
at io.reactivex.Single.subscribe(Single.java: 2707) 
at io.reactivex.Single.subscribe(Single.java: 2693) 
at io.reactivex.Single.subscribe(Single.java: 2664) 
at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103) 
at android.app.Activity.performCreate(Activity.java: 6942) 
at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126) 
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880) 
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988) 
at android.app.ActivityThread. - wrap14(ActivityThread.java) 
at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631) 
at android.os.Handler.dispatchMessage(Handler.java: 102) 
at android.os.Looper.loop(Looper.java: 154) 
at android.app.ActivityThread.main(ActivityThread.java: 6682) 
at java.lang.reflect.Method.invoke(Native Method) 
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520) 
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410) 
複製代碼

這時候會拋出 NoSuchElementException 異常。

6. 條件操做符

6.1 all()

方法預覽:

public final Observable<T> ambWith(ObservableSource<? extends T> other)
複製代碼

有什麼用?

判斷事件序列是否所有知足某個事件,若是都知足則返回 true,反之則返回 false。

怎麼用?

Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 5;
    }
})
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "==================aBoolean " + aBoolean);
    }
});
複製代碼

打印結果:

05-26 09:39:51.644 1482-1482/com.example.rxjavademo D/chan: ==================aBoolean true
複製代碼

6.2 takeWhile()

方法預覽:

public final Observable<T> takeWhile(Predicate<? super T> predicate)
複製代碼

有什麼用?

能夠設置條件,當某個數據知足條件時就會發送該數據,反之則不發送。

怎麼用?

Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});
複製代碼

打印結果:

05-26 09:43:14.634 3648-3648/com.example.rxjavademo D/chan: ========================integer 1
========================integer 2
複製代碼

6.3 skipWhile()

方法預覽:

public final Observable<T> skipWhile(Predicate<? super T> predicate)
複製代碼

有什麼用?

能夠設置條件,當某個數據知足條件時不發送該數據,反之則發送。

怎麼用?

Observable.just(1, 2, 3, 4)
.skipWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});
複製代碼

打印結果:

05-26 09:47:32.653 4861-4861/com.example.rxjavademo D/chan: ========================integer 3
========================integer 4
複製代碼

6.4 takeUntil()

方法預覽:

public final Observable<T> takeUntil(Predicate<? super T> stopPredicate
複製代碼

有什麼用?

能夠設置條件,當事件知足此條件時,下一次的事件就不會被髮送了。

怎麼用?

Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer > 3;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================integer " + integer);
    }
});
複製代碼

打印結果:

05-26 09:55:12.918 7933-7933/com.example.rxjavademo D/chan: ========================integer 1
========================integer 2
05-26 09:55:12.919 7933-7933/com.example.rxjavademo D/chan: ========================integer 3
========================integer 4
複製代碼

6.5 skipUntil()

方法預覽:

public final <U> Observable<T> skipUntil(ObservableSource<U> other)
複製代碼

有什麼用?

當 skipUntil() 中的 Observable 發送事件了,原來的 Observable 纔會發送事件給觀察者。

怎麼用?

Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "========================onSubscribe ");
    }

    @Override
    public void onNext(Long along) {
        Log.d(TAG, "========================onNext " + along);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "========================onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "========================onComplete ");
    }
});
複製代碼

打印結果:

05-26 10:08:50.574 13023-13023/com.example.rxjavademo D/chan: ========================onSubscribe 
05-26 10:08:53.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 4
05-26 10:08:54.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 5
========================onComplete 
複製代碼

從結果能夠看出,skipUntil() 裏的 Observable 並不會發送事件給觀察者。

6.6 sequenceEqual()

方法預覽:

public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
......
複製代碼

有什麼用?

判斷兩個 Observable 發送的事件是否相同。

怎麼用?

Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});
複製代碼

打印結果:

05-26 10:11:45.975 14157-14157/? D/chan: ========================onNext true
複製代碼

6.7 contains()

方法預覽:

public final Single<Boolean> contains(final Object element)
複製代碼

有什麼用?

判斷事件序列中是否含有某個元素,若是有則返回 true,若是沒有則返回 false。

怎麼用?

Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});
複製代碼

打印結果:

05-26 10:14:23.522 15085-15085/com.example.rxjavademo D/chan: ========================onNext true
複製代碼

6.8 isEmpty()

方法預覽:

public final Single<Boolean> isEmpty()
複製代碼

有什麼用?

判斷事件序列是否爲空。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.isEmpty()
.subscribe(new Consumer < Boolean > () {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "========================onNext " + aBoolean);
    }
});
複製代碼

打印結果:

05-26 10:17:16.725 16109-16109/com.example.rxjavademo D/chan: ========================onNext true
複製代碼

6.9 amb()

方法預覽:

public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
複製代碼

有什麼用?

amb() 要傳入一個 Observable 集合,可是隻會發送最早發送事件的 Observable 中的事件,其他 Observable 將會被丟棄。

怎麼用?

ArrayList < Observable < Long >> list = new ArrayList < > ();

list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));

Observable.amb(list)
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "========================aLong " + aLong);
    }
});
複製代碼

打印結果:

05-26 10:21:29.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 6
05-26 10:21:30.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 7
05-26 10:21:31.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 8
05-26 10:21:32.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 9
05-26 10:21:33.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 10
複製代碼

6.10 defaultIfEmpty()

方法預覽:

public final Observable<T> defaultIfEmpty(T defaultItem)
複製代碼

有什麼用?

若是觀察者只發送一個 onComplete() 事件,則能夠利用這個方法發送一個值。

怎麼用?

Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "========================onNext " + integer);
    }
});
複製代碼

打印結果:

05-26 10:26:56.376 19249-19249/com.example.rxjavademo D/chan: ========================onNext 666
複製代碼

RxJava 常見的使用方式都已經介紹的差很少,相信你們若是都掌握這些操做符的用法的話,那麼使用 RxJava 將不會再是難題了。

歡迎關注微信公衆號
相關文章
相關標籤/搜索