RxAndroid Tutorial
響應式編程(Reactive programming)不是一種API,而是一種新的很是有用的範式,而RxJava就是一套基於此思想的框架,在Android開發中咱們經過這個框架就能探索響應式的世界,同時結合另外一個庫,RxAndroid,這是一個擴展庫,更好的兼容了Android特性,好比主線程,UI事件等。
在這篇指南中,你將會學習到如下這些內容:html
從 the starter project for this tutorial 能夠下載這篇文章中項目的全部代碼, 能夠直接在Android Studio中打開。
大部分的代碼都在 CheeseActivity.java
這個類裏面,繼承於 BaseSearchActivity
;裏面有一些基礎方法:
showProgressBar(): 顯示一個進度條
hideProgressBar(): 隱藏一個進度條
showResult(List<String> result): 顯示一個列表數據
mCheeseSearchEngine: CheeseSearchEngine類的一個對象,內部有一個search方法,接收一個數據查詢並返回一個匹配的列表list。
直接運行的話,跑出來是這樣子,就是一個查詢的界面:java
在建立第一個observable以前,先看一下響應式編程的理論 :]react
通常的程序是這樣的,表達式只會計算一次,而後把賦值給變量android
int a = 2; int b = 3; int c = a * b; // c is 6 a = 10; // c is still 6
在a從新賦值後,前面的c並不會變化,而響應式編程會對值的變化作出響應。
有時候頗有可能你已經作過一些響應式編程,可是並無意識到這一點。
好比Excel中的表格,咱們能夠在表格裏面填上一些值,同時將某個格子的值設爲一個表達式,就像下面這樣git
設置這個表格裏面 B1區域的值爲2,B2區域的值爲3,B3是一個表達式,B3 = B1* B2,當其中一個值改變的時候,這個觀察者B3也會變化,如圖把B1改爲10,B3就會自動計算成30。github
RxJava使用的是觀察者模式,其中有兩個關鍵的接口:Observable 和 Observer,當Observable(被觀察的對象)狀態改變,全部subscribed(訂閱)的Observer(觀察者)會收到一個通知。
在Observable的接口中有一個方法 subscribe()
,這樣Observer 能夠調用來進行訂閱。
一樣,在Observer 接口中有三個方法,會被Observable 回調:編程
做爲一個表現良好的Observable,發射0到多個數據時後面都會跟上一個completion 或是error的回調。
聽起來有點複雜,可是一些例子能夠很清晰的解釋。小程序
一個網絡請求observable 一般只發射一個數據而且馬上completes。網絡
每個圓表明了從observable 發射出去的item數據,黑色的block表明告終束或是錯誤。
一個鼠標的移動observable 將會不斷的發送鼠標當前座標,而且從不會結束。app
在一個observable 已經結束後不能再發射新的item數據,下面這個就是一個很差的示範,違反了Observable 的準則
在已經發信號結束後依然發射了一個item。
你能夠直接經過 Observable.create()
建立一個Observable
Observable<T> create(ObservableOnSubscribe<T> source)
看起來十分的簡潔,可是這段代碼是什麼意思呢?這個 「source」 又是什麼? 想要理解這個,只須要知道 ObservableOnSubscribe
是什麼。 這是一個接口,其中有一個方法:
public interface ObservableOnSubscribe<T> { void subscribe(ObservableEmitter<T> emitter) throws Exception; }
這個你建立Observable 時的一個「source」 須要暴露一個 subscribe()
方法,從這裏又引出來另外一個 emitter(發射器),那麼什麼又是emitter?
RxJava中的 Emitter
接口和 Observer 比較類似,都有如下方法
public interface Emitter<T> { void onNext(T value); void onError(Throwable error); void onComplete(); }
ObservableEmitter
提供了一個方法用來取消訂閱,用一個實際場景來形容一下。想象一個水龍頭和水流,這個管道就至關於Observable,從裏面能放出水,ObservableEmitter 就至關因而水龍頭,控制開關,而水龍頭鏈接到管道就是 Observable.create()。
舉個例子省得前面描述太過於抽象,先來看看第一個例子
在 CheeseActivity
類中有這麼一段代碼
// 1 private Observable<String> createButtonClickObservable() { // 2 return Observable.create(new ObservableOnSubscribe<String>() { // 3 @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { // 4 mSearchButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { // 5 emitter.onNext(mQueryEditText.getText().toString()); } }); // 6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { // 7 mSearchButton.setOnClickListener(null); } }); } }); }
上面這段代碼作了如下幾件事情
Observable.create()
建立了一個observable ,並提供了一個ObservableOnSubscribe。subscribe()
方法。setCancellable()
方法。經過重寫cancel()方法,而後當Observable 被處理的時候這個實現會被回調,好比已經結束或者是全部的觀察者都解除了訂閱。如今被觀察者Observable 已經有了,還須要觀察者來進行訂閱,在此以前,咱們先看看另外一個接口, Consumer
,它能夠十分簡單的從emitter 接收到數據。
public interface Consumer<T> { void accept(T t) throws Exception; }
若是僅是想要簡單的訂閱一下Observable,這個接口是很方便的。
Observable 的接口方法 subscribe() 能夠接收不少類型的參數,你能夠訂閱一個全參數的版本,只要你實現其中全部的方法就能夠。若是隻是想要接收一下發射的數據,可使用單一的 Consumer 的版本,這樣只須要實現一個方法,並且也是 onNext
。
咱們能夠直接在Activity的OnStart方法中來實現這個
@Override protected void onStart() { super.onStart(); // 1 Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 2 .subscribe(new Consumer<String>() { //3 @Override public void accept(String query) throws Exception { // 4 showResult(mCheeseSearchEngine.search(query)); } }); }
其中Consumer須要導的包是
import io.reactivex.functions.Consumer;
依次解釋一下上面每一步
這樣一個簡單的實現也寫完了,運行一下APP,跑出來的結果就像下面這樣
雖然已經像模像樣的寫了一個小程序,但其實存在一些問題。當按鈕按下去後這個UI線程實際上被阻塞住了
若是在控制檯可能能夠看到這樣的提示
> 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames! The application may be doing too much work on its main thread.
這是因爲search 發生在主線程,若是是一個網絡請求的話,Android會直接crash,拋出一個NetworkOnMainThreadException 的異常。若是不指定線程,那麼RxJava的操做會一直在一個線程上。
經過 subscribeOn
和 observeOn
兩個操做符能改變線程的執行狀態。subscribeOn
在操做鏈上最好只調用一次,若是屢次調用,依然只有第一次生效subscribeOn
用來指定 observable 在哪一個線程上建立執行操做,若是想要經過observables 發射事件給Android的View,那麼須要保證訂閱者在Android的UI線程上執行操做。
另外一方面, observeOn
能夠在鏈上調用屢次,它主要是用來指定下一個操做在哪個線程上執行,來個例子:
myObservable // observable will be subscribed on i/o thread .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(/* this will be called on main thread... */) .doOnNext(/* ...and everything below until next observeOn */) .observeOn(Schedulers.io()) .subscribe(/* this will be called on i/o thread */);
主要用到三種schedulers:
Schedulers.io(): 適合I/O類型的操做,好比網絡請求,磁盤操做。
Schedulers.computation(): 適合計算任務,好比事件循環或者回調處理。
AndroidSchedulers.mainThread() : 回調主線程,好比UI操做。
map操做符經過運用一個方法把從一個observable 發射的數據再返回成另外一個observable給那些調用的。
好比你有一個observable稱之爲numbers,而且會發射一系列的值,以下所示
經過map操做符的apply方法
numbers.map(new Function<Integer, Integer>() { @Override public Integer apply(Integer number) throws Exception { return number * number; } }
而後結果就像下面這樣
再來個實例,咱們用這個操做符可以把前面的代碼拆分一下
@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(Schedulers.io()) // 2 .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) // 3 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { showResult(result); } }); }
簡述一下代碼,首先,指定下一次操做在I/O線程上,而後經過給的String,執行search返回一個結果列表,
再將線程從I/O上變動爲主線程,showResult
,展現返回的數據。
爲了用戶體驗,咱們須要一個進度條
這裏能夠引入 doOnNext
操做符,doOnNext
有一個 Consumer
,而且在每次observable 發射數據的時候都會被調用,再改一下前面的代碼
@Override protected void onStart() { super.onStart(); Observable<String> searchTextObservable = createButtonClickObservable(); searchTextObservable // 1 .observeOn(AndroidSchedulers.mainThread()) // 2 .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { // 3 hideProgressBar(); showResult(result); } }); }
每次在點擊按鈕的時候就能收到一個事件
首先把線程切換到主線程,而後在 doOnNext
裏面來顯示進度條,再把線程切換到子線程,來進行請求數據,最後在切換回來關閉進度條,展現數據。RxJava很是適合這種需求,代碼也很清晰。
把這個例子跑起來的效果就像下面這樣,點擊的時候就顯示進度條:
除了經過點擊按鈕來搜索,更好的方式就是根據EditText的text內容變化自動的搜索。
首先,就須要對EditText的內容變化進行訂閱觀察,先看代碼實例:
//1 private Observable<String> createTextChangeObservable() { //2 Observable<String> textChangeObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { //3 final TextWatcher watcher = new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) {} @Override public void afterTextChanged(Editable s) {} //4 @Override public void onTextChanged(CharSequence s, int start, int before, int count) { emitter.onNext(s.toString()); } }; //5 mQueryEditText.addTextChangedListener(watcher); //6 emitter.setCancellable(new Cancellable() { @Override public void cancel() throws Exception { mQueryEditText.removeTextChangedListener(watcher); } }); } }); // 7 return textChangeObservable; }
分析一下上面這幾步代碼:
Observable.create
建立一個textChangeObservable ,傳入一個ObservableOnSubscribe 對象beforeTextChanged()
和 afterTextChanged()
,在onTextChanged 裏面,把這個數據經過emitter.onNext 發射出去,這樣訂閱的觀察者就能接收到實現了這個Observable後就能夠把前面的給替換掉
Observable<String> searchTextObservable = createTextChangeObservable();
再跑一次程序,就能夠邊輸入邊搜索了
如今可能有一個需求是在輸入長度比較短的時候不進行搜索,達到必定字符後才搜索,RxJava引入了一個 filter
操做符。
filter只會經過那些知足條件的item,filter經過一個 Predicate
,這個接口內部有一個 test
方法用來決定是否知足條件,最後會返回一個boolean 值。
這裏,Predicate 拿到的是一個輸入字符String,若是長度大於或等於2,就返回true,表示知足條件。
return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } });
注意Predicate須要導的包是:
import io.reactivex.functions.Predicate;
再前面建立Observable的代碼後面加一個 filter
後,當query的長度不足2時,那這個值就不會被髮射出去,而後訂閱的就收不到這個消息。
跑起來就像這樣,只輸一個數,返回false,不會觸發搜索。
再輸一個字符就經過了filter的過濾。
有時咱們對於EditText內容頻繁變化的場景並不想每次變化都去新發送一個請求,因此,這裏又引入了一個新的操做符 debounce
,意思就是防抖動,這個和filter比較相似,也是一種攔截的策略。
這個操做符是根據item被髮射的時間來進行過濾。每次在一個item被髮射後,debounce 會等待一段指定長度的時間,而後纔去發射下一個item。
若是在這段時間內都沒有一個item發生,那麼上一個最後的item會被髮射出去,這樣能保證起碼有一個item能被髮射成功。
從圖裏看到,2,3,4,5觸發的時間很是的接近,因此這一段時間內前三個都被過濾了,只留下了5。
在前面的 createTextChangeObservable()
中,咱們再添加一個 debounce
操做符在 filter
的後面
return textChangeObservable .filter(new Predicate<String>() { @Override public boolean test(String query) throws Exception { return query.length() >= 2; } }).debounce(1000, TimeUnit.MILLISECONDS); // add this line
再跑一下APP,能夠看到中間階段直接省略了,最後搜索了一下結果值
一開始咱們實現了一個observable 是監聽點擊按鈕的事件,而後又實現了一個observable 是監聽EditText的內容變化,那麼怎麼把這兩個合二爲一呢。
RxJava提供了不少的操做符來聯合observables,可是其中最有用和簡單的就是 merge
。merge
能夠將兩個或更多的observable 聯合起來,合成一個單一的observable。
這裏咱們把前面兩個observable 綁定起來
Observable<String> buttonClickStream = createButtonClickObservable(); Observable<String> textChangeStream = createTextChangeObservable(); Observable<String> searchTextObservable = Observable.merge(textChangeStream, buttonClickStream);
如今的效果就是前面的兩種效果的結合體,不管是自動搜索仍是手動搜索都是能夠觸發的。
前面咱們實現過 setCancellable
方法,這個方法會在解除訂閱的時候回調。Observable.subscribe()
會返回一個Disposable,Disposable是一個接口,其中有兩個方法:
public interface Disposable { void dispose(); // ends a subscription boolean isDisposed(); // returns true if resource is disposed (unsubscribed) }
咱們先在 CheeseActivity
中定義一個Disposable
private Disposable mDisposable;
在 onStart()
中,把 subscribe()
的返回值賦給mDisposable
mDisposable = searchTextObservable // change this line .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<String>() { @Override public void accept(String s) { showProgressBar(); } }) .observeOn(Schedulers.io()) .map(new Function<String, List<String>>() { @Override public List<String> apply(String query) { return mCheeseSearchEngine.search(query); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> result) { hideProgressBar(); showResult(result); } });
最後咱們就能在 onStop()
中去解除這個訂閱,代碼以下:
@Override protected void onStop() { super.onStop(); if (!mDisposable.isDisposed()) { mDisposable.dispose(); } }
這樣就解除了訂閱。
你能夠下載這篇文章中的代碼程序,下載地址
固然這篇文章只是講到了RxJava世界的一小點,好比,JakeWharton大神的庫 RxBinding ,這個庫裏面包括大量的Android View的API,你能夠經過調用 RxView.clicks(viewVariable)
來建立一個點擊事件observable 。
除此以外,學習更多有關RxJava的知識,能夠看 官方文檔。
做者:sheepm連接:http://www.jianshu.com/p/031745744bfa來源:簡書著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。