GitHub關於RxJava的介紹:編程
a library for composing asynchronous and event-based programs by using observable sequences安全
他的意思就是 一個經過可觀測的序列來組成異步和基於事件的庫。bash
RxJava的出現消除同步問題、線程安全等問題app
總的來講就是方便咱們異步編程。異步
異步async
鏈式調用結構ide
使用複雜的異步調用方式的時候依舊能夠保持簡潔異步編程
學習成本比較高,入門的門檻比較高學習
難以理解的API,須要查看源碼才能理解API的具體效果ui
首先明白他的基礎使用步驟:
正常建立被觀察者:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE");
emitter.onComplete();
}
});
複製代碼
在這裏面一共產生了四個事件:One、Two、Three、結束。
PS:
非正常建立第一彈:
Observable observable = Observable.just("ONE","TWO","THREE");
非正常建立第二彈:
String[] values = {"ONE", "TWO", "THREE"};
Observable observable = Observable.fromArray(values);
複製代碼
其實這樣的非正常建立是內部將這些信息包裝成onNext()這樣的事件發送給觀察者。
正常建立:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("z", "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i("z", "onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i("z", "onError: ");
}
@Override
public void onComplete() {
Log.i("z", "onComplete: ");
}
};
複製代碼
非正常建立:
Consumer<String> observer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("z", "accept: s = " + s);
}
};
複製代碼
observable.subscribe(observer);
你已經注意到不同的地方,爲何被觀察者訂閱了觀察者?
之因此會這樣,是由於RxJava爲了保持鏈式調用的流暢性。
RxJava既然是異步庫,固然對於異步的處理會更好
在咱們看RxJava的異步調用以前,咱們先來學習下其中比較重要的兩個點
這個表示Observable在一個指定的環境下建立,只能使用一次,屢次建立的話會以第一次爲準。
表示 事件傳遞和 最終處理髮生在哪一個環境下,能夠屢次調用,每次指定以後,下一步就生效。
好比:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE");
emitter.onComplete();
}
}) // 被觀察者在一個新的線程中建立
.subscribeOn(Schedulers.newThread())
// 下面這個操做是在io線程中
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toLowerCase();
}
})
// 切換,觀察者是在主線程中
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
複製代碼
先看一下基礎的調用方式:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: ");
emitter.onNext("ONE");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: e = " + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
}
});
複製代碼
結果:
onSubscribe:
subscribe:
onNext: s = ONE
複製代碼
咱們先從訂閱開始看,也就是subscribe
方法
public final void subscribe(Observer<? super T> observer) {
... // 忽略部分源碼
subscribeActual(observer);
... // 忽略部分源碼
}
複製代碼
直接找到主要的方法subscribeActual(observer)
,這個是抽象的方法,會被實如今子類中。
因此咱們接着看看Observable
的子類實現:
咱們進入到create
方法中:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
其實 返回的就是 ObservableCreate
的對象,
須要注意的是: ObservableCreate
是 Observable
的一個子類 ObservableCreate
被建立都會傳入一個source
的字段,這個source
就是 ObservableOnSubscribe
。
在 ObservableCreate
具體實現了 subscribeActual
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 在這裏觸發 observer#onSubscribe()
observer.onSubscribe(parent);
try {
// 在這裏回調
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
在這裏方法能夠看到 觀察者observer
的 onSubscribe
會先於回調發生。
而後調用 ObservableOnSubscribe
的方法 subscribe
具體的事件後由開發者去作, 能夠看到在案例中調用了 CreateEmitter
,能夠進入到 CreateEmitter
看看onNext()
的實現
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
複製代碼
能夠看到 在CreateEmitter
的onNext()
中調用了 觀察者observer
的onNext()
方法.
而後能夠看到案例中的調用:
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
複製代碼