RxJava(三):建立操做符

博客主頁java

RxJava 的建立操做符主要包括以下內容:react

  • create —— 使用一個函數從頭建立一個 Observable
  • defer —— 只有當訂閱者訂閱才建立 Observable ,爲每一個訂閱建立一個新的 Observable
  • empty —— 建立一個什麼都不作直接通知完成的 Observable
  • never —— 建立一個不發射任何數據 Observable
  • error —— 建立一個什麼都不作直接通知錯誤的 Observable
  • from —— 將一個 Iterable、一個 Future 或者一個數組轉換成 Observable
  • interval —— 建立一個按照給定的時間間隔發射整數序列的 Observable
  • just —— 將一個或多個對象轉換成發射這個或這些對象的一個 Observable
  • range —— 建立一個發射指定範圍的整數序列的 Observable
  • timer —— 建立一個在給定的延時以後發射單個數據的 Observable
  • repeat —— 建立一個發射特定數據重複屢次的 Observable

1. create、just 和 from

1.1 create

使用一個函數從頭開始建立一個 Observable
segmentfault

咱們可使用 create 操做符從頭開始建立一個 Observable 給這個操做符傳遞一個接受觀察者做爲參數的函數,編寫這個函數讓它的行爲表現爲一個 Observable ——恰當地調用觀察者 onNext、onError、onComplete 方法。一個形式正確的有限 Observable 必須嘗試調用觀察者 onComplete() 一次或者它的 onError() 一次,並且此後不能再調用觀察者的任何其餘方法。數組

RxJava 建議咱們在傳遞給 create 方法的函數時,先檢查一下觀察者的 isDisposed 狀態,以便在沒有觀察者的時候,讓咱們的 Observable 中止發射數據,防止運行昂貴的運算。緩存

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        try {
            if (!emitter.isDisposed()) {
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        } catch (Exception e) {
            emitter.onError(e);
        }
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next-> " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Complete.

1.2 just

建立一個發射指定值的 Observable

just 將單個數據轉換爲發射這個單個數據的 Observable數據結構

Observable.just("Observable#just")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: Observable#just

just 相似於 from,可是 from 會將數組或 Iterable 的數據取出而後逐個發射,而 just 只是簡單地原樣發射,將數組或 Iterable 看成單個數據。app

它能夠接受一至十個參數,返回一個按參數列表順序發射這些數據的 Observableide

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error-> " + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 執行結果
 Next: 1
 Next: 2
 Next: 3
 Next: 4
 Next: 5
 Next: 6
 Next: 7
 Next: 8
 Next: 9
 Next: 10
 Complete.

1.3 from

from 能夠將其餘種類的對象和數據類型轉換爲 Observable

當咱們使用 Observable 時,若是要處理的數據均可以轉換成 Observables ,而不是須要混合使用 Observables 和其餘類型的數據,會很是方便。這讓咱們在數據流的整個生命週期中,可使用一組統一的操做符來管理它們。函數

例如, Iterable 能夠當作同步的 Observable; Future 能夠當作老是隻發射單個數據的 Observable。經過顯式地將那些數據轉換爲 Observables ,咱們能夠像使用 Observable 樣與它們交互。spa

所以,大部分 ReactiveX 實現都提供了將特定語言的對象和數據結構轉換爲 Observables 的方法。

RxJava 中, from 操做符能夠將 Future、Iterable 和數組轉換成 Obseruable 。對於 Iterable
和數組,產生的 Observable 會發射 Iterable 或數組的每一項數據。

fromArray

Observable.fromArray("Observable", "fromArray")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: Observable
 Next: fromArray

fromIterable

List<Integer> items = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}

Observable.fromIterable(items)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error-> " + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

// 執行結果
 Next: 0
 Next: 1
 Next: 2
 Next: 3
 Next: 4
 Complete.

Future ,它會發射 Future.get() 方法返回的單個數據。

ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());

Observable.fromFuture(future)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

private class MyCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Log.d(TAG, "模擬一些耗時的任務...");
        Thread.sleep(2000L);
        return "OK";
    }
}

// 執行結果
15:54:06.832  模擬一些耗時的任務...
15:54:08.833  Next: OK

fromFuture 方法有一個可接受兩個可選參數的版本,分別指定超時時長和時間單位。若是過了指定的時長, Future 尚未返回一個值,那麼這個 Observable 就會發射錯誤通知井終止。 下面的代碼,把超時時間設置爲 1s, Observable.fromFuture(future, 1, TimeUnit.SECONDS),執行結果以下:

16:00:44.260  模擬一些耗時的任務...
16:00:45.367  io.reactivex.exceptions.OnErrorNotImplementedException

2. repeat

建立一個發射特定數據重複屢次的 Observable

repeat 會重複地發射數據。某些實現容許咱們重複發射某個數據序列,還有一些容許咱們限制重複的次數。

repeat 不是建立一個 Observable,而是重複發射原始 Observable 的數據序列,這個序列或者是無限的,或者是經過 repeat(n) 指定的重複次數。

Observable.just("hello, repeat")
        .repeat(3) // 重複3次
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next-> " + s);
            }
        });

在RxJava 2.x 中還有兩個 repeat 相關的操做符: repeatWhen 和 repeatUntil

2.1 repeatWhen

repeatWhen 不是緩存和重放原始 Observable 的數據序列,而是有條件地從新訂閱和發射原來的 Observable

將原始 Observable 終止通知(完成或錯誤)看成一個 void 數據傳遞給一個通知處理器,以此來決定是否要從新訂閱和發射原來的 Observable。這個通知處理器就像一個 Observable 操做符,接受一個發射 void 通知的 Observable 做爲輸入,返回一個發射 void 數據(意思是,從新訂閱和發射原始 Observable )或者直接終止(即便用 repeatWhen 終止發射數據)的 Observable

Observable.range(0, 5)
        .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                return Observable.timer(10, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next-> " + integer);
    }
});

// 執行結果
16:35:18.627  Next-> 0
16:35:18.627  Next-> 1
16:35:18.627  Next-> 2
16:35:18.627  Next-> 3
16:35:18.627  Next-> 4
16:35:28.627  Next-> 0
16:35:28.627  Next-> 1
16:35:28.627  Next-> 2
16:35:28.627  Next-> 3
16:35:28.627  Next-> 4

會先發射 0 到 4 這5個數據 ,因爲使用了 repeatWhen 操做符,所以在 10s 以後還會再發射一次這些數據。

2.2 repeatUntil

repeatUntil 是 RxJava 2.x 新增的操做符,表示直到某個條件就再也不重複發射數據。當 BooleanSupplier的 getAsBoolean() 返回 false 時,表示重複發射上游的 Observable,當 getAsBoolean() 爲 true 時,表示停止重複發射上游的 Observable

final long startTimeMillis = System.currentTimeMillis();
Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(5)
        .repeatUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return System.currentTimeMillis() - startTimeMillis > 5000;
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "Next-> " + aLong);
    }
});

// 執行結果
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4
 Next-> 0
 Next-> 1
 Next-> 2
 Next-> 3
 Next-> 4

執行的結果裏打印了兩遍 0 到 4 ,之因此再也不打印第三遍是由於符合了 System.currentTimeMillis() - startTimeMillis > 5000 這個條件。

3. defer、interval 和 timer

3.1 defer

直到有觀察者訂閱時才建立 Observable ,而且爲每一個觀察者建立一個全新的 Observable

defer 操做符會一直等待直到有觀察者訂閱它,而後它使用 Observable 工廠方法生成一個 Observable。它對每一個觀察者都這樣作,所以儘管每一個訂閱者都覺得本身訂閱的是同一個 Observable ,但事實上每一個訂閱者獲取的是它們本身單獨的數據序列。

在某些狀況下,直到最後一分鐘(訂閱發生時)才生成 Observable ,以確保 Observable 包含最新的數據。

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
    @Override
    public ObservableSource<? extends String> call() throws Exception {
        return Observable.just("hello, defer");
    }
});

observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next: " + s);
    }
});

// 執行結果
 Next: hello, defer

3.2 interval

建立一個按固定時間間隔發射整數序列的 Observable

interval 操做符返回一個 Observable ,它按固定的時間間隔發射一個無限遞增的整數序列。

interval 接受一個表示時間間隔的參數和一個表示時間單位的參數。 interval 默認在 computation 調度器上執行

Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 執行結果
 Next: 0
 Next: 1
 Next: 2
 Next: 3
 ......

每隔 1秒 打印一個數字。

3.3 timer

建立一個 Observable, 它在一個給定的延遲後發射一個特殊的值

timer 操做符建立一個在給定的時間段以後返回一個特殊值的 Observable

timer 返回一個 Observable,它在延遲一段給定的時間後發射一個簡單的數字 0。 timer 操做符默認在 computation 調度器上執行。

Observable.timer(2, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                // 2秒後打印
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 執行結果
 Next: 0

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索