Andrid Studio中引用RxAndroid
compile 'io.reactivex:rxandroid:1.2.0'
Hello World
RxJava最核心的兩個東西是Observables(被觀察者,事件源)和Subscribers(觀察者)。Observables發出一系列事件,Subscribers處理這些事件(例如:觸摸事件,web接口調用返回的數據...)。一個Observable能夠發出零個或者多個事件,直到結束或者出錯。每發出一個事件,就會調用它的Subscriber的onNext方法,最後調用Subscriber.onError()或者Subscriber.onError()結束。
Rxjava的看起來很想設計模式中的觀察者模式,可是有一點明顯不一樣,那就是若是一個Observerble沒有任何的的Subscriber,那麼這個Observable是不會發出任何事件的。
建立一個Observable對象很簡單,直接調用Observable.create便可。
Observable<String> myObservable = Observable.create(//建立事件源
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();//該方法必須主動調用
}
}
);
這裏定義的Observable對象
myObservable
僅僅發出一個"
Hello World
"字符串,而後就結束了。
接着咱們建立一個Subscriber來處理Observable對象發出的字符串。
Subscriber<String> mySubscriber = new Subscriber<String>() {//建立觀察者
@Override
public void onNext(String s) {
lg.e("RxJava 測試結果:" + s);
}
@Override
public void onCompleted() {
lg.e("RxJava onCompleted");
}
@Override
public void onError(Throwable e) {
lg.e("RxJava onError");
}
};
這
裏subscriber僅僅就是打印observable發出的字符串。
Subscriber實現了Observer接口,而在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 。因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:
1.onStart(): 這是 Subscriber 增長的方法。它會在 subscribe 剛開始而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置,默認空實現。須要注意的是,onStart() 老是在 subscribe 所發生的線程被調用,並不能保證在主線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法,具體能夠在後面的文中看到。
2.unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。調用該方法將使得Subscriber 再也不接收事件。通常在調用前使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe()方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關係,以免內存泄露的發生。
經過subscribe函數就能夠將咱們定義的myObservable對象和mySubscriber對象關聯起來,這樣就完成了subscriber對observable的訂閱。
myObservable.subscribe(mySubscriber);
一旦mySubscriber訂閱了myObservable,myObservable就是調用mySubscriber對象的onNext和onComplete|onError方法。
變換-> Map &flatMap操做符
RxJava 提供了對事件序列進行變換的支持
,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大緣由。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不一樣的事件或事件序列。
使用場景:當須要修改接收的內容時,
a.能夠在Observable中直接修改字符串,侷限在於
1.若Observable是第三方提供的,可能不容許更改
2.此時修改屬於批量修改,凡是訂閱此Observable的數據均會改變
b.在訂閱者端處理:
1.與但願Subscribers越輕量越好的原則衝突
2.根據響應式函數編程的概念,Subscribers更應該作的事情是"
響應
",響應Observable發出的事件,而不是去修改
基於map針對對象進行變換
線程控制 -> Scheduler
在不指定線程的狀況下, RxJava 遵循的是線程不變的原則,即:在調用 subscribe()的線程生產事件;在生產事件的線程消費事件。若是須要切換線程,就須要用到 Scheduler(調度器)。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:
a.Schedulers.immediate(): 直接在當前線程運行(與),至關於不指定線程。這是默認的 Scheduler。
b.Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。
c.Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread()更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
d.Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。
接下來就可使用subscribeOn() 和 observeOn() 兩個方法對線程進行控制了。
subscribeOn(): 即事件產生的線程
。
observeOn(): 指定Subscriber所在的線程(
map、
subscribe等方法的執行域
),即事件消費的線程
。
示例以下:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {//這裏所屬線程受subscribeOn控制
lg.e("isFunctionInMainThread-call:" + UtilsThread.isFunctionInMainThread());
subscriber.onNext("***");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).map(new Func1<String, String>() {//該map操做由於沒調用observeOn指定線程,默認使用Observable建立所屬線程(即create方法所屬)
@Override
public String call(String s) {
lg.e("isFunctionInMainThread01:" + UtilsThread.isFunctionInMainThread());
return s;
}
}).observeOn(Schedulers.io()).map(new Func1<String, String>() {
@Override
public String call(String s) {
lg.e("isFunctionInMainThread02:" + UtilsThread.isFunctionInMainThread());
return s;
}
}).observeOn(AndroidSchedulers.mainThread()).map(new Func1<String, String>() {//這裏的map操做在主線程
@Override
public String call(String s) {
lg.e("isFunctionInMainThread03:" + UtilsThread.isFunctionInMainThread());
return s;
}
}).observeOn(Schedulers.io()).subscribe(new Subscriber<String>() {//這裏表示subscribe在io線程
@Override
public void onCompleted() {
lg.e("isFunctionInMainThread-subscribe:" + UtilsThread.isFunctionInMainThread());
}
@Override
public void onError(Throwable e) {
lg.e("onError");
}
@Override
public void onNext(String s) {
lg.e("onNext with " + s, "isFunctionInMainThread-subscribe:" + UtilsThread.isFunctionInMainThread());
}
});
運行結果以下所示,
思考: