RxJava系列六(從微觀角度解讀RxJava源碼)

轉載請註明出處:https://zhuanlan.zhihu.com/p/22338235java


前言

經過前面五個篇幅的介紹,相信你們對RxJava的基本使用以及操做符應該有了必定的認識。可是知其然還要知其因此然;因此從這一章開始咱們聊聊源碼,分析RxJava的實現原理。本文咱們主要從三個方面來分析RxJava的實現:函數

  • RxJava基本流程分析oop

  • 操做符原理分析

  • 線程調度原理分析

本章節基於RxJava1.1.9版本的源碼

1、RxJava執行流程分析

RxJava系列2(基本概念及使用介紹)中咱們介紹過,一個最基本的RxJava調用是這樣的:

示例A

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

首先調用Observable.create()建立一個被觀察者Observable,同時建立一個OnSubscribe做爲create()方法的入參;接着建立一個觀察者Subscriber,而後經過subseribe()實現兩者的訂閱關係。這裏涉及到三個關鍵對象和一個核心的方法:

  • Observable(被觀察者)

  • OnSubscribe (從純設計模式的角度來理解,OnSubscribe.call()能夠看作是觀察者模式中被觀察者用來通知觀察者的notifyObservers()方法)

  • Subscriber (觀察者)

  • subscribe() (實現觀察者與被觀察者訂閱關係的方法)

一、Observable.create()源碼分析

首先咱們來看看Observable.create()的實現:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

這裏建立了一個被觀察者Observable,同時將RxJavaHooks.onCreate(f)做爲構造函數的參數,源碼以下:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

咱們看到源碼中直接將參數RxJavaHooks.onCreate(f)賦值給了當前咱們構造的被觀察者Observable的成員變量onSubscribe。那麼RxJavaHooks.onCreate(f)返回的又是什麼呢?咱們接着往下看:

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

因爲咱們並沒調用RxJavaHooks.initCreate(),因此上面代碼中的onObservableCreate爲null;所以RxJavaHooks.onCreate(f)最終返回的就是f,也就是咱們在Observable.create()的時候new出來的OnSubscribe。(因爲對RxJavaHooks的理解並不影響咱們對RxJava執行流程的分析,所以在這裏咱們不作進一步的探討。爲了方便理解咱們只須要知道RxJavaHooks一系列方法的返回值就是入參自己就OK了,例如這裏的RxJavaHooks.onCreate(f)返回的就是f)。

至此咱們作下邏輯梳理:Observable.create()方法構造了一個被觀察者Observable對象,同時將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe

二、Subscriber源碼分析

接着咱們看下觀察者Subscriber的源碼,爲了增長可讀性,我去掉了源碼中的註釋和部分代碼。

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    private final SubscriptionList subscriptions;//訂閱事件集,全部發送給當前Subscriber的事件都會保存在這裏
    
    ...

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }
    
    ...
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

Subscriber實現了Subscription接口,從而對外提供isUnsubscribed()unsubscribe()方法。前者用於判斷是否已經取消訂閱;後者用於將訂閱事件列表(也就是當前觀察者的成員變量subscriptions)中的全部Subscription取消訂閱,而且再也不接受觀察者Observable發送的後續事件。

三、subscribe()源碼分析

前面咱們分析了觀察者和被觀察者相關的源碼,那麼接下來即是整個訂閱流程中最最關鍵的環節了。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    ...

    subscriber.onStart();
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

subscribe()方法中將傳進來的subscriber包裝成了SafeSubscriberSafeSubscriber實際上是subscriber的一個代理,對subscriber的一系列方法作了更加嚴格的安全校驗。保證了onCompleted()onError()只會有一個被執行且只執行一次,一旦它們其中方法被執行事後onNext()就不在執行了。

上述代碼中最關鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。這裏的RxJavaHooks和以前提到的同樣,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二個入參observable.onSubscribe,也就是當前observable的成員變量onSubscribe。而這個成員變量咱們前面提到過,它是咱們在Observable.create()的時候new出來的。因此這段代碼能夠簡化爲onSubscribe.call(subscriber)。這也印證了我在RxJava系列2(基本概念及使用介紹)中說的,onSubscribe.call(subscriber)中的subscriber正是咱們在subscribe()方法中new出來的觀察者。

到這裏,咱們對RxJava的執行流程作個總結:首先咱們調用crate()建立一個觀察者,同時建立一個OnSubscribe做爲該方法的入參;接着調用subscribe()來訂閱咱們本身建立的觀察者Subscriber
一旦調用subscribe()方法後就會觸發執行OnSubscribe.call()。而後咱們就能夠在call方法調用觀察者subscriberonNext(),onCompleted(),onError()

最後我用張圖來總結下以前的分析結果:

RxJava基本流程分析

2、操做符原理分析

以前咱們介紹過幾十個操做符,要一一分析它們的源碼顯然不太現實。在這裏我拋磚引玉,選取一個相對簡單且經常使用的map操做符來分析。

咱們先來看一個map操做符的簡單應用:

示例B

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "This is " + integer;
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

爲了便於表述,我將上面的代碼作了以下拆解:

Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

Subscriber<String> subscriberOne = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

Observable<String> observableB = 
        observableA.map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "This is " + integer;;
                }
            });

observableB.subscribe(subscriberOne);

map()的源碼和上一小節介紹的create()同樣位於Observable這個類中。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

經過查看源碼咱們發現調用map()的時候其實是建立了一個新的被觀察者Observable,咱們姑且稱它爲ObservableB;一開始經過Observable.create()建立的Observable咱們稱之爲ObservableA。在建立ObservableB的時候同時建立了一個OnSubscribeMap,而ObservableA和變換函數Func1則做爲構造OnSubscribeMap的參數。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;//ObservableA
    
    final Func1<? super T, ? extends R> transformer;//map操做符中的轉換函數Func1。T爲轉換前的數據類型,在上面的例子中爲Integer;R爲轉換後的數據類型,在該例中爲String。

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber<? super R> o) {//結合第一小節的分析結果,咱們知道這裏的入參o其實就是咱們本身new的觀察者subscriberOne。
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;//這裏的actual就是咱們在調用subscribe()時建立的觀察者mSubscriber
        final Func1<? super T, ? extends R> mapper;//變換函數
        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }
        
        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

OnSubscribeMap實現了OnSubscribe接口,所以OnSubscribeMap就是一個OnSubscribe。在調用map()的時候建立了一個新的被觀察者ObservableB,而後咱們用ObservableB.subscribe(subscriberOne)訂閱了觀察者subscriberOne。結合咱們在第一小節的分析結果,因此OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦調用了ObservableB.subscribe(subscriberOne)就會執行OnSubscribeMap.call()

call()方法中,首先經過咱們的觀察者o和轉換函數transformer構造了一個MapSubscriber,最後調用了source也就是observableAunsafeSubscribe()方法。即observableA訂閱了一個觀察者MapSubscriber

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        ...
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上面這段代碼最終執行了onSubscribe也就是OnSubscribeMapcall()方法,call()方法中的參數就是以前在OnSubscribeMap.call()中new出來的MapSubscriber。最後在call()方法中執行了咱們本身的業務代碼:

subscriber.onNext(1);
subscriber.onCompleted();

其實也就是執行了MapSubscriberonNext()onCompleted()

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        ...
        return;
    }
    actual.onNext(result);
}

onNext(T t)方法中的的mapper就是變換函數,actual就是咱們在調用subscribe()時建立的觀察者subscriberOne。這個T就是咱們例子中的IntegerR就是String。在onNext()中首先調用變換函數mapper.call()T轉換成R(在咱們的例子中就是將Integer類型的1轉換成了String類型的「This is 1」);接着調用subscriberOne.onNext(String result)。一樣在調用MapSubscriber.onCompleted()時會執行subscriberOne.onCompleted()。這樣就完成了一直完成的調用流程。

我認可太囉嗦了,花費了這麼大的篇幅纔將map()的轉換原理解釋清楚。我也是但願儘可能的將每一個細節都呈現出來方便你們理解,若是看我囉嗦了這麼久仍是沒能理解,請看下面我畫的這張執行流程圖。

加入Map操做符後的執行流程

3、線程調度原理分析

在前面的文章中我介紹過RxJava能夠很方便的經過subscribeOn()observeOn()來指定數據流的每一部分運行在哪一個線程。其中subscribeOn()指定了處理Observable的所有的過程(包括髮射數據和通知)的線程;observeOn()指定了觀察者的onNext(), onError()onCompleted()執行的線程。接下來咱們就分析分析源碼,看看線程調度是如何實現的。

在分析源碼前咱們先看看一段常見的經過RxJava實現的線程調度代碼:

示例C

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

一、subscribeOn()源碼分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

經過上面的代碼咱們能夠看到,subscribeOn()map()同樣是建立了一個新的被觀察者Observable。所以我大體就能猜到subscribeOn()的執行流程應該和map()差很少,OperatorSubscribeOn確定也是一個OnSubscribe。那咱們接下來就看看OperatorSubscribeOn的源碼:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;//線程調度器,用來指定訂閱事件發送、處理等所在的線程
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn實現了OnSubscribe接口,call()中對Subscriber的處理也和OperatorMapSubscriber的處理相似。首先經過scheduler構建了一個Worker;而後用傳進來的subscriber構造了一個新的Subscriber s,並將s丟到Worker.schedule()中來處理;最後用原Observable去訂閱觀察者s。而這個Worker就是線程調度的關鍵!前面的例子中咱們經過subscribeOn(Schedulers.io())指定了Observable發射處理事件以及通知觀察者的一系列操做的執行線程,正是經過這個Schedulers.io()建立了咱們前面提到的Worker。因此咱們來看看Schedulers.io()的實現。

首先經過Schedulers.io()得到了ioScheduler並返回,上面的OperatorSubscribeOn經過這個的SchedulercreateWorker()方法建立了咱們前面提到的Worker

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

接着咱們看看這個ioScheduler是怎麼來的,下面的代碼向咱們展示了是如何在Schedulers的構造函數中經過RxJavaSchedulersHook.createIoScheduler()來初始化ioScheduler的。

private Schedulers() {

    ...

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    ...
}

最終RxJavaSchedulersHook.createIoScheduler()返回了一個CachedThreadScheduler,並賦值給了ioScheduler

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    ...
    return new CachedThreadScheduler(threadFactory);
}

到這一步既然咱們知道了ioScheduler就是一個CachedThreadScheduler,那咱們就來看看它的createWorker()的實現。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

上面的代碼向咱們赤裸裸的呈現了前面OperatorSubscribeOn中的Worker其實就是EventLoopWorker。咱們重點要關注的是他的scheduleActual()

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    private final CompositeSubscription innerSubscription = new CompositeSubscription();
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    final AtomicBoolean once;

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.once = new AtomicBoolean();
        this.threadWorker = pool.get();
    }

    ...

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        ...
        ScheduledAction s = threadWorker.scheduleActual(new Action0() {
            @Override
            public void call() {
                if (isUnsubscribed()) {
                    return;
                }
                action.call();
            }
        }, delayTime, unit);
        innerSubscription.add(s);
        s.addParent(innerSubscription);
        return s;
    }
}

經過對源碼的一步步追蹤,咱們知道了前面OperatorSubscribeOn.call()中的inner.schedule()最終會執行到ThreadWorkerscheduleActual()方法。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
}

scheduleActual()中的ScheduledAction實現了Runnable接口,經過線程池executor最終實現了線程切換。上面即是subscribeOn(Schedulers.io())實現線程切換的所有過程。

二、observeOn()源碼分析

observeOn()切換線程是經過lift來實現的,相比subscribeOn()在實現原理上相對複雜些。不過本質上最終仍是建立了一個新的Observable

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OperatorObserveOn做爲OnSubscribeLift構造函數的參數用來建立了一個新的OnSubscribeLift對象,接下來咱們看看OnSubscribeLift的實現:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

OnSubscribeLift繼承自OnSubscribe,經過前面的分析咱們知道一旦調用了subscribe()將觀察者與被觀察綁定後就會觸發被觀察者所對應的OnSubscribecall()方法,因此這裏會觸發OnSubscribeLift.call()。在call()中調用了OperatorObserveOn.call()並返回了一個新的觀察者Subscriber st,接着調用了前一級Observable對應OnSubscriber.call(st)

咱們再看看OperatorObserveOn.call()的實現:

public Subscriber<? super T> call(Subscriber<? super T> child) {
    ...
    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
}

OperatorObserveOn.call()中建立了一個ObserveOnSubscriber並調用init()進行了初始化。

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

    ...

    @Override
    public void onNext(final T t) {
        ...
        schedule();
    }

    @Override
    public void onCompleted() {
        ...
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        ...
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        final NotificationLite<T> localOn = this.on;
        
        for (;;) {
            long requestAmount = requested.get();
            
            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;
                
                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                localChild.onNext(localOn.getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }
            
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
    
    ...
}

ObserveOnSubscriber繼承自Subscriber,並實現了Action0接口。咱們看到ObserveOnSubscriberonNext()onCompleted()onError()都有個schedule(),這個方法就是咱們線程調度的關鍵;經過schedule()將新觀察者ObserveOnSubscriber發送給subscriberOne的全部事件都切換到了recursiveScheduler所對應的線程,簡單的說就是把subscriberOneonNext()onCompleted()onError()方法丟到了recursiveScheduler對應的線程中來執行。

那麼schedule()又是如何作到這一點的呢?他內部調用了recursiveScheduler.schedule(this)recursiveScheduler其實就是一個Worker,和咱們在介紹subscribeOn()時提到的worker同樣,執行schedule()實際上最終是建立了一個runable,而後把這個runnable丟到了特定的線程池中去執行。在runnablerun()方法中調用了ObserveOnSubscriber.call(),看上面的代碼你們就會發如今call()方法中最終調用了subscriberOneonNext()onCompleted()onError()方法。這即是它實現線程切換的原理。

好了,咱們最後再看看示例C對應的執行流程圖,幫助你們加深理解。

RxJava執行流程

總結

這一章以執行流程操做符實現以及線程調度三個方面爲切入點剖析了RxJava源碼。下一章將站在更宏觀的角度來分析整個RxJava的框架結構、設計思想等等。敬請期待~~ :)

若是你們喜歡這一系列的文章,歡迎關注個人知乎專欄和GitHub。

相關文章
相關標籤/搜索