Rx(ReactiveX,響應式編程)是一種事件驅動的基於異步數據流的編程模式,整個數據流就像一條河流,它能夠被觀測(監聽),過濾,操控或者與其餘數據流合併爲一條新的數據流。而RxJava是.Net Rx在JVM上的實現。RxJava能夠應用於大部分基於JVM的語言,如Scala,Groovy等。整個RxJava+RxAndroid的包大小爲(1125kb+10kb)java
RxJava
最核心的兩個東西是Observables
(被觀察者,事件源)和Subscribers
(觀察者),Observables
發出一系列事件,Subscribers
處理這些事件。而RxJava
的Observables
是擴展自設計模式中的觀察者模式,添加了如下幾個能力:android
Observables
;Observables
,但不會直接將錯誤或異常直接拋出;Observable在存活期間,生命週期包含三個可能的事件,與迭代器的生命週期很相似:git
Events | Iterable(pull) | Observable(push) |
---|---|---|
獲得數據 | T next() | onNext(T) |
發現錯誤 | throws Exception | onError(Throwable) |
完成 | !hasNext() | onCompleted() |
與使用迭代器的區別:在使用迭代器的時候,線程會阻塞直到他們須要的數據到來。而使用Observable,是使用異步的方式將數據推送到Observer;github
而根據推送機制的不一樣,Observable分爲熱Observable和冷Observable:編程
下面是一個簡單的建立觀察者的代碼:設計模式
Observable.create(new Observable.OnSubscribe<Object>(){ @Override public void call(Subscriber<? super Object> subscriber){} }); // example Observable<Integer> ob= Observable.create(new Observable.OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> subscriber){ for(int i = 0; i < 5; i++){ observer.onNext(i); } observer.onCompleted(); } }); // 並不用關心有多少數據, Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer(){ @Override public void onCompleted(){ System.out.println("Observable completed"); } @Override public void onError(Throwable e){ System.out.println("Oh no! Something wrong happened!"); } @Override public void onNext(Integer item) System.out.println("Item is " + item); })
Observable的構造方法:網絡
Subject = Observable + Observer
這意味着一個Subject能夠同時是觀察者和被觀察者,事件源(Observable),也就是說Subject能夠像觀察者同樣訂閱一個事件源,而且能夠像Observable同樣輸出它們收到的事件。RxJava提供了四種不一樣類型的subjects:多線程
cast() 相似於map();併發
zip(),能夠將多個輸入整合成一個輸出(會合並Items);app
repeat(),當接收到onComplete()事件時,觸發從新訂閱
.trampoline(),爲一些不須要當即執行的任務進行調度,會依次執行隊列裏的任務,是方法:repeat(),retry()的默認調度器;
RxAndroid還提供了一個Android特有的調度器:AndroidSchedulers.mainThread()來讓代碼在UI主線程中執行;
RxJava提供了一個每個Observables均可以使用的subscribeOn(),ObserveOn()方法,將Scheduler與Observables創建聯繫,咱們能夠這樣來使用:
.subscribeOn(Schedulers.io()) // 讓事件的產生髮生在IO線程,屢次調用,以最後一次調用的結果爲準 .observeOn(AndroidSchedulers.mainThread()) // 讓事件的回調發生在UI主線程中 .subscribe(....)
前面版本的RxAndroid還提供了AppObservable,ViewObservable,WidgetObservable,LifecycleObservable 等,但最新版本的RxAndroid直接刪掉了這些;
1
Grokking RxJava,Part 4
RxJava處理網絡失敗
RxJava Essentials
給Android開發者的RxJava詳解
拆RxJava