RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.java
RxJava是使用Java VM實現的響應式編程庫:一個經過使用可觀察序列來編寫異步和基於事件的程序的庫。react
它擴展了觀察者模式以支持數據/事件序列,並添加了容許您以聲明方式組合序列的運算符,同時抽象出對低級線程、線程同步、線程安全和併發數據結構等問題的關注。android
RxJava存在1.x版本和2.x版本,API改動較大,接入方法基本不兼容,但實現思路相似,作了必定的優化,下面是官方對RxJava2的介紹。git
Version 2.x (Javadoc)
- single dependency: Reactive-Streams
continued support for Java 6+ & Android 2.3+github
performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.編程
Java 8 lambda-friendly API安全
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)微信
async or synchronous execution數據結構
virtual time and schedulers for parameterized concurrency併發
單一依賴:Reactive-Streams
繼續支持Java 6+和android 2.3+
經過1.x和Reactive-Streams-Commons項目的積累,實現設計變動,提升性能。
友好地支持Java 8 lambda表達式
靈活的處理併發,包括threads, pools, event loops, fibers, actors等
同步或異步操做
爲參數化的併發設計了調度器
既然都點到這篇文章裏了,難道還不下載源碼看看嗎??
這裏1.x版本和2.x版本在不一樣的分支上
public static void main(String[] args) {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("test");
subscriber.onCompleted();
}
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("onNext:" + s);
}
});
}
複製代碼
拆分一下,分爲兩部分
// 經過Observable的create靜態方法,傳入一個OnSubscribe對象
// 這個OnSubscribe對象附帶了一個call方法,用於回調
// 整個create方法返回了一個Observable對象的實例
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("test");
subscriber.onCompleted();
}
}
});
複製代碼
// 調用observable的subscribe方法,傳入一個Observer對象,
// 這個Observer對象附帶了三個回調方法
// 經過這裏subscribe方法調用上面Observable.OnSubscribe對象中的call方法
// 再在call方法中的subscriber對象調用這裏Observer中的onNext,onCompleted方法
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("onNext:" + s);
}
});
複製代碼
上面的程序打印出的效果是:
onNext:test
onCompleted
從Observable.create方法開始
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
// 接收一個OnSubscribe參數,調用構造函數返回一個Observable
// 1)、先看Observable的構造函數作了什麼
// 2)、再看OnSubscribe類是怎樣的
return new Observable<T>(RxJavaHooks.onCreate(f));
}
複製代碼
Observable的構造函數
// Obserbable類中存在一個final變量是OnSubscribe類型的
final OnSubscribe<T> onSubscribe;
// 構造方法將傳入的OnSubscribe對象賦值給局部變量onSubscribe保存起來
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
複製代碼
再來看OnSubscribe類,其實就是一個包含有一個call方法的類,在基本使用的第2步,調用observable對象subscribe方法時觸發。
/** * 這裏的OnSubscribe是一個接口,繼承子Action1接口 * * 在Observable.subscribe方法被調用的時候執行 * 這裏的subscribe是第2部分,若是印象模糊能夠回頭看一下上面的基本使用部分 * @param <T> the output value type */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
/** * 一個call方法,observable對象subscribe方法時觸發 * 用上面的基本使用的例子,這裏的call攜帶的範型T就是Subscriber的實例subscriber * 上面call方法就是調用subscribe中的onNext方法和onCompleted * 能夠看出這裏的onNext和onCompleted就是基本使用例子第二步的subscribe方法傳入Observer對象的方法 * * 繼續跟Action和 Action繼承的Function都沒有實現 * @param <T> the first argument type */
public interface Action1<T> extends Action {
void call(T t);
}
複製代碼
能夠看出例子第1步實例Observable.OnSubscribe對象的時候實現了一個call方法,call方法傳入的參數就是Subscriber實例,方法內調用的onNext方法和onCompleted方法執行了第2步中Observer實例的對應方法,因此這裏能夠猜想Observer是一個接口,Subscriber實現了Observer。咱們來看一下這兩個類。
/** * 很熟悉的三連,本身實現,完成業務 */
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
/** * 和咱們的猜想一致,Subscriber實現了Observer */
public abstract class Subscriber<T> implements Observer<T>, Subscription {
...
}
複製代碼
到這裏應該對總體有一個理解了,咱們再來看一下observable.subscribe方法,結束後在進行總結。
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
// 這裏我門傳入的是new Observer,直接進入下面部分
return subscribe(new ObserverSubscriber<T>(observer));
}
...
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
// 方法比較長,這裏只分析重要部分
// 這裏的RxJavaHooks.onObservableStart方法返回了observable.onSubscribe
// 再調用observable類中的局部變量onSubscribe的call方法,具體的實現就是咱們再例子中實現的call方法,調用了subscriber.onNext和subscriber.onCompleted
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
// RxJavaHooks.onObservableStart分析
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
// 會進入該方法,最後返回的就是Observable構造函數中存儲的局部變量onSubscribe,不要將這個call和我門本身實現的call方法弄混了
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
複製代碼
其實分析到這裏就結束了,將整個調用鏈鏈接了起來
咱們再來看一下關於RxJavaHooks.onObservableStart分析,
分析這裏的返回值就是Observable構造函數中存儲的局部變量onSubscribe。
對RxJavaHooks.onObservableStart方法進行斷點調試,進入f!=null 判斷。
// Func2是一個接口,咱們繼續使用斷點調試查看具體的實現
public interface Func2<T1, T2, R> extends Function {
R call(T1 t1, T2 t2);
}
複製代碼
// 經過斷點調試找到call方法的實現
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
複製代碼
// 返回了第二個參數
@Deprecated
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
複製代碼
最後返回了第二個參數,就是observable.onSubscribe。
第一步:在調用create方法構造了一個Observable對象,而且在Observable對象的構造方法中,將局部變量onSubscribe賦值,該onSubscribe實現了call方法等待被回調,call方法中提供了一個subscriber實例,該實例實現了Observer,有onCompleted,onError,onNext方法能夠進行調用。
第二步:執行Observable實例的subscribe方法,傳入了Observer對象,實現了onCompleted,onError,onNext方法。再經過RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
方法回調到了第一步中OnSubscribe的call方法,這裏的subscriber方法就是第二步中實例的Observer。
總體就是在第一步中寫了一個回調,等待第二步subscribe方法調起。第二步中的Observer實現了一個回調的三個方法,供第一步中的回調函數內調用。