1:用法:ide
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; //observable.subscribe(observer); observable.doOnSubscribe(disposable -> { Log.d(TAG, "doOnSubscribe"); } ).doOnComplete(() -> { Log.d(TAG, "doOnComplete"); }).doOnNext((C) -> { Log.d(TAG, "doNext" + C); }).subscribe(observer);
2:Observable函數
首先看Observable:ui
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
看一下create:this
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
參數ObservableOnSubscribe是一個接口,裏面只有一個函數subscribe:spa
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter<T> e) throws Exception; }
create須要返回的是一個Observable:code
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
經過在這裏可知反回的是一個Observable的繼承類ObservableCreate:server
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
。。。。。。。。。。
總結以上的幾個類,能夠概括一下:blog
經過Observable.create返回一個Observable,具體是返回ObservableCreate,該類繼承Observable,同時該類持有ObservableOnSubscribe,而ObservableOnSubscribe是一個接口,具體的實現是在:繼承
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
Observable的解析暫時先到這裏,咱們先看如何和observer關聯起來:接口
//observable.subscribe(observer); observable.doOnSubscribe(disposable -> { Log.d(TAG, "doOnSubscribe"); } ).doOnComplete(() -> { Log.d(TAG, "doOnComplete"); }).doOnNext((C) -> { Log.d(TAG, "doNext" + C); }).subscribe(observer);
進入subscribe:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
能夠看到這是Observable裏面的一個方法,前面咱們說到,create生成的是ObservableCreate,而該類繼承Observable,因此咱們如今就是在ObservableCreate的subscribe方法裏面,看參數,傳進來的是一個observer,observer也是一個interface,具體實現就是應用層的:
Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } };
到這裏再來總結一下,經過subscribe,就是ObservableCreate這個類(該類繼承Observable,同時該類持有ObservableOnSubscribe,而ObservableOnSubscribe是一個接口)執行subscribe,傳參爲observer,是一個實現爲用戶層定義的接口。接下來就是具體看一下subscribe這個函數是如何走的:
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
首先看:
subscribeActual(observer);
點進這個函數看到是一個Observable裏面的一個abstract函數,那實現是在哪裏呢,前面一直提到的,咱們如今實際上是處於ObservableCreate這個類,進入該類找到subscribeActual這個函數:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
先看這句:
source.subscribe(parent);
先看看source是怎麼來的:
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); } });
一層層往上追溯能夠看到sorce其實就是用戶層實現的那個接口:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(ObservableEmitter<T> e) throws Exception; }
會到:
source.subscribe(parent);
這裏的parent是CreateEmitter,先看subscribe:
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { //Log.d(TAG, "ObservableEmitter"); //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName()); emitter.onNext(12); emitter.onNext(13); emitter.onNext(14); emitter.onNext(15); emitter.onComplete(); }
前面看到subscribe的參數是parent:
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
這裏定義了一個CreateEmitter,持有observer:
implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } }
咱們來看一下onNext是如何執行的,當執行到:
emitter.onNext(12);
的時候,就到了:
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
看這句:
observer.onNext(t);
咱們前面講到了,這裏的observer,就是用戶層自定義的:
Observer<Integer> observer = new Observer<Integer>() { private int i; private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); mDisposable = d; } @Override public void onNext(Integer integer) { /*i++; if(i == 3){ mDisposable.dispose(); }*/ Log.d(TAG, "onNext" + integer); }
到這裏,Observable裏面的onNext和Observer的onNext就聯繫起來了,Observavble發送一個,Observer執行一個。onError和onComplete同理。