RxJava簡要分析

 

一:RxJava執行流程:
java

RxJava簡單使用緩存

private final String tag = getClass().getSimpleName();
//數據源,被觀察對象
    Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(tag,"emit 1");
            emitter.onNext("t1");
            Log.d(tag,"emit 2");
            emitter.onNext("t2");
            Log.d(tag,"emit 3");
            emitter.onNext("t3");
            Log.d(tag,"emit ");
            emitter.onComplete();
        }
    });


    private void observer_test(){
        Observer<String> dnObser = new Observer<String>() {//觀察者,處理對應事件
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(tag,"onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(tag,"onNext "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(tag,"onError");
            }

            @Override
            public void onComplete() {
                Log.d(tag,"onComplete");
            }
        };
        obser.subscribe(dnObser);
        });
    }

 

從例子中看出RxJava主要組成:app

Observable:被觀察者,被觀察者自己
ObservableOnSubscribe:通知觀察者執行哪些行爲
Observer:觀察者,經過實現對應方法作具體處理ide

訂閱過程處理:oop

  @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    //開始調用ObservableOnSubscribe subscribe
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

 

protected abstract void subscribeActual(Observer<? super T> observer);ui

查看subscribe方法爲抽象方法,具體實現爲ObservableCreate,從Observabel的create方法能夠知道this

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

 查看Observable內的源碼spa

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {//這裏調用ObservableOnSubscribe 的subscribe方法,開始執行事件流程
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
......
}

 

二:數據轉換線程

收到Observable的消息以前咱們有可能會對數據流進行處理,例如map()、flatMap()、fllter()等方法,
這裏使用了map()方法,它接收了observeable的數據並將經過該方法將數據進行轉換後的新數據發出去,即作了中間轉化代理

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

 rxjava2使用了Function接口提供轉換功能,

public interface Function<T, R> {
   //將T類型數據轉化爲R處理
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

 

具體操做交給ObservableMap內部類MapObserver處理

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
           .......

            U v;
            try {
//調用Function apply處理
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }//將轉換後的類型再傳遞給原Obsever
            actual.onNext(v);
        }
......
    }
}

 

MapObserver實現Observer,持有傳入的Observer,經過Function的mapper.apply(t)進行轉換後再傳遞給原observer onNext()


三:任務調度(scheduler)

經過使用subscribeOn()、observeOn()方法傳入對應的Scheduler去指定每一個操做應該運行在何種線程之中

Observable.create(...)
    ...
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    ...
    .subscribeOn(Schedulers.newThread())
    ...
    .subscribeOn(Schedulers.computation())
    ...
        .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
        .subscribe(...)

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

 

建立了一個新的Observable,併爲新的Observable建立了新的計劃表ObservableSubscribeOn對象,新的計劃表保存了原始Observable對象和調度器scheduler

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {

        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
//調用了Scheduler的shedule方法,建立Runable內部執行原obseverable sbscribe
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
...
}

 

以IOScheduler爲例

Scheduler schdule

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//調用Work 切換執行線程
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

    public int size() {
        return pool.get().allWorkers.size();
    }

    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
。。。。

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

       。。。
    }

 咱們從緩存池裏拿到須要的worker並做了一層封裝成爲EventLoopWorker:最後調用NewThreadWorker 的scheduleActual

NewThreadWorker實現:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;//線程執行器
...

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
......

   //經過Execotor來執行上面傳遞過來的Runable對象,達到在不一樣類型線程來執行調用Observer方法
    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        try {
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(decoratedRun);//提交到線程池執行
            } else {
                f = executor.schedule(decoratedRun, delayTime, unit);
            }
            return Disposables.fromFuture(f);
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

 

再看看observeOn

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

 

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            ...

                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
    ...
        void schedule() {//任務調度,交給線程池回調Runable
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    @Override
        public void run() {//回調處理,代理調用原Observer方法
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
   void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
.......

                    a.onNext(v);
                ......
            }
        }        ... } }

這裏經過ObservableObserveOn代理,實現Observer observeOn線程切換處理

 

 

 

未完待續。。。

相關文章
相關標籤/搜索