RxJava/RxAndroid/AutoDispose/RxBinding/RxBus

首先:html

  • RXJava建議先掌握Lambda, 不然要定義不少泛型. 而且代碼量冗餘.
  • RxJava分爲三部分, Observable 被觀察者 和 Observer 觀察者 以及操做符.
  • Reactive流式編程是微軟提出來的概念, 由Netfix公司實現的架構.
  • RxJava是針對Java語言, Android和JavaEE均可以使用, 可是目前主要是Android在使用.
  • 操做符(或者直接叫方法)有不少變形(或者說方法的重載)

優勢:java

  • 對於多線程任務的異步處理很是強大和方便
  • 某些本身實現很複雜的功能RxJava都提供好了

官方網站:react

  • ReactiveXandroid

    ReactiveX支持多種語言, RxJava只是其中一種而已(還例如RxPHP, RxSwift, RxKotlin).git

  • RxJavagithub

  • RxKotlin編程

針對Android的擴展數組

  • RxAndroid緩存

    主要是增長了Android中的線程(AndroidSchedulers)bash

  • RxBinding

    實現了Android View的事件被觀察者

  • AutoDispose

    自動解綁觀察者的擴展組件(跟隨Activity生命週期)

  • RxBus

    RxJava實現的事件總線

文檔

RxJavaDoc

RxJava中文翻譯文檔

基本使用

調度器

public final Observable<T> subscribeOn(Scheduler scheduler) public final Observable<T> observeOn(Scheduler scheduler) 複製代碼

AndroidScheduler是RxAndroid添加的調度器, 主要增長了主線程.

  1. 被觀察者默認在建立實例的當前線程下, 觀察者(操做符)跟隨被觀察者線程;
  2. subscribeOn決定被觀察者的線程(只有第一次指定線程有效), 某些被觀察者擁有默認線程就沒法變動(例如interval默認在computation線程下)
  3. observeOn決定如下的觀察者線程(包括操做符) (能夠屢次變動線程), 包括observerOn如下的無默認調度器的操做符回調函數的執行線程;

操做符

  • 不能指定Scheduler的操做符都是跟隨被觀察者的線程, 能夠經過subscribeOn來控制被觀察者發射事件的線程
  • 若是操做符擁有默認的線程調度器將沒法經過subscribeOn來控制(可是通常有重載方法能夠在參數中控制)

調度器類型

Schedulers.computation( )	
用於計算任務,如事件循環或和回調處理,不要用於io操做, 線程數等於處理器的數量

Schedulers.from(executor)	
使用指定的Executor做爲調度器

Schedulers.io( )
用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用

Schedulers.newThread( )
爲每一個任務建立一個新的線程

Schedulers.trampoline( )
在當前線程執行任務

Schedulers.single()	
全部使用該調度器的都始終處於同一個線程,且任務以先進先出的順序被執行.
複製代碼

批量解除訂閱關係

CompositeDisposable

示例:

CompositeDisposable compositeDisposable=new CompositeDisposable();

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();或者 emitter.onError(new Throwable("O__O "));
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
               // 訂閱 
                mDisposable = d;
               // 添加到容器中 
                compositeDisposable.add(d);
            }
            @Override
            public void onNext(Integer value) {
               // 判斷mDisposable.isDisposed() 若是解除了則不須要處理
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });
        // 解除全部訂閱者
        compositeDisposable.clear();
複製代碼

須要強調的是Observable只有存在訂閱者的時候纔會發送事件, 若是取消了訂閱者並不會發送任何事件, 不用擔憂內存泄漏等問題.

周期函數

doOn**()這一系列方法都是能夠在觀察者回調以前執行操做

public final Flowable<T> doOnDispose(Action onCancel) public final Flowable<T> doOnComplete(Action onComplete) public final Flowable<T> doOnEach(Consumer<? super Notification<T>> onNotification) // 發送任何事件都會觸發回調(包括onError onComplete) public final Flowable<T> doOnEach(org.reactivestreams.Subscriber<? super T> subscriber) public final Flowable<T> doOnError(Consumer<? super java.lang.Throwable> onError) public final Flowable<T> doOnLifecycle(Consumer<? super org.reactivestreams.Subscription> onSubscribe, LongConsumer onRequest, Action onCancel) public final Flowable<T> doOnNext(Consumer<? super T> onNext) public final Flowable<T> doOnRequest(LongConsumer onRequest) // 該方法用於跟蹤背壓, 經常用於調試, 故Observable沒有該方法 public final Flowable<T> doOnSubscribe(Consumer<? super org.reactivestreams.Subscription> onSubscribe) // 在訂閱被觀察者前回調 public final Flowable<T> doOnTerminate(Action onTerminate) // 該回調會在onComplete和onError方法前回調, 不管是異常仍是完成 複製代碼

Notification

由於doOnEach回調會在全部事件都觸發, 因此Notification包含了全部事件的信息

java.lang.Throwable getError() // 若是觸發的事件是onError則會返回異常信息, 不然null T getValue() // 若是觸發的事件是onNext則會返回該值, 不然返回null boolean isOnComplete() boolean isOnError() boolean isOnNext() // Notification提供靜態方法直接構成出三種事件的實例對象 static <T> Notification<T> createOnComplete() static <T> Notification<T> createOnError(java.lang.Throwable error) static <T> Notification<T> createOnNext(T value) 複製代碼

建立操做符

建立操做符所有屬於靜態方法調用

建立被觀察者

Create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 複製代碼

ObservableOnSubscribe中只有一個方法

void subscribe(ObservableEmitter<T> e) throws java.lang.Exception 複製代碼

ObservableEmitter 譯爲發射器. 能夠經過三種方法發送事件

void onComplete() void onError(java.lang.Throwable error) void onNext(T value) 複製代碼

Tip:

  • onError()onComplete()不能同時使用
  • onError()不能屢次使用, onComplete()能夠

其餘方法:

void setCancellable(Cancellable c) // 設置一個取消事件監聽器 void setDisposable(Disposable d) boolean isDisposed() 複製代碼

Just

經過直接傳入N個參數來批量發送事件(最多九個參數)

static <T> Observable<T> just(T item) 複製代碼

所有事件發送完畢後會回調onComplete方法

FromArray

經過傳入數組或者集合來發送事件

static <T> Observable<T> fromArray(T... items) 複製代碼

FromIterable

Iterable是可遍歷集合的根接口, 能夠經過發送集合;

List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");

Flowable.fromIterable(list).subscribe(
        s -> Log.i("tag", s)
);
複製代碼

更多的方法

  • Javadoc: from(array)
  • Javadoc: from(Iterable)
  • Javadoc: from(Future)
  • Javadoc: from(Future,Scheduler)
  • Javadoc: from(Future,timeout, timeUnit)

defer

只可以在回調函數中建立被觀察者

public static <T> Observable<T> defer(java.util.concurrent.Callable<? extends ObservableSource<? extends T>> supplier) 複製代碼

追加事件

StartWith

在已經建立的事件前面再添加事件

public final Observable<T> startWith(T item) public final Observable<T> startWithArray(T... items) 複製代碼

直接結束事件

Empty

不發送任何事件給觀察者, 當即回調onComplete()

Flowable.empty().subscribe(
        obj -> Log.i("tag", "next" + obj.toString()),
        e -> Log.i("tag", "error"),
        () -> Log.i("tag", "complete"));
複製代碼

Never

不發送任何事件給觀察者, 而且不執行任何方法(例如OnComplete)

Error

不發送任何事件, 可是會直接執行onError()

輪循器

Interval

定時控制間隔發送事件. 默認在計算線程(ComputationScheduler), 能夠指定線程.

只能控制間隔時間

public static Observable<java.lang.Long> interval(long period,
                                                  java.util.concurrent.TimeUnit unit)

public static Observable<java.lang.Long> interval(long initialDelay, // 第一次的延遲時間
                                                    long period, // 間隔時間
                                                    java.util.concurrent.TimeUnit unit) // 時間單位
複製代碼

IntervalRange

更加精確的範圍內發送計時器事件, 默認在計算線程(ComputationScheduler), 能夠指定線程.

能夠控制發送次數

public static Observable<java.lang.Long> intervalRange(long start,  // 開始數字
                                                       long count, // 總共次數
                                                       long initialDelay, // 初始間隔時間
                                                       long period, // 後面每次間隔時間
                                                       java.util.concurrent.TimeUnit unit) // 時間單位
複製代碼

Range

能夠設置發送次數的定時器, 沒有默認在特定的線程執行; 不會根據時間發送, 直接一次性按照按照順序發送完畢.

// 發送int事件類型
public static Observable<java.lang.Integer> range(int start, // 開始
                                                  int count) // 結束

// 發送long的事件類型
public static Observable<java.lang.Long> rangeLong(long start,
                                                     long count)
複製代碼

Timer

定時器操做符. Timer默認在computationScheduler(計算線程)上運行, 能夠指定線程.

public static Observable<java.lang.Long> timer(long delay, // 間隔時間
                                               java.util.concurrent.TimeUnit unit) // 時間單位
複製代碼

TimerInterval

記錄輪循器的信息

  • 時間間隔
  • 時間單位
public final Observable<Timed<T>> timeInterval()

// 時間間隔設置固定單位
public final Observable<Timed<T>> timeInterval(java.util.concurrent.TimeUnit unit)
複製代碼

輸出示例:

// 無參默認單位爲毫秒
Timed[time=1003, unit=MILLISECONDS, value=12]

// 設置單位爲秒
Timed[time=1, unit=SECONDS, value=40]
複製代碼

Repeat

重複發送事件

  • 無限循環
  • 指定時間內循環
  • 中止
public final Observable<T> repeat()
// 無限循環

public final Observable<T> repeat(long times)
// 設置循環次數

public final Observable<T> repeatUntil(BooleanSupplier stop)
// 設置循環結束條件

// 添加一個被觀察者做爲從新發送事件的條件
public final Observable<T> repeatWhen(Function<? super Observable<java.lang.Object>,? extends ObservableSource<?>> handler)
複製代碼

示例

io.reactivex.Observable.just(1)
                .repeatUntil(
                        new BooleanSupplier() {
                            /**
                             * @return 返回true表示結束循環
                             * @throws Exception
                             */
                            @Override
                            public boolean getAsBoolean() throws Exception {
                                return true;
                            }
                        })
                .subscribe(System.out::println);
複製代碼

RepeatWhen

若是回調函數中的被觀察者發送onComplete和onError事件不會進入重複事件

可是若是發送onNext事件就會致使重複發送

Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS, TrampolineScheduler.instance())
    .repeatWhen(
    new Function<Observable<Object>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Object> objectObservable)
            throws Exception {

            // 源被觀察者結束後(onComplete)等待五秒再次從新發送
            return Observable.interval(5, TimeUnit.SECONDS);
        }
    })
    .subscribe(
    new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            // do something
        }
    });
複製代碼

多觀察者建立

Concat

將多個被觀察者鏈接起來按照順序發送

ConcatArrayConcat操做符其實都同樣, 只不過能夠接受數組而已

Merge

將多個被觀察者合併, 遵照時間順序(不遵照參數添加順序)

public static <T> Observable<T> mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources) 複製代碼

Zip

能夠將多個發射器發送的事件對應發送順序組合成一個而後統一一次接收事件, 遵照兩兩合併的原則.

若是存在異步狀況, 將會等待須要合併的兩個事件同時執行完畢後再發送給觀察者;

  • 遵照訂閱順序
public static <T,R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, Function<? super java.lang.Object[],? extends R> zipper) 複製代碼

示例:

Observable.zip(getStringObservable(), getIntegerObservable(),
               new BiFunction<String, Integer, String>() {
                 @Override public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                   // 在這裏將兩個發射器的事件合併而後統一發送
                   return s + integer;
                 }
               }).subscribe(new Consumer<String>() {
  @Override public void accept(@NonNull String s) throws Exception {
    // 這裏只會接受到apply的返回值
  }
});
複製代碼

ZipWith

將傳入的被觀察者和源被觀察者對應組合(Zip)該方法屬於非靜態方法

Observable.just(1).zipWith(Observable.just(2), new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            System.out.println("integer = [" + integer + "], integer2 = [" + integer2 + "]");
            return integer + "" + integer2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            // 結果: s = [12]
            System.out.println("s = [" + s + "]");
        }
    });
複製代碼

CombineLast

最後一個被觀察者的全部事件依次和前面的被觀察者的最後一個事件合併.

  • 遵照訂閱觀察者順序
public static <T1,T2,T3,R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, Function3<? super T1,? super T2,? super T3,? extends R> combiner) 複製代碼

示例結果:

最後一個被觀察者和以前的全部的被觀察者的最後一個事件同時被接收到

s = [輪循器一10輪循器二10輪循器三1]
s = [輪循器一10輪循器二10輪循器三2]
s = [輪循器一10輪循器二10輪循器三3]
s = [輪循器一10輪循器二10輪循器三4]
s = [輪循器一10輪循器二10輪循器三5]
s = [輪循器一10輪循器二10輪循器三6]
s = [輪循器一10輪循器二10輪循器三7]
s = [輪循器一10輪循器二10輪循器三8]
s = [輪循器一10輪循器二10輪循器三9]
s = [輪循器一10輪循器二10輪循器三10]
複製代碼

變化操做符

轉換Observable

FlatMap

將事件攔截而後轉成被觀察者再次發送

交錯順序, 即發射器發送事件的時候多是異步或者延遲的.

public final <R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)


// onNext/onError/onComplete 分別回調
public final <R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends R>> onNextMapper,
                                                               Function<? super java.lang.Throwable,? extends ObservableSource<? extends R>> onErrorMapper,
                                                               java.util.concurrent.Callable<? extends ObservableSource<? extends R>> onCompleteSupplier)

    
public final <U,R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends U>> mapper,
                                                                 BiFunction<? super T,? super U,? extends R> resultSelector)

複製代碼

FlatMapIterable

public final <U> Observable<U> flatMapIterable(Function<? super T,? extends java.lang.Iterable<? extends U>> mapper) // 就是轉成集合類型被觀察者接收到 複製代碼

重載方法將可迭代對象最終又轉換成單個對象

public final <U,V> Observable<V> flatMapIterable( Function<? super T,? extends java.lang.Iterable<? extends U>> mapper, BiFunction<? super T,? super U,? extends V> resultSelector) 複製代碼

示例

Observable.just(1, 2, 3, 4, 5).flatMapIterable(new Function<Integer, Iterable<String>>() {
      @Override public Iterable<String> apply(Integer integer) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:32) ___ " + "apply() called with: integer = [" + integer + "]");
        
        // iterable屬於全部集合的根接口
        ArrayList<String> strings = new ArrayList<>();
        strings.add(integer.toString() + "集合中");
        return strings;
      }
    }, new BiFunction<Integer, Object, String>() {

      /** * 獲得一個最終被觀察者接受的事件 * @param t1 發射器的事件 * @param t2 被添加到集合中對象 * @return 最終被觀察者接受的事件 * @throws Exception 若是返回null將拋出異常 */
      @Override public String apply(Integer integer, Object o) throws Exception {
        Log.d("日誌", "(MainActivity.java:39) ___ "
            + "apply() called with: integer = ["
            + integer
            + "], o = ["
            + o
            + "]");
        // 若是返回null則會拋出異常進入onError
        return "吳彥祖";
      }
    }).subscribe(new Observer<String>() {
      @Override public void onSubscribe(Disposable d) {
        Log.i("日誌", "(MainActivity.java:49) ___ onSubscribe");
      }

      @Override public void onNext(String s) {
        Log.d("日誌", "(MainActivity.java:53) ___ " + "onNext() called with: s = [" + s + "]");
      }

      @Override public void onError(Throwable e) {
        Log.i("日誌", "(MainActivity.java:57) ___ onError");
      }

      @Override public void onComplete() {
        Log.i("日誌", "(MainActivity.java:61) ___ onComplete");
      }
    });
複製代碼

FatMapCompletable

只能接受到onComplete回調

public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper) public final Completable flatMapCompletable(Function<? super T,? extends CompletableSource> mapper, boolean delayErrors) 複製代碼

FlatMapMaybe

public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper);


public final <R> Observable<R> flatMapMaybe(Function<? super T,? extends MaybeSource<? extends R>> mapper, boolean delayErrors) 複製代碼

FlatMapSingle

單一接受者

public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper) public final <R> Observable<R> flatMapSingle(Function<? super T,? extends SingleSource<? extends R>> mapper, boolean delayErrors) 複製代碼

ConcatMap

和FlatMap的區別是保證順序發射(不存在交錯順序), 內部使用Concat實現;

例如: 兩個異步的被觀察者不管如何都會按照你參數添加的順序發送事件

public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper) public final <R> Flowable<R> concatMap(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, int prefetch) 複製代碼

ConcatMapDelayError

延遲異常拋出到全部事件發送完畢後

public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper) public final <R> Flowable<R> concatMapDelayError(Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper, int prefetch, boolean tillTheEnd) 複製代碼

ConcatMapEager

將全部被觀察者的事件所有添加到一個被觀察者上, 而後一次性發送;

public final <R> Observable<R> concatMapEager(Function<? super T,? extends ObservableSource<? extends R>> mapper)
複製代碼

事件變化

Map

將類型轉換

public final <R> Flowable<R> map(Function<? super T,? extends R> mapper)
複製代碼

SwitchMap

每次源被觀察者發送數據的時候都會向觀察者發送一個新的被觀察者, 可是若是有延遲操做就只會發送最後一個源被觀察者建立的新被觀察者;

在RxJava2以前SwitchMap叫作FlatMapLatest, 這樣是否更加容易理解

public final <R> Observable<R> switchMap(Function<? super T,? extends ObservableSource<? extends R>> mapper) 複製代碼

示例

Observable.just(1, 2, 3)
            .switchMap(new Function<Integer, ObservableSource<?>>() {
              @Override
              public ObservableSource<Long> apply(Integer integer) throws Exception {
                return interval1;
              }
            })
            .subscribe(ele -> Log.d("日誌", "(MainActivity.java:39) ___ Result = " + ele));
複製代碼

包裝事件成容器

Buffer

建立一個List集合存儲事件發送給觀察者

  • 數量
  • 跨度
  • 時間
public final Observable<java.util.List<T>> buffer(int count) // 向List中添加事件數量

public final Observable<java.util.List<T>> buffer(int count,
                                                  int skip) // 分段事件跨度
複製代碼

例如: {1.2.3.4.5} count = 3, skip = 2.

接收到的事件就爲 {1,2,3} {3,4,5} {5}

public final <B> Observable<java.util.List<T>> buffer(java.util.concurrent.Callable<? extends ObservableSource<B>> boundarySupplier)
// 在回調接口中boundarySupplier封裝成集合Collection


// 緩存必定時間內的事件添加到集合中
public final Observable<java.util.List<T>> buffer(long timespan,
                                                  long timeskip,
                                      				java.util.concurrent.TimeUnit unit)

複製代碼

Window

建立一個被觀察者存儲事件發送給觀察者

相似Buffer可是分組的結果是被觀察者, 相似GroupBy可是不是依靠於Key來分組;

分時間

public final Observable<Observable<T>> window(long timespan,
                                              java.util.concurrent.TimeUnit unit)
// 按照時間跨度分組

public final Observable<Observable<T>> window(long timespan,
                                                  java.util.concurrent.TimeUnit unit,
                                                  long count)
    // 按照時間跨度內的數量分組 (規定事件內只能添加規定數量的事件到被觀察者內)

public final Observable<Observable<T>> window(long timespan,
                                                  long timeskip,
                                                  java.util.concurrent.TimeUnit unit)
    // 按照時間跨度分組

public final Observable<Observable<T>> window(long timespan,
                                                  java.util.concurrent.TimeUnit unit,
                                                  long count,
                                                  boolean restart)
複製代碼

分觀察者發送

相似Buffer, 不一樣的是Buffer是將事件封裝成一個List集合, 而Window是將必定事件數量封裝成一個新的被觀察者.

public final Observable<Observable<T>> window(long count)
    // 按照數量分組

public final Observable<Observable<T>> window(long count,
                                                long skip)
// 按照跨度分組
複製代碼

示例

Observable.just(1, 2, 3, 4).window(3).subscribe(new Consumer<Observable<Integer>>() {
      @Override public void accept(Observable<Integer> integerObservable) throws Exception {
        Log.i("日誌", "(MainActivity.java:19) ___ 觀察者");
        
        integerObservable.subscribe(new Consumer<Integer>() {
          @Override public void accept(Integer integer) throws Exception {
            Log.i("日誌", "(MainActivity.java:23) ___ 接受到事件");
          }
        });

      }
    });
複製代碼
public final <B> Observable<Observable<T>> window(java.util.concurrent.Callable<? extends ObservableSource<B>> boundary)
    
public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
                                                                            Function<? super U,? extends ObservableSource<V>> closingIndicator)

public final <U,V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator,
                                                                            Function<? super U,? extends ObservableSource<V>> closingIndicator,
                                                                            int bufferSize)
複製代碼

GroupBy

根據key將Observable被觀察者的事件分組變動爲GroupedObservable, 該類繼承自Observable, 可是新增一個方法

getKey()能夠獲取回調中返回對象key;

// 在回調中經過key來分組
public final <K> Observable<GroupedObservable<K,T>> groupBy(Function<? super T,? extends K> keySelector)



// 分組而後再次發送事件
public final <K,V> Observable<GroupedObservable<K,V>> groupBy(Function<? super T,? extends K> keySelector,
                                                                                      Function<? super T,? extends V> valueSelector)
複製代碼

Scan

public final Flowable<T> scan(BiFunction<T,T,T> accumulator) public final <R> Flowable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator) 複製代碼

回調返回值將做爲下次回調方法的第一個參數

Observable.just(1, 2, 3, 4, 5).scan(110, new BiFunction<Integer, Integer, Integer>() {

      /** * 第一次發射器發射的值會由觀察者接收到, 若是設置了initialValue則第一次接收到該初始值. 而後後面都會該方法的返回值 * @return 返回的值最終被觀察者接受到 * @throws Exception on error */
      @Override
      public Integer apply(Integer integer, Integer integer2) throws Exception {



        return integer2 + 1;
      }
    }).subscribe(new Consumer<Integer>() {
      @Override
      public void accept(Integer integer) throws Exception {
        Log.d("日誌",
              "(MainActivity.java:37) ___ " + "accept() called with: integer = [" + integer + "]");
      }
    });
複製代碼

ScanWith

第一個回調函數中返回值是第二個回調函數中的第一個參數(即初始值)

public final <R> Flowable<R> scanWith(java.util.concurrent.Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator) 複製代碼

ToList

將事件轉成集合一次接受; 可是主要要求被觀察者的事件類型統一;

public final Single<java.util.List<T>> toList()

public final Single<java.util.List<T>> toList(int capacityHint)
// 集合初始化空間

public final <U extends java.util.Collection<? super T>> Single<U> toList(java.util.concurrent.Callable<U> collectionSupplier) 複製代碼

過濾操做符

延遲

Delay

delay操做符會延遲每一個事件發送的時間(包括onComplete但包括onError)

public final Observable<T> delay(long delay,
                                                         java.util.concurrent.TimeUnit unit)
// 延遲時間單位

public final <U> Observable<T> delay(Function<? super T,? extends ObservableSource<U>> itemDelay)
// 在回調中經過觀察者演出

複製代碼
Observable.just(1, 2, 3)
        .delay(
            new Function<Integer, ObservableSource<Integer>>() {

              /**
               * 每次源被觀察者發送事件都會執行該方法, 可是事件不會被觀察者接收到(處於延遲中).
               *
               * @param integer 事件
               * @return 返回的被觀察者, 當這個觀察者發送事件的時候將終止延遲
               * @throws Exception
               */
              @Override
              public ObservableSource<Integer> apply(Integer integer) throws Exception {
                System.out.println("integer = [" + integer + "]");
                return Observable.just(1)
                    .delay(3, TimeUnit.SECONDS, TrampolineScheduler.instance());
              }
            })
        .subscribe(
            new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                System.out.println("result = [" + integer + "]");
              }
            });
複製代碼

去除重複事件

Distinct

相同事件去重

public final Flowable<T> distinct() // 去除全部重複的事件 public final <K> Observable<T> distinct(Function<? super T,K> keySelector) // 在回調內返回一個泛型值, 而後比較該泛型值來判斷是否屬於重複 public final <K> Observable<T> distinct(Function<? super T,K> keySelector, java.util.concurrent.Callable<? extends java.util.Collection<? super K>> collectionSupplier) 複製代碼

示例:

/*演示只取兩個偶數*/
    Observable.just(1, 2, 3, 4, 5, 6).distinct(new Function<Integer, String>() {
      /** * 該方法每次發送事件都會回調 * @throws Exception */
      @Override public String apply(Integer integer) throws Exception {
        return integer % 2 == 0 ? "偶數" : "奇數";
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]");
      }
    });
複製代碼

distinctUntilChanged

去除臨近的重複事件

public final Flowable<T> distinctUntilChanged() // 只會去除鄰近的重複事件 public final Observable<T> distinctUntilChanged(BiPredicate<? super T,? super T> comparer) // 該回調會每次返回鄰近的兩個事件, 而後你本身在回調內比較兩個值是否算重複, 返回布爾類型 public final <K> Observable<T> distinctUntilChanged(Function<? super T,K> keySelector) // 在回調內返回一個泛型值, 而後比較該泛型值來判斷是否屬於重複 複製代碼

示例:

Observable.just(1, 2, 2, 4, 5, 6).distinctUntilChanged(new BiPredicate<Integer, Integer>() {
      @Override public boolean test(Integer integer, Integer integer2) throws Exception {
        
        return integer.equals(integer2);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]");
      }
    });
複製代碼

只發送指定數據

Element

只發送指定位置(索引)的事件

public final Maybe<T> elementAt(long index) // 索引 public final Single<T> elementAt(long index, T defaultItem) // 索引越界後發送事件 public final Maybe<T> firstElement() // 只發送第一個 public final Maybe<T> lastElement() public final Completable ignoreElements() // 忽略所有事件 public final Single<T> elementAtOrError(long index) // 若是事件爲空, 則會拋出異常 複製代碼

Debounce

發送事件後的必定時間內再次發送的事件都會被捨棄, 而且從新開始計時.

這個場景經常使用語搜索輸入框的自動提示: 你連續輸入文字會不斷地發送事件,即會致使時間不斷地被重置始終沒法成功發送事件. 只有你在中止輸入後纔會成功發送事件.

public final <U> Observable<T> debounce(Function<? super T,? extends ObservableSource<U>> debounceSelector) public final Observable<T> debounce(long timeout, java.util.concurrent.TimeUnit unit) 複製代碼

一樣還有一種實現方式

public final Observable<T> throttleWithTimeout(long timeout, public final Observable<T> throttleWithTimeout(long timeout, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 複製代碼

Filter

篩選事件,例如字符串爲空則不發送事件

Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                      // 根據返回結果布爾類型肯定是否攔截事件
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                
            }
        });
複製代碼

All

判斷全部事件是否符合條件, 回調函數會執行屢次可是觀察者只能收到一個布爾類型值;

public final Single<java.lang.Boolean> all(Predicate<? super T> predicate)
複製代碼
Flowable.just(1, 2, 3, 4).all(new Predicate<Integer>() {
    @Override public boolean test(Integer integer) throws Exception {
        return integer < 4;
    }
}).subscribe(new Consumer<Boolean>() {
    @Override public void accept(Boolean aBoolean) throws Exception {

    }
});
複製代碼

First

只發射第一個事件, 不然就發射默認的事件

public final Single<T> first(T defaultItem) public final Single<T> firstOrError() // 若是爲空則拋出異常進入OnError() 複製代碼

Last

只發射最後一個事件, 不然就發射默認的事件

public final Single<T> last(T defaultItem) 複製代碼

Amb

僅僅處理第一個發射事件的被觀察者, 其餘的觀察者都會被拋棄.

要求添加的全部被觀察者的事件類型須要統一;

public static <T> Observable<T> amb(java.lang.Iterable<? extends ObservableSource<? extends T>> sources) // 集合類型 複製代碼

ambArray

public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources) // 可變參數類型 複製代碼

ambWith

非靜態函數, 結束對源被觀察者的訂閱, 而後對當前指定參數的被觀察者訂閱;

public final Observable<T> ambWith(ObservableSource<? extends T> other) 複製代碼

Blocking

該操做符會打斷RxJava的鏈式調用;

只發送第一個或者最後一個事件

public final T blockingFirst()

public final T blockingFirst(T defaultItem)

public final T blockingLast()

public final T blockingLast(T defaultItem)

public final T blockingSingle()
複製代碼
  • 若是沒有設置默認值的話, 被觀察者事件爲空將拋出異常
  • 使用single, 可是若是observable的事件不止一個, 將拋出異常

示例:

Long aLong = Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS)
.blockingFirst();
複製代碼

循環

public final void blockingForEach(Consumer<? super T> onNext)

public final java.lang.Iterable<T> blockingIterable()

public final java.lang.Iterable<T> blockingIterable(int bufferSize)

public final java.lang.Iterable<T> blockingMostRecent(T initialValue)

public final void blockingSubscribe()
複製代碼

限制接受事件

  • 指定時間內 第一個
  • 指定時間內 最後一個

throttleFirst

只發送必定時間內的第一個事件, 默認在ComputationScheduler上執行, 可是能夠指定Scheduler

public final Observable<T> throttleFirst(long windowDuration, java.util.concurrent.TimeUnit unit) public final Observable<T> throttleFirst(long skipDuration, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 複製代碼

throttleLast

只發送必定時間內的最後一個事件

public final Observable<T> throttleLast(long intervalDuration, java.util.concurrent.TimeUnit unit) public final Observable<T> throttleLast(long intervalDuration, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 複製代碼

throttleWithTimeout

發送事件A後將觸發計時, 若是規定時間內有新的事件B發送, 將丟棄事件A; 功能和debounce 相同, 可是debounce能夠添加一個Observable做爲計時;

public final Observable<T> throttleWithTimeout(long timeout, java.util.concurrent.TimeUnit unit) public final Observable<T> throttleWithTimeout(long timeout, java.util.concurrent.TimeUnit unit, Scheduler scheduler) 複製代碼

Sample

控制間隔, 在必定時間內只取最後發射的事件, 能夠指定線程. 等同於throttleLast

public final Observable<T> sample(long period, java.util.concurrent.TimeUnit unit) public final Observable<T> sample(long period, java.util.concurrent.TimeUnit unit, boolean emitLast) 複製代碼
public final <U> Observable<T> sample(ObservableSource<U> sampler) public final <U> Observable<T> sample(ObservableSource<U> sampler, boolean emitLast) 複製代碼

TimeOut

public final Observable<T> timeout(long timeout, java.util.concurrent.TimeUnit timeUnit) public final Observable<T> timeout(long timeout, java.util.concurrent.TimeUnit timeUnit, ObservableSource<? extends T> other) 複製代碼
  1. 設置一個超時的時間間隔, 若是發送事件的間隔超過這個事件就會發出一個異常事件TimeoutException(進入onError).
  2. 若是你不想發出的是異常而是發送一個新的被觀察者事件, 就可使用該方法

和上面的操做符不一樣的是, 這個操做符是經過回調函數返回的Observable來控制超時時間; 若是返回的Observable發送了事件可是源被觀察者還未發送事件, 將判斷爲超時; 進入onError拋出TimeOutException

public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator) public final <V> Observable<T> timeout(Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other) 複製代碼

如下重載能夠給第一項被觀察者發送的事件單獨設置一個超時參數(Observable)

public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator) public final <U,V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T,? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other) 複製代碼

跳過事件

  • 數量
  • 指定時間
  • 倒序跳過
  • 指定被觀察者發送完畢
  • 回調函數連續跳過

Skip

跳過事件

public final Observable<T> skip(long count) // 跳過指定數量的事件 public final Observable<T> skip(long time, java.util.concurrent.TimeUnit unit) // 跳過指定時間內的事件 複製代碼

SkipLast

從後開始跳過發送事件;

public final Observable<T> skipLast(int count) public final Observable<T> skipLast(long time, java.util.concurrent.TimeUnit unit) public final Flowable<T> skipLast(long time, java.util.concurrent.TimeUnit unit, boolean delayError) 複製代碼

SkipUntil

在做爲參數的被觀察者沒有發送完事件以前的全部源被觀察者的事件都將被跳過;

public final <U> Observable<T> skipUntil(ObservableSource<U> other) 複製代碼

示例代碼:

Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS)
    // 若是下面的被觀察者的事件沒有發送完畢(不包括onComplete)源被觀察者的事件都將被跳過
    .skipUntil(Observable.just(1)
        .delay(2, TimeUnit.SECONDS)) 
    .subscribe(new Consumer<Long>() {
      @Override
      public void accept(Long aLong) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:80) ___ " + "accept() called with: aLong = [" + aLong + "]");
      }
    });
複製代碼

SkipWhile

在回調中判斷是否拋棄事件;

和過濾操做符filter不一樣的是skipWhile只能從開頭開始連續跳過事件, 即若是第一個事件你沒有跳過, 那麼該回調函數就不會再次執行, 也就是你以後都沒法跳過事件了;

public final Observable<T> skipWhile(Predicate<? super T> predicate)
複製代碼

中止訂閱

  • 能夠接受的最多事件數量
  • 能夠接收到事件的時間
  • 參數被觀察者發送事件將中止源被觀察者的訂閱狀態
  • 倒序
  • 回調函數判斷是否中止

Take

阻止發射器繼續發射事件, 結束之後會進入onComplete回調

public final Observable<T> take(long count) // 控制最多接受到的事件數量 public final Observable<T> take(long time, java.util.concurrent.TimeUnit unit) // 控制只有規定時間內才能接受到事件 複製代碼

TakeUntil

public final <U> Observable<T> takeUntil(ObservableSource<U> other) // 若是該參數指定的被觀察者發送了事件就會結束以前的被觀察者的事件發送 public final Observable<T> takeUntil(Predicate<? super T> stopPredicate) // 經過回調來判斷是否結束事件的發送, 返回true結束髮射器發射事件 複製代碼

TakeLast

public final Observable<T> takeLast(int count) public final Observable<T> takeLast(long count, long time, java.util.concurrent.TimeUnit unit) public final Observable<T> takeLast(long time, java.util.concurrent.TimeUnit unit) public final Observable<T> takeLast(long time, java.util.concurrent.TimeUnit unit, boolean delayError) 複製代碼

TakeWhile

再回調中判斷是否結束髮射器(一樣進入onComplete), 可是和TakeUntil不一樣的是返回false爲結束.

public final Observable<T> takeWhile(Predicate<? super T> predicate) 複製代碼

Join

添加一個被觀察者(稱爲目標被觀察者), 該被觀察者發送的每一個事件都將依次和源被觀察者的全部事件結合(在同一個回調函數中傳入)

public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> join(ObservableSource<? extends TRight> other,
                                                                                      Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd,
                                                                                      Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd,
                                                                                      BiFunction<? super T,? super TRight,? extends R> resultSelector)
複製代碼

示例代碼:

Observable.just(1L, 2L, 3L, 4L)
        .join(Observable.just(5L, 6L, 7L, 8L), new Function<Long, ObservableSource<Long>>() {

          /** * 接受源被觀察者事件 * @param aLong * @return 返回的被觀察者發送事件後將終止源被觀察者的事件發送 * @throws Exception */
          @Override
          public ObservableSource<Long> apply(Long aLong) throws Exception {
            Log.d("日誌",
                "(MainActivity.java:65) ___ " + "源被觀察者 aLong = [" + aLong + "]");
            return Observable.interval(3, TimeUnit.SECONDS);
          }
        }, new Function<Long, ObservableSource<Long>>() {

          /** * 接受添加的被觀察者事件(join Observable) * @param aLong * @return 返回的被觀察者發送事件後就將終止添加的被觀察者的 * @throws Exception */
          @Override
          public ObservableSource<Long> apply(Long aLong) throws Exception {
            Log.d("日誌",
                "(MainActivity.java:75) ___ " + "被添加的被觀察者 aLong = [" + aLong + "]");
            return Observable.interval(3, TimeUnit.SECONDS);
          }
        }, new BiFunction<Long, Long, String>() {
          /** * 同時接受被添加的被觀察者和源被觀察者的事件 * @param aLong 源被觀察者發送的事件 * @param aLong2 被添加的被觀察者發送的事件 * @return 該返回值最終被觀察者接收到 * @throws Exception */
          @Override
          public String apply(Long aLong, Long aLong2) throws Exception {
            Log.d("日誌", "(MainActivity.java:89) ___ " + "apply() called with: aLong = [" + aLong
                + "], aLong2 = [" + aLong2 + "]");
            return aLong + "" + aLong2;
          }
        })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d("日誌", "(MainActivity.java:83) ___ " + "accept() called with: s = [" + s + "]");
          }
        });
複製代碼

GroupJoin

和Join相似

public final <TRight,TLeftEnd,TRightEnd,R> Observable<R> groupJoin(ObservableSource<? extends TRight> other, Function<? super T,? extends ObservableSource<TLeftEnd>> leftEnd, Function<? super TRight,? extends ObservableSource<TRightEnd>> rightEnd, BiFunction<? super T,? super Observable<TRight>,? extends R> resultSelector) 複製代碼

示例:

Observable.just(1L, 2L, 3L, 4L)
        .groupJoin(Observable.just(5L, 6L, 7L, 8L),
            new Function<Long, ObservableSource<Long>>() {
              @Override
              public ObservableSource<Long> apply(Long aLong) throws Exception {
                return null;
              }
            }, new Function<Long, ObservableSource<Long>>() {
              @Override
              public ObservableSource<Long> apply(Long aLong) throws Exception {
                return null;
              }
            }, new BiFunction<Long, Observable<Long>, String>() {
              @Override
              public String apply(Long aLong, Observable<Long> longObservable) throws Exception {
                return null;
              }
            })
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d("日誌", "(MainActivity.java:78) ___ " + "accept() called with: s = [" + s + "]");
          }
        });
複製代碼

ToMap

public final <K> Single<java.util.Map<K,T>> toMap(Function<? super T,? extends K> keySelector)

public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
                                                        Function<? super T,? extends V> valueSelector)


public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
                                                        Function<? super T,? extends V> valueSelector,
                                                        java.util.concurrent.Callable<? extends java.util.Map<K,V>> mapSupplier)
複製代碼

錯誤處理

onErrorResumeNext

若是發生異常將接收到一個回調返回另外一個被觀察者(執行OnNext不執行onError)

// 訂閱另外一個被觀察者
public final Observable<T> onErrorResumeNext(ObservableSource<? extends T> next)
複製代碼

onErrorReturn

若是發生異常將接收到一個回調返回事件(執行OnNext不執行onError)

public final Observable<T> onErrorReturn(Function<? super java.lang.Throwable,? extends T> valueSupplier)
複製代碼

兩個操做符最後都會執行onComplete

Retry

當被觀察者的發射器發出異常事件(onError)之後會執行重試操做

public final Observable<T> retry()
// 當發生錯誤將從新發送事件(被觀察者的全部事件所有從新發送)

public final Observable<T> retry(long times)
// 從新發送的次數


public final Observable<T> retry(Predicate<? super java.lang.Throwable> predicate)

public final Observable<T> retry(BiPredicate<? super java.lang.Integer,? super java.lang.Throwable> predicate)
// 在回調函數中判斷是否重試(和上面的操做符不一樣的是該回調函數裏面會有一個重試次數的參數)

public final Observable<T> retry(long times,
                                                         Predicate<? super java.lang.Throwable> predicate)
// 判斷函數 + 重試次數
複製代碼

RetryUntil

該操做符其實Retry已經有相同實現retry(predicate)

不一樣的是返回true表示中止重試

public final Observable<T> retryUntil(BooleanSupplier stop)
複製代碼

RetryWhen

在回調函數中返回一個被觀察者, 該被觀察者若是發出錯誤事件就會致使源被觀察者重試. 若是沒有發出錯誤事件就不會觸發重試;

public final Observable<T> retryWhen(Function<? super Observable<java.lang.Throwable>,? extends ObservableSource<?>> handler)
複製代碼

示例:

該示例不會觸發重試;

Observable.create(
            new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(null);
                emitter.onNext(3);
              }
            })
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return Observable.just(23);
            }
        })
        .subscribe(
            new DefaultObserver<Integer>() {
              @Override
              public void onNext(Integer integer) {
                System.out.println("integer = [" + integer + "]");
              }

              @Override
              public void onError(Throwable e) {
                System.out.println("Main.onError");
              }

              @Override
              public void onComplete() {
                System.out.println("Main.onComplete");
              }
            });
複製代碼

判斷操做

Contains

判斷髮射的事件是否包含指定的事件, 觀察者獲得一個布爾類型的值

public final Single<java.lang.Boolean> contains(java.lang.Object element)
複製代碼

Any

public final Single<java.lang.Boolean> any(Predicate<? super T> predicate)
複製代碼

依次判斷每一個事件, 若是返回true則立刻終止發射器

Observable.just(1, 2, 3, 4).any(new Predicate<Integer>() {
      @Override public boolean test(Integer integer) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:27) ___ " + "test() called with: integer = [" + integer + "]");
        if (integer == 3) {
          return true;
        } else {
          return false;
        }
      }
    }).subscribe(new Consumer<Boolean>() {
      @Override public void accept(Boolean aBoolean) throws Exception {
        Log.d("日誌",
            "(MainActivity.java:33) ___ " + "accept() called with: aBoolean = [" + aBoolean + "]");
      }
    });
複製代碼

IsEmpty

public final Single<java.lang.Boolean> isEmpty()
複製代碼

判斷是否有發射過事件, 觀察者獲得個布爾類型的值.

DefaultIfEmpty

public final Observable<T> defaultIfEmpty(T defaultItem)
複製代碼

若是發射器沒有發射任何事件, 就會發射一個指定的默認事件, 例如發射器的事件被攔截

SwitchIfEmpty

public final Observable<T> switchIfEmpty(ObservableSource<? extends T> other)
複製代碼

若是沒有發射事件就用另外一個被觀察者替代

Flowable.empty()
        .switchIfEmpty(Flowable.just(3, 4, 5))
        .subscribe(ele -> Log.i("tag", String.valueOf(ele)));
複製代碼

SequenceEqual

public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
                                                          ObservableSource<? extends T> source2,
                                                          BiPredicate<? super T,? super T> isEqual)
// 最後的

public static <T> Single<java.lang.Boolean> sequenceEqual(ObservableSource<? extends T> source1,
                                                              ObservableSource<? extends T> source2)
複製代碼

會比較兩個被觀察者是否相同, 而後觀察者接受一個布爾類型的值, 發射的事件數量不相同, 類型不相同都會爲false. 只要判斷到不相同立刻就會終止事件的發送.

Observable.sequenceEqual(Observable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS),
        Observable.just(0l, 1l, 2l), new BiPredicate<Long, Long>() {
          @Override public boolean test(Long aLong, Long aLong2) throws Exception {
            // 在這裏判斷是否相等
            return false;
          }
        }).subscribe(new Consumer<Boolean>() {
      @Override public void accept(Boolean aBoolean) throws Exception {
        // 最終結果
      }
    });
複製代碼

合併操做

Reduce

相加操做符, 每次均可以兩個事件一塊兒處理, 而後全部事件都被處理後就會被觀察者接受到最終的事件

public final Maybe<T> reduce(BiFunction<T,T,T> reducer) 複製代碼

第一次處理(apply)會接收到事件1和事件2, 而後第N次就是上次處理的結果(apply的返回值)和事件N

示例:

Observable.just(1, 2, 3, 4).reduce(new BiFunction<Integer, Integer, Integer>() {
      /** * 該方法會回調屢次直到全部事件都依次相加(或者說操做)之後纔會被觀察者接收到最終的結果 * @throws Exception */
      @Override public Integer apply(Integer integer, Integer integer2) throws Exception {
        // 三個泛型分別對應 上次運行結果 當前事件 返回值
        return integer + integer2;
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
      }
    });
複製代碼

待定操做符

public final <R> Single<R> reduce(R seed,
                                  BiFunction<R,? super T,R> reducer)

public final <R> Single<R> reduceWith(java.util.concurrent.Callable<R> seedSupplier,
                                      BiFunction<R,? super T,R> reducer)
複製代碼

Scan

和Reduce相似, 可是每次觀察者都能收到通過操做的事件.

public final Observable<T> scan(BiFunction<T,T,T> accumulator) 複製代碼

第一次觀察者就會接受到第一個事件, 後面纔是被處理事後的事件.

後面介紹個重載方法, 能夠單獨傳入一個初始化事件和待觀察事件組合.

public final <R> Observable<R> scan(R initialValue, // 加入一個初始化的值 BiFunction<R,? super T,R> accumulator) 複製代碼

這裏我將initialValue設置爲"吳彥祖"

Observable.just(1, 2, 3, 4).scan("吳彥祖", new BiFunction<String, Integer, String>() {
  @Override public String apply(String s, Integer integer) throws Exception {
    Log.d("日誌", "(MainActivity.java:19) ___ "
          + "apply() called with: s = ["
          + s
          + "], integer = ["
          + integer
          + "]");
    return s + integer;
  }
}).subscribe(new Consumer<String>() {
  @Override public void accept(String s) throws Exception {

  }
});
複製代碼

ScanWith

public final <R> Observable<R> scanWith(java.util.concurrent.Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator) 複製代碼

Collect

能夠建立容器來依次操做數據(觀察者只會收到一次事件, 也就是容器)

Flowable.just(1, 2, 3)
        .collect(
            new Callable<ArrayList<Integer>>() {//建立收集容器
              @Override
              public ArrayList<Integer> call() throws Exception {
                return new ArrayList<>();
              }
            }, new BiConsumer<ArrayList<Integer>, Integer>() {//建立收集
              @Override
              public void accept(ArrayList<Integer> list, Integer integer) throws Exception {//前者容器,後者數據
                list.add(integer);
              }
            })
        .subscribe(ele -> Log.d("日誌", "(MainActivity.java:33) ___ Result = " + String.valueOf(ele)));
複製代碼

Compose

public final <R> Observable<R> compose(ObservableTransformer<? super T,? extends R> composer)
複製代碼

輔助操做符

時間統計

TimeStamp

該操做符將事件和發送的時間都封裝到一個對象Timed

public final Observable<Timed<T>> timestamp()
public final Observable<Timed<T>> timestamp(Scheduler scheduler)
public final Observable<Timed<T>> timestamp(java.util.concurrent.TimeUnit unit)
public final Observable<Timed<T>> timestamp(java.util.concurrent.TimeUnit unit,
                                                                    Scheduler scheduler)
複製代碼

示例:

Observable.intervalRange(0, 5, 2, 2, TimeUnit.SECONDS)
        .timestamp()
        .subscribe(new Consumer<Timed<Long>>() {
          @Override
          public void accept(Timed<Long> longTimed) throws Exception {
            Log.d("日誌",
                "(MainActivity.java:62) ___ " + "accept() called with: longTimed = [" + longTimed
                    + "]");
          }
        });
複製代碼

結果

longTimed = [Timed[time=1525735346216, unit=MILLISECONDS, value=2]]
複製代碼

計數器

Count

觀察者將接收到事件數量, 而沒法收到事件自己.

Flowable.just(1,2,3,4,5).count().subscribe(new BiConsumer<Long, Throwable>() {
    @Override public void accept(Long aLong, Throwable throwable) throws Exception {
        Log.d("日誌", "(MainActivity.java:18) ___ Result = " + aLong);
    }
});
複製代碼

To

轉換操做符

public final <K,V> Single<java.util.Map<K,V>> toMap(Function<? super T,? extends K> keySelector,
                                                                            Function<? super T,? extends V> valueSelector)

複製代碼

BackPressure

意爲"背壓"

瞭解了Observable和Flowable的區別,咱們還不知什麼叫作背壓,下面咱們來簡單瞭解下概念。所謂背壓就是生產者(被觀察者)的生產速度大於消費者(觀察者)消費速度從而致使的問題。

舉一個簡單點的例子,若是被觀察者快速發送消息,可是觀察者處理消息的很緩慢,若是沒有特定的流(Flow)控制,就會致使大量消息積壓佔用系統資源,最終致使十分緩慢。

同步線程是不可能產生這種問題, 觀察者沒有處理完事件就不可能再次發送事件.

怎麼優化和減小這種狀況後面再探討,不過能夠注意到,Flowable建立的時候已經設置了BackpressureStrategy,並且Subscriber使用了request來控制最大的流量。

被觀察者

Observable

不支持背壓的被觀察者, 性能高;

Flowable

支持背壓的被觀察者, 性能比Observable低;

Publisher和ObservableSource

Publisher該類屬於Flowable的根接口, ObservableSource屬於Observable的根接口;

Single

該被觀察者只能發出一個事件, 重複發送不會受到(由於只能發送一條事件因此不存在背壓).

Single.create(new SingleOnSubscribe<Integer>() {
  @Override public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
    emitter.onSuccess(1);
    emitter.onSuccess(2);
  }
}).subscribe(new Consumer<Integer>() {
  @Override public void accept(Integer integer) throws Exception {
    // 只會觸發一次事件
    Log.d("日誌",
          "(MainActivity.java:34) ___ " + "accept() called with: integer = [" + integer + "]");
  }
});
複製代碼

DisposableSingleObserver

能夠手動斷開觀察者

public final void dispose()

    public final boolean isDisposed()
複製代碼

ResourceSingleObserver

能夠添加其餘的Disposable, 而後一塊兒取消觀察者

public final void add(Disposable resource)
複製代碼

其餘被觀察者基本都遵照這幾個規則有相似名稱的觀察者類

Completable

若是你的觀察者連onNext事件都不關心,你可使用Completable,他只有onComplete和onError兩個事件:

Completable.create(new CompletableOnSubscribe() {//被觀察者

    @Override
    public void subscribe(CompletableEmitter e) throws Exception {
        e.onComplete();//單一onComplete或者onError
    }

}).subscribe(new CompletableObserver() {//觀察者
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onComplete() {

    }

    @Override
    public void onError(Throwable e) {

    }
});

複製代碼

一樣也可使用Actions來簡化Observer:

  • completable.subscribe(onComplete)
  • completable.subscribe(onComplete,onError)

要轉換成其餘類型的被觀察者,也是可使用toFlowable()toObservable()等方法去轉換。

Maybe

若是你有一個需求是可能發送一個數據或者不會發送任何數據,這時候你就須要Maybe,它相似於Single和Completable的混合體。

Maybe可能會調用如下其中一種狀況(也就是所謂的Maybe):

  • onSuccess或者onError
  • onComplete或者onError

能夠看到onSuccess和onComplete是互斥的存在,例子代碼以下:

//被觀察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> e) throws Exception {
        e.onSuccess("test");//發送一個數據的狀況,或者onError,不須要再調用onComplete(調用了也不會觸發onComplete回調方法)
        //e.onComplete();//不須要發送數據的狀況,或者onError
    }
});

//訂閱觀察者
maybe.subscribe(new MaybeObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {
        //發送一個數據時,至關於onNext和onComplete,但不會觸發另外一個方法onComplete
        Log.i("tag", s);
    }

    @Override
    public void onComplete() {
        //無數據發送時候的onComplete事件
        Log.i("tag", "onComplete");
    }

    @Override
    public void onError(Throwable e) {

    }

});
複製代碼

Subject

AsyncSubject

  • 該被觀察者只會發送最後一個事件(同時onComplete)
  • 若是出現異常終止任何事件;
  • 要求必須手動執行onComplete()纔會接收到最後一個事件和onComplete

BehaviorSubject

  • 該主題只會觀察訂閱前的最後一個發送事件onNext()以及訂閱以後發送的全部事件;

PublishSubject

  • 只觀察訂閱後發送的事件;

ReplaySubject

  • 該主題不管訂閱前仍是訂閱後發送的事件都將會被觀察者接收到;

觀察者

  • Action:無參數類型

  • Consumer:單一參數類型

  • BiConsumer<T1, T2>:雙參數類型

  • Consumer<Obejct[]>:多參數類型

Callable

測試

用於測試RxJava的事件類

TestObserver

TestScheduler

對於某些須要通過必定時間的輪循器等事件觀察者

// 時間快速通過多少
public void advanceTimeBy(long delayTime,
                 java.util.concurrent.TimeUnit unit)

// 時間直接到某個點
public void advanceTimeTo(long delayTime,
                 java.util.concurrent.TimeUnit unit)
複製代碼

TestSubsriber

@Test public void addition_isCorrect() throws Exception {
        TestSubscriber<String> testSubscriber = new TestSubscriber<>();
        //依次發射A,B,C
        Flowable.just("A", "B", "C").subscribe(testSubscriber);

        //斷言值是否不存在
        testSubscriber.assertNever("D");
        //斷言值是否相等
        testSubscriber.assertValues("A", "B", "C");
        //斷言值的數量是否相等
        testSubscriber.assertValueCount(3);
        //斷言是否結束
        testSubscriber.assertTerminated();
    }
複製代碼

RxBinding

依賴(根據須要來添加依賴便可)

compile 'com.jakewharton.rxbinding2:rxbinding:2.1.1'
compile 'com.jakewharton.rxbinding2:rxbinding-support-v4:2.1.1'
compile 'com.jakewharton.rxbinding2:rxbinding-appcompat-v7:2.1.1'
compile 'com.jakewharton.rxbinding2:rxbinding-design:2.1.1'
compile 'com.jakewharton.rxbinding2:rxbinding-recyclerview-v7:2.1.1'
複製代碼

使用方式

AutoDispose

會根據Activity的生命週期, 自動解除RxJava的觀察者訂閱. 由Uber開源. Rxlifecycler做者推薦使用.

依賴:

compile 'com.uber.autodispose:autodispose-android:x.y.z'
複製代碼

使用:

OnDestroy方法之後解除訂閱

Observable.interval(1, TimeUnit.SECONDS)
    .as(
    AutoDispose.autoDisposable(
        AndroidLifecycleScopeProvider.from(this)))
    .subscribe(
    new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d("日誌", "(MainActivity.java:33) ___ Result = " + aLong);
        }
    });
複製代碼

as方法只能在subscribe以前

自定義生命週期

Observable.interval(1, TimeUnit.SECONDS)
                .as(
                        AutoDispose.autoDisposable(
                                AndroidLifecycleScopeProvider.from(this, Lifecycle.Event.ON_STOP)))
                .subscribe(
                        new Consumer<Long>() {
                            @Override
                            public void accept(Long aLong) throws Exception {
                                Log.d("日誌", "(MainActivity.java:33) ___ Result = " + aLong);
                            }
                        });
複製代碼

RxBus

RxBus是一種實現方式而非單指某庫. 本文介紹的RxBus是GitHub Star最高的第三方庫. 主要特色是支持標記(tag)功能. 不支持粘性事件(粘性事件的缺點也比較明顯不作介紹);

依賴

compile 'com.hwangjr.rxbus:rxbus:1.0.6'
複製代碼

訂閱事件

@Subscribe(
    thread = EventThread.IMMEDIATE,
    tags = {@Tag}
)
public void heardFromMouseMam(String mouseWar) {
    Timber.e("Just heard from mouse mam: " + mouseWar + " from " + Thread.currentThread());
}
複製代碼

多個標籤接受事件

@Subscribe(
        thread = EventThread.IMMEDIATE,
        tags = {@Tag(Constants.EventType.TAG_STORY)}
)
public void heardFromMouse(String mouseWar) {
    Timber.e("Just heard from mouse: " + mouseWar + " from " + Thread.currentThread());
}
複製代碼

自動發送事件

@Produce(
        thread = EventThread.NEW_THREAD,
        tags = {@Tag}
)
public String tell() {
    return "執行Tell方法" + Thread.currentThread();
}
複製代碼
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息