RxJava系列二(基本概念及使用介紹)

轉載請註明出處:https://zhuanlan.zhihu.com/p/20687307java


前言

上一篇的示例代碼中你們必定發現了Observable這個類。從純Java的觀點看,Observable類源自於經典的觀察者模式。RxJava的異步實現正是基於觀察者模式來實現的,並且是一種擴展的觀察者模式。異步

觀察者模式

觀察者模式基於Subject這個概念,Subject是一種特殊對象,又叫作主題或者被觀察者。當它改變時那些由它保存的一系列對象將會獲得通知,而這一系列對象被稱做Observer(觀察者)。它們會對外暴漏了一個通知方法(比方說update之類的),當Subject狀態發生變化時會調用的這個方法。ide

觀察者模式很適合下面這些場景中的任何一個:

  1. 當你的架構有兩個實體類,一個依賴另外一個,你想讓它們互不影響或者是獨立複用它們時。

  2. 當一個變化的對象通知那些與它自身變化相關聯的未知數量的對象時。

  3. 當一個變化的對象通知那些無需推斷具體類型的對象時。

一般一個觀察者模式的類圖是這樣的:

若是你對觀察者模式不是很瞭解,那麼強烈建議你先去學習下。關於觀察者模式的詳細介紹能夠參考我以前的文章:設計模式之觀察者模式

擴展的觀察者模式

在RxJava中主要有4個角色:

  • Observable

  • Subject

  • Observer

  • Subscriber

Observable和Subject是兩個「生產」實體,Observer和Subscriber是兩個「消費」實體。說直白點Observable對應於觀察者模式中的被觀察者,而ObserverSubscriber對應於觀察者模式中的觀察者Subscriber實際上是一個實現了Observer的抽象類,後面咱們分析源碼的時候也會介紹到。Subject比較複雜,之後再分析。

上一篇文章中咱們說到RxJava中有個關鍵概念:事件。觀察者Observer和被觀察者Observable經過subscribe()方法實現訂閱關係。從而Observable 能夠在須要的時候發出事件來通知Observer

RxJava如何使用

我本身在學習一種新技術的時候一般喜歡先去了解它是怎麼用的,掌握了使用方法後再去深挖其原理。那麼咱們如今就來講說RxJava到底該怎麼用。

第一步:建立觀察者Observer

Observer<Object> observer = new Observer<Object>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
 };

這麼簡單,一個觀察者Observer建立了!

大兄弟你等等...,你以前那篇觀察者模式中不是說觀察者只提供一個update方法的嗎?這特麼怎麼有三個?!!

少年勿急,且聽我慢慢道來。在普通的觀察者模式中觀察者通常只會提供一個update()方法用於被觀察者的狀態發生變化時,用於提供給被觀察者調用。而在RxJava中的觀察者Observer提供了:onNext()onCompleted()onError()三個方法。還記得嗎?開篇咱們講過RxJava是基於一種擴展的觀察這模式實現,這裏多出的onCompleted和onError正是對觀察者模式的擴展。ps:onNext就至關於普通觀察者模式中的update

RxJava中添加了普通觀察者模式缺失的三個功能:

  1. RxJava中規定當再也不有新的事件發出時,能夠調用onCompleted()方法做爲標示;

  2. 當事件處理出現異常時框架自動觸發onError()方法;

  3. 同時Observables支持鏈式調用,從而避免了回調嵌套的問題。

第二步:建立被觀察者Observable

Observable.create()方法能夠建立一個Observable,使用crate()建立Observable須要一個OnSubscribe對象,這個對象繼承Action1。當觀察者訂閱咱們的Observable時,它做爲一個參數傳入並執行call()函數。

Observable<Object> observable = Observable.create(new 
            Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

除了create(),just()和from()一樣能夠建立Observable。看看下面兩個例子:

just(T...)將傳入的參數依次發送

Observable observable = Observable.just("One", "Two", "Three");
//上面這行代碼會依次調用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable<? extends T>)將傳入的數組或者Iterable拆分紅Java對象依次發送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面這行代碼會依次調用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第三步:被觀察者Observable訂閱觀察者Observerps:你沒看錯,不一樣於普通的觀察者模式,這裏是被觀察者訂閱觀察者

有了觀察者和被觀察者,咱們就能夠經過subscribe()來實現兩者的訂閱關係了。

observable.subscribe(observer);

observable.subscribe(observer)

連在一塊兒寫就是這樣:

Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

至此一個完整的RxJava調用就完成了。

兄臺,你叨逼叨叨逼叨的說了一大堆,但是我沒搞定你特麼到底在幹啥啊?!!不急,我如今就來告訴大家到底發生了什麼。

首先咱們使用Observable.create()建立了一個新的Observable<Integer>,併爲create()方法傳入了一個OnSubscribe,OnSubscribe中包含一個call()方法,一旦咱們調用subscribe()訂閱後就會自動觸發call()方法。call()方法中的參數Subscriber其實就是subscribe()方法中的觀察者Observer。咱們在call()方法中調用了5次onNext()和1次onCompleted()方法。一套流程周下來之後輸出結果就是下面這樣的:

Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted

看到這裏可能你又要說了,大兄弟你別唬我啊!OnSubscribe的call()方法中的參數Subscriber怎麼就變成了subscribe()方法中的觀察者Observer?!!!這倆兒貨明明看起來就是兩個不一樣的類啊。

咱們先看看Subscriber這個類:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    ...
}

從源碼中咱們能夠看到,Subscriber是Observer的一個抽象實現類,因此我首先能夠確定的是Subscriber和Observer類型是一致的。接着往下咱們看看subscribe()這個方法:

public final Subscription subscribe(final Observer<? super T> observer) {

    //這裏的if判斷對於咱們要分享的問題沒有關聯,能夠先無視
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}

咱們看到subscribe()方法內部首先將傳進來的Observer作了一層代理,將它轉換成了Subscriber。咱們再看看這個方法內部的subscribe()方法:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

進一步往下追蹤看看return後面這段代碼到底作了什麼。精簡掉其餘無關代碼後的subscribe(subscriber, this)方法是這樣的:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber,                     Observable<T> observable) {

    subscriber.onStart();
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        return Subscriptions.unsubscribed();
    }
}

咱們重點看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面這個hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它本身括號內的第二個參數observable.onSubscribe,而後調用了它的call方法。而這個observable.onSubscribe正是create()方法中的Subscriber,這樣整個流程就理順了。看到這裏是否是對RxJava的執行流程清晰了一點呢?這裏也建議你們在學習新技術的時候多去翻一翻源碼,知其然還要能知其因此然不是嗎。

subscribe()的參數除了能夠是Observer和Subscriber之外還能夠是Action一、Action0;這是一種更簡單的回調,只有一個call(T)方法;因爲太簡單這裏就不作詳細介紹了!

異步

上一篇文章中開篇就講到RxJava就是來處理異步任務的。可是默認狀況下咱們在哪一個線程調用subscribe()就在哪一個線程生產事件,在哪一個線程生產事件就在哪一個線程消費事件。那怎麼作到異步呢?RxJava爲咱們提供Scheduler用來作線程調度,咱們來看看RxJava提供了哪些Scheduler。

Schedulers 做用
Schedulers.immediate() 默認的Scheduler,直接在當前線程運行
Schedulers.newThread() 老是開啓一個新線程
Schedulers.io() 用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io()默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器
Schedulers.computation() 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU
Schedulers.from(executor) 使用指定的Executor做爲調度器
Schedulers.trampoline() 當其它排隊的任務完成後,在當前線程排隊開始執行
AndroidSchedulers.mainThread() RxAndroid中新增的Scheduler,表示在Android的main線程中運行

同時RxJava還爲咱們提供了subscribeOn()observeOn()兩個方法來指定Observable和Observer運行的線程。

Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

上面這段代碼你們應該有印象吧,沒錯正是咱們上一篇文章中的例子。subscribeOn(Schedulers.io())指定了獲取小區列表、處理房源信息等一系列事件都是在IO線程中運行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展現房源的操做在UI線程執行。這就作到了在子線程獲取房源,主線程展現房源。

好了,RxJava系列的入門內容咱們就聊到這。下一篇咱們再繼續介紹更多的API以及它們內部的原理。

若是你們喜歡這一系列的文章,歡迎關注個人知乎專欄和GitHub。

相關文章
相關標籤/搜索