於3月14號,RxJava開源了他的第三個版本。java
這個版本中,更新了一下的內容:react
(1)包結構變化 RxJava 3 components are located under the io.reactivex.rxjava3 package (RxJava 1 has rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core.
爲了閱讀障礙的朋友們給出個人一份四級水準翻譯,有如下的幾點變化:git
io.reactivex.rxjava3
中
(2)行爲變化。 針對一些現有錯誤的糾正等。github
(3)API變化。 @FunctionalInterface
註解的使用等web
詳細見於文檔:What's different in 3.0緩存
就總體來講咱們的基本開發功能沒有很大的改變。網絡
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.異步
RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步、且基於事件的程序的庫。async
基於事件流的鏈式調用完成訂閱編輯器
Observable.create<String> {
it.onNext("items:1")
it.onNext("items:2")
it.onError(Exception())
it.onComplete()
}.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "subscribe事件");
}
override fun onNext(s: String) {
Log.d(TAG, "Next事件:$s");
}
override fun onError(e: Throwable) {
Log.d(TAG, "Error事件");
}
override fun onComplete() {
Log.d(TAG, "Complete事件");
}
})
複製代碼
咱們可以看到幾個特別顯眼的類和方法。
d.dispose()
這一段代碼,就可以讓鏈接斷開。
是否有這樣的一個問題,爲何會是被觀察者訂閱觀察者?
爲了更好的理解咱們將這Observable
、Observer
、Subscribe
這三者對應到咱們生活中,分別是顧客、廚師、服務員。顧客告訴服務員想吃什麼,服務員告訴廚師要作什麼。
接下來又出現了另一個問題,若是咱們的廚師忙不過來了呢? 想來這也是平常生活中很是容易遇到的問題了,顧客太多,廚師又只有那麼幾個,致使廚師忙的暈頭轉向了。那RxJava一樣的是存在這樣的問題的,處理速度必定,可是被觀察者的數據量過大,咱們該如何去進行處理呢?這就引出了背壓的概念。
上文中咱們知道了RxJava要有背壓的緣由,這裏咱們再圖解一下。
長時間出現這樣的狀況使得消息的堆疊,就可能會致使應用因OOM
而崩潰。
在看源碼的解決方案以前,咱們先進行一個思考,請看下圖:
注: 並不直接對應實際代碼
Flowable.create<Int>({ emitter ->
// 一共發送129個事件,即超出了緩存區的大小
// 將數值128改爲0來觀察一下變化
for (i in 0..128) {
Log.d(TAG, "發送了事件$i")
emitter.onNext(i)
}
emitter.onComplete()
}, BackpressureStrategy.ERROR) // 背壓策略加入
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription) {
Log.d(TAG, "onSubscribe")
}
override fun onNext(integer: Int) { Log.d(TAG, "接收到了事件$integer") } override fun onError(t: Throwable) { Log.w(TAG, "onError: ", t) } override fun onComplete() { Log.d(TAG, "onComplete") } }) 複製代碼
從源碼中能夠看到這樣的一些使用:
Observer
差很少,可是多了一些適配
Flowable
的功能
這也就是咱們上文中所思考的問題了,如今先看看RxJava給咱們提供了什麼樣的方案。
public enum BackpressureStrategy {
/** * 提示緩存區已滿 */
MISSING,
/** * 默認模式,數據超出緩存的128時,拋出異常 */
ERROR,
/** * 無限緩存,可能會OOM */
BUFFER,
/** * 超出128時進行丟棄後面進來的數據 */
DROP,
/** * 超出128時進行丟棄最開始進來的數據 */
LATEST
}
複製代碼
2. ERROR
3. BUFFER: 成功發送了128的事件
4. DROP: 只能獲取到127數據 5. LATEST: 獲取到最後發送的數據,也就是149
講過了上面的內容,是否有主意要過另外一個很是重要的知識點,也就是線程該怎麼作?
在Android的開發過程當中咱們一直已經都有一個強烈的概念叫作耗時任務不要放在UI線程來運做,那咱們的RxJava呢?回到咱們上述的代碼中,作一個實驗進行一下觀察。
Observable.create<String> {
Log.e(TAG, "Observable的工做線程:" + Thread.currentThread().name)
}.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "subscribe事件");
Log.e(TAG, "Observer的工做線程:" + Thread.currentThread().name)
}
override fun onNext(s: String) { Log.d(TAG, "Next事件:$s"); } override fun onError(e: Throwable) { Log.d(TAG, "Error事件"); } override fun onComplete() { Log.d(TAG, "Complete事件"); } }) 複製代碼
從圖中明顯可以看出,當前的工做線程爲main
,也就是主線程。
????那不是糟了,咱們的耗時任務在主線程中進行完成的時候,不就會ANR
的問題了?天然就須要找一個解決方案了。
那咱們先來看看第一種,自我掩蓋式。在上述的代碼外加一層Thread
。
圖中顯示到工做線程切換了,可是如何進行UI的數據更新就又成了一個問題了,固然咱們仍是能夠本身加入Handler
來解決問題的。
爲了解決這樣的問題,RxJava
給了咱們一個很好的解決方案,也就是subscribeOn() & observeOn()
,以及一些已經定義好的場景內容。
類型 | 含義 | 應用場景 |
---|---|---|
Schedulers.immediate() | 當前線程 = 不指定線程 | 默認 |
AndroidSchedulers.mainThread() | Android主線程 | 操做UI |
Schedulers.newThread() | 常規新線程 | 耗時等操做 |
Schedulers.io() | io操做線程 | 網絡請求、讀寫文件等io密集型操做 |
Schedulers.computation() | CPU計算操做線程 | 大量計算操做 |
Observable.create<String> {
Log.e(TAG, "Observable的工做線程:" + Thread.currentThread().name)
}
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "subscribe事件");
Log.e(TAG, "Observer的工做線程:" + Thread.currentThread().name)
}
override fun onNext(s: String) { Log.d(TAG, "Next事件:$s"); } override fun onError(e: Throwable) { Log.d(TAG, "Error事件"); } override fun onComplete() { Log.d(TAG, "Complete事件"); } }) 複製代碼
固然這裏我就不作這麼多的Demo了,建議直接看看Carson_Ho
大佬的文章,下面是各個對應的連接:
接下來咱們就拿上面一份簡單源碼的使用過程進行分析。
Observable.create<String> {
it.onNext("items:1")
it.onNext("items:2")
it.onError(Exception())
it.onComplete()
}.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.d(TAG, "subscribe事件");
}
override fun onNext(s: String) {
Log.d(TAG, "Next事件:$s");
}
override fun onError(e: Throwable) {
Log.d(TAG, "Error事件");
}
override fun onComplete() {
Log.d(TAG, "Complete事件");
}
})
複製代碼
那麼如今咱們就要對整個結構進行一個分析:
Observer
和
Observable
進行關聯,若是是不一樣線程之間呢?
create
函數做爲一個泛指的存在,他還能夠是just
、fromArray
。。他們最後都會出現一個相同的函數。
RxJavaPlugins.onAssembly(...);
// 出現了這樣的幾個類
// 1. ObservableFromArray
// 2. ObservableJust
// 3. ObservableFromIterable
// 4. ....
// 他們所有繼承了Observable,他們有這樣一個相同的重寫方法subscribeActual(Observer)
複製代碼
咱們主要拿create
這個函數和這一整套流程來作一個詳細的講解。
爲了讓代碼紋理更清晰,刪掉了健壯代碼。
public final void subscribe(@NonNull Observer<? super T> observer) {
try {
// 進行連接
observer = RxJavaPlugins.onSubscribe(this, observer);
// 使得observable和observer進行了連接
subscribeActual(observer); // 1 -->
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
npe.initCause(e);
throw npe;
}
}
複製代碼
那麼咱們就要看看這個subscribeActual()
這個函數幹了什麼事情了。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent); // 2 -->
try { source.subscribe(parent); // 3 --> } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 複製代碼
但願讀者可以注意到這樣的代碼CreateEmitter
消息發射器的建立,以及onSubsrcibe()
的連接,以及source.subscribe(parent);
數據的訂閱。
onSubscribe()
說明咱們的函數已經完成了訂閱。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } // 下一消息發送 @Override public void onNext(T t) { // 。。。 } // 錯誤發送 @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { // 。。。 } // 完成鏈接 @Override public void onComplete() { // 。。。 } // 斷開鏈接 @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } 複製代碼
在這裏咱們清楚的看到了整個數據處理的邏輯,那麼咱們的Observer
能夠理解爲咱們一個用於自定義處理的類。
拋出一個問題,爲何咱們的數據在通過一個報錯以後日後的數據就不會再進行收發了?
請注意看看onError
的源碼。
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override public boolean tryOnError(Throwable t) { if (t == null) { t = ExceptionHelper.createNullPointerException("onError called with a null Throwable."); } if (!isDisposed()) { try { observer.onError(t); } finally { // 消息中出現錯誤後,斷開鏈接 dispose(); } return true; } return false; } 複製代碼
在完成一次報錯的操做以後,咱們的鏈接就被關閉了,因此咱們以後的數據也就沒法進行了接收。
上文中由於直接使用了Kotlin
的lambda
表達式,因此不夠直觀,這裏我轉成Java
寫一次。
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {
} }); 複製代碼
這是一個Observable
的建立流程,顯然咱們如今看到的函數就是咱們要找的被進行重寫的函數了。內部使用到的onNext()
、onCompelete()
等函數的定義就是由咱們的ObservableEmitter
來直接完成提供的。
對於observeOn()
而言,進入源碼中咱們能知道,它使用了這樣的一個類ObservableObserveOn
,而咱們傳入的值就是我上文所提到過的Schedulers
。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler; // 咱們傳入的Scheduler
this.delayError = delayError; // 延遲onError輸出
this.bufferSize = bufferSize; // 緩衝區大小
}
複製代碼
讓咱們再這個類的他的subscribeActual()
函數。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } } 複製代碼
顯然對Scheduler
進行了使用,那麼咱們從前面的文章可以做出一個推測,這個數據的響應者就應該是ObserveOnObserver
這個類了。
那咱們再進入一層,看看他的構成,能看到以下的代碼(onComplete()、onSubscribe()..)
皆可。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); // 1--> } schedule(); // 2--> } 複製代碼
他的onNext()
的函數中存在一個異步判斷,而數據就是從一個隊列中取出來的。這個隊列先暫時放一邊,咱們猜想他和咱們以前的緩存區相關。
先看看註釋2的schedule()
函數。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
他會調用到這個函數一個worker
,而這個worker
就是咱們傳入的Scheduler
所擁有的函數了,咱們選擇用newThread()
來進行一個查看,而this
就是ObserveOnObserver
自己了。
// Worker.createWorker()
public Worker createWorker() {
return new NewThreadWorker(threadFactory); // 1 -->
}
// 1-->
public NewThreadWorker(ThreadFactory threadFactory) {
// 建立了一個線程池
executor = SchedulerPoolFactory.create(threadFactory);
}
// worker.schedule(this);
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null); // 2 -->
}
// 2 -->
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); // .... Future<?> f; try { // 使用一個線程池來維持數據的數據的有效運行 if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); // 經過返回新的線程並設置完成線程切換 } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } 複製代碼
將咱們的ObserveOnObserver
扔進了線程池之後就已經完成了線程切換了。
拋出一個問題,爲何網上都說subscribeOn()
只會生效一次?
咱們再次慢慢地用源碼說明問題,下方是ObservableSubscribeOn
類的subscribeActual()
函數。
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 建立了與之綁定用的SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent); // SubscribeTask就是一個Runnable // 而後scheduler不知幹了什麼事情 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); // 1--> } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // 只是將Runnable 和 Worker進行綁定 DisposeTask task = new DisposeTask(decoratedRun, w); // schedule的函數是否有所眼熟 // 在上文的observeOn中咱們也有所說起 // 使用了線程池來進行維護的線程切換的位置 w.schedule(task, delay, unit); // 2--> return task; } 複製代碼
通過上述步驟後咱們就獲取了對應的Disposable
,那就進入了parent.setDisposable()
的函數了。
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
複製代碼
其實從名字咱們就看出來一個問題了setOnce()
,已經說明只能值設置一次,因此已經這裏已經證實了爲何個人subscribeOn()
只有第一次設置的時候纔會生效的緣由了。
收! 回到咱們的線程內容,既然是線程池,天然要看看他對應的線程了,看看咱們的DisposeTask
的run()
函數把。
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run(); // 1 --> SubscribeTask
} finally {
dispose();
runner = null; // 當前線程運行完就釋放
}
}
// 對應咱們SubscribeTask中的run()函數 public void run() { // 這個source就是咱們的observable source.subscribe(parent); } 複製代碼
上流事件也就是讓上層的Observable
又對咱們的數據SubscribeOnObserver
進行了一次訂閱,這個時候線程又一次進行了切換操做。
對咱們的一個RxAndroid而言,通常誰是在IO線程,誰在UI線程呢?
好吧,直接問,可能會沒有思路,那咱們換個問題,誰是數據產生者,誰是數據消費者? 對應到咱們的網絡請求過程,顯然網絡請求是一個在子線程工做的任務,而數據更新就是在主線程。那麼對應到咱們的RxJava
,顯然是Observable
是產生者,而Observer
是消費者,那麼咱們也就知道了誰應該在IO線程了,顯然是Observable
,而Observer
應該處於UI線程了。可是這就是問題所在了,咱們該如何進行數據的通訊呢?個人被觀察者有數據了,可是咱們的觀察者該如何知道?
先來看一下如何進行使用,咱們應該在IO線程中進行訂閱,在UI線程中進行觀察。
.subscribeOn(Schedulers.io()) // 對應 被觀察者
.observeOn(AndroidSchedulers.mainThread()) // 對應 觀察者
複製代碼
在上文中咱們提到了一個叫作緩存區的概念,在咱們的FlowableCreate
的源碼中能找到關於這一部分的源碼。
// 在源碼的64行上下
emitter = new BufferAsyncEmitter<>(t, bufferSize());
// bufferSize()函數對應的數據就是咱們的128
// 因此會有咱們緩存超出128時報錯的狀況存在
複製代碼
可是這裏咱們並無看到和數據發送相關的內容,只看到一個緩衝區的存在。那咱們就繼續往下進行分析了。咱們以前分析過Observer
的源碼,裏面使一些接收的過程,而Subscribe
也差很少,因此方案也一樣的不在這個類中。
那就進行定位了,是咱們最開始的代碼起了什麼樣的做用。
對於Emitter而言,其實他已經持有了訂閱的對象,能夠直接發送數據,有點相似於觀察者模式,可是Flowable
中咱們可以發現的數據拉取,實際上是經過FlowableSubscriber
來進行主動拉取,和觀察者模式的主動推送有必定的區別。
可是數據的通訊仍是須要看看咱們的AndroidSchedulers.mainThread()
。由於咱們要進行UI線程的數據更新,天然是不會使用上述的方法進行的,那RxJava是如何完成這樣的操做的呢。
進入到observeOn
的源碼中能看到
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) { source.subscribe(new ObserveOnConditionalSubscriber<>( (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch)); } else { source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch)); } } 複製代碼
咱們可以看到這樣的一段代碼scheduler.createWorker()
,咱們拿AndroidSchedulers.mainThread()
來看上一看。
public final class AndroidSchedulers {
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), true); } } HandlerScheduler(Handler handler, boolean async) { this.handler = handler; this.async = async; } 複製代碼
在類AndroidSchedulers
中的構造可以發現其實最後使用的就是一個Handler
的機制,也就是說最後要切到主線程時使用的就是Handler
的機制來發送消息了,並且他直接獲取了主線程的Looper,將消息直接傳輸到了主線程。
那麼講述到這兒咱們的RxJava
的總體流程就已經講完了。
本文使用 mdnice 排版