本篇文章已受權微信公衆號 YYGeeker
獨家發佈轉載請標明出處java
CSDN學院課程地址react
- RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…
在RxJava使用以前記得在Gradle中添加依賴引入android
implementation "io.reactivex.rxjava2:rxjava:2.1.12"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
複製代碼
在上面的例子中,咱們能夠注意到被觀察者中有個Emitter(發射器),發射器的位置位於subscribe回調參數ObservableEmitter<String> e
中,經過名字咱們能夠知道,RxJava的事件通知就是經過它來進行發送的,因此它是一個事件發射器,發射器能發送的事件有onNext()
,onComplete()
,onError()
,在觀察者的回調中,分別對應着相同方法名進行回調,這裏對觀察者的回調方法進行簡單介紹編程
人類就喜歡酷炫的東西,炫耀自身的優勢,固然RxJava也少不了人類這種心理,就是鏈式編程,下面這段代碼能夠完美替代例子上面的全部代碼,其效果是和上面同樣的。這裏須要注意的是,建立Observer過程當中,會將全部的接收方法都建立出來,若是此時程序發生異常,RxJava默認會將異常信息try-catch緩存
public static void main(String[] args) {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默認在主線程裏執行該方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
})
//建立觀察者並訂閱
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
複製代碼
長此以往,人類喜歡簡潔,喜歡定製服務,巧了,RxJava也給你知足了,下面這段代碼中,實現的方法跟上面的實現方法是對應起來的,你們看參數就知道哪一個對應哪一個了,你能夠經過建立Consumer,不須要實現的方法你能夠不寫,看上去更簡潔,這裏我爲了方便你們看,都new出來了,Consumer就是消費者的意思,能夠理解爲消費了onNext等等等事件。這裏須要注意的是,建立Consumer過程當中,若是沒有將第二個Throwable的Consumer建立出來,若是此時程序發生異常,程序將會崩潰bash
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//對應onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//對應onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//對應onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//對應onSubscribe()
}
});
}
複製代碼
固然Rxjava的使用不單單這麼簡單的事件發送,他還能完成一些業務上的邏輯。好比註冊登陸操做,正常的邏輯是經過註冊去獲取用戶的Token,而後經過Token進行登陸,這個過程涉及到註冊須要在子線程去進行網絡請求,而後在UI線程中更新界面提示,而後再切換到子線程進行登陸操做,最後又得切換到UI線程去更新界面,這一系列的操做,也是能夠經過RxJava的線程切換來進行實現,在RxJava中的線程切換特別簡單,只要下面這兩句話就能自由的在子線程和UI線程中自由切換微信
public static void main(String[] args) {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默認在主線程裏執行該方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
})
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
複製代碼
說到線程切換,就必須不得不說的是RxJava的線程調度器,其調度器就是Schedulers,在調度器中封裝了各式各樣的線程提供給咱們使用,下面舉例其現有的調度器列表網絡
調度器類型 | 效果 |
---|---|
Schedulers.computation() | 用於計算任務,如事件循環或和回調處理,不要用於IO操做(IO操做請使用Schedulers.io());默認線程數等於處理器的數量 |
Schedulers.from(executor) | 使用指定的Executor做爲調度器 |
Schedulers.immediate() | 在當前線程當即開始執行任務 |
Schedulers.io() | 用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io()默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器 |
Schedulers.newThread() | 爲每一個任務建立一個新線程 |
Schedulers.trampoline() | 當其它排隊的任務完成後,在當前線程排隊開始執行 |
RxJava事件發出去並非置之不顧,要有合理的管理者來管理它們,在合適的時機要進行釋放事件,這樣纔不會致使內存泄漏,這裏的管理者咱們稱爲事件調度器(或事件管理者)CompositeDisposable
。Rxjava的事件流發出去後,會返回Disposable
類型的對象,咱們能夠將該對象添加到事件調度器上,而後進行相關操做,這裏的事件調度器咱們能夠簡單的理解爲事件的容器異步
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//對應onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//對應onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//對應onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//對應onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
複製代碼
CompositeDisposable
提供的方法中,都是對事件的管理ide
RxJava的事件發射分爲冷與熱,一個"熱"的Observable可能一建立完就開始發射數據,所以全部後續訂閱它的觀察者可能從序列中間的某個位置開始接受數據(有一些數據錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它纔開始發射數據,所以這個觀察者能夠確保會收到整個數據序列
RxJava能夠簡單的歸結爲三步驟