1、Rx介紹html
1. 能夠把Observable當作Iterable的推送方式的等價物。java
2. Observable類型給GOF的觀察者模式添加了兩種缺乏的語義,這樣就和Iterable類型中可用的操做一致了:react
有了這兩種功能,Rx就能使Observable與Iterable保持一致了,惟一的不一樣是數據流的方向。任何對Iterable的操做,你均可以對Observable使用。android
2、Observablegit
1. 在異步模型中流程更像這樣的:github
2. 取消訂閱的結果會傳遞給這個Observable的操做符鏈,並且會致使這個鏈條上的每一個環節都中止發射數據項。這些並不保證會當即發生,然而,對一個Observable來講,即便沒有觀察者了,它也能夠在一個while循環中繼續生成並嘗試發射數據項。數據庫
3. Observable何時開始發射數據序列?這取決於Observable的實現,一個"熱"的Observable可能一建立完就開始發射數據,所以全部後續訂閱它的觀察者可能從序列中間的某個位置開始接受數據(有一些數據錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它纔開始發射數據,所以這個觀察者能夠確保會收到整個數據序列。編程
3、Single緩存
1. Single相似於Observable,不一樣的是,它老是隻發射一個值,或者一個錯誤通知,而不是發射一系列的值。網絡
所以,不一樣於Observable須要三個方法onNext, onError, onCompleted,訂閱Single只須要兩個方法:
2. Single只會調用這兩個方法中的一個,並且只會調用一次,調用了任何一個方法以後,訂閱關係終止。
4、Subject
1. Subject能夠當作是一個橋樑或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。
因爲一個Observable訂閱一個Observable,它能夠觸發這個Observable開始發射數據(若是那個Observable是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。
1. Schedulers.immediate():
直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。
2. Schedulers.newThread():
老是啓用新線程,並在新線程執行操做。
3. Schedulers.io( ):
用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用Schedulers.computation();
Schedulers.io( )默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器。
Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。
行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。
不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
4. Schedulers.computation():
計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。
這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。
不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
6. Schedulers.trampoline( ):
當其它排隊的任務完成後,在當前線程排隊開始執行。
7. 能夠用Scheduler.Worker調度你本身的任務:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {...});
8. Worker同時是Subscription,所以你能夠(一般也應該)調用它的unsubscribe方法通知能夠掛起任務和釋放資源了:
worker.unsubscribe();
9. 延時和週期調度器:
你可使用Worker.schedule(action,delayTime,timeUnit)在指定的調度器上延時執行你的任務;
Worker.schedulePeriodically(action,initialDelay,period,timeUnit)方法讓你能夠安排一個按期執行的任務。
1. FlatMap:能夠認爲是一個將嵌套的數據結構展開的過程。
FlatMap
對這些Observables發射的數據作的是合併(merge
)操做,所以它們多是交錯的。
2. Map:實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項。
3. Scan:
Scan
操做符對原始Observable發射的第一項數據應用一個函數,而後將那個函數的結果做爲本身的第一項數據發射。它將函數的結果同第二項數據一塊兒填充給這個函數來產生它本身的第二項數據。它持續進行這個過程來產生剩餘的數據序列。這個操做符在某些狀況下被叫作accumulator(累加器)
。
4. Do:
doOnEach,doOnNext,doOnSubscribe,doOnUnsubscribe,doOnCompleted,doOnError,doOnTerminate,finallyDo;
5. ObserveOn:指定觀察者觀察Observable的調度程序(工做線程);
6. SubscribeOn:指定Observable應該在哪一個調度程序上執行;當使用了多個
subscribeOn()
的時候,只有第一個 subscribeOn()
起做用。
7. Subscribe:收到Observable發射的數據和通知後執行的操做;
8. Connect:指示一個可鏈接的Observable開始發射數據給訂閱者;
9. Publish:將一個普通的Observable轉換爲可鏈接的;
10. RefCount:使一個可鏈接的Observable表現得像一個普通的Observable;
11. Replay:確保全部的觀察者收到一樣的數據序列,即便他們在Observable開始發射數據以後才訂閱;
12. To:將Observable或者Observable發射的數據序列轉換爲另外一個對象或數據結構;
13. Blocking:
BlockingObservable
的方法不是將一個Observable變換爲另外一個,也不是過濾Observables,它們會打斷Observable的調用鏈,會阻塞等待直到Observable發射了想要的數據,而後返回這個數據(而不是一個Observable);
forEach( ),
first( ),
last( ),
getIterator( )
14. Unsubscribe:這個方法很重要,由於在subscribe()
以後, Observable
會持有 Subscriber
的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause()
onStop()
等方法中)調用 unsubscribe()
來解除引用關係,以免內存泄露的發生。
15. doOnSubscribe():而與 Subscriber.onStart()
相對應的,有一個方法 Observable.doOnSubscribe()
。它和 Subscriber.onStart()
一樣是在 subscribe()
調用後並且在事件發送前執行,但區別在於它能夠指定線程。默認狀況下, doOnSubscribe()
執行在 subscribe()
發生的線程;而若是在 doOnSubscribe()
以後有 subscribeOn()
的話,它將執行在離它最近的 subscribeOn()
所指定的線程;
16. 根據響應式函數編程的概念,Subscribers更應該作的事情是「響應」,響應Observable發出的事件,而不是去修改。Subscribers越輕量越好;
17. defer:只有當訂閱者訂閱才建立Observable;爲每一個訂閱建立一個新的Observable;使用defer()來包裝緩慢的代碼(用到才生成);
18. cache
:記住Observable發射的數據序列併發射相同的數據序列給後續的訂閱者;cache() (或者 replay())會繼續執行網絡請求(並記住數據序列)(甚至你調用了unsubscribe也不會中止);
19. firstOrDefault:firstOrDefault
與first
相似,可是在Observagle沒有發射任何數據時發射一個你在參數中指定的默認值。
20. toList:一般,發射多項數據的Observable會爲每一項數據調用onNext
方法。你能夠用toList
操做符改變這個行爲,讓Observable將多項數據組合成一個List
,而後調用一次onNext
方法傳遞整個列表。
21. onErrorResumeNext:返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的onError
調用,不會將錯誤傳遞給觀察者,做爲替代,它會開始鏡像另外一個,備用的Observable。
22. onErrorReturn:返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的onError
調用,不會將錯誤傳遞給觀察者,做爲替代,它會發射一個特殊的項並調用觀察者的onComleted
方法。
23. concatMap:相似於最簡單版本的flatMap
,可是它按次序鏈接而不是合併那些生成的Observables,而後產生本身的數據序列。
Further Reading:
1、Deferring Observable code until subscription in RxJava
just()
, from()
, and other Observable
creation tools store the value of data when created, not when subscribed.
defer(): none of the code inside of defer()
is executed until subscription.
Observable.create()
- no need to call onCompleted()
.2、Loading data from multiple sources with RxJava
first() vs. takeFirst(): The difference between the two calls is that first()
will throw a NoSuchElementException
if none of the sources emits valid data, whereas takeFirst()
will simply complete without exception.
3、Don't break the chain: use RxJava's compose() operator
1. compose(): feed it an Observable
of one type and it'll return an Observable
of another. reusable and the chain is preserved.
compose() vs flatMap(): The difference is that compose()
is a higher level abstraction: it operates on the entire stream, not individually emitted items(flatMap).
If you want to replace some operators with reusable code, use compose()
.
2. Reusing Transformers: don't have to care about type
final Transformer schedulersTransformer = observable -> observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); @SuppressWarnings("unchecked") <T> Transformer<T, T> applySchedulers() { return (Transformer<T, T>) schedulersTransformer; }
4、Functional Reactive Programming on Android With RxJava
1. Problems of concurrent programming with AsyncTask:
(1) There are many things that can go wrong, so we want to recover from errors, and add a try-catch block. Perhaps we want to inform the user about this error too, which likely involves interacting with the UI. Wait, we cannot do that because we are not allowed to update any user-interface elements from a background thread.
(2) how do we obtain a reference to a Context
, without which we cannot do anything meaningful with the UI? Apparently, we have to bind it to the task instance up front, at the point of instantiation, and keep a reference to it throughout a task’s execution. But what if the download takes a minute to run? Do we want to hold on to an Activity
instance for an entire minute?
(3) Composing service objects means nesting AsyncTask
, which leads to what is commonly referred to as 「callback hell」 because you start tasks from a task callback from a task callback from a …
(4) Dealing with AsyncTask and Screen Orientation. (AsyncTask configuration change)
2. Rxjava solves all of the problems in one fell swoop:
Context
5、What are the Hot and Cold observables?
hot happens even when nobody is subscribed, cold happens "on demand". Also, Publish() converts cold to hot and Defer() converts hot to cold.
passive sequences are Cold and active are described as being Hot.
7、HOW TO KEEP YOUR RXJAVA SUBSCRIBERS FROM LEAKING
1. The straightforward solution to this is to unsubscribe from your Observable
when theActivity
is about to be destroyed
2. If we subclass Observable
to wrap our Subscribers
in a Subscriber
decorator that delegates work to its weakly held, wrapped Subscriber
, we can keep clients from having to worry about leaking their Subscribers
without forcing them to write boilerplate code.
Add throttling behaviour: debounce() is what you usually need.
Kill the previous requests: introduce switchMap instead of flatMap.
No error functionality / no network functionality: You need a retry mechanism for these.
RxTextView.textChanges(searchEditText) .debounce(150, MILLISECONDS) // throttling behaviour .switchMap(Api::searchItems) // Kill the previous requests .retryWhen(new RetryWithConnectivityIncremental(context, 5, 15, SECONDS)) // a retry mechanism .subscribe(this::updateList, t->showErrorToUser());
9、Observe on the correct thread
Instead of doing this: Observable.just(1,2,3).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).flatMap(/** logic which doesn't touch ui **//).subscribe();
do this: Observable.just(1,2,3).subscribeOn(Schedulers.newThread()).flatMap(/** logic which doesn't touch ui **//).observeOn(AndroidSchedulers.mainThread()).subscribe();
Earlier .subscribeOn() wins.
AsyncSubject:僅釋放Observable釋放的最後一個數據,而且僅在Observable完成以後(subject.onComplete())。
observer will receive no onNext events if the subject.onCompleted() isn't called.
當Observer訂閱了一個BehaviorSubject,它一開始就會釋放Observable最近釋放的一個數據對象,當尚未任何數據釋放時,它則是一個默認值。
PublishSubject:僅會向Observer釋放在訂閱以後Observable釋放的數據。
無論Observer什麼時候訂閱ReplaySubject,ReplaySubject會向全部Observer釋放Observable釋放過的數據。
假設你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你能夠調用它的asObservable方法,這個方法返回一個Observable。具體使用方法能夠參考Javadoc文檔。
因爲一個Observable訂閱一個Observable,它能夠觸發這個Observable開始發射數據(若是那個Observable是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。
若是你把 Subject
看成一個 Subscriber
使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
要避免此類問題,你能夠將 Subject
轉換爲一個 SerializedSubject
,相似於這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
12、Implementing an Event Bus With RxJava - RxBus
There’s an important semantic difference between the Observer and Pub-sub patterns though:
In the pub-sub pattern the focus is on 「broadcasting」 messages outside.
The Observable here doesn’t want to know who the events are going out to, just that they’ve gone out.
In other words the Observable (a.k.a Publisher) doesn’t want to know who the Observers (a.k.a Subscribers) are.
Why the anonymity: 「decoupling」, subscriber need not have logic coded in them that establish the dependencies between the two.
How the anonymity: "event bus", get hold of a middleman and let that middleman take care of all the communication.