一:RxJava執行流程:
java
RxJava簡單使用緩存
private final String tag = getClass().getSimpleName(); //數據源,被觀察對象 Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(tag,"emit 1"); emitter.onNext("t1"); Log.d(tag,"emit 2"); emitter.onNext("t2"); Log.d(tag,"emit 3"); emitter.onNext("t3"); Log.d(tag,"emit "); emitter.onComplete(); } }); private void observer_test(){ Observer<String> dnObser = new Observer<String>() {//觀察者,處理對應事件 @Override public void onSubscribe(Disposable d) { Log.d(tag,"onSubscribe"); } @Override public void onNext(String s) { Log.d(tag,"onNext "+s); } @Override public void onError(Throwable e) { Log.e(tag,"onError"); } @Override public void onComplete() { Log.d(tag,"onComplete"); } }; obser.subscribe(dnObser); }); }
從例子中看出RxJava主要組成:app
Observable:被觀察者,被觀察者自己
ObservableOnSubscribe:通知觀察者執行哪些行爲
Observer:觀察者,經過實現對應方法作具體處理ide
訂閱過程處理:oop
@Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //開始調用ObservableOnSubscribe subscribe subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
protected abstract void subscribeActual(Observer<? super T> observer);ui
查看subscribe方法爲抽象方法,具體實現爲ObservableCreate,從Observabel的create方法能夠知道this
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
查看Observable內的源碼spa
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {//這裏調用ObservableOnSubscribe 的subscribe方法,開始執行事件流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
二:數據轉換線程
收到Observable的消息以前咱們有可能會對數據流進行處理,例如map()、flatMap()、fllter()等方法,
這裏使用了map()方法,它接收了observeable的數據並將經過該方法將數據進行轉換後的新數據發出去,即作了中間轉化代理
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
rxjava2使用了Function接口提供轉換功能,
public interface Function<T, R> { //將T類型數據轉化爲R處理 @NonNull R apply(@NonNull T t) throws Exception; }
具體操做交給ObservableMap內部類MapObserver處理
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { ....... U v; try { //調用Function apply處理 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; }//將轉換後的類型再傳遞給原Obsever actual.onNext(v); } ...... } }
MapObserver實現Observer,持有傳入的Observer,經過Function的mapper.apply(t)進行轉換後再傳遞給原observer onNext()
三:任務調度(scheduler)
經過使用subscribeOn()、observeOn()方法傳入對應的Scheduler去指定每一個操做應該運行在何種線程之中
Observable.create(...)
...
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
...
.subscribeOn(Schedulers.newThread())
...
.subscribeOn(Schedulers.computation())
...
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(...)
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
建立了一個新的Observable
,併爲新的Observable
建立了新的計劃表ObservableSubscribeOn對象,新的計劃表保存了原始Observable
對象和調度器scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //調用了Scheduler的shedule方法,建立Runable內部執行原obseverable sbscribe parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); } ... }
以IOScheduler爲例
Scheduler schdule
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //調用Work 切換執行線程 w.schedule(new Runnable() { @Override public void run() { try { decoratedRun.run(); } finally { w.dispose(); } } }, delay, unit); return w; }
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } public int size() { return pool.get().allWorkers.size(); } static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } 。。。。 @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } 。。。 }
咱們從緩存池裏拿到須要的worker並做了一層封裝成爲EventLoopWorker:最後調用NewThreadWorker 的scheduleActual
看NewThreadWorker實現:
public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor;//線程執行器 ... public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } ...... //經過Execotor來執行上面傳遞過來的Runable對象,達到在不一樣類型線程來執行調用Observer方法 public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future<?> f; if (delayTime <= 0) { f = executor.submit(decoratedRun);//提交到線程池執行 } else { f = executor.schedule(decoratedRun, delayTime, unit); } return Disposables.fromFuture(f); } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } }
再看看observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L; final Observer<? super T> actual; final Scheduler.Worker worker; final boolean delayError; final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error; volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onSubscribe(Disposable s) { ... actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); } ... void schedule() {//任務調度,交給線程池回調Runable if (getAndIncrement() == 0) { worker.schedule(this); } } @Override
public void run() {//回調處理,代理調用原Observer方法
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
.......
a.onNext(v);
......
}
} ... } }
這裏經過ObservableObserveOn代理,實現Observer observeOn線程切換處理
未完待續。。。