在基本使用上(無背壓) 2.x版本和1.x版本極爲類似,針對擴展性作了優化java
一樣的方法再盤他一遍!bash
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("1");
e.onComplete();
}
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
d.dispose();
}
@Override
public void onNext(String value) {
System.out.println("onNext:" + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onCompleted");
}
});
複製代碼
一樣的拆分一下,分爲兩部分
微信
// 經過Observable的create靜態方法,傳入一個ObservableOnSubscribe對象
// 這個OnSubscribe對象附帶了一個call方法,用於回調
// 整個create方法返回了一個Observable對象的實例
// 這裏對於1.x版本的區別是
// Observable.OnSubscribe->ObservableOnSubscribe
// Observer-> ObservableEmitter
// subscriber.isUnsubscribed->e.isDisposed
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("1");
e.onComplete();
}
}
});
// 調用observable的subscribe方法,傳入一個Observer對象
// 這個Observer對象實現了onSubscribe方法,onNext方法,onError方法,onComplete方法。
// 這裏對比1.x版本的區別是
// Observer接口增長了void onSubscribe(@NonNull Disposable d);方法須要實現
// 主動設置狀態d.dispose();增長了靈活性
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
d.dispose();
}
@Override
public void onNext(String value) {
System.out.println("onNext:" + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onCompleted");
}
});
複製代碼
上面的程序打印出的效果是:
onSubscribe
ide
將d.dispose()
註釋掉優化
上面的程序打印出的效果是:
onSubscribe
onNext:1
onCompleted
ui
1.一樣的,從從Observable.create方法開始this
// 在1.x版本中,Observable.create方法是棄用的
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
ObservableCreate的構造方法spa
final ObservableOnSubscribe<T> source;
// 對局部變量賦值,保存,等待回調調用
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
複製代碼
與1.x版本的實現方式大體相同,細節部分有些改變code
這裏經過RxJavaPlugins.onAssembly
方法,返回了一個ObservableCreate
對象,該對象繼承與Observable
,構造方法中傳入了一個ObservableOnSubscribe
實例,賦值給局部變量,等待被回調。該實例就是咱們在基本使用的例子中建立的對象,而且實現了subscribe(ObservableEmitter<String> e)
方法。cdn
回顧1.x版本,思路也是保存一個咱們本身實現的實例到局部變量,等待被回調。能夠說二者在基本使用這種狀況下沒有本質區別。
2.再來看看咱們的subscribe(ObservableEmitter<String> e)
方法是如何被回調的
從第二部分的observable.subscribe()
方法入手分析
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 省略一些判斷及異常捕捉。
...
subscribeActual(observer);
...
}
複製代碼
這裏的 subscribeActual是一個抽象方法,根據1中分析能夠得知,調用者observable是一個ObservableCreate
對象,咱們進入該類查找subscribeActual(observer)
方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 構造了一個CreateEmitter對象,這個對象實現了Emitter、Disposable接口
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 調用傳入observer的onSubscribe方法,這裏的observer就是咱們在基本使用第2步中構造出的observer實例,回調咱們本身實現的onSubscribe,傳入parent能夠進行對d.dispose()方法進行控制,標記是否已經進行了處理。
observer.onSubscribe(parent);
try {
// 調用第1步中保存的source,是一個咱們本身實現ObservableOnSubscribe對象,在這裏被回調。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
再來看一下爲何經過基本使用例子裏面的e.onNext("1");e.onComplete();
就調用到了咱們在限免實現Observer裏面的的方法
@Override
public void onNext(T t) {
// 判空
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 調用了observer.onNext
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
// 調用了observer.onComplete();
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
這樣下來,就經過subscribeActual(observer)
就實現了第1步和第2步的關聯
1)在內部先調用observer.onSubscribe(parent)
進入咱們本身的邏輯執行了System.out.println("onSubscribe")
2)再調用了source.subscribe(parent)
進入咱們本身的邏輯執行了
if (!e.isDisposed()) {
e.onNext("1");
e.onComplete();
}
複製代碼
接着執行了
@Override
public void onNext(String value) {
System.out.println("onNext:" + value);
}
@Override
public void onComplete() {
System.out.println("onCompleted");
}
複製代碼
將咱們的代碼鏈接了起來。
其實在2.x版本的基本使用中,有這麼幾個對象:
ObservableOnSubscribe
對象實現subscribe(ObservableEmitter<String> e)
方法等待被回調Observable
調用subscribe(new Observer)
方法的時候傳入一個觀察者,經過該方法,先調用了本身的onSubscribe方法,能夠進行Disposable的標記,再回調了等待被回調的被觀察者Observable的subscribe方法