Rxjava2 介紹與詳解實例

前言

如今咱們能夠看到愈來愈多的開發者都在使用 Rx 相關的技術進行 App,Java 後端等領域進行開發。在開源的社區以及互聯網公司,Rx、響應式編程、函數式都是熱門的存在。因此筆者將結合自身的學習以及實際使用狀況,寫一個針對 Rxjava2 的系列文章,一塊兒學習和使用 Rxjava 所帶來的便捷。java

筆者將利用工做之餘,結合 ReactiveX 官方 Wiki 對 Rxjava 的定義與介紹,對相關基礎知識、基本操做,經常使用部分的 API 進行整理,並加上我的理解和相關操做的示例。react

相關參考連接:git

Rxjava2 系列文章目錄:github

實例代碼:編程

RX介紹

ReactiveX的歷史後端

ReactiveX 是Reactive Extensions的縮寫,通常簡寫爲Rx,最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年愈來愈流行了,如今已經支持幾乎所有的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是reactivex.io。數組

什麼是ReactiveX緩存

微軟給的定義是,Rx是一個函數庫,讓開發者能夠利用可觀察序列和LINQ風格查詢操做符來編寫異步和基於事件的程序,使用Rx,開發者能夠用Observables表示異步數據流,用LINQ操做符查詢異步數據流, 用Schedulers參數化異步數據流的併發處理,Rx能夠這樣定義:Rx = Observables + LINQ + Schedulers。安全

ReactiveX.io給的定義是,Rx是一個使用可觀察數據流進行異步編程的編程接口,ReactiveX結合了觀察者模式迭代器模式函數式編程的精華。

RxJava 究竟是什麼

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫)。這就是 RxJava ,歸納得很是精準。

然而,對於初學者來講,這仍是比較含蓄難懂的。由於它是一個總結,而初學者更須要一個入門的介紹或者理解。其實, RxJava 的本質能夠總結爲異步的概念。說到本質上,它就是一個實現異步操做的庫。RxJava 的異步實現,是經過一種擴展的觀察者模式來實現的。

RxJava 優勢

一樣是作異步,爲何去使用它,而不用現成的 Thread,ThreadPoolExecutor,Android的AsyncTask / Handler / ... ?其實就是簡潔,易用 !

異步操做很關鍵的一點是程序的簡潔性,由於在調度過程比較複雜的狀況下,異步代碼常常會既難寫也難被讀懂。 正如Android 創造的 AsyncTask 和Handler ,其實都是爲了讓異步代碼更加簡潔。RxJava 的優點也是簡潔,但它的簡潔的不同凡響之處在於,隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔

名詞定義

  • Reactive 直譯爲反應性的,有活性的,根據上下文通常翻譯爲反應式、響應式。
  • Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念。
  • Observable 可觀察對象,在Rx中定義爲更強大的Iterable,在觀察者模式中是被觀察的對象,一旦數據產生或發生變化,會經過某種方式通知觀察者或訂閱者。
  • Observer 觀察者對象,監聽Observable發射的數據並作出響應,Subscriber是它的一個特殊實現。
  • emit 直譯爲發射,發佈,發出,含義是Observable在數據產生或變化時發送通知給Observer,調用Observer對應的方法,文章裏一概譯爲發射。
  • items 直譯爲項目,條目,在Rx裏是指Observable發射的數據項,文章裏一概譯爲數據,數據項。

Rx模式

使用觀察者模式

  • 建立:Rx能夠方便的建立事件流和數據流
  • 組合:Rx使用查詢式的操做符組合和變換數據流
  • 監聽:Rx能夠訂閱任何可觀察的數據流並執行操做

簡化代碼

  • 函數式風格:對可觀察數據流使用無反作用的輸入輸出函數,避免了程序裏錯綜複雜的狀態
  • 簡化代碼:Rx的操做符統統常能夠將複雜的難題簡化爲不多的幾行代碼
  • 異步錯誤處理:傳統的try/catch沒辦法處理異步計算,Rx提供了合適的錯誤處理機制
  • 輕鬆使用併發:Rx的Observables和Schedulers讓開發者能夠擺脫底層的線程同步和各類
    併發問題

使用Observable的優點

Rx擴展了觀察者模式用於支持數據和事件序列,添加了一些操做符,它讓你能夠聲明式的組合這些序列,而無需關注底層的實現:如線程、同步、線程安全、併發數據結構和非阻塞IO。

Observable經過使用最佳的方式訪問異步數據序列填補了這個間隙。

類型 單個數據 多個數據
同步 T getData() Iterable getData
異步 Future<T> getData() Observable<T> getData()

Rx的Observable模型讓你能夠像使用集合數據同樣操做異步事件流,對異步事件流使用各類
簡單、可組合的操做。

1. Observable可組合

對於單層的異步操做來講,Java中Future對象的處理方式是很是簡單有效的,可是一旦涉及到嵌套,它們就開始變得異常繁瑣和複雜。使用Future很難很好的組合帶條件的異步執行流程(考慮到運行時各類潛在的問題,甚至能夠說是不可能的),固然,要想實現仍是能夠作到的,可是很是困難,或許你能夠用 Future.get() ,但這樣作,異步執行的優點就徹底沒有了。從另外一方面說,Rx的bservable一開始就是爲組合異步數據流準備的。

2. Observable更靈活

Rx的Observable不只支持處理單獨的標量值(就像Future能夠作的),也支持數據序列,甚至是無窮的數據流。 Observable 是一個抽象概念,適用於任何場景。Observable擁有它的近親Iterable的所有優雅與靈活。

Observable是異步的雙向push,Iterable是同步的單向pull,對比:

事件 Iterable(pull) Observable(push)
獲取數據 T next() onNext(T)
異常處理 throws Exception onError(Exception)
任務完成 !hasNext() onCompleted

3. Observable無偏見

Rx對於對於併發性或異步性沒有任何特殊的偏好,Observable能夠用任何方式實現,線程池、事件循環、非阻塞IO、Actor模式,任何知足你的需求的,你擅長或偏好的方式均可以。不管你選擇怎樣實現它,不管底層實現是阻塞的仍是非阻塞的,客戶端代碼將全部與Observable的交互都當作是異步的。

Rx使用依賴:

下列是筆者使用的版本(可根據實際狀況進行選擇):

  1. 使用Gradle依賴:implementation "io.reactivex.rxjava2:rxjava:2.2.12"
  2. 使用Maven依賴或者Jar包下載 :Rxjava 2.2.12
  3. 其餘版本以及相關下載 :Maven

Rxjava的入門基礎

1. Observable

1.1 觀察者模式

基本概念:Observable (可觀察者,即被觀察者)、Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer(觀察者觀察被觀察者的通知事件)。

在RxJava中,一個實現了 Observer 接口的對象能夠訂閱 (subscribe) 一個 Observable 類的實例。訂閱者(subscriber) 對 Observable 發射 (emit) 的任何數據或數據序列做出響應。這種模式 簡化了併發操做,由於它不須要阻塞等待 Observable 發射數據,而是建立了一個處於待命狀態的觀察者哨兵,哨兵在將來某個時刻響應Observable的通知。

RxJava 的事件回調方法: onSubscribe()onNext()onCompleted()onError()

  • onSubscribe(): 當被觀察者被觀察者訂閱的時候觸發。
  • onNext(): 當被觀察者發送數據的時候經過此方法通知觀察者數據變換。
  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。

注意: 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted() 和 onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

1.2 Consumer 和 Action

這兩個詞意思分別是 消費者(能夠理解爲消費被觀察者發射出來的事件)和 行爲(能夠理解爲響應被觀察者的行爲)。對於 Observer 中的 4 個回調方法,咱們未必都能用獲得,若是隻須要用到其中的一部分,就須要 Consumer 和 Action 上場了。

簡單示例:

// 1. 進行訂閱,subscribe(Observer)
    observable.subscribe(observer);
        
    System.out.println("---------------------------------------------");
    // 2. 進行訂閱,subscribe(Consumer onNext)
    observable.subscribe(nextConsumer);
    
    System.out.println("---------------------------------------------");
    // 3. 進行訂閱,subscribe(Consumer onNext, Consumer onError)
    observable.subscribe(nextConsumer, errorConsumer);
        
    System.out.println("---------------------------------------------");
    // 4. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted)
    observable.subscribe(nextConsumer, errorConsumer, completedAction);
        
    System.out.println("---------------------------------------------");
    // 5. 進行訂閱,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe)
    observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);

1.3 Observable的分類

在RxJava中,Observable 有 Hot 與 Cold 之分。

  • Hot Observable : 不管有沒有觀察者進行訂閱,事件始終都會發生。當有多個觀察者訂閱時,Hot Observable此時與訂閱者們的關係時一對多的關係,能夠與多個訂閱者共享信息。
  • Cold Observable : 只有有觀察者訂閱了,纔開始執行數據流的發送,而且與觀察者時一對一的關係。當有多個不一樣的訂閱者時,消息是從新完整發送的,也就是說對於訂閱者們來講,它們的事件是彼此獨立的。

Javadoc: Observable

2. Flowable

Rxjava2.x 中有這麼一個被觀察者 Flowable,一樣做爲被觀察者,它和Observable有什麼區別呢,在Rxjava2中,Observable再也不支持背壓,而新增的Flowable支持背壓,何爲背壓,就是異步場景下上游發送事件的速度大於下游處理事件的速度所產生的現象。

img-Flowable

提示:在本系列後面會有詳細的單獨篇章來介紹和如何使用背壓。
Javadoc: Flowable

3. Single

Single 相似於 Observable,不一樣的是,它老是隻發射一個值,或者一個錯誤通知,而不是發射一系列的值。

所以,不一樣於Observable須要三個方法 onNext, onError, onCompleted,訂閱Single只須要兩個方法:

  • onSuccess: Single發射單個的值到這個方法
  • onError: 若是沒法發射須要的值,Single發射一個Throwable對象到這個方法

Single 只會調用這兩個方法中的一個,並且只會調用一次,調用了任何一個方法以後,訂閱關係終止。

img-single

示例代碼:

// Single: 只發送 onSuccess or onError 通知,而且只會發送一次, 第一次發送數據後的都不會在處理
    Single.create(new SingleOnSubscribe<String>() {
    
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
                emitter.onSuccess("Success");           // 發送success通知
                emitter.onSuccess("Success2");      // 只能發送一次通知,後續不在處理
            }
    }).subscribe(new BiConsumer<String, Throwable>() {
    
            @Override
            public void accept(String t1, Throwable t2) throws Exception {
                System.out.println("--> accept: t1 = " + t1 + ",  t2 = " + t2);
            }
    });

輸出:

--> accept: t1 = Success,  t2 = null

提示:Single 能夠經過 toXXX 方法轉換爲 Observable, Flowable, Completable與Maybe。
Javadoc: Single

4. Completable

Completable 在建立後,不會發射任何數據, 只有 onCompleteonError事件,同時沒有Observable中的一些操做符,如 map,flatMap。一般與 andThen 操做符結合使用。

img-Completable

示例代碼:

// 1. Completable:只發送complete 或 error 事件,不發送任何數據
    Completable.fromAction(new Action() {

        @Override
        public void run() throws Exception {
            System.out.println("Hello World! This is Completable.");
        }
    }).subscribe(new CompletableObserver() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError");
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    });

    System.out.println("----------------------------------------------");
    // 2. 與 andThen 結合使用,當Completable執行完onCompleted後,執行andThen裏的任務
    Completable.create(new CompletableOnSubscribe() {

        @Override
        public void subscribe(CompletableEmitter emitter) throws Exception {
            Thread.sleep(1000);
            System.out.println("--> completed");
            emitter.onComplete();
        }
    }).andThen(Observable.range(1, 5)).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept: " + t);
        }
    });

輸出:

--> onSubscribe
Hello World! This is Completable.
--> onComplete
----------------------------------------------
--> completed
--> accept: 1
--> accept: 2
--> accept: 3
--> accept: 4
--> accept: 5

提示:Completable 能夠經過 toXXX 方法轉換爲 Observable, Flowable, Single與Maybe。
Javadoc: Completable

5. Maybe

Maybe 是 Rxjava 2.x 之後的新類型,只能發射 0 或者 1 項數據,即便後續有多個數據,後面的數據也不會被處理。能夠看作是 Single 與 Completable 結合。

img-Maybe

示例代碼:

// Maybe 只發送0個或者1個數據,後續數據將被忽略
    Maybe.create(new MaybeOnSubscribe<String>() {

        @Override
        public void subscribe(MaybeEmitter<String> emitter) throws Exception {
            // 若是先發送了,將會調用MaybeObserver的onCompleted方法,若是有數據發送或者調用onError,則不會去調用
            // emitter.onComplete();
            emitter.onSuccess("Hello"); // 若是發送了第一個數據後續數據將不會被處理
            emitter.onSuccess("World");
        }
    }).subscribe(new MaybeObserver<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onSuccess(String t) {
            System.out.println("--> onSuccess: " + t);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    });

輸出:

--> onSubscribe
--> onSuccess: Hello

提示:Maybe 能夠經過 toXXX 方法轉換爲 Observable, Flowable, Single與Completable。
Javadoc: Maybe

6. Subject

Subject 能夠當作是一個橋樑或者代理,在 RxJava 實現中,它同時充當了 ObserverObservable 的角色。由於它是一個Observer,它能夠訂閱一個或多個 Observable;又由於它是一個 Observable ,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。

它既能夠是數據源observerable,也能夠是數據的訂閱者Observer。這個能夠經過源碼來了解一下。

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    ...
}

Subject 實際上仍是 Observable,不過它由於實現了Observer接口,能夠經過onNext、onComplete、onError方法發射和終止發射數據。

注意: 不要使用just(T)from(T)create(T)來使用Subject,由於會把Subject轉換爲Obserable。

在 Rxjava 中,官方一共爲咱們提供了幾種Subject:

  • AsyncSubject (僅釋放接收到的最後一個數據)
  • BehaviorSubject (釋放訂閱前最後一個數據和訂閱後接收到的全部數據)
  • PublishSubject (釋放訂閱後接收到的數據)
  • ReplaySubject (釋放接收到的全部數據)
  • UnicastSubject (僅支持訂閱一次的Subject)
  • Serialized(串行化)
  • TestSubject(在2.x中被TestScheduler和TestObserver替代)

6.1 AsyncSubject

AsyncSubject 僅釋放 onComplete() 以前的最後一個數據(必須調用subject.onComplete()纔會發送數據,不然觀察者不會接收到任何數據)。

能夠獲取數據業務邏輯的最後的結果數據。

img-AsyncSubject

注意: 若是因異常(Error)終止,將不會向後續的Observer釋放數據,可是會向Observer傳遞一個異常通知。

實例代碼:

// 注意: 不要使用just(T)、from(T)、create(T)來使用Subject,由於會把Subject轉換爲Obserable
    // 不管訂閱的時候AsyncSubject是否Completed,都可以收到最後一個值的回調
    AsyncSubject<String> asyncSubject = AsyncSubject.create();
    asyncSubject.onNext("emitter 1");
    asyncSubject.onNext("emitter 2");
    asyncSubject.onNext("emitter 3");
    asyncSubject.onNext("emitter 4");
    asyncSubject.onNext("emitter 5"); // 此時訂閱後將近發送此項數據
    // asyncSubject.onNext(1/0 + ""); // 發生error時將不會有數據發射,僅發送error通知
    asyncSubject.onComplete();

    // 訂閱後只會接收最後一個數據
    asyncSubject.subscribe(new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onNext(String t) {
            System.out.println("--> onNext = " + t);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError = " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    });

輸出:

--> onSubscribe
--> onNext = emitter 5
--> onComplete

Javadoc: AsyncSubject

6.2 BehaviorSubject

當觀察者訂閱 BehaviorSubject 時,它開始發射原始Observable在訂閱前的最後一個發射的數據(若是此時尚未收到任何數據,它會發射一個默認值),而後繼續發射其它任何來自原始Observable的數據。

能夠緩存訂閱前最後一次發出的數據,以及訂閱後發送的全部數據。

img-BehaviorSubject

注意: 若是因異常(Error)終止,將不會向後續的Observer釋放數據,可是會向Observer傳遞一個異常通知。

實例代碼:

// 建立無默認值的BehaviorSubject
    BehaviorSubject<Integer> subject = BehaviorSubject.create();
    // 建立有默認值的BehaviorSubject
    BehaviorSubject<Integer> subjectDefault = BehaviorSubject.createDefault(-1);
    
    // 觀察者對象
    Observer<Integer> observer = new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--------------------------------"); 
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onNext(Integer t) {
            System.out.println("--> onNext: " + t);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };
    
    // 1. 無數據發送的時候,發送默認值
    //  subjectDefault.subscribe(observer);
    
    // 2. 此時會發射全部訂閱後正常發射的數據: 1, 2, 3, 4, error
    //  subject.subscribe(observer);
    subject.onNext(1);
    subject.onNext(2);
    subject.onNext(3);
    
    // 3. 此時會發射訂閱前的一個數據及後面正常發射的數據: 3, 4, error
    //  subject.subscribe(observer);
    subject.onNext(4);
    subject.onError(new NullPointerException());
    
    // 4. 此時不會發射後續數據,僅發送Error通知
    //  subject.subscribe(observer);
    subject.onNext(5);
    subject.onComplete();
    
    // 5. 此時沒有數據發射,若是有error存在的話,將會發送error
    subject.subscribe(observer);

輸出:

--------------------------------
--> onSubscribe
--> onNext: -1
--------------------------------
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException

Javadoc: BehaviorSubject

6.3 PublishSubject

PublishSubject 只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。須要注意的是,PublishSubject 可能會一建立完成就馬上開始發射數據(除非你能夠阻止它發生),所以這裏有一個風險:在Subject被建立後到有觀察者訂閱它以前這個時間間隙內,可能有一個或多個數據可能會丟失。若是要確保來自原始Observable的全部數據都被分發,你須要這樣作:使用Create建立那個Observable以便手動給它引入 "冷" Observable的行爲(當全部觀察者都已經訂閱時纔開始發射數據),或者改用 ReplaySubject

若是 PublishSubject 在訂閱前已經調用了 onComplete() 方法,則觀察者不會接收到數據。

img-PublishSubject

注意: 若是因異常(Error)終止,將不會向後續的Observer釋放數據,可是會向Observer傳遞一個異常通知。

實例代碼:

// 釋放訂閱後接收到正常發射的數據,有error將不會發射任何數據
    PublishSubject<Integer> subject = PublishSubject.create();
    // 觀察者對象
    Observer<Integer> observer = new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--------------------------------");
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onNext(Integer t) {
            System.out.println("--> onNext: " + t);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };
    
    // 1. 此時訂閱將釋放後續正常發射的數據: 1,2, 3, 4, error
    // subject.subscribe(observer);
    subject.onNext(1);
    subject.onNext(2);
    
    // 2. 此時訂閱,發射後續正常發射的數據:3, 4, error
    // subject.subscribe(observer);
    subject.onNext(3);
    subject.onNext(4);
    
    // 此時將不會發送任何數據,直接發送error
    subject.onError(new NullPointerException());
    subject.onNext(5);
    subject.onComplete();
    
    // 3. 此時訂閱若是有error,僅發送error,不然無數據發射
    subject.subscribe(observer);

輸出:

--------------------------------
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onNext: 3
--> onNext: 4
--> onError: java.lang.NullPointerException
--------------------------------
--> onSubscribe
--> onError: java.lang.NullPointerException

Javadoc: PublishSubject

6.4 ReplaySubject

ReplaySubject 會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。也 有其它版本的ReplaySubject,在重放緩存增加到必定大小的時候或過了一段時間後會丟棄舊的數據(原始Observable發射的)。

若是你把 ReplaySubject 看成一個觀察者使用,注意不要從多個線程中調用它的onNext方法 (包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議, 給Subject的結果增長了不肯定性。

img-ReplaySubject
img-ReplaySubjectError

ReplaySubject 還能夠限制緩存數據的數量,限制緩存的時間:

  • create(bufferAge):指定內部緩存,減小內部緩存區增加過多的重分配
  • createWithSize(maxAge):指定訂閱後接受以前已經發射過數據的 maxAge 個數據項
  • createWithTime(timeout, TimeUnit, Scheduler) :接受訂閱後接受以前已經發射過指定 timeout 時間段內的數據項

實例代碼:

// 1. 接受收到的全部數據以及通知,對每隔Observer都執行相同的獨立的操做
    ReplaySubject<Integer> subject = ReplaySubject.create();
    
    // 2. 指定內部緩存大小,此方法避免在內部緩衝區增加以容納新緩衝區時過多的數組重分配
    // ReplaySubject<Integer> subject = ReplaySubject.create(5);
    
    // 3. createWithSize(count) 
    // 指定保留訂閱前數據項的個數的Subject,會發射訂閱前count個數據和後續的數據
    // ReplaySubject<Integer> subject = ReplaySubject.createWithSize(1);
    
    // 4. createWithTime(maxAge, unit, scheduler) 
    // 指定保留訂閱前指定maxAge時間段內數據和後續的數據
    // ReplaySubject<Integer> subject = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, Schedulers.trampoline());

    // 建立Observer(觀察者), 能夠接受Observable全部通知
    Observer<Integer> observer = new Observer<Integer>() {

        public void onSubscribe(Disposable d) {
            System.out.println("----------------------------------");
            System.out.println("--> onSubscribe");
        }

        public void onNext(Integer t) {
            System.out.println("--> onNext = " + t);
        }

        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };
    
    // 正常接受全部Observable的數據和通知
    subject.subscribe(observer);
    subject.onNext(1);
    subject.onNext(2);
    subject.onNext(3);
    
    // 正常接受全部Observable的數據和通知
    subject.subscribe(observer);
    subject.onNext(4);
    // 若是有error,則發送error通知,不影響任何一個觀察者數據與通知接受
    // subject.onError(new NullPointerException());
    subject.onNext(5);
    subject.onComplete();
    
    // 正常接受全部Observable的數據和通知
    subject.subscribe(observer);

輸出:

----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--> onNext = 4
--> onNext = 4
--> onNext = 5
--> onNext = 5
--> onComplete
--> onComplete
----------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--> onNext = 4
--> onNext = 5
--> onComplete

Javadoc: ReplaySubject

6.5 UnicastSubject

UnicastSubject 是僅支持訂閱一次的 Subject ,若是多個訂閱者試圖訂閱這個 Subject 將會受到 IllegalStateException

經常使用於一次性消費或安全場合,如網絡結算,支付等。

實例代碼:

// 建立UnicastSubject,只能被訂閱一次,不能再次被訂閱
    UnicastSubject<Integer> subject = UnicastSubject.create();
    
    // 建立Observer(觀察者), 能夠接受Observable全部通知
    Observer<Integer> observer = new Observer<Integer>() {

        public void onSubscribe(Disposable d) {
            System.out.println("--------------------------------");
            System.out.println("--> onSubscribe");
        }

        public void onNext(Integer t) {
            System.out.println("--> onNext = " + t);
        }

        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };
    // 訂閱後,此subject將不能夠再被訂閱了
    subject.subscribe(observer);
    
    subject.onNext(1);
    subject.onNext(2);
    subject.onNext(3);
    // 此時會有IllegalStateException,由於只能訂閱一次,不能重複訂閱
    subject.subscribe(observer);
    subject.onNext(4);
    subject.onNext(5);
    subject.onComplete();
    
    // 此時會有IllegalStateException,由於只能被訂閱一次,不能重複訂閱
    subject.subscribe(observer);

輸出:

--------------------------------
--> onSubscribe
--> onNext = 1
--> onNext = 2
--> onNext = 3
--------------------------------
--> onSubscribe
--> onError: java.lang.IllegalStateException: Only a single observer allowed.
--> onNext = 4
--> onNext = 5
--> onComplete
--------------------------------
--> onSubscribe
--> onError: java.lang.IllegalStateException: Only a single observer allowed.

Javadoc: UnicastSubject

6.6 SerializedSubject

在併發狀況下,不推薦使用一般的Subject對象,此時會產生屢次調用產生一系列不可控的問題,而是推薦使用 SerializedSubject,併發時只容許一個線程調用onNext等方法,將Subject 串行化 後,全部其餘的Observable和Subject方法都是線程安全的。

注意: 在Rxjava2 中 SerializedSubject 是一個不公開(不是public)的類型,意味着不能夠直接建立使用,可是能夠經過Subject.toSerialized()方法將Subject對象串行化保證其線程安全。同時也提供了 SerializedObserver,SerializedSubscriber等來包裝對象成爲串行化對象。

實例代碼:

// 建立Subject
    ReplaySubject<String> subject = ReplaySubject.create();

    // 經過toSerialized()進行串行化
    Subject<String> serialized = subject.toSerialized();

    // 訂閱
    serialized.subscribe(new Consumer<String>() {

        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId());
        }
    });

    // 多線程執行
    for (int i = 0; i < 10; i++) {
        final int value = i + 1;
        new Thread(new Runnable() {

            @Override
            public void run() {
                serialized.onNext(value + "-SendThreadID: " + Thread.currentThread().getId());
            }
        }).start();
    }

    System.in.read();
    
    System.out.println("---------------------------------------------------------------------");

    // 建立一個 SerializedObserver來進行串行化,保證線程安全
    // 注意:只保證同時只有一個線程調用 onNext, onCompleted, onError方法,並非將全部emit的值放到一個線程上而後處理
    SerializedObserver<String> observer = new SerializedObserver<String>(new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        @Override
        public void onNext(String t) {
            System.out.println("--> onNext: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError");
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete");
        }
    });

    // 訂閱
    subject.subscribe(observer);

    // 多線程執行
    for (int i = 0; i < 10; i++) {
        final int value = i + 1;
        new Thread(new Runnable() {

            @Override
            public void run() {
                subject.onNext(value + "-SendThreadID: " + Thread.currentThread().getId());
        //      if (value == 10) {
        //          subject.onComplete();
        //      }
            }
        }).start();
    }

    System.in.read();

輸出:

--> accept: 1-SendThreadID: 11, ReceiverThreadID: 11
--> accept: 2-SendThreadID: 12, ReceiverThreadID: 11
--> accept: 10-SendThreadID: 20, ReceiverThreadID: 11
--> accept: 9-SendThreadID: 19, ReceiverThreadID: 11
--> accept: 8-SendThreadID: 18, ReceiverThreadID: 11
--> accept: 7-SendThreadID: 17, ReceiverThreadID: 11
--> accept: 6-SendThreadID: 16, ReceiverThreadID: 11
--> accept: 4-SendThreadID: 14, ReceiverThreadID: 11
--> accept: 5-SendThreadID: 15, ReceiverThreadID: 11
--> accept: 3-SendThreadID: 13, ReceiverThreadID: 11
---------------------------------------------------------------------
--> onSubscribe
--> onNext: 1-SendThreadID: 11, ReceiverThreadID: 11
--> onNext: 3-SendThreadID: 13, ReceiverThreadID: 11
--> onNext: 4-SendThreadID: 14, ReceiverThreadID: 11
--> onNext: 5-SendThreadID: 15, ReceiverThreadID: 11
--> onNext: 6-SendThreadID: 16, ReceiverThreadID: 16
--> onNext: 7-SendThreadID: 17, ReceiverThreadID: 16
--> onNext: 8-SendThreadID: 18, ReceiverThreadID: 16
--> onNext: 9-SendThreadID: 19, ReceiverThreadID: 16
--> onNext: 10-SendThreadID: 20, ReceiverThreadID: 16

6.7 TestSubject

Rxjava2 中已經取消了TestSubject,使用TestSchedulerTestObserver替代,下面主要以 TestObserver 爲例進行介紹。

TestObserver 是一個一個記錄事件並容許對其進行斷言的觀察者,多用於測試場合。通常能夠建立一個TestObserver 對象或者從Observable 或者 Subject 中直接調用 test() 方法獲取。

實例代碼:

// Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onNext(100);
            emitter.onError(new NullPointerException());
            emitter.onComplete();
        }
    });

    // 1. 建立TestObserver對象
    TestObserver<Integer> testObserver = TestObserver.create(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe:");
        }

        @Override
        public void onNext(Integer t) {
            System.out.println("--> onNext: " + t);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete:");
        }
    });

    observable.subscribe(testObserver);
    try {
        // 斷言是否爲收到訂閱,可是沒有事件發送
        testObserver.assertEmpty();
        // 斷言是否收到onComplete()
        testObserver.assertComplete();
        // 斷言沒有數據100發送
        testObserver.assertNever(100);
        // 斷言接收數據結果
        testObserver.assertResult(1, 2, 3);
        // 斷言異常
        testObserver.assertError(NullPointerException.class);
        ... 更多請參考Api
    } catch (Error e) {
        System.out.println("Error: " + e);
    }

    System.out.println("-----------------------------------------------");
    // Subject
    AsyncSubject<Object> subject = AsyncSubject.create();

    // 2. 從Observable或者Subject中獲取TestObserver對象
    TestObserver<Integer> test = observable.test();
    TestObserver<Object> test2 = subject.test();
    System.out.println(test.values()); // received onNext values
    try {
        // 斷言是否爲收到訂閱,可是沒有事件發送
        test.assertEmpty();
        test2.assertEmpty();
        // 斷言是否收到onComplete()
        test.assertComplete();
        // 斷言沒有數據100發送
        test.assertNever(100);
        // 斷言接收數據結果
        test.assertResult(1, 2, 3);
        // 斷言異常
        test.assertError(NullPointerException.class);
        ... 更多請參考Api
    } catch (Error e) {
        System.out.println("Error: " + e);
    }

輸出(當出現斷言不匹配的狀況,會有相應Error拋出):

--> onSubscribe:
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 100
--> onError: java.lang.NullPointerException
Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0)
-----------------------------------------------
[1, 2, 3, 100]
Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0)

Javadoc: TestObserver

6.8 Processor

ProcessSubject 的做用和使用相同。Process 是 Rxjava2 中的新功能,它是一個接口,繼承自 Subscriber、Publish。與Subject 最大的區別是 Process 支持背壓,關於背壓,後續將會有專題文章來作詳細介紹。

7. Scheduler

若是你想給Observable操做符鏈添加多線程功能,你能夠指定操做符(或者特定的 Observable)在特定的調度器(Scheduler)上執行。

某些ReactiveX的Observable操做符有一些變體,它們能夠接受一個Scheduler參數。這個參數指定操做符將它們的部分或所有任務放在一個特定的調度器上執行。

使用ObserveOn和SubscribeOn操做符,你可讓Observable在一個特定的調度器上執行, ObserveOn指示一個Observable在一個特定的調度器上調用觀察者的onNext, onError和 onCompleted方法,SubscribeOn更進一步,它指示Observable將所有的處理過程(包括髮射數據和通知)放在特定的調度器上執行。

調度器的種類

下表展現了RxJava中可用的調度器種類:

調度器類型 做用
Schedulers.computation() 用於計算任務,如事件循環或和回調處理,不要用於IO操做(IO操做請使用Schedulers.io()),默認線程數等於處理器的數量 。
Schedulers.from(executor) 使用指定的Executor做爲調度器。
Schedulers.trampoline() 調度在當前線程上工做,但不當即執行。當其它排隊的任務完成後,在當前線程排隊開始執行。
Schedulers.io() 用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用 Schedulers.computation();Schedulers.io( )默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器。
Schedulers.newThread() 爲每一個任務建立一個新線程
Schedulers.single() 一個默認的、共享的、單線程支持的調度器實例,用於須要在同一後臺線程上強順序執行的工做。

關於Rxjava中的線程模型、線程轉換操做、調度器的使用等後面會有專題文章來詳細介紹。

小結:

本章主要介紹了Rxjava的概念與添加使用依賴、Rxjava中的觀察者模式、Observable、Flowable、Subject,Schedule等基礎對象的介紹與使用,應該能夠對Rxjava的概念及基本對象有了基本的認識和了解,以及簡單的上手使用。

有關Rxjava2 的其餘相關部份內容,後續將有系列的文章來介紹,請關注上面的實時文章目錄。

相關文章
相關標籤/搜索