史上最淺顯易懂的RxJava入門教程

RxJava是一個神奇的框架,用法很簡單,但內部實現有點複雜,代碼邏輯有點繞。我讀源碼時,確實有點似懂非懂的感受。網上關於RxJava源碼分析的文章,源碼貼了一大堆,代碼邏輯繞來繞去的,讓人看得雲裏霧裏的。既然用拆輪子的方式來分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴展性有關的代碼剔除,留下核心代碼帶你們揭祕 RxJava 的實現原理。java

什麼是RxJavareact

  • • Rx是Reactive Extensions的簡寫,翻譯爲響應的擴展。也就是經過由一方發出信息,另外一方響應信息並做出處理的核心框架代碼。
  • • 該框架由微軟的架構師Erik Meijer領導的團隊開發,並在2012年11月開源。
  • • Rx庫支持.NET、JavaScript和C++等,如今已經支持幾乎所有的流行編程語言了。
  • • Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是 reactivex.io。
  • • RxJava做爲一個流行的框架,其源碼依託在GitHub,除了支持RxJava,針對安卓系統也除了一個支持框架RxAndroid
    2.RxJava簡化代碼
    通常咱們在安卓項目中,若是想從後臺獲取數據並刷新界面,代碼大概以下,下面咱們來看一個例子:
    new Thread() {br/>@Override
    public void run() {
    super.run();
    for (File folder : folders) {
    File[] files = folder.listFiles();
    for (File file : files) {
    if (file.getName().endsWith(".png")) {
    final Bitmap bitmap = getBitmapFromFile(file);
    getActivity().runOnUiThread(new Runnable() {br/>@Override
    public void run() {
    imageCollectorView.addImage(bitmap);
    }
    });
    }
    }
    }
    }
    }.start();
    上面的代碼通過多層嵌套後 可讀性太差了!若是你用了RxJava 能夠這樣寫:
    Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {br/>@Override
    public Observable<File> call(File file) {
    return Observable.from(file.listFiles());
    }
    })
    .filter(new Func1<File, Boolean>() {br/>@Override
    public Boolean call(File file) {
    return file.getName().endsWith(".png");
    }
    })
    .map(new Func1<File, Bitmap>() {br/>@Override
    public Bitmap call(File file) {
    return getBitmapFromFile(file);
    }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {br/>@Override
    public void call(Bitmap bitmap) {
    imageCollectorView.addImage(bitmap);
    }
    });
    這樣寫的好處就是減小層次嵌套 提升了代碼的可讀性,除了簡化代碼,RxJava還能夠爲每一個方法提供特定的運行線程。
    3.引入框架
    目前RxJava已經升級爲2.0版本,但爲了可以更好的理解RxJava,咱們能夠從1.0版本開始學習。也爲了讓咱們的安卓項目可以更好的使用RxJava,能夠在項目中引入gradle腳本依賴:
    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.1.6'
    如今 咱們的項目已經支持RxJava的功能了。
    4.響應式的核心
    所謂的響應式,無非就是存在這樣的2個部分,一部分負責發送事件/消息,另外一部分負責響應事件/消息。
    之前若是咱們想看新聞,通常須要經過看報紙。好比,你對某個報刊雜誌比較感興趣,那麼你首先要作3件事:
      1. 提供你家的地址
      1. 找到對應的報社
      1. 去報社訂閱整個月的報紙
        通過了上面的流程,之後天天只要有新的報刊資料出來了,報社都會將雜誌發送到你家。
        史上最淺顯易懂的RxJava入門教程

將上面的例子進行代碼抽象,步驟以下:android

    1. 提供觀察者(由於你是關心雜誌內容的人 因此你是觀察該事件的人)
    1. 提供被觀察者(只要有新的雜誌出來 就須要通知關心的人 因此報社是被觀察的對象)
    1. 訂閱(也就是 觀察者&被觀察者之間要相互關聯 以便被觀察的對象一變化 就會立刻通知觀察該事件的對象)

史上最淺顯易懂的RxJava入門教程
上面示例的演示代碼以下:
//1.建立被觀察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {br/>@Override
public void call(Subscriber<? super String> subscriber) {
//4.開始發送事件
//事件有3個類型 分別是onNext() onCompleted() onError()
//onCompleted() onError() 通常都是用來通知觀察者 事件發送完畢了,二者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});編程

//2.建立觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.訂閱
observable.subscribe(subscriber);

輸出以下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代碼運行的原理markdown

  • • 上面的代碼中,當觀察者subscriber訂閱了被觀察者observable以後,系統會自動回調observable對象內部的call()。
  • • 在observable的call()方法實體中,發送瞭如onNext/onCompleted/onError事件後。
  • • 接着subscriber就能回調到到對應的方法。
    5.被觀察者變種
    普通的Observable發送須要三個方法onNext, onError, onCompleted,而Single做爲Observable的變種,只須要兩個方法:
  • • onSuccess - Single發射單個的值到這個方法
  • • onError - 若是沒法發射須要的值,Single發射一個Throwable對象到這個方法
    Single只會調用這兩個方法中的一個,並且只會調用一次,調用了任何一個方法以後,訂閱關係終止。
    final Single<String> single = Single.create(new Single.OnSubscribe<String>() {br/>@Override
    public void call(SingleSubscriber<? super String> singleSubscriber) {
    //先調用onNext() 最後調用onCompleted()
    //singleSubscriber.onSuccess("Hello Android !");
    //只調用onError();
    singleSubscriber.onError(new NullPointerException("mock Exception !"));
    }
    });架構

    Observer<String> observer = new Observer<String>() {br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }框架

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };
    single.subscribe(observer);
    6.觀察者變種
    Observer觀察者對象,上面咱們用Subscriber對象代替。由於該對象自己就是繼承了Observer。
    該對象實現了onNext()&onCompleted()&onError()事件,咱們若是對哪一個事件比較關心,只須要實現對應的方法便可,代碼以下:
    //建立觀察者
    Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }編程語言

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };ide

    //訂閱
    observable.subscribe(subscriber);
    上面的代碼中,若是你只關心onNext()事件,但卻不得不實現onCompleted()&onError()事件.這樣的代碼就顯得很臃腫。鑑於這種需求,RxJava框架在訂閱方面作了特定的調整,代碼以下:
    //爲指定的onNext事件建立獨立的接口
    Action1<String> onNextAction = new Action1<String>() {br/>@Override
    public void call(String s) {
    Log.i(TAG, "call: "+s);
    }
    };函數

    //訂閱
    observable.subscribe(onNextAction);

     

不知道你們注意到沒有,subscribe()訂閱的再也不是觀察者,而是特定的onNext接口對象。相似的函數以下,咱們能夠根據須要實現對應的訂閱:

public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

這裏還有一個forEach函數有相似的功能:

public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

##7.Subject變種

上面2節中既介紹了被觀察者變種,又介紹了觀察者變種,這裏再介紹一種雌雄同體的對象(既做爲被觀察者使用,也能夠做爲觀察者)。

針對不一樣的場景一共有四種類型的Subject。他們並非在全部的實現中所有都存在。

###AsyncSubject

一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。它會把這最後一個值發射給任何後續的觀察者。

如下貼出代碼:
//建立被觀察者final AsyncSubject<String> subject = AsyncSubject.create();//建立觀察者
Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(String s) {
    Log.i(TAG, "s:" + s);

}

};//訂閱事件
subject.subscribe(subscriber);//被觀察者發出事件 若是調用onCompleted(),onNext()則會打印最後一個事件;若是沒有,onNext()則不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
輸出:
s:Hello Java onCompleted
然而,若是原始的Observable由於發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。

上面的觀察者被觀察者代碼相同,如今發出一系列信號,並在最後發出異常 代碼以下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//由於發送了異常 因此onNext()沒法被打印
subject.onError(null);
###BehaviorSubject

當觀察者訂閱BehaviorSubject時,他會將訂閱前最後一次發送的事件和訂閱後的全部發送事件都打印出來,若是訂閱前無發送事件,則會默認接收構造器create(T)裏面的對象和訂閱後的全部事件,代碼以下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(Object o) {
    Log.i(TAG, "onNext: " + o);
}

};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//這裏開始訂閱 若是上面的3個註釋沒去掉,則Hello C的事件和訂閱後面的事件生效//若是上面的三個註釋去掉 則打印構造器NORMAL事件生效後和訂閱後面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject

PublishSubject只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。

須要注意的是,PublishSubject可能會一建立完成就馬上開始發射數據,所以這裏有一個風險:在Subject被建立後到有觀察者訂閱它以前這個時間段內,一個或多個數據可能會丟失。

代碼以下:
PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

輸出以下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject

ReplaySubject會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。

代碼以下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

輸出以下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject總結

AsyncSubject不管什麼時候訂閱 只會接收最後一次onNext()事件,若是最後出現異常,則不會打印任何onNext()BehaviorSubject會從訂閱前最後一次oNext()開始打印直至結束。若是訂閱前無調用onNext(),則調用默認creat(T)傳入的對象。若是異常後才調用,則不打印onNext()PublishSubject只會打印訂閱後的任何事件。ReplaySubject不管訂閱在什麼時候都會調用發送的事件。

相關文章
相關標籤/搜索