這個頁面列出了不少用於Observable的輔助操做符html
materialize( )
— 將Observable轉換成一個通知列表convert an Observable into a list of Notificationsdematerialize( )
— 將上面的結果逆轉回一個Observabletimestamp( )
— 給Observable發射的每一個數據項添加一個時間戳serialize( )
— 強制Observable按次序發射數據而且要求功能是無缺的cache( )
— 記住Observable發射的數據序列併發射相同的數據序列給後續的訂閱者observeOn( )
— 指定觀察者觀察Observable的調度器subscribeOn( )
— 指定Observable執行任務的調度器doOnEach( )
— 註冊一個動做,對Observable發射的每一個數據項使用doOnCompleted( )
— 註冊一個動做,對正常完成的Observable使用doOnError( )
— 註冊一個動做,對發生錯誤的Observable使用doOnTerminate( )
— 註冊一個動做,對完成的Observable使用,不管是否發生錯誤doOnSubscribe( )
— 註冊一個動做,在觀察者訂閱時使用doOnUnsubscribe( )
— 註冊一個動做,在觀察者取消訂閱時使用finallyDo( )
— 註冊一個動做,在Observable完成時使用delay( )
— 延時發射Observable的結果delaySubscription( )
— 延時處理訂閱請求timeInterval( )
— 按期發射數據using( )
— 建立一個只在Observable生命週期存在的資源single( )
— 強制返回單個數據,不然拋出異常singleOrDefault( )
— 若是Observable完成時返回了單個數據,就返回它,不然返回默認數據toFuture( )
, toIterable( )
, toList( )
— 將Observable轉換爲其它對象或數據結構=========================================================java
Materialize
將數據項和事件通知都當作數據項發射,Dematerialize
恰好相反。react
一個合法的有限的Obversable將調用它的觀察者的onNext
方法零次或屢次,而後調用觀察者的onCompleted
或onError
正好一次。Materialize
操做符將這一系列調用,包括原來的onNext
通知和終止通知onCompleted
或onError
都轉換爲一個Observable發射的數據序列。緩存
RxJava的materialize
未來自原始Observable的通知轉換爲Notification
對象,而後它返回的Observable會發射這些數據。數據結構
materialize
默認不在任何特定的調度器 (Scheduler
) 上執行。多線程
Dematerialize
操做符是Materialize
的逆向過程,它將Materialize
轉換的結果還原成它本來的形式。併發
dematerialize
反轉這個過程,將原始Observable發射的Notification
對象還原成Observable的通知。異步
dematerialize
默認不在任何特定的調度器 (Scheduler
) 上執行。ide
給Observable發射的數據項附加一個時間戳函數
RxJava中的實現爲timestamp
,它將一個發射T類型數據的Observable轉換爲一個發射類型爲Timestamped<T>
的數據的Observable,每一項都包含數據的原始發射時間。
timestamp
默認在immediate
調度器上執行,可是能夠經過參數指定其它的調度器。
強制一個Observable連續調用並保證行爲正確
一個Observable能夠異步調用它的觀察者的方法,多是從不一樣的線程調用。這可能會讓Observable行爲不正確,它可能會在某一個onNext
調用以前嘗試調用onCompleted
或onError
方法,或者從兩個不一樣的線程同時調用onNext
方法。使用Serialize
操做符,你能夠糾正這個Observable的行爲,保證它的行爲是正確的且是同步的。
RxJava中的實現是serialize
,它默認不在任何特定的調度器上執行。
保證全部的觀察者收到相同的數據序列,即便它們在Observable開始發射數據以後才訂閱
可鏈接的Observable (connectable Observable)與普通的Observable差很少,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect
操做符時纔會開始。用這種方法,你能夠在任什麼時候候讓一個Observable開始發射數據。
若是在將一個Observable轉換爲可鏈接的Observable以前對它使用Replay
操做符,產生的這個可鏈接Observable將老是發射完整的數據序列給任何將來的觀察者,即便那些觀察者在這個Observable開始給其它觀察者發射數據以後才訂閱。
RxJava的實現爲replay
,它有多個接受不一樣參數的變體,有的能夠指定replay
的最大緩存數量,有的還能夠指定調度器。
有一種 replay
返回一個普通的Observable。它能夠接受一個變換函數爲參數,這個函數接受原始Observable發射的數據項爲參數,返回結果Observable要發射的一項數據。所以,這個操做符實際上是replay
變換以後的數據項。
指定一個觀察者在哪一個調度器上觀察這個Observable
不少ReactiveX實現都使用調度器 「Scheduler
」來管理多線程環境中Observable的轉場。你可使用ObserveOn
操做符指定Observable在一個特定的調度器上發送通知給觀察者 (調用觀察者的onNext
,onCompleted
, onError
方法)。
注意:當遇到一個異常時ObserveOn
會當即向前傳遞這個onError
終止通知,它不會等待慢速消費的Observable接受任何以前它已經收到但尚未發射的數據項。這可能意味着onError
通知會跳到(併吞掉)原始Observable發射的數據項前面,正如圖例上展現的。
SubscribeOn
操做符的做用相似,但它是用於指定Observable自己在特定的調度器上執行,它一樣會在那個調度器上給觀察者發通知。
RxJava中,要指定Observable應該在哪一個調度器上調用觀察者的onNext
, onCompleted
, onError
方法,你須要使用observeOn
操做符,傳遞給它一個合適的Scheduler
。
指定Observable自身在哪一個調度器上執行
不少ReactiveX實現都使用調度器 「Scheduler
」來管理多線程環境中Observable的轉場。你可使用SubscribeOn
操做符指定Observable在一個特定的調度器上運轉。
ObserveOn
操做符的做用相似,可是功能頗有限,它指示Observable在一個指定的調度器上給觀察者發通知。
在某些實現中還有一個UnsubscribeOn
操做符。
註冊一個動做做爲原始Observable生命週期事件的一種佔位符
你能夠註冊回調,當Observable的某個事件發生時,Rx會在與Observable鏈關聯的正常通知集合中調用它。Rx實現了多種操做符用於達到這個目的。
RxJava實現了不少Do
操做符的變體。
doOnEach
操做符讓你能夠註冊一個回調,它產生的Observable每發射一項數據就會調用它一次。你能夠以Action
的形式傳遞參數給它,這個Action接受一個onNext
的變體Notification
做爲它的惟一參數,你也能夠傳遞一個Observable給doOnEach
,這個Observable的onNext
會被調用,就好像它訂閱了原始的Observable同樣。
doOnNext
操做符相似於doOnEach(Action1)
,可是它的Action不是接受一個Notification
參數,而是接受發射的數據項。
示例代碼
Observable.just(1, 2, 3) .doOnNext(new Action1<Integer>() { @Override public void call(Integer item) { if( item > 1 ) { throw new RuntimeException( "Item exceeds maximum value" ); } } }).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
輸出
Next: 1 Error: Item exceeds maximum value
doOnSubscribe
操做符註冊一個動做,當觀察者訂閱它生成的Observable它就會被調用。
doOnUnsubscribe
操做符註冊一個動做,當觀察者取消訂閱它生成的Observable它就會被調用。
doOnCompleted
操做符註冊一個動做,當它產生的Observable正常終止調用onCompleted
時會被調用。
doOnError
操做符註冊一個動做,當它產生的Observable異常終止調用onError
時會被調用。
doOnTerminate
操做符註冊一個動做,當它產生的Observable終止以前會被調用,不管是正常仍是異常終止。
finallyDo
操做符註冊一個動做,當它產生的Observable終止以後會被調用,不管是正常仍是異常終止。
延遲一段指定的時間再發射來自Observable的發射物
Delay
操做符讓原始Observable在發射每項數據以前都暫停一段指定的時間段。效果是Observable發射的數據項在時間上向前總體平移了一個增量。
RxJava的實現是 delay
和delaySubscription
。
第一種delay
接受一個定義時長的參數(包括數量和單位)。每當原始Observable發射一項數據,delay
就啓動一個定時器,當定時器過了給定的時間段時,delay
返回的Observable發射相同的數據項。
注意:delay
不會平移onError
通知,它會當即將這個通知傳遞給訂閱者,同時丟棄任何待發射的onNext
通知。然而它會平移一個onCompleted
通知。
delay
默認在computation
調度器上執行,你能夠經過參數指定使用其它的調度器。
另外一種delay
不實用常數延時參數,它使用一個函數針對原始Observable的每一項數據返回一個Observable,它監視返回的這個Observable,當任何那樣的Observable終止時,delay
返回的Observable就發射關聯的那項數據。
這種delay
默認不在任何特定的調度器上執行。
這個版本的delay
對每一項數據使用一個Observable做爲原始Observable的延時定時器。
這種delay
默認不在任何特定的調度器上執行。
還有一個操做符delaySubscription
讓你你能夠延遲訂閱原始Observable。它結合搜一個定義延時的參數。
delaySubscription
默認在computation
調度器上執行,你能夠經過參數指定使用其它的調度器。
還有一個版本的delaySubscription
使用一個Obseable而不是一個固定的時長來設置訂閱延時。
這種delaySubscription
默認不在任何特定的調度器上執行。
將一個發射數據的Observable轉換爲發射那些數據發射時間間隔的Observable
TimeInterval
操做符攔截原始Observable發射的數據項,替換爲發射表示相鄰發射物時間間隔的對象。
RxJava中的實現爲timeInterval
,這個操做符將原始Observable轉換爲另外一個Obserervable,後者發射一個標誌替換前者的數據項,這個標誌表示前者的兩個連續發射物之間流逝的時間長度。新的Observable的第一個發射物表示的是在觀察者訂閱原始Observable到原始Observable發射它的第一項數據之間流逝的時間長度。不存在與原始Observable發射最後一項數據和發射onCompleted
通知之間時長對應的發射物。
timeInterval
默認在immediate
調度器上執行,你能夠經過傳參數修改。
建立一個只在Observable生命週期內存在的一次性資源
Using
操做符讓你能夠指示Observable建立一個只在它的生命週期內存在的資源,當Observable終止時這個資源會被自動釋放。
using
操做符接受三個參數:
當一個觀察者訂閱using
返回的Observable時,using
將會使用Observable工廠函數建立觀察者要觀察的Observable,同時使用資源工廠函數建立一個你想要建立的資源。當觀察者取消訂閱這個Observable時,或者當觀察者終止時(不管是正常終止仍是因錯誤而終止),using
使用第三個函數釋放它建立的資源。
using
默認不在任何特定的調度器上執行。
只發射第一項(或者知足某個條件的第一項)數據
若是你只對Observable發射的第一項數據,或者知足某個條件的第一項數據感興趣,你可使用First
操做符。
在某些實現中,First
沒有實現爲一個返回Observable的過濾操做符,而是實現爲一個在當時就發射原始Observable指定數據項的阻塞函數。在這些實現中,若是你想要的是一個過濾操做符,最好使用Take(1)
或者ElementAt(0)
。
在一些實現中還有一個Single
操做符。它的行爲與First
相似,但爲了確保只發射單個值,它會等待原始Observable終止(不然,不是發射那個值,而是以一個錯誤通知終止)。你可使用它從原始Observable獲取第一項數據,並且也確保只發射一項數據。
在RxJava中,這個操做符被實現爲first
,firstOrDefault
和takeFirst
。
可能容易混淆,BlockingObservable
也有名叫first
和firstOrDefault
的操做符,它們會阻塞並返回值,不是當即返回一個Observable。
還有幾個其它的操做符執行相似的功能。
只發射第一個數據,使用沒有參數的first
操做符。
示例代碼
Observable.just(1, 2, 3) .first() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
輸出
Next: 1 Sequence complete.
傳遞一個謂詞函數給first
,而後發射這個函數斷定爲true
的第一項數據。
firstOrDefault
與first
相似,可是在Observagle沒有發射任何數據時發射一個你在參數中指定的默認值。
傳遞一個謂詞函數給firstOrDefault
,而後發射這個函數斷定爲true
的第一項數據,若是沒有數據經過了謂詞測試就發射一個默認值。
takeFirst
與first
相似,除了這一點:若是原始Observable沒有發射任何知足條件的數據,first
會拋出一個NoSuchElementException
,takeFist
會返回一個空的Observable(不調用onNext()
可是會調用onCompleted
)。
single
操做符也與first
相似,可是若是原始Observable在完成以前不是正好發射一次數據,它會拋出一個NoSuchElementException
。
single
的變體接受一個謂詞函數,發射知足條件的單個值,若是不是正好只有一個數據項知足條件,會以錯誤通知終止。
和firstOrDefault
相似,可是若是原始Observable發射超過一個的數據,會以錯誤通知終止。
和firstOrDefault(T, Func1)
相似,若是沒有數據知足條件,返回默認值;若是有多個數據知足條件,以錯誤通知終止。
first系列的這幾個操做符默認不在任何特定的調度器上執行。
將Observable轉換爲另外一個對象或數據結構
ReactiveX的不少語言特定實現都有一種操做符讓你能夠將Observable或者Observable發射的數據序列轉換爲另外一個對象或數據結構。它們中的一些會阻塞直到Observable終止,而後生成一個等價的對象或數據結構;另外一些返回一個發射那個對象或數據結構的Observable。
在某些ReactiveX實現中,還有一個操做符用於將Observable轉換成阻塞式的。一個阻塞式的Ogbservable在普通的Observable的基礎上增長了幾個方法,用於操做Observable發射的數據項。
getIterator
操做符只能用於BlockingObservable
的子類,要使用它,你首先必須把原始的Observable轉換爲一個BlockingObservable
。可使用這兩個操做符:BlockingObservable.from
或the Observable.toBlocking
。
這個操做符將Observable轉換爲一個Iterator
,你能夠經過它迭代原始Observable發射的數據集。
toFuture
操做符也是隻能用於BlockingObservable
。這個操做符將Observable轉換爲一個返回單個數據項的Future
,若是原始Observable發射多個數據項,Future
會收到一個IllegalArgumentException
;若是原始Observable沒有發射任何數據,Future
會收到一個NoSuchElementException
。
若是你想將發射多個數據項的Observable轉換爲Future
,能夠這樣用:myObservable.toList().toBlocking().toFuture()
。
toFuture
操做符也是隻能用於BlockingObservable
。這個操做符將Observable轉換爲一個Iterable
,你能夠經過它迭代原始Observable發射的數據集。
一般,發射多項數據的Observable會爲每一項數據調用onNext
方法。你能夠用toList
操做符改變這個行爲,讓Observable將多項數據組合成一個List
,而後調用一次onNext
方法傳遞整個列表。
若是原始Observable沒有發射任何數據就調用了onCompleted
,toList
返回的Observable會在調用onCompleted
以前發射一個空列表。若是原始Observable調用了onError
,toList
返回的Observable會當即調用它的觀察者的onError
方法。
toList
默認不在任何特定的調度器上執行。
toMap
收集原始Observable發射的全部數據項到一個Map(默認是HashMap)而後發射這個Map。你能夠提供一個用於生成Map的Key的函數,還能夠提供一個函數轉換數據項到Map存儲的值(默認數據項自己就是值)。
toMap
默認不在任何特定的調度器上執行。
toMultiMap
相似於toMap
,不一樣的是,它生成的這個Map同時仍是一個ArrayList
(默認是這樣,你能夠傳遞一個可選的工廠方法修改這個行爲)。
toMultiMap
默認不在任何特定的調度器上執行。
toSortedList
相似於toList
,不一樣的是,它會對產生的列表排序,默認是天然升序,若是發射的數據項沒有實現Comparable
接口,會拋出一個異常。然而,你也能夠傳遞一個函數做爲用於比較兩個數據項,這是toSortedList
不會使用Comparable
接口。
toSortedList
默認不在任何特定的調度器上執行。
nest
操做符有一個特殊的用途:將一個Observable轉換爲一個發射這個Observable的Observable。