RxJava
的關鍵是異步,即便隨着程序的邏輯變得複雜,它依然可以保持簡潔。bash
API
介紹和原理剖析觀察者模式面向的需求是:A
對象(觀察者)對B
對象(被觀察者)的某種變化高度敏感,須要在B
變化的一瞬間作出反應,觀察者採用註冊Register
或者訂閱Subscribe
的方式,告訴觀察者,我須要你的某某狀態,並在它變化的時候通知我,在RxJava
當中,Observable
是被觀察者,Observer
就是觀察者。異步
RxJava
有四個基本概念:ide
Observable
:被觀察者。Observer
:觀察者。Subscribe
:訂閱。Event
:事件。Observable
和Observer
經過subscribe
方法實現訂閱關係,Observable
能夠在須要的時候發出事件來通知Observer
。函數
RxJava
有如下三種事件:ui
onNext
:普通事件。onCompleted
:RxJava
不只把每一個事件單獨處理,還會把它們看做一個隊列,當不會再有新的onNext
事件發出時,須要觸發onCompleted
事件做爲標誌。onError
:onCompleted
和有且僅有一個,而且是事件序列中的最後一個。RxJava
的基本實現有如下三點: 1)建立觀察者 - Observer
spa
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext");
}
};
複製代碼
除了Observer
接口以外,RxJava
還內置了一個實現了Observer
的抽象類:Subscriber
,它對Observer
接口進行了一些擴展,實質上在RxJava
的subscribe
過程當中,Observer
也老是被轉換成爲一個Subscriber
再使用,他們的區別在與:線程
onStart
:這是新增的方法,它會在subscribe
剛開始,而事件還未發送以前被調用,它老是在subscribe
所發生的線程被調用。unsubscribe
:這是它實現的另外一個接口Subscription
的方法,用於取消訂閱,在這個方法被調用後,Subscriber
將再也不接收事件,通常在調用這個方法前,可使用isUnsubscribed
判斷一下狀態,Observable
在訂閱以後會持有Subscriber
的引用,所以不釋放會有內存泄漏的危險。2)建立被觀察者 - Observable
RxJava
用create
方法來建立一個observable
,3d
rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
});
複製代碼
這裏傳入了一個Observable.OnSubscribe<T>
對象做爲參數,它會被存儲在返回的Observable
對象當中,它的做用至關於一個計劃表,當Observable
被訂閱的時候,OnSubscribe
的call
方法會自動被調用,事件序列被依次觸發。 create
是RxJava
最基本的創造事件序列的方法,基於這個方法,還提供了一些快捷方法來建立事件隊列:代理
just(T...)
Observable observable = Observable.just("Hello", "Hi", "Aloha");
複製代碼
from(T[]) / from(Iterable<? extends T>)
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
複製代碼
3)訂閱 - subscribe
code
observable.subscribe(observer);
observable.subscribe(subscriber);
複製代碼
其內部核心的代碼相似於:
public Subscription subscribe(Subscriber subscriber) {
//準備方法。
subscriber.onStart();
//事件發送的邏輯開始執行,這個onSubscribe就是建立Observable時新建的OnSubscribe對象。
onSubscribe.call(subscriber);
//把傳入的Subscriber轉換爲Subscription並返回,方便unsubscribe。
return subscriber;
}
複製代碼
Observable.subscribe
方法除了支持傳入Observer
和Subscriber
,還支持傳入Action0
、Action1
這樣不完整定義的回調,RxJava
會自動根據定義建立出Subscriber
。
在不指定線程的狀況下,RxJava
遵循這樣的原則,在哪一個線程調用subscribe
,就在哪一個線程產生事件,在哪一個線程產生事件,就在哪一個線程消費事件,若是須要消費線程,那麼就須要用到Scheduler
, RxJava
內置了幾個Scheduler
:
Schedulers.immediate
:直接在當前線程運行。Schedulers.newThread
:老是啓用新線程,並在線程執行操做。Schedulers.io
:其內部實現是一個無數量上限的的線程池,能夠重用空閒的線程,不要把計算工做放在io
,能夠避免建立沒必要要的線程。Schedulers.computation
:使用固定的線程池,大小爲CPU
核數。AndroidSchedulers.mainThread
:指定的操做將在Android
主線程中運行。對線程控制有如下兩個方法:
subscribeOn
:指定subscribe
發生的線程,即Observable.OnSubscribe
被激活時所處的線程,也就是call
方法執行時所處的線程。observeOn
:指定Subscriber
所運行在的線程。observeOn
指定的是Subscriber
的線程,而這個Subscriber
並不必定是subscribe()
參數中的Subscriber
,而是observeOn
執行時的當前Observable
所對應的Subscriber
,即它的直接下級Subscriber
,也就是它以後的操做所在的線程,所以,若是有屢次切換線程的要求,只要在每一個想要切換線程的位置調用依次observeOn
便可。 和observeOn
不一樣,subscribeOn
只能調用一次,下面咱們來分析一下它的內部實現,首先是subscribeOn
的原理: subscribeOn
和ObserveOn
都作了線程切換的工做:
subscribeOn
的線程切換髮生在OnSubscribe
中,即在它通知上一級的OnSubscribe
時,這時事件尚未發送,所以subscribeOn
的線程控制能夠從事件發出的開端形成影響。observeOn
的線程切換則發生在它內建的Subscriber
中,即發生在它即將給下一級Subscriber
發送事件時,所以控制的是它後面的線程。變換,就是將事件序列中的對象或整個序列進行加工處理,轉換不一樣的事件或者序列。
5.1 map()
經過FuncX
,把參數中的Integer
轉換成爲String
,是最經常使用的變換,這個變換是發生在subscribeOn
所指定的線程當中的。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
long nextId = Thread.currentThread().getId();
Log.d(TAG, "onNext:" + s + ", threadId=" + nextId);
}
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
long callId = Thread.currentThread().getId();
subscriber.onNext(5);
subscriber.onCompleted();
}
});
observable.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
long mapId = Thread.currentThread().getId();
return "My Number is:" + integer;
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
複製代碼
其示意圖相似於:
5.2 flatMap
它和map
有一個共同點,就是把傳入的參數轉化以後返回另外一個對象,可是和map
不一樣的是,flatMap
返回的是一個Observable
對象,並且它並不直接把這個對象傳給Subscriber
,而是經過這個新建的Observable
來發送事件,其整個的調用過程:
Observable
。Observable
,經過它來發送事件。Observable
發送的事件,被匯入同一個Observable
,它複雜將這些事件同一交給Subscriber
的回調方法。Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext, s=" + s);
}
};
Observable<List<String>> observable = Observable.create(new Observable.OnSubscribe<List<String>>() {
@Override
public void call(Subscriber<? super List<String>> subscriber) {
List<String> list = new ArrayList<>();
list.add("First");
list.add("Second");
list.add("Third");
subscriber.onNext(list);
}
});
observable.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
return Observable.from(strings);
}
}).subscribe(subscriber);
複製代碼
其示意圖:
變換的實質是針對事件序列的處理和再發送,在RxJava
的內部,它們是基於同一個基礎的變換方法lift(operator)
//生成了一個新的Observable並返回。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
//構造新的Observable時,同時新建了一個OnSubscribe對象。
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//原始的onSubscribe。
onSubscribe.call(newSubscriber);
}
});
}
複製代碼
示意圖:
lift
建立了一個Observable
後,加上以前的原始Observable
,有兩個Observable
。Observable
裏的OnSubscribe
加上原始的,共有兩個OnSubscribe
。lift/map
建立的Observable
對象的subscribe
方法時,因而它觸發了上面的call
方法中的內容。OnSubscribe
的call
方法中,傳入了目標的Subscriber
,同時其外部類中還持有了原始的OnSubscribe
。咱們先經過operator.call(oldSubscriber)
方法,生成了新的Subscriber(new Subscriber)
,而後利用這個新的Subscriber
向原始的Observable
進行訂閱。下面咱們之前面map
實現的例子來分析一下源碼,上面的例子經過map
操做符把Integer
類型的Observable
和String
類型的Subscriber
生成了訂閱關係。
map
方法,它經過lift
方法返回了一個String
類型的Observable
。//其中T=Integer,R=String。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
複製代碼
OperatorMap
這個對象,這個對象實現了operator<R,T>
接口,而這個接口繼承於Func1<Subscriber<? super R>, Subscriber<? super T>>
,在它實現的call
方法中傳入了String
類型的Subscriber
(目標Subscriber
),並返回了Integer
類型的Subscriber
(代理Subscriber
),當它的方法被回調時,會調用目標Subscriber
的對應方法,其中在調用onNext
時,就用上了外部傳入的Func1
函數:@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
複製代碼
lift
方法:public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//返回一個Integer類型的Subscriber。
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
//關鍵方法:Integer類型的OnSubscribe調用對應的Subscribe,這個call方法裏面寫了咱們的邏輯,當它調用onNext(Integer integer)時,實際上調用的是onNext(String str)。
onSubscribe.call(st);
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
o.onError(e);
}
}
});
}
複製代碼
subscribe
方法。