RxJava是ReactiveX推出在Java VM環境下使用的異步操做庫。除了在Java環境ReactiveX也爲其餘編程語言推出Rx庫,例如Py、Js、Go等。網上有不少關於對RxJava的介紹和使用,在Android開發中也有不少項目使用RxJava。那爲何還要使用RxJava呢,Android開發也有提供異步操做的方法供開發者使用,我想應該是RxJava比起Handle、AsyncTask簡潔優雅。html
1 RxJava採用鏈式調用,在程序邏輯上清晰簡潔java
2 採用擴展式觀察者設計模式react
關於觀察者模式以及其餘RxJava的介紹這個就不作重複,下面內容主要圍繞RxJava和RxAndroid使用。對於RxJava官方文檔已經有詳細介紹,本節是以學習討論爲主,如存在錯誤的地方但願你們能夠指出。android
使用RxJava須要建立Observable,Observable用於發射數據。
以下Observable的create方法須要傳入一個OnSubscribe,其繼承於Action1<Subscriber<? super T>>,Action中的Subscriber就是訂閱者。git
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
另外create方法中須要實現接口call,返回subscriber對象。call方法實如今observable訂閱後要執行的事件流。
subscriber.onNext發射data,subscriber.onCompleted能夠表示發射事件結束。接着調用observable的subscribe方法實現被訂閱後執行的事件流。github
Observable<String> observable = Observable .create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("1"); subscriber.onNext("2"); subscriber.onNext("3"); subscriber.onNext("4"); subscriber.onNext("5"); } }); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.print(s + '\n'); } }; observable.subscribe(subscriber); //輸出結果 print: //1 //2 //3 //4 //5
Observable除了使用create方法建立外還能夠使用from或者just快速設置發射的事件流,簡化了create的步驟。
編程
Observable<String> o = Observable.from("a", "b", "c");
Observable<String> o = Observable.just("one object");
RxJava的線程由Schedulers調度者控制,經過它來控制具體操做在什麼線程中進行。設計模式
Schedulers.immediate() 在當前線程中執行數組
Schedulers.newThread() 爲每個任務開闢線程執行網絡
Schedulers.computation() 計算任務運行的線程
Schedulers.io() IO任務運行的線程
....
AndroidSchedulers.mainThread() Android 主線程運行
對於線程的控制主要由subscribeOn()和observeOn()兩個方法控制:
subscribeOn 控制Observable.OnSubscribe所處的線程,等同於Observable create、just、from時所處的線程。
observeOn 控制Subscriber的線程,也能夠說是控制事件被執行時所在的線程。
Observable .just(1,2,3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { System.out.print(integer + '\n'); } }); //輸出結果 print: //1 //2 //3
寫下上面的RxJava鏈式調用的代碼,有沒有以爲比之前使用的異步調用清爽許多,對處女座還說這很治癒!
ReactiveX提供超級多的操做符,每一個操做符都具備不一樣的功能,但目的都是在Observable和Subscribe之間變換和修改發射出去的事件流。這節介紹幾個比較常見簡單的操做符,以後有機會再寫一節操做符篇詳細說說每一個操做符的做用。附上官方操做符文檔看看就知道有多少多了。
Map()
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return create(new OnSubscribeMap<T, R>(this, func)); }
首先先介紹一個操做符map,map實現Func1接口將T類型數據變換爲R類型數據,返回R類型數據。例如傳入Integer類型的事件隊列,通過map加工以後以String類型返回。
Observable .just(1,2,3) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer + ""; } }) .subscribe(new Subscriber<String>() { ...... @Override public void onNext(String str) { System.out.print(str + '\n'); } }); //輸出結果 print: //1 //2 //3
Filter()
public final Observable<T> filter(Func1<? super T, Boolean> predicate) { return create(new OnSubscribeFilter<T>(this, predicate)); }
filter和map同樣實現Func1接口不過它變換以後的類型爲boolean,對發射的事件流進行篩選,當變換後的boolean值爲true,訂閱者才能收到經過篩選的事件,反之該事件不被消費。例如事件流篩選要求當int值可被2整除才能繼續傳遞,因此最後訂閱者可消費的事件爲2,4,6,8,10。
Observable .just(1,2,3,4,5,6,7,8,9,10) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer + ""; } }) .subscribe(new Subscriber<String>() { ...... @Override public void onNext(String str) { System.out.print(str + '\n'); Log.i("subscribe", str); } }); //輸出結果 print: //2 //3 //4 //6 //8 //10
Skip()
public final Observable<T> skip(int count) { return lift(new OperatorSkip<T>(count)); }
skip操做符表示跳過前幾個事件從某一個事件開始發射事件,下標從0開始。
Observable .just(1,2,3,4,5,6,7,8,9,10) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .skip(3) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer + ""; } }) .subscribe(new Subscriber<String>() { ...... @Override public void onNext(String s) { System.out.print(s + '\n'); Log.i("subscribe", s); } }); //輸出結果 print: //4 //5 //6 //7 //8 //9 //10
Range()
public static Observable<Integer> range(int start, int count) { if (count < 0) { throw new IllegalArgumentException("Count can not be negative"); } if (count == 0) { return Observable.empty(); } if (start > Integer.MAX_VALUE - count + 1) { throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE"); } if(count == 1) { return Observable.just(start); } return Observable.create(new OnSubscribeRange(start, start + (count - 1))); }
range操做符能夠理解爲just,from傳遞一個連續的int類型待發射數組,n爲起始int值,m爲Count。例如n = 1,m = 5 int數組就是{1,2,3,4,5}
這節先學習到這,算是對RxJava的初步認識和學習。其實運用RxJava主要仍是依賴於對操做符的使用,前面所介紹的操做符屬於最最簡單基礎的,還有不少特別有用的操做符沒有介紹。以後再繼續介紹一些操做符。RxJava在Android開發中很受歡迎,就是由於它的強大,同時RxJava能夠和Retrofit組合使用,更高效處理網絡請求回值。另外GitHub GoogleSample上的android-architecture也有使用RxJava框架的TODO項目,能夠看看理解RxJava在項目中的實踐應用。