Observable p=Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("hello world");
e.onComplete();
}
});
複製代碼
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
調用create方法以後實際上返回了一個ObservableCreate對象.繼承了Observable,是一個被觀察者對象.java
p.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
咱們看下subscribe方法.bash
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
複製代碼
其餘代碼都刪掉了,剩下最核心的 subscribeActual(observer),這個observer就是咱們建立的匿名內部類對象.subscribeActual()方法是個抽象方法,咱們看下ObservableCreate中是怎麼實現的.app
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
CreateEmitter發射器,在這裏咱們調用了 observer.onSubscribe(parent)也就是咱們建立的匿名observer類的onSubscribe方法.ide
***source.subscribe(parent)***最重要的方法可能沒有之一,觀察者和被觀察者順利會師,事件開始執行,oop
@Override
public void subscribe(ObservableEmitter e) throws Exception {//這裏的ObservableEmitter就是parent,也就是CreateEmitter發射器對象
e.onNext("hello world");
e.onComplete();
}
複製代碼
接下來看看CreateEmitter的onNext和onComplete方法.ui
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
複製代碼
咱們看到在發射器的onNext方法中,啥也沒作,就是當了個二傳手,調用了咱們觀察者的onNext方法.this
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
onComplete方法中也就是調用了觀察者的onComplete方法. 咱們來縷縷這個過程 1 create方法傳返回了一個對象是ObservableCreate,ObservableCreate的構造方法中有一個ObservableOnSubscribe對象,也就是咱們使用create時候建立的匿名內部類對象. 2 p.subscribe(o)實際上調用了ObservableCreate的subscribeActual方法 3 subscribeActual中首先調用了 observer的onSubscribe方法,緊接着調用了source.subscribe(parent)也就是ObservableOnSubscribe的subscribe方法,事件開始執行 4 subscribe方法中調用CreateEmitter的onNext方法,這個方法調用了observer的onNext方法,觀察者對事件進行反應. 5 subscribe方法中調用CreateEmitter的onComplete方法,這個方法調用了observer的onComplete方法,整個流程結束.spa
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
map操做符把咱們的observable對象變化成了具體的ObservableMap,參數是咱們以前建立好的observable和mapper functioncode
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
注意注意:這裏造成了一個新的訂閱關係 這裏的source是咱們create建立的observable,要否則會懵,建立ObservableMap時候咱們傳進來的this是咱們生成的observable. 到這裏咱們會從新調用onSubscribe() subscribeActual(),這裏就回到了咱們最簡單模式時候的調用步驟.不一樣的是咱們真正的調用observer的方法實在MapObserver對應的方法中. 具體流程是***發射器調用onNext方法-->MapObserver的onNext方法-->再到咱們定義的observer的onNext方法***server
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//調用mapper改變數據
**v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");**
} catch (Throwable ex) {
fail(ex);
return;
}
//actual咱們定義的observer
actual.onNext(v);
}
複製代碼
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
複製代碼
看看ObservableFlatMap代碼
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼
是否是和MAP超級像,咱們這幾看MergeObserver onNext作了什麼
@Override
public void onNext(T t) {
...
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
...
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
addInner(inner);
p.subscribe(inner);
break;
}
}
}
複製代碼
省略了不少代碼,咱們看主要邏輯,獲取到flatMap生成的observableSource,而後 p.subscribe(inner);注意這裏的P不是observable 看innerObserver的onNext作了什麼
//這裏的onNext事件由 p.subscribe(inner)觸發
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
複製代碼
在這裏咱們終於看到咱們定義的observer接收到了onNext事件
Observable ObservableSource要分清楚,他們都有一個方法叫subscribe() Observer Emitter分清楚,他們有共同的方法onNext() onError() onComplete() 不然話很容易暈頭轉向.
文章若有表述有錯誤,請指出,謝謝.