目錄html
幾種主要的需求java
建立Observable的各類方式react
使用 Create
操做符從頭開始建立一個Observable,給這個操做符傳遞一個接受觀察者做爲參數的函數,編寫這個函數能夠調用觀察者的 onNext
,onError
和 onCompleted
方法,當發生訂閱的時候會自動調用觀察者的 onSubscribe
方法。git
經過 Subscribe 進行Observable 與 Observer 的訂閱,其中 subscribe 方法能夠接收一個完整通知參數的 Observer 對象,也能夠接收部分通知參數的 Consumer
(接收數據) 或者 Action
(僅接收通知) 對象。github
實例代碼:數組
// 建立Observable(被觀察者) Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("World"); emitter.onComplete(); } }); // 建立Observer(觀察者), 能夠接受全部通知 Observer<String> observer = new Observer<String>() { public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } public void onNext(String t) { System.out.println("--> onNext = " + t); } public void onError(Throwable e) { System.out.println("--> onError"); } public void onComplete() { System.out.println("--> onComplete"); } }; // 建立只接受 onNext(item) 通知的Consumer(觀察者) Consumer<String> nextConsumer = new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept nextConsumer: " + t); } }; // 建立只接受 onError(Throwable) 通知的Consumer(觀察者) Consumer<Throwable> errorConsumer = new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("-- accept errorConsumer: " + t); } }; // 建立只接受 onComplete() 通知的Action(觀察者) Action completedAction = new Action() { @Override public void run() throws Exception { System.out.println("--> run completedAction"); } }; // 建立只接受 onSubscribe 通知的Consumer(觀察者) Consumer<Disposable> onSubscribeComsumer = new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept onSubscribeComsumer "); } }; // 1. 進行訂閱,subscribe(Observer) observable.subscribe(observer); System.out.println("---------------------------------------------"); // 2. 進行訂閱,subscribe(Consumer onNext) observable.subscribe(nextConsumer); System.out.println("---------------------------------------------"); // 3. 進行訂閱,subscribe(Consumer onNext, Consumer onError) observable.subscribe(nextConsumer, errorConsumer); System.out.println("---------------------------------------------"); // 4. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted) observable.subscribe(nextConsumer, errorConsumer, completedAction); System.out.println("---------------------------------------------"); // 5. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe) observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);
輸出:緩存
--> onSubscribe --> onNext = Hello --> onNext = World --> onComplete --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --------------------------------------------- --> accept nextConsumer: Hello --> accept nextConsumer: World --> run completedAction --------------------------------------------- --> accept onSubscribeComsumer --> accept nextConsumer: Hello --> accept nextConsumer: World --> run completedAction
注意:create 方法默認不在任何特定的調度器上執行。app
- onSubscribe(Disposable): 在發生訂閱時接收。
- onNext(item): 在被觀察者發射數據接收。
- onError(Throwable): 在被觀察者發射Error時接收。
onComplete(): 在被觀察者完成數據發送時接收。ide
Javadoc: create(OnSubscribe)
Javadoc: subscribe()
Javadoc: subscribe(observer)
Javadoc: subscribe(onNext)
Javadoc: subscribe(onNext, onError)
Javadoc: subscribe(onNext, onError, onComplete)
Javadoc: subscribe(onNext, onError, onComplete, onSubscribe)函數
直到有觀察者訂閱時才建立 Observable,而且爲每一個觀察者建立一個新的 Observable.
Defer
操做符會一直等待直到有觀察者訂閱它,而後它使用Observable工廠方法生成一個 Observable。它對每一個觀察者都這樣作,所以儘管每一個訂閱者都覺得本身訂閱的是同一個 Observable,事實上每一個訂閱者獲取的是它們本身的單獨的數據序列。
實例代碼:
// 建立一個Defer類型的Observable Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { public ObservableSource<? extends Integer> call() throws Exception { // 建立每一個觀察者訂閱所返回的 Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(4); emitter.onNext(5); emitter.onComplete(); } }); return observable; } }); // 建立第一個觀察者並訂閱defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.1 --> accept = " + t); } }); // 建立第二個觀察者並訂閱defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.2 --> accept = " + t); } }); // 建立第三個觀察者並訂閱defer Observable deferObservable.subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("No.3 --> accept = " + t); } });
輸出:
No.1 --> accept = 1 No.1 --> accept = 2 No.1 --> accept = 3 No.1 --> accept = 4 No.1 --> accept = 5 No.2 --> accept = 1 No.2 --> accept = 2 No.2 --> accept = 3 No.2 --> accept = 4 No.2 --> accept = 5 No.3 --> accept = 1 No.3 --> accept = 2 No.3 --> accept = 3 No.3 --> accept = 4 No.3 --> accept = 5
注意:defer 方法默認不在任何特定的調度器上執行。
Javadoc: defer(Func0)
Empty
:建立一個不發射任何數據可是正常終止的Observable
Never
:建立一個不發射數據也不終止的Observable
Error
:建立一個不發射數據以一個錯誤終止的Observable
這三個操做符生成的 Observable 行爲很是特殊和受限,多用於一些特殊的場景(某些操做狀態異常後返回一個error、empty、never 的 Observable)。測試的時候頗有用,有時候也用於結合其它的 Observables,或者做爲其它須要 Observable 的操做符的參數。
實例代碼:
System.out.println("--> 1 -----------------------------------"); // 1. 建立一個不發射任何數據可是正常終止的Observable Observable.empty() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } }); System.out.println("--> 2 -----------------------------------"); // 2. 建立一個不輸出數據,而且不會終止的Observable Observable.never() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } }); System.out.println("--> 3 -----------------------------------"); // 3. 建立一個不發射數據以一個錯誤終止的Observable Observable.error(new NullPointerException("error test")) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Object t) { System.out.println("onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("onError: " + e); } @Override public void onComplete() { System.out.println("onComplete"); } });
輸出:
--> 1 ----------------------------------- onSubscribe onComplete --> 2 ----------------------------------- onSubscribe --> 3 ----------------------------------- onSubscribe onError: java.lang.NullPointerException: error test
注意:
- RxJava將這些操做符實現爲 empty,never和 error。
- error 操做符須要一 個 Throwable參數,你的Observable會以此終止。
- 這些操做符默認不在任何特定的調度器上執行,可是 empty 和 error 有一個可選參數是Scheduler,若是你傳遞了Scheduler參數,它 們會在這個調度器上發送通知.
Javadoc: empty()
Javadoc: never()
Javadoc: error(java.lang.Throwable)
建立一個發射指定值的Observable。
Just
將單個數據轉換爲發射那個數據的Observable。相似於From
,可是From會將數組或Iterable的數據取出而後逐個發射,而Just只是簡單的原樣發射,將數組或Iterable當作單個數據。
注意: 若是你傳遞 null
給 Just
,它會返回一個發射 null 值的 Observable。不要誤認爲它會返回一個空Observable(徹底不發射任何數據的Observable),若是須要空Observable你應該使用Empty
操做符。
實例代碼:
// 單個對象發送 Observable.just(1) .subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("--> singe accept: " + t); } }); System.out.println("---------------------------------"); // 多個對象發送,內部實際使用from實現 (接受一至九個參數,返回一個按參數列表順序發射這些數據的Observable) Observable.just(1, 2, 3, 4, 5) .subscribe(new Consumer<Integer>() { public void accept(Integer t) throws Exception { System.out.println("--> mutil accept: " + t); } });
輸出:
--> singe accept: 1 --------------------------------- --> mutil accept: 1 --> mutil accept: 2 --> mutil accept: 3 --> mutil accept: 4 --> mutil accept: 5
Javadoc: just(item ...)
將其它種類的對象和數據類型轉換爲Observable,發射來自對應數據源數據類型的數據,在RxJava中,from
操做符能夠轉換 Future
、Iterable
和數組
。對於Iterable和數組,產生的Observable會發射Iterable或數組的每一項數據。
實例代碼:
// 初始化數據 Integer[] array = { 1, 2, 3, 4, 5, 6 }; List<String> iterable = new ArrayList<String>(); iterable.add("A"); iterable.add("B"); iterable.add("C"); iterable.add("D"); iterable.add("E"); // 1. fromArray Observable.fromArray(array).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1):fromArray: " + t); } }); System.out.println("---------------------------------------"); // 2. fromIterable Observable.fromIterable(iterable) .subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2) fromIterable: " + t); } }); System.out.println("---------------------------------------"); // 3. fromCallable Observable.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return 1; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(3): fromCallable: " + t); } }); System.out.println("---------------------------------------"); // 4. fromFuture Observable.fromFuture(new Future<String>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public String get() throws InterruptedException, ExecutionException { System.out.println("--> fromFutrue: get()"); return "hello"; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(4): fromFuture: " + t); } });
輸出:
--> accept(1):fromArray: 1 --> accept(1):fromArray: 2 --> accept(1):fromArray: 3 --> accept(1):fromArray: 4 --> accept(1):fromArray: 5 --> accept(1):fromArray: 6 --------------------------------------- --> accept(2) fromIterable: A --> accept(2) fromIterable: B --> accept(2) fromIterable: C --> accept(2) fromIterable: D --> accept(2) fromIterable: E --------------------------------------- --> accept(3): fromCallable: 1 --------------------------------------- --> fromFutrue: get() --> accept(4): fromFuture: hello
注意:from默認不在任何特定的調度器上執行。然而你能夠將Scheduler做爲可選的第二個參數傳遞給Observable,它會在那個調度器上管理這個Future。
Javadoc: from(array)
Javadoc: from(Iterable)
Javadoc: from(Callable)
Javadoc: from(Future)
Javadoc: from(Future,Scheduler)
Javadoc: from(Future,timeout,timeUnit)
建立一個發射特定數據重複屢次的Observable,它不是建立一個Observable,而是重複發射原始 Observable的數據序列,這個序列或者是無限的,或者經過 repeat(n)
指定重複次數。
實例代碼:
// 1. repeat(): 一直重複發射原始 Observable的數據序列 Observable.range(1, 5) .repeat() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1): " + t); } }); System.out.println("----------------------------------------"); // 2. repeat(n): 重複執行5次 Observable.range(1, 2) .repeat(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(2): " + t); } });
輸出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 ...... ---------------------------------------- --> accept(2): 1 --> accept(2): 2 --> accept(2): 1 --> accept(2): 2 --> accept(2): 1 --> accept(2): 2
注意: repeat 操做符默認在 trampoline 調度器上執行。有一個變體能夠經過可選參數指定 Scheduler。
Javadoc: repeat()
Javadoc: repeat(long)
Javadoc: repeat(Scheduler)
Javadoc: repeat(long,Scheduler)
repeatWhen
的操做符,它不是緩存和重放原始 Observable 的數據序列,接收到原始 Observable 終止通知後,有條件的決定是否從新訂閱原來的 Observable 。
將原始 Observable 的終止通知(完成或錯誤)當作一個 void 數據傳遞給一個通知處理器,它以此來決定是否要從新訂閱和發射原來的 Observable。這個通知處理器就像一個 Observable 操做符,接受一個發射 void通知的 Observable爲輸入,返回一個發射 void 數據(意思是,從新訂閱和發射原始 Observable)或者直接終止(意思是,使用 repeatWhen 終止發射數據)的 Observable。
實例代碼:
// repeatWhen(Func1()):接收到終止通知後,在函數中決定是否從新訂閱原來的Observable // 須要注意的是repeatWhen的objectObservable處理(也能夠單獨自定義Observable返回),這裏使用flathMap進行處理, // 讓它延時發出onNext,這裏onNext發出什麼數據都不重要,它只是僅僅用來處理重訂閱的通知,若是發出的是onComplete/onError,則不會觸發重訂閱 Observable.range(1, 2) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("-----------> 完成一次訂閱"); } }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { private int n = 0; @Override public ObservableSource<?> apply(Observable<Object> t) throws Exception { // 接收到原始Observable的終止通知,決定是否從新訂閱 System.out.println("--> apply repeat "); return t.flatMap(new Function<Object, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Object t) throws Exception { if(n < 3) { // 從新訂閱3次 n ++; return Observable.just(0); } else { return Observable.empty(); } } }); // return Observable.timer(1, TimeUnit.SECONDS); // 間隔一秒後從新訂閱一次 // return Observable.interval(1, TimeUnit.SECONDS); // 每間隔一秒從新訂閱一次 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: " + t); } });
輸出:
--> apply repeat --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 --> accept: 1 --> accept: 2 -----------> 完成一次訂閱
注意:repeatWhen操做符默認在 trampoline 調度器上執行。
Javadoc: repeatWhen(Func1)
根據條件(函數BooleanSupplier
)判斷是否須要繼續訂閱: false:繼續訂閱; true:取消訂閱
實例代碼:
// repeatUntil 根據條件(BooleanSupplier)判斷是否須要繼續訂閱 Observable.range(1, 2) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("-----------> 完成一次訂閱"); } }).repeatUntil(new BooleanSupplier() { private int n = 0; @Override public boolean getAsBoolean() throws Exception { System.out.println("getAsBoolean = " + (n < 3? false:true) ); // 是否須要終止 if (n < 3) { n++; return false; // 繼續從新訂閱 } return true; // 終止從新訂閱 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: " + t); } });
輸出:
--> accept: 1 --> accept: 2 -----------> 完成一次訂閱 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 getAsBoolean = false --> accept: 1 --> accept: 2 -----------> 完成一次訂閱 getAsBoolean = true
Javadoc: repeatWhen(Func1)
建立一個發射特定整數序列的Observable。
Range
操做符發射一個範圍內的有序整數序列,你能夠指定範圍的起始和長度。
RxJava將這個操做符實現爲 range 函數,它接受兩個參數,一個是範圍的起始值,一個是範圍的數據的數目。若是你將第二個參數設爲0,將致使Observable不發射任何數據(若是設置 爲負數,會拋異常)。
實例代碼:
// 1. range(n,m) 發射從n開始的m個整數序列,序列區間[n,n+m-1) Observable.range(0, 5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("-- accept(range): " + t); } }); System.out.println("------------------------------"); // 2. rangeLong(n,m) 發射從n開始的m個長整型序列,序列區間[n,n+m-1) Observable.rangeLong(1, 5) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("-- accept(rangeLong): " + t); } });
輸出:
-- accept(range): 0 -- accept(range): 1 -- accept(range): 2 -- accept(range): 3 -- accept(range): 4 ------------------------------ -- accept(rangeLong): 1 -- accept(rangeLong): 2 -- accept(rangeLong): 3 -- accept(rangeLong): 4 -- accept(rangeLong): 5
Javadoc: range(int start,int count)
Javadoc: rangeLong(long start, long count)
建立一個按固定時間間隔發射整數序列的Observable,它按固定的時間間隔發射一個無限遞增的整數序列。
RxJava將這個操做符實現爲 interval 方法。它接受一個表示時間間隔的參數和一個表示時間單位的參數。
實例代碼:
// [1] interval(long period, TimeUnit unit) // 每間隔period時間單位,發射一次整數序列 Observable.interval(1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long l) throws Exception { System.out.println("--> accept(1): " + l); } }); System.out.println("------------------------------------"); // [2] interval(long initialDelay, long period, TimeUnit unit) // 在延遲initialDelay秒後每隔period時間單位發射一個整數序列 Observable.interval(0, 1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }); System.out.println("------------------------------------"); // [3] intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) // 延遲initialDelay秒後從起始數據start開始,每隔period秒發送一個數字序列,一共發送count個數據 Observable.intervalRange(1, 5, 3, 2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { public void accept(Long t) throws Exception { System.out.println("--> accept(3): " + t); } });
注意:interval 默認在 computation 調度器上執行, 有一個變體能夠經過可選參數指定 Scheduler。
Javadoc: interval(long period, TimeUnit unit)
Javadoc: interval(long period, TimeUnit unit, Scheduler scheduler)
Javadoc: interval(long initialDelay, long period, TimeUnit unit)
Javadoc: interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
輸出:
--> accept(1): 0 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 ... ------------------------------------ --> accept(2): 0 --> accept(2): 1 --> accept(2): 2 --> accept(2): 3 --> accept(2): 4 --> accept(2): 5 ... ------------------------------------ --> accept(3): 1 --> accept(3): 2 --> accept(3): 3 --> accept(3): 4 --> accept(3): 5
建立一個給定的延遲後發射一個特殊的值的Observable。
RxJava將這個操做符實現爲 timer 函數。timer 返回一個Observable,它在延遲一段給定的時間後發射一個簡單的數字0
實例代碼:
// timer(long delay, TimeUnit unit, Scheduler scheduler) // 定時delay時間 單位後發送數字0,指定可選參數Schedule調度器爲trampoline(當前線程排隊執行) Observable.timer(5, TimeUnit.SECONDS, Schedulers.trampoline()) .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept: " + t); } });
輸出:
--> accept: 0
注意:timer 操做符默認在 computation 調度器上執行。有一個變體能夠經過可選參數指定 Scheduler。
Javadoc: timer(long delay, TimeUnit unit)
Javadoc: timer(long delay, TimeUnit unit, Scheduler scheduler)
根據實際狀況,使用不一樣的方式建立不一樣種類的Observable,這個在開發中很是有用,能夠減小不少重複、複雜、冗餘的操做,能夠快速的建立一個符合要求的Observable,必定程度上提升了開發的效率。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: