Android異步框架RxJava 1.x系列(三) - 線程調度器Scheduler

前言

RxJava 事件的發出和消費都在同一個線程,基於同步的觀察者模式。觀察者模式的核心是後臺處理,前臺回調的異步機制。要實現異步,須要引入 RxJava 的另外一個概念 - 線程調度器 Schedulerjava

 

 

正文

在不指定線程的狀況下,RxJava 遵循的是線程不變的原則。即在哪一個線程調用 subscribe() 方法,就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到線程調度器 Schedulerweb

1. 幾種Scheduler介紹

在 RxJava 中,Scheduler - 調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:數據庫

  • Schedulers.immediate()

直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler編程

  • Schedulers.newThread()

老是啓用新線程,並在新線程執行操做。後端

  • Schedulers.io()

I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 內部採用的是一個無數量上限的線程池,能夠重用空閒的線程。所以多數狀況下 io() 比 newThread() 更有效率。緩存

注意:不要把計算任務放在 io() 中,能夠避免建立沒必要要的線程。網絡

  • Schedulers.computation()

計算任務所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。多線程

注意:不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。架構

  • AndroidSchedulers.mainThread()

Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。框架

2. Scheduler的線程切換

2.1. 單次線程切換

有了這幾個 Scheduler,就可使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。

  • subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程,或者叫作事件產生的線程。

  • observeOn(): 指定 Subscriber 所運行在的線程,或者叫作事件消費的線程。

直接看代碼:

Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } }); 複製代碼

上面這段代碼中,因爲 subscribeOn(Schedulers.io()) 的指定,被建立的事件的內容 1234 將會在 IO 線程發出;因爲 observeOn(AndroidScheculers.mainThread()) 的指定,所以 subscriber 數字的打印將發生在主線程。

事實上,這種使用方式很是常見,它適用於多數的 『後臺線程取數據,主線程顯示』的程序策略。

如下是一個完整的例子:

int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) // 指定事件發出,即圖片讀取發生在 IO 線程 .subscribeOn(Schedulers.io()) // 指定事件消費 - 回調,即頁面圖片渲染髮生在主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } }); 複製代碼

這樣的好處是,加載圖片的過程發生在 IO 線程,而設置圖片則發生在了主線程。這就意味着,即便加載圖片耗費了幾十甚至幾百毫秒的時間,也不會形成界面上的絲毫卡頓。

2.2. 屢次線程切換

上面介紹到能夠利用 subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發生在不一樣的線程。在瞭解了 map() 和 flatMap() 等變換方法後,一個問題就產生了 - 能不能多切換幾回線程?

由於 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 並非 subscribe()參數中的 Subscriber ,而是 observeOn() 執行時,當前 Observable 所對應的 Subscriber,即它的直接下級 Subscriber

也就是說,observeOn() 指定的是它以後的操做所在的線程。所以若是有屢次切換線程的需求,只要在每一個想要切換線程的位置調用一次 observeOn() 便可。

直接查看示例代碼:

Observable.just(1, 2, 3, 4) // 事件發出的 IO 線程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) // 新線程,由 observeOn() 指定 .observeOn(Schedulers.newThread()) .map(mapOperator) // IO 線程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // Android 主線程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); 複製代碼

上面的代碼,經過 observeOn() 的屢次調用,程序實現了線程的屢次切換。不過,不一樣於 observeOn()的是,subscribeOn() 的位置放在哪裏均可以,但它是隻能調用一次的。

3. Scheduler的實現原理

其實,subscribeOn() 和 observeOn() 的內部實現,也是用的 lift() (見上文),具體看圖(不一樣顏色的箭頭表示不一樣的線程):

  • subscribeOn()的原理圖

 

 

從圖中能夠看出,subscribeOn() 進行了線程切換的工做(圖中的 schedule... 的位置)。

subscribeOn() 的線程切換髮生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件尚未開始發送,所以 subscribeOn() 的線程控制只能在事件發出的開端形成影響,即只容許一次線程切換。

  • observeOn()的原理圖

 

 

從圖中能夠看出,和 observeOn() 進行了線程切換的工做(圖中的 schedule... 的位置)。

observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級Subscriber 發送事件時,所以 observeOn() 控制的是它後面的線程,容許屢次線程切換。

  • 混合切換原理圖

最後用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,線程調度是怎麼發生的:

 

 

圖中共有 5 處對事件的操做,由圖中能夠看出:

  1. ① 和 ② 兩處受第一個 subscribeOn() 影響,運行在紅色線程;

  2. ③ 和 ④ 處受第一個 observeOn() 的影響,運行在綠色線程;

  3. ⑤ 處受第二個 onserveOn() 影響,運行在紫色線程;

  4. 而第二個 subscribeOn() ,因爲在通知過程當中線程就被第一個 subscribeOn() 截斷,所以對整個流程並無任何影響。

注意:當使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起做用。

4. 延伸拓展

雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程以前倒是有用的。在前面的文章介紹 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用做流程開始前的初始化處理。

因爲 onStart() 在 subscribe() 發生時就被調用了,所以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就致使若是 onStart() 中含有對線程有要求的代碼(例如:在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,由於沒法預測 subscribe() 會在什麼線程執行。

與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe()。它和 Subscriber.onStart() 一樣是在 subscribe() 調用後並且在事件發送前執行,但區別在於它能夠指定線程。默認狀況下,doOnSubscribe() 執行在 subscribe() 發生的線程;而若是在 doOnSubscribe() 以後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。

示例代碼以下:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() { @Override public void call() { // 須要在主線程執行 progressBar.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 複製代碼

上面的代碼,在 doOnSubscribe() 的後面跟一個 subscribeOn() ,就能指定特定工做的線程了!

小結

RxJava 的提供的各類事件及事件轉換模型,以及基於轉換的線程調度器,結合觀察者模式,使得 RxJava 在異步編程體驗、靈活性和運行效率上領先於其餘的開源框架!


歡迎關注技術公衆號: 零壹技術棧

 

零壹技術棧

 

本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。

相關文章
相關標籤/搜索