關於
Flowable
的源碼解析能夠看RxJava2 Flowable源碼淺析java
關於
Subject
的源碼解析能夠看RxJava2 Subject源碼淺析緩存
Observable
、Flowable
、Subject
Observer
、Subscrption
Observable#subscribe
纔開始請求上游發送數據。當下遊請求dispose()
中止通知上游中止發送。Rxjava1
開始就有人說Rxjava能夠看做流水線,上游怎麼加工對於下游來講是無感知的,下游只要負責接收響應對應數據事件就行。對於rxajva的思考,能夠參考一下:Rxjava沉思錄系列和Rxjava主要負責人系列博客bash
通常cold Observable建立都是經過just
、create
、fromXX
、just
建立的。最簡單粗暴的建立方式:多線程
Observable.create<String> { it.onNext("") }.subscribe()
複製代碼
//[僅關注點相關代碼]
//ObservableOnSubscribe僅一個subscribe方法
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins這是一個全局Hook,#onAssembly不實現默認直接返回
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final void subscribe(Observer<? super T> observer) {
try {
........
//真正調用subscribe的實現
subscribeActual(observer);
}
......
}
//整個Observable惟一的抽象方法,由子Observable實現,經過這個方法將上游和下游關聯起來
protected abstract void subscribeActual(Observer<? super T> observer);
}
複製代碼
由Observable#create
真正返回的是ObservableCreate
,當調用Observable#subscribe
才真正通知上游Observable
開始發送數據。其實質是經過#subscribeActual
將上下游創建聯繫,並調用上游#subscribe
(在ObservableCreate
中就是ObservableOnSubscribe#subscribe
)方法通知上游,下游已訂閱能夠開始發送數據。app
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//source(上游)即Observable#create傳入的ObservableOnSubscribe
//這裏就將上下游真正的聯繫了起來。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
.....
}
}
複製代碼
因此實質就是下游通知上游,下游已產生訂閱觸發上游下發數據/事件,上游再經過下發數據/事件,最終下游經過指定方法響應上游下發的數據/事件。因此一開始說的流水線方式就能夠理解了。異步
由於每次下游產生一次訂閱都會通知到上游的
#subscribe
,因此若是上游只在#subscribe
中去建立初始數據源就能夠每一個作到不一樣下游的數據不關聯ide
Observable.create<String> { it.onNext("") }.subscribe()
流程圖以下:post
//Observable#map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//**ObservableMap將上游Observable和當前的轉換Function創建聯繫
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//將下游包裝成MapObserver,並將MapObserver和上游創建聯繫
//這樣上游下發時,先經過MapObserver處理才下發給真正的Observer
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
....
U v;
try {
//經過Function獲取到map後的數據
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch ....
//向下游下發數據
actual.onNext(v);
}
...
}
}
複製代碼
能夠看到map
操做符的做用就是經過將上游攔截返回ObservableMap
提供給下游訂閱,並在map上游返回數據前經過mapper
將上游數據轉化並下發給下游。ui
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//emmmm,是否是點眼熟
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//onSubscribe()方法執行在 訂閱處所在的線程
s.onSubscribe(parent);
//將上游放入scheduler中調用,且當即執行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
//scheduler#scheduleDirect中執行完後
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//該方法調用已經在scheduler中調用
source.subscribe(parent);
}
}
}
複製代碼
由源碼能夠看出由scheduler.scheduleDirect
->SubscribeTask#run
->SubscribeOnObserver#subscribe(observer)
將整個調度切換到指定線程中。this
由於訂閱是用下自上的,因此
subscribeOn
也老是離源最近的一個生效。由於觸發源的subscribe
是離源最近一個。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
能夠看出Rxjava的操做符套路基本是將源
Observable
經過裝飾者模式封裝一層再返回新的Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;//默認false
final int bufferSize;//通常128
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//建立主線程調度器
Scheduler.Worker w = scheduler.createWorker();
//關聯上下游,觸發上游訂閱過程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
複製代碼
這裏能夠看出ObservableObserveOn
仍是很簡單的,上游訂閱過程並不用關心,下游的觸發則由ObserveOnObserver
處理。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//上游數據的緩存隊列
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
......
//建立對接緩存數據
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回調下游onSubscribe
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {//執行過complete/error則done爲true
return;
}
if (sourceMode != QueueDisposable.ASYNC) {//非異步數據,默認同步數據
queue.offer(t);//入隊列
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;//標記已完成
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;//標記已完成
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
//自旋+1,!=0則表示worker.schedule已在執行無需在調度
if (getAndIncrement() == 0) {
worker.schedule(this);//經過調度器處理,將數據取出下發到下游
}
}
@Override
public void run() {
if (outputFused) {//默認false
drainFused();
} else {
drainNormal();//取出數據下發
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (; ; ) {
//檢測是否不用再處理
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (; ; ) {
boolean d = done;
T v;
try {
v = q.poll();//取出一個數據
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {//可能已經提早disposed了
return;
}
if (empty) {//數據爲空隊列無數據,退出下發循環
break;
}
//下發
a.onNext(v);
}
//可能有錯過的schedule,再次循環檢測
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//檢測是否compelte/error/隊列已空
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {//已經disposed
queue.clear();
return true;
}
if (d) {//是否已結束
Throwable e = error;
if (delayError) {//延遲error,等待隊列清空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);//下發error
worker.dispose();
return true;
} else if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
複製代碼
ObserveOnObserver
繼承於BasicIntQueueDisposable
繼承於AtomicInteger
,經過自身的原子性(自旋/CAS)來消除多線程對#schedule
的調用。
能夠看出#observeOn
只對下游有影響。
由於subscribeOn()
切換線程是在subscribeActual
中切換,經過切換上游訂閱過程的整個線程,從而影響發射數據的下發所在線程。因此subscribeOn()
只有最靠近源的一次生效。
而observeOn
主動切換下發過程,對下發過程產生影響,·且屢次調用屢次生效。 PS:操做符的轉換效果都是在onXXX
下發過程當中實現的,因此對操做符也有做用。