本文由
玉剛說寫做平臺
提供寫做贊助java原做者:
四月葡萄
git版權聲明:本文版權歸微信公衆號 玉剛說 全部,未經許可,不得以任何形式轉載github
本文主要是對RxJava的消息訂閱和線程切換進行源碼分析,相關的使用方式等不做詳細介紹。數據庫
本文源碼基於rxjava:2.1.14
。json
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.緩存
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.安全
上面這段話來自於RxJava在github上面的官方介紹。翻譯成中文的大概意思就是:微信
RxJava是一個在Java虛擬機上的響應式擴展,經過使用可觀察的序列將異步和基於事件的程序組合起來的一個庫。網絡
它擴展了觀察者模式來支持數據/事件序列,而且添加了操做符,這些操做符容許你聲明性地組合序列,同時抽象出要關注的問題:好比低級線程、同步、線程安全和併發數據結構等。數據結構
簡單點來講, RxJava就是一個使用了觀察者模式,可以異步的庫。
上面說到,RxJava擴展了觀察者模式,那麼什麼是觀察模式呢?咱們先來了解一下。
舉個例子,以微信公衆號爲例,一個微信公衆號會不斷產生新的內容,若是咱們讀者對這個微信公衆號的內容感興趣,就會訂閱這個公衆號,當公衆號有新內容時,就會推送給咱們。咱們收到新內容時,若是是咱們感興趣的,就會點進去看下;若是是廣告的話,就可能直接忽略掉。這就是咱們生活中遇到的典型的觀察者模式。
在上面的例子中,微信公衆號就是一個被觀察者(Observable
),不斷的產生內容(事件),而咱們讀者就是一個觀察者(Observer
) ,經過訂閱(subscribe
)就可以接受到微信公衆號(被觀察者)推送的內容(事件),根據不一樣的內容(事件)作出不一樣的操做。
RxJava的擴展觀察者模式中就是存在這麼4種角色:
角色 | 角色功能 |
---|---|
被觀察者(Observable ) |
產生事件 |
觀察者(Observer ) |
響應事件並作出處理 |
事件(Event ) |
被觀察者和觀察者的消息載體 |
訂閱(Subscribe ) |
鏈接被觀察者和觀察者 |
RxJava中的事件分爲三種類型:Next
事件、Complete
事件和Error
事件。具體以下:
事件類型 | 含義 | 說明 |
---|---|---|
Next |
常規事件 | 被觀察者能夠發送無數個Next事件,觀察者也能夠接受無數個Next事件 |
Complete |
結束事件 | 被觀察者發送Complete事件後能夠繼續發送事件,觀察者收到Complete事件後將不會接受其餘任何事件 |
Error |
異常事件 | 被觀察者發送Error事件後,其餘事件將被終止發送,觀察者收到Error事件後將不會接受其餘任何事件 |
在分析RxJava消息訂閱原理前,咱們仍是先來看下它的簡單使用步驟。這裏爲了方便講解,就不用鏈式代碼來舉例了,而是採用分步驟的方式來逐一說明(平時寫代碼的話仍是建議使用鏈式代碼來調用,由於更加簡潔)。其使用步驟以下:
- 建立被觀察者(
Observable
),定義要發送的事件。- 建立觀察者(
Observer
),接受事件並作出響應操做。- 觀察者經過訂閱(
subscribe
)被觀察者把它們鏈接到一塊兒。
這裏咱們就根據上面的步驟來實現這個例子,以下:
//步驟1. 建立被觀察者(Observable),定義要發送的事件。
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//步驟2. 建立觀察者(Observer),接受事件並作出響應操做。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//步驟3. 觀察者經過訂閱(subscribe)被觀察者把它們鏈接到一塊兒。
observable.subscribe(observer);
複製代碼
其輸出結果爲:
onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
複製代碼
下面咱們對消息訂閱過程當中的源碼進行分析,分爲兩部分:建立被觀察者過程和訂閱過程。
首先來看下建立被觀察者(Observable
)的過程,上面的例子中咱們是直接使用Observable.create()
來建立Observable
,咱們點進去這個方法看下。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
能夠看到,create()
方法中也沒作什麼,就是建立一個ObservableCreate
對象出來,而後把咱們自定義的ObservableOnSubscribe
做爲參數傳到ObservableCreate
中去,最後就是調用 RxJavaPlugins.onAssembly()
方法。
咱們先來看看ObservableCreate
類:
public final class ObservableCreate<T> extends Observable<T> {//繼承自Observable
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;//把咱們建立的ObservableOnSubscribe對象賦值給source。
}
}
複製代碼
能夠看到,ObservableCreate
是繼承自Observable
的,而且會把ObservableOnSubscribe
對象給存起來。
再看下RxJavaPlugins.onAssembly()
方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//省略無關代碼
return source;
}
複製代碼
很簡單,就是把上面建立的ObservableCreate
給返回。
因此Observable.create()
中就是把咱們自定義的ObservableOnSubscribe
對象從新包裝成一個ObservableCreate
對象,而後返回這個ObservableCreate
對象。 注意,這種從新包裝新對象的用法在RxJava中會頻繁用到,後面的分析中咱們還會屢次遇到。 放個圖好理解,包起來哈~
Observable.create()
的時序圖以下所示:
接下來咱們就看下訂閱過程的代碼,一樣,點進去Observable.subscribe()
:
public final void subscribe(Observer<? super T> observer) {
//省略無關代碼
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
//省略無關代碼
}
複製代碼
能夠看到,實際上其核心的代碼也就兩句,咱們分開來看下:
public static <T> Observer<? super T> onSubscribe(
@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
//省略無關代碼
return observer;
}
複製代碼
跟以前代碼同樣,這裏一樣也是把原來的observer
返回而已。 再來看下subscribeActual()
方法。
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼
Observable
類的subscribeActual()
中的方法是一個抽象方法,那麼其具體實如今哪呢?還記得咱們前面建立被觀察者的過程嗎,最終會返回一個ObservableCreate
對象,這個ObservableCreate
就是Observable
的子類,咱們點進去看下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//觸發咱們自定義的Observer的onSubscribe(Disposable)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
能夠看到,subscribeActual()
方法中首先會建立一個CreateEmitter
對象,而後把咱們自定義的觀察者observer
做爲參數給傳進去。這裏一樣也是包裝起來,放個圖:
CreateEmitter
實現了
ObservableEmitter
接口和
Disposable
接口,以下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//代碼省略
}
複製代碼
而後就是調用了observer.onSubscribe(parent)
,實際上就是調用觀察者的onSubscribe()
方法,即告訴觀察者已經成功訂閱到了被觀察者。
繼續往下看,subscribeActual()
方法中會繼續調用source.subscribe(parent)
,這裏的source
就是ObservableOnSubscribe
對象,即這裏會調用ObservableOnSubscribe
的subscribe()
方法。 咱們具體定義的subscribe()
方法以下:
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
複製代碼
ObservableEmitter
,顧名思義,就是被觀察者發射器。 因此,subscribe()
裏面的三個onNext()
方法和一個onComplete()
會逐一被調用。 這裏的ObservableEmitter
接口其具體實現爲CreateEmitter
,咱們看看CreateEmitte
類的onNext()
方法和onComplete()
的實現:
//省略其餘代碼
@Override
public void onNext(T t) {
//省略無關代碼
if (!isDisposed()) {
//調用觀察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//調用觀察者的onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
能夠看到,最終就是會調用到觀察者的onNext()
和onComplete()
方法。至此,一個完整的消息訂閱流程就完成了。 另外,能夠看到,上面有個isDisposed()
方法能控制消息的走向,即可以切斷消息的傳遞,這個後面再來講。
Observable
(被觀察者)和Observer
(觀察者)創建鏈接(訂閱)以後,會建立出一個發射器CreateEmitter
,發射器會把被觀察者中產生的事件發送到觀察者中去,觀察者對發射器中發出的事件作出響應處理。能夠看到,是訂閱以後,Observable
(被觀察者)纔會開始發送事件。
放張事件流的傳遞圖:
再來看下訂閱過程的時序流程圖:
以前有提到過切斷消息的傳遞,咱們先來看下如何使用:
Observable observable = Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe : " + d);
mDisposable=d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
mDisposable.dispose();
Log.d(TAG, "切斷觀察者與被觀察者的鏈接");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
複製代碼
輸出結果爲:
onSubscribe : null
onNext : 文章1
切斷觀察者與被觀察者的鏈接
複製代碼
能夠看到,要切斷消息的傳遞很簡單,調用下Disposable
的dispose()
方法便可。調用dispose()
以後,被觀察者雖然能繼續發送消息,可是觀察者卻收不到消息了。 另外有一點須要注意,上面onSubscribe
輸出的Disposable
值是"null"
,並非空引用null
。
咱們這裏來看看下dispose()
的實現。Disposable
是一個接口,能夠理解Disposable
爲一個鏈接器,調用dispose()
後,這個鏈接器將會中斷。其具體實如今CreateEmitter
類,以前也有提到過。咱們來看下CreateEmitter
的dispose()
方法:
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
複製代碼
就是調用DisposableHelper.dispose(this)
而已。
public enum DisposableHelper implements Disposable {
DISPOSED
;
//其餘代碼省略
public static boolean isDisposed(Disposable d) {
//判斷Disposable類型的變量的引用是否等於DISPOSED
//即判斷該鏈接器是否被中斷
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
//這裏會把field給設爲DISPOSED
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}
複製代碼
能夠看到DisposableHelper
是一個枚舉類,而且只有一個值:DISPOSED
。dispose()
方法中會把一個原子引用field
設爲DISPOSED
,即標記爲中斷狀態。所以後面經過isDisposed()
方法便可以判斷鏈接器是否被中斷。
再回頭看看CreateEmitter
類中的方法:
@Override
public void onNext(T t) {
//省略無關代碼
if (!isDisposed()) {
//若是沒有dispose(),纔會調用onNext()
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
//若是dispose()了,會調用到這裏,即最終會崩潰
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//省略無關代碼
if (!isDisposed()) {
try {
//若是沒有dispose(),纔會調用onError()
observer.onError(t);
} finally {
//onError()以後會dispose()
dispose();
}
//若是沒有dispose(),返回true
return true;
}
//若是dispose()了,返回false
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//若是沒有dispose(),纔會調用onComplete()
observer.onComplete();
} finally {
//onComplete()以後會dispose()
dispose();
}
}
}
複製代碼
從上面的代碼能夠看到:
- 若是沒有
dispose
,observer.onNext()
纔會被調用到。onError()
和onComplete()
互斥,只能其中一個被調用到,由於調用了他們的任意一個以後都會調用dispose()
。- 先
onError()
後onComplete()
,onComplete()
不會被調用到。反過來,則會崩潰,由於onError()
中拋出了異常:RxJavaPlugins.onError(t)
。其實是dispose
後繼續調用onError()
都會炸。
上面的例子和分析都是在同一個線程中進行,這中間也沒涉及到線程切換的相關問題。可是在實際開發中,咱們一般須要在一個子線程中去進行一些數據獲取操做,而後要在主線程中去更新UI,這就涉及到線程切換的問題了,經過RxJava咱們也能夠把線程切換寫得還簡潔。
關於RxJava如何使用線程切換,這裏就不詳細講了。 咱們直接來看一個例子,並分別打印RxJava在運行過程當中各個角色所在的線程。
new Thread() {
@Override
public void run() {
Log.d(TAG, "Thread run() 所在線程爲 :" + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable subscribe() 所在線程爲 :" + Thread.currentThread().getName());
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer onSubscribe() 所在線程爲 :" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer onNext() 所在線程爲 :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer onError() 所在線程爲 :" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer onComplete() 所在線程爲 :" + Thread.currentThread().getName());
}
});
}
}.start();
複製代碼
輸出結果爲:
Thread run() 所在線程爲 :Thread-2 Observer onSubscribe() 所在線程爲 :Thread-2 Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 Observer onNext() 所在線程爲 :main Observer onNext() 所在線程爲 :main Observer onComplete() 所在線程爲 :main 複製代碼
從上面的例子能夠看到:
Observer
(觀察者)的onSubscribe()
方法運行在當前線程中。Observable
(被觀察者)中的subscribe()
運行在subscribeOn()
指定的線程中。Observer
(觀察者)的onNext()
和onComplete()
等方法運行在observeOn()
指定的線程中。
下面咱們對線程切換的源碼進行一下分析,分爲兩部分:subscribeOn()
和observeOn()
。
首先來看下subscribeOn()
,咱們的例子中是這麼個使用的:
.subscribeOn(Schedulers.io())
複製代碼
subscribeOn()
方法要傳入一個Scheduler
類對象做爲參數,Scheduler
是一個調度類,可以延時或週期性地去執行一個任務。
經過Schedulers
類咱們能夠獲取到各類Scheduler
的子類。RxJava提供瞭如下這些線程調度類供咱們使用:
Scheduler類型 | 使用方式 | 含義 | 使用場景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操做線程 | 讀寫SD卡文件,查詢數據庫,訪問網絡等IO密集型操做 |
NewThreadScheduler | Schedulers.newThread() |
建立新線程 | 耗時操做等 |
SingleScheduler | Schedulers.single() |
單例線程 | 只需一個單例線程時 |
ComputationScheduler | Schedulers.computation() |
CPU計算操做線程 | 圖片壓縮取樣、xml,json解析等CPU密集型計算 |
TrampolineScheduler | Schedulers.trampoline() |
當前線程 | 須要在當前線程當即執行任務時 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主線程 | 更新UI等 |
下面咱們來看下Schedulers.io()
的代碼,其餘的Scheduler
子類都差很少,就不逐以分析了,有興趣的請自行查看哈~
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler io() {
//1.直接返回一個名爲IO的Scheduler對象
return RxJavaPlugins.onIoScheduler(IO);
}
static {
//省略無關代碼
//2.IO對象是在靜態代碼塊中實例化的,這裏會建立按一個IOTask()
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
//3.IOTask中會返回一個IoHolder對象
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
//4.IoHolder中會就是new一個IoScheduler對象出來
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
能夠看到,Schedulers.io()
中使用了靜態內部類的方式來建立出了一個單例IoScheduler
對象出來,這個IoScheduler
是繼承自Scheduler的。這裏mark一發,後面會用到這個IoScheduler
的。
而後,咱們就來看下subscribeOn()的代碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//省略無關代碼
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
能夠看到,首先會將當前的Observable
(其具體實現爲ObservableCreate
)包裝成一個新的ObservableSubscribeOn
對象。 放個圖:
跟前面同樣,RxJavaPlugins.onAssembly()
也是將ObservableSubscribeOn
對象原樣返回而已,這裏就不看了。 能夠看下ObservableSubscribeOn
的構造方法:
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
複製代碼
也就是把source
和scheduler
這兩個保存一下,後面會用到。
而後subscribeOn()
方法就完了。好像也沒作什麼,就是從新包裝一下對象而已,而後將新對象返回。即將一箇舊的被觀察者包裝成一個新的被觀察者。
接下來咱們回到訂閱過程,爲何要回到訂閱過程呢?由於事件的發送是從訂閱過程開始的啊。 雖然咱們這裏用到了線程切換,可是呢,其訂閱過程前面的內容跟上一節分析的是同樣的,咱們這裏就不重複了,直接從不同的地方開始。還記得訂閱過程當中Observable
類的subscribeActual()
是個抽象方法嗎?所以要看其子類的具體實現。在上一節訂閱過程當中,其具體實現是在ObservableCreate
類。可是因爲咱們調用subscribeOn()
以後,ObservableCreate
對象被包裝成了一個新的ObservableSubscribeOn
對象了。所以咱們就來看看ObservableSubscribeOn
類中的subscribeActual()
方法:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
subscribeActual()
中一樣也將咱們自定義的Observer
給包裝成了一個新的SubscribeOnObserver
對象。一樣,放張圖:
Observer
的
onSubscribe()
方法,能夠看到,到目前爲止,還沒出現過任何線程相關的東西,因此
Observer
的
onSubscribe()
方法就是運行在當前線程中。 而後咱們重點看下最後一行代碼,首先建立一個
SubscribeTask
對象,而後就是調用
scheduler.scheduleDirect()
.。 咱們先來看下
SubscribeTask
類:
//SubscribeTask是ObservableSubscribeOn的內部類
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//這裏的source就是咱們自定義的Observable對象,即ObservableCreate
source.subscribe(parent);
}
}
複製代碼
很簡單的一個類,就是實現了Runnable
接口,而後run()
中調用Observer.subscribe()
。
再來看下scheduler.scheduleDirect()
方法
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼
往下看:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//createWorker()在Scheduler類中是個抽象方法,因此其具體實如今其子類中
//所以這裏的createWorker()應當是在IoScheduler中實現的。
//Worker中能夠執行Runnable
final Worker w = createWorker();
//實際上decoratedRun仍是這個run對象,即SubscribeTask
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將Runnable和Worker包裝成一個DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
//Worker執行這個task
w.schedule(task, delay, unit);
return task;
}
複製代碼
咱們來看下建立Worker
和Worker
執行任務的過程。
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
//就是new一個EventLoopWorker,而且傳一個Worker緩存池進去
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
//構造方法
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//從緩存Worker池中取一個Worker出來
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//省略無關代碼
//Runnable交給threadWorker去執行
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
複製代碼
注意,不一樣的Scheduler
類會有不一樣的Worker
實現,由於Scheduler
類最終是交到Worker
中去執行調度的。
咱們來看下Worker
緩存池的操做:
static final class CachedWorkerPool implements Runnable {
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
//若是緩衝池不爲空,就從緩存池中取threadWorker
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
//若是緩衝池中爲空,就建立一個並返回。
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
複製代碼
咱們再來看下threadWorker.scheduleActual()
。 ThreadWorker
類沒有實現scheduleActual()
方法,其父類NewThreadWorker
實現了該方法,咱們點進去看下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
//構造方法中建立一個ScheduledExecutorService對象,能夠經過ScheduledExecutorService來使用線程池
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//這裏的decoratedRun實際仍是run對象
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將decoratedRun包裝成一個新對象ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//省略無關代碼
if (delayTime <= 0) {
//線程池中當即執行ScheduledRunnable
f = executor.submit((Callable<Object>)sr);
} else {
//線程池中延遲執行ScheduledRunnable
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
//省略無關代碼
return sr;
}
}
複製代碼
這裏的executor
就是使用線程池去執行任務,最終SubscribeTask
的run()
方法會在線程池中被執行,即Observable
的subscribe()
方法會在IO線程中被調用。這與上面例子中的輸出結果符合:
Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 複製代碼
Observer
(觀察者)的onSubscribe()
方法運行在當前線程中,由於在這以前都沒涉及到線程切換。- 若是設置了
subscribeOn(指定線程)
,那麼Observable
(被觀察者)中subscribe()
方法將會運行在這個指定線程中去。
來張總的subscribeOn()
切換線程時序圖
若是咱們屢次設置subscribeOn()
,那麼其執行線程是在哪個呢?先來看下例子
//省略先後代碼,看重點部分
.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(AndroidSchedulers.mainThread())//第三次
複製代碼
其輸出結果爲:
Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 複製代碼
即只有第一次的subscribeOn()
起做用了。這是爲何呢? 咱們知道,每調用一次subscribeOn()
就會把舊的被觀察者包裝成一個新的被觀察者,通過了三次調用以後,就變成了下面這個樣子:
ObservableSubscribeOn
(第一次)那一層時,管你以前是在哪一個線程,
subscribeOn(Schedulers.io())
都會把線程切到IO線程中去執行,因此屢次設置
subscribeOn()
時,只有第一次生效。
咱們再來看下observeOn()
,仍是先來回顧一下咱們例子中的設置:
//指定在Android主線程中執行
.observeOn(AndroidSchedulers.mainThread())
複製代碼
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//省略無關代碼
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
一樣,這裏也是新包裝一個ObservableObserveOn
對象,注意,這裏包裝的舊被觀察者是ObservableSubscribeOn
對象了,由於以前調用過subscribeOn()
包裝了一層了,因此如今是以下圖所示:
RxJavaPlugins.onAssembly()
也是原樣返回。
咱們看看ObservableObserveOn
的構造方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
複製代碼
裏面就是一些變量賦值而已。
和subscribeOn()
差很少,咱們就直接來看ObservableObserveOn
的subscribeActual()
方法了。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//判斷是否當前線程
if (scheduler instanceof TrampolineScheduler) {
//是當前線程的話,直接調用裏面一層的subscribe()方法
//即調用ObservableSubscribeOn的subscribe()方法
source.subscribe(observer);
} else {
//建立Worker
//本例子中的scheduler爲AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
//這裏會將Worker包裝到ObserveOnObserver對象中去
//注意:source.subscribe沒有涉及到Worker,因此仍是在以前設置的線程中去執行
//本例子中source.subscribe就是在IO線程中執行。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
一樣,這裏也將observer
給包裝了一層,以下圖所示:
source.subscribe()
中將會把事件逐一發送出去,咱們這裏只看下ObserveOnObserver
中的onNext()
方法的處理,onComplete()
等就不看了,實際上都差很少。
@Override
public void onNext(T t) {
//省略無關代碼
if (sourceMode != QueueDisposable.ASYNC) {
//將信息存入隊列中
queue.offer(t);
}
schedule();
}
複製代碼
就是調用schedule()
而已。
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver一樣實現了Runnable接口,因此就把它本身交給worker去調度了
worker.schedule(this);
}
}
複製代碼
Android主線程調度器裏面的代碼就不分析了,裏面其實是用handler
來發送Message
去實現的,感興趣的能夠看下。 既然ObserveOnObserver
實現了Runnable
接口,那麼就是其run()
方法會在主線程中被調用。 咱們來看下ObserveOnObserver
的run()
方法:
@Override
public void run() {
//outputFused默認是false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
複製代碼
這裏會走到drainNormal()
方法。
void drainNormal() {
int missed = 1;
//存儲消息的隊列
final SimpleQueue<T> q = queue;
//這裏的actual其實是SubscribeOnObserver
final Observer<? super T> a = actual;
//省略無關代碼
//從隊列中取出消息
v = q.poll();
//...
//這裏調用的是裏面一層的onNext()方法
//在本例子中,就是調用SubscribeOnObserver.onNext()
a.onNext(v);
//...
}
複製代碼
至於SubscribeOnObserver.onNext()
,裏面也沒切換線程的邏輯,就是調用裏面一層的onNext()
,因此最終會調用到咱們自定義的Observer
中的onNext()
方法。所以,Observer
的onNext()
方法就在observeOn()
中指定的線程中給調用了,在本例中,就是在Android主線程中給調用。
- 若是設置了
observeOn(指定線程)
,那麼Observer
(觀察者)中的onNext()
、onComplete()
等方法將會運行在這個指定線程中去。subscribeOn()
設置的線程不會影響到observeOn()
。
最後,來張observeOn()時序圖:
因本人水平有限,若有錯誤,歡迎指出並交流~四月葡萄的博客