就在前不久作了一個關於RXJava的相關教學視頻,事後整理了關於RxJava的預習資料和相關內容以及圖文和相關源碼,須要借鑑的能夠和我聯繫~java
承接上文:我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?(一)
源碼和麪試大全PDF
(VX:mm14525201314)
git
在RxJava
中,一個實現了Observer接口的對象能夠訂閱(subscribe)一個Observable 類的實例。訂閱者(subscriber)對Observable發射(emit)的任何數據或數據序列做出響應。這種模式簡化了併發操做,由於它不須要阻塞等待Observable發射數據,而是建立了一個處於待命狀態的觀察者哨兵,哨兵在將來某個時刻響應Observable的通知。github
####Single面試
RxJava(以及它派生出來的RxGroovy
和RxScala
)中有一個名爲Single的Observable變種。Single相似於Observable,不一樣的是,它老是隻發射一個值,或者一個錯誤通知,而不是發射一系列的值。緩存
所以,不一樣於Observable須要三個方法onNext, onError, onCompleted
,訂閱Single只須要兩個方法:多線程
onSuccess
- Single發射單個的值到這個方法onError
- 若是沒法發射須要的值,Single發射一個Throwable
對象到這個方法Single只會調用這兩個方法中的一個,並且只會調用一次,調用了任何一個方法以後,訂閱關係終止併發
Single也能夠組合使用多種操做,一些操做符讓你能夠混合使用Observable和Single:
ide
Subject能夠當作是一個橋樑或者代理,在某些ReactiveX
實現中(如RxJava
),它同時充當了Observer
和Observable
的角色。由於它是一個Observer
,它能夠訂閱一個或多個Observable
;又由於它是一個Observable
,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。測試
因爲一個Subject
訂閱一個Observable
,它能夠觸發這個Observable
開始發射數據(若是那個Observable
是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。this
針對不一樣的場景一共有四種類型的Subject。他們並非在全部的實現中所有都存在,並且一些實現使用其它的命名約定(例如,在RxScala
中Subject被稱做PublishSubject
)。
一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。(若是原始Observable沒有發射任何值,AsyncObject
也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。
然而,若是原始的Observable
由於發生了錯誤而終止,AsyncSubject
將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知
當觀察者訂閱BehaviorSubject
時,它開始發射原始Observable最近發射的數據(若是此時尚未收到任何數據,它會發射一個默認值),而後繼續發射其它任何來自原始Observable的數據。
然而,若是原始的Observable由於發生了一個錯誤而終止,BehaviorSubject
將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知
PublishSubject
只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。須要注意的是,PublishSubject
可能會一建立完成就馬上開始發射數據(除非你能夠阻止它發生),所以這裏有一個風險:在Subject被建立後到有觀察者訂閱它以前這個時間段內,一個或多個數據可能會丟失。若是要確保來自原始Observable的全部數據都被分發,你須要這樣作:或者使用Create建立那個Observable以便手動給它引入"冷"Observable的行爲(當全部觀察者都已經訂閱時纔開始發射數據),或者改用ReplaySubject
。
若是原始的Observable由於發生了一個錯誤而終止,PublishSubject
將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。
ReplaySubject
會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。也有其它版本的ReplaySubject
,在重放緩存增加到必定大小的時候或過了一段時間後會丟棄舊的數據(原始Observable發射的)。
若是你把ReplaySubject
看成一個觀察者使用,注意不要從多個線程中調用它的onNext
方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
假設你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你能夠調用它的asObservable
方法,這個方法返回一個Observable。具體使用方法能夠參考Javadoc文檔。
若是你把 Subject 看成一個 Subscriber 使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
要避免此類問題,你能夠將 Subject 轉換爲一個 SerializedSubject
,相似於這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
若是你想給Observable操做符鏈添加多線程功能,你能夠指定操做符(或者特定的Observable)在特定的調度器(Scheduler)上執行。
某些ReactiveX
的Observable操做符有一些變體,它們能夠接受一個Scheduler參數。這個參數指定操做符將它們的部分或所有任務放在一個特定的調度器上執行。
使用ObserveOn
和SubscribeOn
操做符,你可讓Observable在一個特定的調度器上執行,ObserveOn
指示一個Observable在一個特定的調度器上調用觀察者的onNext
, onError
和onCompleted
方法,SubscribeOn
更進一步,它指示Observable將所有的處理過程(包括髮射數據和通知)放在特定的調度器上執行。
下表展現了RxJava中可用的調度器種類:
在RxJava中,某些Observable操做符的變體容許你設置用於操做執行的調度器,其它的則不在任何特定的調度器上執行,或者在一個指定的默認調度器上執行。下面的表格個列出了一些操做符的默認調度器:
除了將這些調度器傳遞給RxJava
的Observable
操做符,你也能夠用它們調度你本身的任務。下面的示例展現了Scheduler.Worker
的用法:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
要調度遞歸的方法調用,你可使用schedule
,而後再用schedule(this)
,示例:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker類的對象實現了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,因此你能夠在訂閱取消時中止任務,或者從正在調度的任務內部取消訂閱,示例:
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
同時是Subscription
,所以你能夠(一般也應該)調用它的unsubscribe
方法通知能夠掛起任務和釋放資源了。
你可使用schedule(action,delayTime,timeUnit)
在指定的調度器上延時執行你的任務,下面例子中的任務將在500毫秒以後開始執行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另外一個版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)
方法讓你能夠安排一個按期執行的任務,下面例子的任務將在500毫秒以後執行,而後每250毫秒執行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler讓你能夠對調度器的時鐘表現進行手動微調。這對依賴精確時間安排的任務的測試頗有用處。這個調度器有三個額外的方法:
advanceTimeTo(time,unit)
向前波動調度器的時鐘到一個指定的時間點advanceTimeBy(time,unit)
將調度器的時鐘向前撥動一個指定的時間段triggerActions( )
開始執行任何計劃中的可是未啓動的任務,若是它們的計劃時間等於或者早於調度器時鐘的當前時間(順手留下GitHub連接,須要獲取相關面試等內容的能夠本身去找)
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)