博客主頁java
RxJava 的建立操做符主要包括以下內容:react
使用一個函數從頭開始建立一個 Observable segmentfault
咱們可使用 create 操做符從頭開始建立一個 Observable 給這個操做符傳遞一個接受觀察者做爲參數的函數,編寫這個函數讓它的行爲表現爲一個 Observable ——恰當地調用觀察者 onNext、onError、onComplete 方法。一個形式正確的有限 Observable 必須嘗試調用觀察者 onComplete() 一次或者它的 onError() 一次,並且此後不能再調用觀察者的任何其餘方法。數組
RxJava 建議咱們在傳遞給 create 方法的函數時,先檢查一下觀察者的 isDisposed 狀態,以便在沒有觀察者的時候,讓咱們的 Observable 中止發射數據,防止運行昂貴的運算。緩存
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { try { if (!emitter.isDisposed()) { for (int i = 0; i < 5; i++) { emitter.onNext(i); } emitter.onComplete(); } } catch (Exception e) { emitter.onError(e); } } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error-> " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Complete.
建立一個發射指定值的 Observable
just 將單個數據轉換爲發射這個單個數據的 Observable數據結構
Observable.just("Observable#just") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: Observable#just
just 相似於 from,可是 from 會將數組或 Iterable 的數據取出而後逐個發射,而 just 只是簡單地原樣發射,將數組或 Iterable 看成單個數據。app
它能夠接受一至十個參數,返回一個按參數列表順序發射這些數據的 Observableide
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error-> " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: 1 Next: 2 Next: 3 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9 Next: 10 Complete.
from 能夠將其餘種類的對象和數據類型轉換爲 Observable
當咱們使用 Observable 時,若是要處理的數據均可以轉換成 Observables ,而不是須要混合使用 Observables 和其餘類型的數據,會很是方便。這讓咱們在數據流的整個生命週期中,可使用一組統一的操做符來管理它們。函數
例如, Iterable 能夠當作同步的 Observable; Future 能夠當作老是隻發射單個數據的 Observable。經過顯式地將那些數據轉換爲 Observables ,咱們能夠像使用 Observable 樣與它們交互。spa
所以,大部分 ReactiveX 實現都提供了將特定語言的對象和數據結構轉換爲 Observables 的方法。
RxJava 中, from 操做符能夠將 Future、Iterable 和數組轉換成 Obseruable 。對於 Iterable
和數組,產生的 Observable 會發射 Iterable 或數組的每一項數據。
fromArray
Observable.fromArray("Observable", "fromArray") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: Observable Next: fromArray
fromIterable
List<Integer> items = new ArrayList<>(); for (int i = 0; i < 5; i++) { items.add(i); } Observable.fromIterable(items) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error-> " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: 0 Next: 1 Next: 2 Next: 3 Next: 4 Complete.
Future ,它會發射 Future.get() 方法返回的單個數據。
ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> future = executorService.submit(new MyCallable()); Observable.fromFuture(future) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); private class MyCallable implements Callable<String> { @Override public String call() throws Exception { Log.d(TAG, "模擬一些耗時的任務..."); Thread.sleep(2000L); return "OK"; } } // 執行結果 15:54:06.832 模擬一些耗時的任務... 15:54:08.833 Next: OK
fromFuture 方法有一個可接受兩個可選參數的版本,分別指定超時時長和時間單位。若是過了指定的時長, Future 尚未返回一個值,那麼這個 Observable 就會發射錯誤通知井終止。 下面的代碼,把超時時間設置爲 1s, Observable.fromFuture(future, 1, TimeUnit.SECONDS),執行結果以下:
16:00:44.260 模擬一些耗時的任務... 16:00:45.367 io.reactivex.exceptions.OnErrorNotImplementedException
建立一個發射特定數據重複屢次的 Observable
repeat 會重複地發射數據。某些實現容許咱們重複發射某個數據序列,還有一些容許咱們限制重複的次數。
repeat 不是建立一個 Observable,而是重複發射原始 Observable 的數據序列,這個序列或者是無限的,或者是經過 repeat(n) 指定的重複次數。
Observable.just("hello, repeat") .repeat(3) // 重複3次 .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next-> " + s); } });
在RxJava 2.x 中還有兩個 repeat 相關的操做符: repeatWhen 和 repeatUntil
repeatWhen 不是緩存和重放原始 Observable 的數據序列,而是有條件地從新訂閱和發射原來的 Observable
將原始 Observable 終止通知(完成或錯誤)看成一個 void 數據傳遞給一個通知處理器,以此來決定是否要從新訂閱和發射原來的 Observable。這個通知處理器就像一個 Observable 操做符,接受一個發射 void 通知的 Observable 做爲輸入,返回一個發射 void 數據(意思是,從新訂閱和發射原始 Observable )或者直接終止(即便用 repeatWhen 終止發射數據)的 Observable
Observable.range(0, 5) .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception { return Observable.timer(10, TimeUnit.SECONDS); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 執行結果 16:35:18.627 Next-> 0 16:35:18.627 Next-> 1 16:35:18.627 Next-> 2 16:35:18.627 Next-> 3 16:35:18.627 Next-> 4 16:35:28.627 Next-> 0 16:35:28.627 Next-> 1 16:35:28.627 Next-> 2 16:35:28.627 Next-> 3 16:35:28.627 Next-> 4
會先發射 0 到 4 這5個數據 ,因爲使用了 repeatWhen 操做符,所以在 10s 以後還會再發射一次這些數據。
repeatUntil 是 RxJava 2.x 新增的操做符,表示直到某個條件就再也不重複發射數據。當 BooleanSupplier的 getAsBoolean() 返回 false 時,表示重複發射上游的 Observable,當 getAsBoolean() 爲 true 時,表示停止重複發射上游的 Observable
final long startTimeMillis = System.currentTimeMillis(); Observable.interval(500, TimeUnit.MILLISECONDS) .take(5) .repeatUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { return System.currentTimeMillis() - startTimeMillis > 5000; } }).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }); // 執行結果 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4
執行的結果裏打印了兩遍 0 到 4 ,之因此再也不打印第三遍是由於符合了 System.currentTimeMillis() - startTimeMillis > 5000 這個條件。
直到有觀察者訂閱時才建立 Observable ,而且爲每一個觀察者建立一個全新的 Observable
defer 操做符會一直等待直到有觀察者訂閱它,而後它使用 Observable 工廠方法生成一個 Observable。它對每一個觀察者都這樣作,所以儘管每一個訂閱者都覺得本身訂閱的是同一個 Observable ,但事實上每一個訂閱者獲取的是它們本身單獨的數據序列。
在某些狀況下,直到最後一分鐘(訂閱發生時)才生成 Observable ,以確保 Observable 包含最新的數據。
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { return Observable.just("hello, defer"); } }); observable.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello, defer
建立一個按固定時間間隔發射整數序列的 Observable
interval 操做符返回一個 Observable ,它按固定的時間間隔發射一個無限遞增的整數序列。
interval 接受一個表示時間間隔的參數和一個表示時間單位的參數。 interval 默認在 computation 調度器上執行
Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next: " + aLong); } }); // 執行結果 Next: 0 Next: 1 Next: 2 Next: 3 ......
每隔 1秒 打印一個數字。
建立一個 Observable, 它在一個給定的延遲後發射一個特殊的值
timer 操做符建立一個在給定的時間段以後返回一個特殊值的 Observable
timer 返回一個 Observable,它在延遲一段給定的時間後發射一個簡單的數字 0。 timer 操做符默認在 computation 調度器上執行。
Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // 2秒後打印 Log.d(TAG, "Next: " + aLong); } }); // 執行結果 Next: 0
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)