RxJava
事件的發出和消費都在同一個線程,基於同步的觀察者模式。觀察者模式的核心是後臺處理,前臺回調的異步機制。要實現異步,須要引入 RxJava
的另外一個概念 - 線程調度器 Scheduler
。java
![](http://static.javashuo.com/static/loading.gif)
正文
在不指定線程的狀況下,RxJava
遵循的是線程不變的原則。即在哪一個線程調用 subscribe()
方法,就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到線程調度器 Scheduler
。web
1. 幾種Scheduler介紹
在 RxJava
中,Scheduler
- 調度器,至關於線程控制器,RxJava
經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava
已經內置了幾個 Scheduler
,它們已經適合大多數的使用場景:數據庫
- Schedulers.immediate()
直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler
。編程
- Schedulers.newThread()
老是啓用新線程,並在新線程執行操做。後端
- Schedulers.io()
I/O
操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler
。行爲模式和 newThread()
差很少,區別在於 io()
內部採用的是一個無數量上限的線程池,能夠重用空閒的線程。所以多數狀況下 io()
比 newThread()
更有效率。緩存
注意:不要把計算任務放在
io()
中,能夠避免建立沒必要要的線程。網絡
- Schedulers.computation()
計算任務所使用的 Scheduler
。這個計算指的是 CPU
密集型計算,即不會被 I/O
等操做限制性能的操做,例如圖形的計算。這個 Scheduler
使用的固定的線程池,大小爲 CPU
核數。多線程
注意:不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。架構
- AndroidSchedulers.mainThread()
Android
還有一個專用的 AndroidSchedulers.mainThread()
,它指定的操做將在 Android
主線程運行。框架
2. Scheduler的線程切換
2.1. 單次線程切換
有了這幾個 Scheduler
,就可使用 subscribeOn()
和 observeOn()
兩個方法來對線程進行控制了。
-
subscribeOn()
: 指定subscribe()
所發生的線程,即Observable.OnSubscribe
被激活時所處的線程,或者叫作事件產生的線程。 -
observeOn()
: 指定Subscriber
所運行在的線程,或者叫作事件消費的線程。
直接看代碼:
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } }); 複製代碼
上面這段代碼中,因爲 subscribeOn(Schedulers.io())
的指定,被建立的事件的內容 1
、2
、3
、4
將會在 IO
線程發出;因爲 observeOn(AndroidScheculers.mainThread())
的指定,所以 subscriber
數字的打印將發生在主線程。
事實上,這種使用方式很是常見,它適用於多數的 『後臺線程取數據,主線程顯示』的程序策略。
如下是一個完整的例子:
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }) // 指定事件發出,即圖片讀取發生在 IO 線程 .subscribeOn(Schedulers.io()) // 指定事件消費 - 回調,即頁面圖片渲染髮生在主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } }); 複製代碼
這樣的好處是,加載圖片的過程發生在 IO
線程,而設置圖片則發生在了主線程。這就意味着,即便加載圖片耗費了幾十甚至幾百毫秒的時間,也不會形成界面上的絲毫卡頓。
2.2. 屢次線程切換
上面介紹到能夠利用 subscribeOn()
結合 observeOn()
來實現線程控制,讓事件的產生和消費發生在不一樣的線程。在瞭解了 map()
和 flatMap()
等變換方法後,一個問題就產生了 - 能不能多切換幾回線程?
由於 observeOn()
指定的是 Subscriber
的線程,而這個 Subscriber
並非 subscribe()
參數中的 Subscriber
,而是 observeOn()
執行時,當前 Observable
所對應的 Subscriber
,即它的直接下級 Subscriber
。
也就是說,observeOn() 指定的是它以後的操做所在的線程。所以若是有屢次切換線程的需求,只要在每一個想要切換線程的位置調用一次 observeOn() 便可。
直接查看示例代碼:
Observable.just(1, 2, 3, 4) // 事件發出的 IO 線程,由 subscribeOn() 指定 .subscribeOn(Schedulers.io()) // 新線程,由 observeOn() 指定 .observeOn(Schedulers.newThread()) .map(mapOperator) // IO 線程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // Android 主線程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber); 複製代碼
上面的代碼,經過 observeOn()
的屢次調用,程序實現了線程的屢次切換。不過,不一樣於 observeOn()
的是,subscribeOn()
的位置放在哪裏均可以,但它是隻能調用一次的。
3. Scheduler的實現原理
其實,subscribeOn()
和 observeOn()
的內部實現,也是用的 lift()
(見上文),具體看圖(不一樣顏色的箭頭表示不一樣的線程):
- subscribeOn()的原理圖
![](http://static.javashuo.com/static/loading.gif)
從圖中能夠看出,subscribeOn()
進行了線程切換的工做(圖中的 schedule...
的位置)。
subscribeOn()
的線程切換髮生在 OnSubscribe
中,即在它通知上一級 OnSubscribe
時,這時事件尚未開始發送,所以 subscribeOn()
的線程控制只能在事件發出的開端形成影響,即只容許一次線程切換。
- observeOn()的原理圖
![](http://static.javashuo.com/static/loading.gif)
從圖中能夠看出,和 observeOn()
進行了線程切換的工做(圖中的 schedule...
的位置)。
observeOn()
的線程切換則發生在它內建的 Subscriber
中,即發生在它即將給下一級Subscriber
發送事件時,所以 observeOn()
控制的是它後面的線程,容許屢次線程切換。
- 混合切換原理圖
最後用一張圖來解釋當多個 subscribeOn()
和 observeOn()
混合使用時,線程調度是怎麼發生的:
![](http://static.javashuo.com/static/loading.gif)
圖中共有 5
處對事件的操做,由圖中能夠看出:
-
① 和 ② 兩處受第一個
subscribeOn()
影響,運行在紅色線程; -
③ 和 ④ 處受第一個
observeOn()
的影響,運行在綠色線程; -
⑤ 處受第二個
onserveOn()
影響,運行在紫色線程; -
而第二個
subscribeOn()
,因爲在通知過程當中線程就被第一個subscribeOn()
截斷,所以對整個流程並無任何影響。
注意:當使用了多個 subscribeOn() 的時候,只有第一個 subscribeOn() 起做用。
4. 延伸拓展
雖然超過一個的 subscribeOn()
對事件處理的流程沒有影響,但在流程以前倒是有用的。在前面的文章介紹 Subscriber
的時候,提到過 Subscriber
的 onStart()
能夠用做流程開始前的初始化處理。
因爲 onStart() 在 subscribe() 發生時就被調用了,所以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就致使若是 onStart() 中含有對線程有要求的代碼(例如:在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,由於沒法預測 subscribe() 會在什麼線程執行。
與 Subscriber.onStart()
相對應的,有一個方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
一樣是在 subscribe()
調用後並且在事件發送前執行,但區別在於它能夠指定線程。默認狀況下,doOnSubscribe()
執行在 subscribe()
發生的線程;而若是在 doOnSubscribe()
以後有 subscribeOn()
的話,它將執行在離它最近的 subscribeOn()
所指定的線程。
示例代碼以下:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() { @Override public void call() { // 須要在主線程執行 progressBar.setVisibility(View.VISIBLE); } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 複製代碼
上面的代碼,在 doOnSubscribe()
的後面跟一個 subscribeOn()
,就能指定特定工做的線程了!
小結
RxJava
的提供的各類事件及事件轉換模型,以及基於轉換的線程調度器,結合觀察者模式,使得 RxJava
在異步編程體驗、靈活性和運行效率上領先於其餘的開源框架!
歡迎關注技術公衆號: 零壹技術棧
![零壹技術棧](http://static.javashuo.com/static/loading.gif)
本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。