更多文章請點擊連接:http://77blogs.com/?p=162html
轉載請標明出處:http://www.javashuo.com/article/p-pfcoiphs-mh.html,http://77blogs.com/?p=162設計模式
RxJava到底是啥,從根本上來說,它就是一個實現異步操做的庫,而且可以使代碼很是簡潔。它的異步是使用觀察者模式來實現的。app
關於觀察者模式的介紹,能夠看我這一篇文章:異步
http://www.javashuo.com/article/p-bnuyxkop-mg.html ide
這裏我主要講RxJava的一些基本用法,基本案例,原理的話暫時不深究:spa
1、本身構造事件線程
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter emitter) { int i = getNumber(); if (i < 0) { emitter.onComplete(); return; } else { Log.d(TAG, Thread.currentThread().getName()); emitter.onNext(i); emitter.onComplete(); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, Thread.currentThread().getName()); Log.d(TAG, integer + ""); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { } });
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable
和 Observer
經過 subscribe()
方法實現訂閱關係,從而 Observable
能夠在須要的時候發出事件來通知 Observer
。設計
onNext():方法用來發送事件。code
下面看看其餘兩個方法:server
onCompleted()
: 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext()
發出時,須要觸發 onCompleted()
方法做爲標誌。onError()
: 事件隊列異常。在事件處理過程當中出異常時,onError()
會被觸發,同時隊列自動終止,不容許再有事件發出。onCompleted()
和 onError()
有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted()
和 onError()
兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。講一下咱們上面的例子,上面這個例子是採用簡潔的鏈式調用來寫的:
首先使用 create()
方法來建立一個 Observable ,併爲它定義事件觸發規則,而後經過emitter.onNext(i)傳遞出來,.subscribeOn(Schedulers.io())即是指定該事件產生的所在的線程爲子線程,.observeOn(AndroidSchedulers.mainThread())指定觀察者執行的線程爲主線程。這時候爲止返回的對象爲Observable對象。
而後該Observable對象subscribe綁定觀察者(也就是觀察者進行訂閱),裏面有接收被觀察者發出來的事件,有一個成功的方法,和一個失敗的方法,這樣就實現了由被觀察者向觀察傳遞事件。
2、對集合裏的數據進行變換
List<Integer> list = new ArrayList<Integer>() { { add(0); add(1); add(2); } }; Observable.fromIterable(list).map(new Function() { @Override public Object apply(Object o) throws Exception { int i = (int) o + 1; return String.valueOf(i); } }) .toList() .toObservable().subscribeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Object o) throws Exception { Log.d(TAG, o.toString()); } });
且看,咱們須要對某個集合裏面的數據一一進行變換,而後發送出來執行其餘操做。
上面即是對集合裏面的每一項進行加一操做,而後再轉換爲String類型,而後toList(),組合成集合發送出來,最後在觀察者方法中打印出每一項。
3、合併執行
定義兩個被觀察者,各自產生事件,而後合併在一塊兒,發送給一個觀察者。
首先定義咱們上面第一個例子的被觀察者,用於發送一個數字:
Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter emitter) { int i = getNumber(); if (i < 0) { emitter.onComplete(); return; } else { Log.d(TAG, Thread.currentThread().getName()); emitter.onNext(i); emitter.onComplete(); } } }) .subscribeOn(Schedulers.io());
其次再定義咱們上面第二個例子的被觀察者:
List<Integer> list = new ArrayList<Integer>() { { add(0); add(1); add(2); } }; Observable observable2 = Observable.fromIterable(list).map(new Function() { @Override public Object apply(Object o) { int i = (int) o + 1; return String.valueOf(i); } }) .toList() .toObservable().subscribeOn(Schedulers.io());
最後將這兩個被觀察者的事件合併起來發送給一個觀察者:
Disposable disposable = Observable.zip(observable1, observable2, new BiFunction() { @Override public Object apply(Object o, Object o2) throws Exception { int i = (int) o; String k = (String) ((List) o2).get(0); return k + i; } }) .subscribe(new Consumer() { @Override public void accept(Object o) { Log.d(TAG, (String) o); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) { Log.d(TAG, throwable.getMessage()); } });
zip方法,顧名思義,有點相似與於打包的意思。
o爲被觀察者1返回的結果,o2爲被觀察2返回的結果,將這兩個結果一塊兒處理後發送給觀察者。打印出來。
如今先介紹這幾個,找個時間再整理一些其餘的用法以及原理實現。