RxJava操做符大全

整理歸類了比較全的比較經常使用的操做符,但不是最全的。還有個別沒有添加,歡迎你們交流補充。java

建立操做符

create

完整建立1個被觀察者對象(Observable)數組

just

  1. 快速建立1個被觀察者對象(Observable)
  2. 發送事件的特色:直接發送 傳入的事件

快速建立 被觀察者對象(Observable) & 發送10個如下事件數據結構

from

fromeArray

  1. 快速建立1個被觀察者對象(Observable)
  2. 發送事件的特色:直接發送 傳入的數組數據

將數組元素一次發射出,能夠用來遍歷數組app

fromIterable

  1. 快速建立1個被觀察者對象(Observable)
  2. 發送事件的特色:直接發送 傳入的集合List數據

同上,可用來遍歷集合ide

發送事件

下列方法通常用於測試使用函數

<-- empty()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即觀察者接收後會直接調用onCompleted()

<-- error()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()

<-- never()  -->
// 該方法建立的被觀察者對象發送事件的特色:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用
複製代碼

延時操做符

  1. 定時操做:在通過了x秒後,須要自動執行y操做
  2. 週期性操做:每隔x秒後,須要自動執行y操做

delay

使得被觀察者延遲一段時間再發送事件測試

// 1. 指定延遲時間
// 參數1 = 時間;參數2 = 時間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時間 & 調度器
// 參數1 = 時間;參數2 = 時間單位;參數3 = 線程調度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時間  & 錯誤延遲
// 錯誤延遲,即:若存在Error事件,則如常執行,執行後再拋出錯誤異常
// 參數1 = 時間;參數2 = 時間單位;參數3 = 錯誤延遲參數
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時間 & 調度器 & 錯誤延遲
// 參數1 = 時間;參數2 = 時間單位;參數3 = 線程調度器;參數4 = 錯誤延遲參數
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時間並添加調度器,錯誤通知能夠設置是否延遲
複製代碼

defer

直到有觀察者(Observer )訂閱時,才動態建立被觀察者對象(Observable) & 發送事件this

  1. 經過 Observable工廠方法建立被觀察者對象(Observable)
  2. 每次訂閱後,都會獲得一個剛建立的最新的Observable對象,這能夠確保Observable對象裏的數據是最新的
<-- 1. 第1次對i賦值 ->>
Integer i = 10;

// 2. 經過defer 定義被觀察者對象
// 注:此時被觀察者對象還沒建立
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

<-- 2. 第2次對i賦值 ->>
i = 15;

<-- 3. 觀察者開始訂閱 ->>
// 注:此時,纔會調用defer()建立被觀察者對象(Observable)
observable.subscribe(new Observer<Integer>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "開始採用subscribe鏈接");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "接收到的整數是"+ value  );
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "對Error事件做出響應");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "對Complete事件做出響應");
    }
});
複製代碼

timer

  1. 快速建立1個被觀察者對象(Observable)
  2. 發送事件的特色:延遲指定時間後,發送1個數值0(Long類型)

timer操做符默認運行在一個新線程上
也可自定義線程調度器(第3個參數):timer(long, TimeUnit, Scheduler)spa

interval

  1. 快速建立1個被觀察者對象(Observable)線程

  2. 發送事件的特色:每隔指定時間就發送事件

    /**
     * initialDelay 初始延時時間
     * unit 時間單位
     * period 間隔時間
     * scheduler 線程調度器
     */
     public static Observable<Long> interval(long interval, TimeUnit unit) {
     	return interval(interval, interval, unit, Schedulers.computation());
     }
     
     public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
     	return interval(interval, interval, unit, scheduler);
     }
     
     public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
     	return interval(initialDelay, period, unit, Schedulers.computation());
     }
    複製代碼

range/rangeLong

  1. 快速建立1個被觀察者對象(Observable)

  2. 發送事件的特色:連續發送1個事件序列,可指定範圍

    /**
     * start 起始數字
     * count 數量
     */
     public static Observable<Integer> range(int start, int count)
    複製代碼

過濾操做符

  • take, takeFirst, takeLast

  • skip, skipFirst, skipLast

  • first

  • last

  • firstOrDefault, lastOrDefault (只發射最後一項(或者知足某個條件的最後一項)數據,能夠指定默認值。)

    // 跳過前面幾項
      public final Observable<T> skip(int count)
      // 跳過前面的時間,以後產生的數據提交
      public final Observable<T> skip(long time, TimeUnit unit)
      // skipLast和skip相反,跳事後面的幾項。
      // 忽略最後時間單位內產生的數據
      skipLast(long time,TimeUnit)  
      
      // 並非娶第n個,而是取前面n個數據
      take(n) 
      // 是在制定時間內取數據,若是超過了這個時間源Observable產生的數據將做廢
      take(long time, TimeUnit unit)  
    複製代碼

takeFirst操做符和first操做符相似,取知足條件的第一個
區別:first取不到要拋異常,takeFirst不會

takeLast操做符與last操做符類似。區別在於,若是取不到知足條件的值,last將拋出異常

filter

過濾數據,不知足條件的數據將被過濾不發射。

filter(Fun) 自定義過濾條件
return false的數據將不被髮射

ofType

過濾指定類型的數據

Observable.just(1,2,"3")
        .ofType(Integer.class)
        .subscribe(item -> Log.d("JG",item.toString()));
複製代碼

elementAt/elementAtOrDefault/elementAtOrError

發射某一項數據,若是超過了範圍能夠指定默認值。內部經過OperatorElementAt過濾。

Observable.just(3,4,5,6)
             .elementAt(2)
    .subscribe(item->Log.d("JG",item.toString())); //5
複製代碼

firstElement/lastElement

僅選取第1個元素 / 最後一個元素

ignoreElements

丟棄全部數據,只發射錯誤或正常終止的通知。內部經過OperatorIgnoreElements實現。

distinct

過濾重複數據,內部經過OperatorDistinct實現。

distinctUntilChanged

過濾掉連續重複的數據。內部經過OperatorDistinctUntilChanged實現

Observable.just(3,4,5,6,3,3,4,9)
   .distinctUntilChanged()
  .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
複製代碼

timeout

若是原始Observable過了指定的一段時長沒有發射任何數據,就發射一個異常或者使用備用的Observable。

Debounce/throtleWithTimeout

根據你指定的時間間隔進行限流

發送數據事件時,若2次發送事件的間隔<指定時間,就會丟棄前一次的數據,直到指定時間內都沒有新數據發射時纔會發送後一次的數據

條件操做符

single/singleOrDefault

檢測源Observable產生的數據項是否只有一個,不然報錯

onError()
java.lang.IllegalArgumentException: Sequence contains too many elements

all

all操做符接收一個函數參數,建立並返回一個單布爾值的Observable,
若是原Observable正常終止而且每一項數據都知足條件,就返回true,
若是原Observable的任何一項數據不知足條件或者非正常終止就返回False。

判斷全部的數據項是否知足某個條件,內部經過OperatorAll實現。

amb/ambWith

amb操做符對於給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的全部數據。

當你傳遞多個Observable給amb操做符時,該操做符只發射其中一個Observable的數據和通知:首先發送通知給amb操做符的的那個Observable,無論發射的是一項數據仍是一個onError或onCompleted通知,amb將忽略和丟棄其它全部Observables的發射物。

amb(T o1, T ... o2)(可接受2到9個參數)

給定多個Observable,只讓第一個發射數據的Observable發射所有數據,其餘Observable將會被忽略。

contains

contains操做符將接收一個特定的值做爲一個參數,斷定原Observable是否發射該值,若已發射,則建立並返回的Observable將發射true,不然發射false。
判斷在發射的全部數據項中是否包含指定的數據,內部調用的實際上是exists

contains操做符默認不在任何特定的調度器上執行。

可用來判斷Observable發射的值中是否包含該值。

exists

exists操做符相似與contains操做符,不一樣的是,其接受一個函數參數,在函數中,對原Observable發射的數據,設定比對條件並作判斷。若任何一項知足條件就建立並返回一個發射true的Observable,不然返回一個發射false的Observable。

該操做符默認不在任何特定的調度器上執行。

判斷是否存在數據項知足某個條件。內部經過OperatorAny實現。

isEmpty

isEmpty操做符用於斷定原始Observable是否沒有發射任何數據。若原Observable未發射任何數據,建立建立並返回一個發射true的Observable,不然返回一個發射false的Observable。

isEmpty操做符默認不在任何特定的調度器上執行。

能夠用來判斷是否沒有數據發射。

defaultIfEmpty

defaultIfEmpty操做接受一個備用數據,在原Observable沒有發射任何數據正常終止(以onCompletedd的形式),該操做符以備用數據建立一個Observable並將數據發射出去。

RxJava將這個操做符實現爲defaultIfEmpty。它默認不在任何特定的調度器上執行。

switchIfEmpty

若是原始Observable正常終止後仍然沒有發射任何數據,就使用備用的Observable。

若是原始Observable正常終止後仍然沒有發射任何數據
defaultIfEmpty使用默認值發射,switchIfEmpty使用默認Observable發射

sequenceEqual

sequenceEqual(Observable,Observable,Func2)變體接收兩個Observable參數和一個函數參數,在函數參數中,能夠比較兩個參數是否相同。

該操做符默認不在任何特定的調度器上執行。

用於判斷兩個Observable發射的數據是否相同(數據,發射順序,終止狀態)

skipUntil

skipUntil操做符在觀察者訂閱原Observable時,該操做符將是忽略原Observable的發射的數據,直到第二個Observable發射了一項數據那一刻,它才 開始發射原Observable發射的數據。

該操做符默認不在任何特定的調度器上執行。

skipWhile

skipWhile操做符丟棄原Observable發射的數據,直到發射的數據不知足一個指定的條件,纔開始發射原Observable發射的數據。

在觀察者訂閱原Observable時,skipWhile操做符將忽略原Observable的發射物,直到你指定的某個條件變爲false時,它開始發射原Observable發射的數據。

skipWhile操做符默認不在任何特定的調度器上執行。

takeUntil

takeUntil操做符與skipUntil操做符做用相反,當第二個Observable發射了一項數據或者終止時,丟棄原Observable發射的任何數據。
takeUntil(Func1)變體接受一個函數參數,當知足條件時終止發射數據。

takeWhile

takeWhile操做符與skipWhile操做符做用相反。在觀察者訂閱原Observable時,takeWhile建立並返回原Oservable的鏡像Observable,暫命名爲_observable,發射原Observable發射的數據。當你指定的某個條件變爲false時,_observable發射onCompleted終止通知。

takeWhile操做符默認不在任何特定的調度器上執行。

變換操做符

map

對被觀察者發送的每1個事件都經過指定的函數處理,從而變換成另一種事件

即,將被觀察者發送的事件轉換爲任意的類型事件。

若是是list,可對list的每一個元素進行類型轉換,最後tolist發射轉換後的list。

flatmap

對Observable發射的數據都應用(apply)一個函數,這個函數返回一個Observable,而後合併這些Observables,而且發送(emit)合併的結果。 flatMap和map操做符很相像,flatMap發送的是合併後的Observables,map操做符發送的是應用函數後返回的結果集

將原Observable發射的每一個數據轉換爲新的Observable,發射每個轉換的Observable

新合併生成的事件序列順序是無序的,即與舊序列發送事件的順序無關

concatMap

做用同flatMap

與flatMap的區別是,新合併生成的事件序列順序是有序的

switchMap

當源Observable發射一個新的數據項時,若是舊數據項訂閱還未完成,就取消舊訂閱數據和中止監視那個數據項產生的Observable,開始監視新的數據項.

cast

cast操做符將原始Observable發射的每一項數據都強制轉換爲一個指定的類型,而後再發射數據,它是map的一個特殊版本

所相互轉換的類之間須要存在某種關係,如繼承、實現

concat

組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行

按發送順序串行執行

merge

組合多個被觀察者一塊兒發送數據,合併後 按時間線並行執行

區別上述concat()操做符:一樣是組合多個被觀察者一塊兒發送數據,但concat()操做符合並後是按發送順序串行執行

並行執行

zip

合併多個被觀察者(Observable)發送的事件,生成一個新的事件序列(即組合事後的事件序列),並最終發送

事件組合方式 = 嚴格按照原先事件序列 進行對位合併
最終合併的事件數量 = 多個被觀察者(Observable)中數量最少的數量

reduce

把被觀察者須要發送的事件聚合成1個事件 & 發送

聚合的邏輯根據需求撰寫,但本質都是前2個數據聚合,而後與後1個數據繼續進行聚合,依次類推

自定義聚合條件,前2個數據聚合獲得結果與第三個數據再聚合。以此類推...

collect

將被觀察者Observable發送的數據事件收集到一個數據結構裏

Observable.just(1, 2, 3, 4)
    .collect(new Func0<ArrayList<Integer>>() {
        @Override
        public ArrayList<Integer> call() {
        	//建立收集容器
            return new ArrayList<>();
        }
    }, new Action2<ArrayList<Integer>, Integer>() {
        @Override
        public void call(ArrayList<Integer> list1, Integer integer) {
        	//開始收集每一項數據
            list1.add(integer);
        }
    }).subscribe(new Action1<ArrayList<Integer>>() {
        @Override
        public void call(ArrayList<Integer> integers) {
			//獲得收集後的數據
        }
    });
複製代碼

startWith

在一個被觀察者發送事件前,追加發送一些數據或是一個新的被觀察者

//源碼是經過concat實現,在前面追加一個Observable
public final Observable<T> startWith(Observable<T> values) {
    return concat(values, this);
}
複製代碼

compose

其餘操做符

retry

重試,即當出現錯誤時,讓被觀察者(Observable)從新發射數據

retryUntil

出現錯誤後,判斷是否須要從新發送數據

retryWhen

遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否須要從新訂閱原始被觀察者(Observable)& 發送事件

repeat

無條件地、重複發送 被觀察者事件

具有重載方法,可設置重複建立次數

repeatWhen

有條件地、重複發送 被觀察者事件

count

統計被觀察者發送事件的數量

相關文章
相關標籤/搜索