Android進階知識:RxJava相關

1. 前言

RxJava核心功能是一個用來完成異步操做的庫,相對於其它異步操做的方法,RxJavaAPI使用更加的簡潔。而且RxJava中還提供了不少功能強大的操做符,幫助咱們解決不少本來複雜繁瑣的代碼邏輯,提升了代碼質量。RxJava的實現是基於觀察者模式,觀察者模式中如下有三個比較重要的概念:git

  1. 被觀察者(Observable)
  2. 觀察者(Observer)
  3. 訂閱(subscribe)

被觀察者是事件的發起者,被觀察者與觀察者創建訂閱關係後,被觀察者發送事件,觀察者才能接收到事件。github

2. 基礎使用

RxJava的基礎使用也很簡單,分爲三個步驟,分別是建立被觀察者,建立觀察者和創建訂閱關係,具體代碼以下。bash

// 1. 建立被觀察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        // 2. 建立觀察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);

            }

            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");

            }

            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName());
        // 3. 創建訂閱關係
        observable.subscribe(observer);
複製代碼

運行日誌: markdown

3. 訂閱源碼流程

本文中全部源碼基於RxJava22.2.11版本。首先來看看這個基本的訂閱流程源碼是怎麼實現的。app

3.1 建立被觀察者

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼

使用RxJava能夠經過Observablecreate方法建立一個被觀察者對象。create方法從參數中傳入一個ObservableOnSubscribe類型的source,而後方法中先校驗了source是否爲空,接着將傳入的source封裝成一個ObservableCreate對象,而後調用了RxJavaPlugins.onAssembly方法返回建立的好的Observable。接着進入onAssembly方法查看。異步

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
複製代碼

onAssembly方法中首先是一個Hook實現,這裏能夠理解爲一個代理。能夠看到這裏先判斷onObservableAssembly是否爲空,爲空就直接返回傳入的source,不然再調用apply方法。這裏能夠繼續跟蹤一下onObservableAssemblyasync

@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

/**
  * Sets the specific hook function.
  * @param onObservableAssembly the hook function to set, null allowed
  */
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
複製代碼

它是RxJavaPlugins中的成員變量,默認爲空,而且提供了一個set方法來設置它。由於默認爲空,因此默認返回的就是傳入的source。這裏的代理默認是不會對Observable作什麼操做,若是須要有特殊的需求能夠調用set方法實現本身的代理。而默認返回的source類型爲ObservableCreate對象也實現了Observable接口。ide

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ......
}
複製代碼

3.2 建立觀察者

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();
}
複製代碼

觀察者Observer是一個接口,其中提供了一些方法,使用時建立接口的實現,並根據需求在方法中作本身的實現。函數

3.3 創建訂閱關係

創建訂閱關係調用了Observablesubscribe方法。oop

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
           ......
        } catch (Throwable e) {
           ......
        }
    }
複製代碼

方法中仍是先判斷了傳入參數observer是否爲空,接着仍是一個Hook實現,這裏就不細究了,得到Hook返回的observer後再次判斷是否爲空,以後調用了subscribeActual方法。

protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼

ObservablesubscribeActual方法是個抽象方法,以前看過這裏的Observable實際實現是個ObservableCreate對象,因此再進入ObservableCreate類查看對應方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
複製代碼

ObservableCreate中的subscribeActual方法中先建立了一個CreateEmitter發射器對象,並將observer對象傳入。接着調用了observeronSubscribe方法,此時觀察者的onSubscribe方法執行。最後調用了sourcesubscribe方法。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
複製代碼

這個source就是在create方法中傳入的ObservableOnSubscribe。它的subscribe方法中經過調用ObservableEmitter的方法發送事件,這裏的ObservableEmitter就是以前建立的CreateEmitter對象,因此再來進一步看看它其中的方法。

CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
複製代碼

CreateEmitter的構造函數接收了觀察者對象,而後在調用onNext方法時先作了空判斷,再對isDisposed進行取消訂閱的判斷,以後調用了observeronNext方法,也就是觀察者的onNext方法。一樣的onComplete中最終也是調用了observeronComplete方法。至此RxJava中的基本訂閱流程的源碼就梳理完了。

4. 線程切換

RxJava中有個很重要的功能,就是能方便的切換線程,來看下它的使用,仍是以前基礎使用中的例子進行修改。

Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("string1");
                emitter.onNext("string2");
                emitter.onNext("string3");
                emitter.onComplete();
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }
            @Override
            public void onNext(String s) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
        Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread());
        Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
        observable2.subscribe(observer);
複製代碼

被觀察者和觀察者的建立和以前同樣,在創建訂閱關係時調用subscribeOnobserveOn方法進行線程的切換。這裏每一個方法返回的都是Observable類型,因此能夠採用鏈式調用,這也是RxJava的一個特色,可是這裏沒有采用這種寫法,而是將其拆分開來寫而且日誌打印出每一個Observable的具體類型,這是爲了方便以後源碼理解。 運行結果日誌:

4.1 subscribeOn

Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
observable1.subscribe(observer);
複製代碼

運行結果:

先只調用 subscribeOn方法運行查看結果,發現不只被觀察者發射事件運行在了子線程,觀察者接收事件也運行在子線程,那麼進入 subscribeOn方法查看它的實現。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

能夠看到subscribeOn方法和subscribe方法有些相似。首先是判斷傳入的scheduler是否爲空,而後一樣調用RxJavaPlugins.onAssembly方法,此次構建了一個ObservableSubscribeOn對象返回。而subscribeOn方法以後仍是調用了subscribe方法,根據以前的分析,subscribe方法最終會調用到subscribeActual方法,不過此時的subscribeActual方法再也不是ObservableCreate中的而是ObservableSubscribeOn中的subscribeActual方法。

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

ObservableSubscribeOnsubscribeActual方法中流程和以前的也很相似,此次是先建立了一個SubscribeOnObserver對象,將觀察者對象傳入,接着一樣先調用了observer.onSubscribe方法,而後將傳入的SubscribeOnObserver封裝入了一個SubscribeTask對象中,接着調用了scheduler.scheduleDirect方法再將返回結果獲得的Disposable設置到SubscribeOnObserver中。下面一個方法一個方法看。首先是建立SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
複製代碼

SubscribeTaskObservableSubscribeOn的內部類,其實現很簡單就是實現了一個Runnable接口,構造方法中傳入了SubscribeOnObserver對象,在其run方法中調用了ObservableSubscribeOn中的成員變量sourcesubscribe方法。這個source是在建立ObservableSubscribeOn時傳入的,根據前面的代碼能夠找到是在subscribeOn方法中建立的對象而且這個source對應傳入的是當前這個Observable對象即經過Observable.create得到的被觀察者對象,其實現以前看過是一個ObservableCreate因此這裏就和以前同樣又會走到了其父類Observablesubscribe方法中,繼而調用ObservableCreatesubscribeActual方法,以後最終會調用到觀察者的對應onNext等方法,不過此時的觀察者不直接是在使用時建立傳入的Observer,而是以前看到的SubscribeOnObserver類型,不過其中的onNext等方法仍是調用了在使用時建立傳入的Observer的對應方法。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
        }
        ......
    }
複製代碼

下面接着看到scheduleDirect這個方法,在建立好SubscribeTask以後調用了scheduleDirect方法。這裏的scheduler就是subscribeOn中傳入的,對應開始例子中的Schedulers.newThread

public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
// 靜態成員變量NEW_THREAD
static final Scheduler NEW_THREAD;

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
複製代碼

進入Schedulers.newThread一步步跟蹤,看到newThread方法返回靜態成員變量中的NEW_THREAD,而NEW_THREAD又是經過NewThreadTask建立。

static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final Scheduler DEFAULT = new NewThreadScheduler();
複製代碼

繼續跟蹤查看發現NewThreadTask實際是實現了Callable接口,其call方法中返回了靜態內部類中的NewThreadHolder.DEFAULT。這個DEFAULT的實現類型爲NewThreadScheduler。至此終於找到了咱們傳入的Scheduler的真正實現類。因而繼續看其scheduleDirect方法。

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
複製代碼

scheduleDirect方法是在其父類中實現的,看到其中進而調用了同名重載方法,方法中首先是調用createWorker方法建立一個Worker。這個方法的實現就是在NewThreadScheduler中了。

public Worker createWorker() {
    return new NewThreadWorker(threadFactory);
}
複製代碼

createWorker方法中只作了一件事就是建立返回了一個NewThreadWorker

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ......
}
複製代碼

NewThreadWorker中看到建立了一個線程池,再回到scheduleDirect方法,建立完Worker後將傳入的RunnableSubscribeTask進行一個裝飾獲得新的Runnable對象。接着將Worker和新的Runnable封裝到一個DisposeTask對象中。

static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
        @NonNull
        final Runnable decoratedRun;
        @NonNull
        final Worker w;
        @Nullable
        Thread runner;
        DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }
        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }
    ......
}
複製代碼

DisposeTask一樣實現了Runnable接口,在run方法中調用了從構造傳入的decoratedRunrun方法執行任務。回到最後一步,調用Workerschedule方法,這裏就對應的NewThreadWorkerschedule方法。

public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
複製代碼

schedule方法中又進一步調用了其scheduleActual方法。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
複製代碼

scheduleActual方法裏看到又將decoratedRunDisposableContainer封裝成ScheduledRunnable最後將這個ScheduledRunnable 交給構造函數中建立的線程池去運行,最終就會執行到前面看過的SubscribeTask中的run方法完成訂閱邏輯,調用觀察者的onNext等方法。到這裏就看出最終的source.subscribe是會經過線程池切換到子線程中去執行了。

經過查看subscribeOn方法源碼能夠發現,方法裏其實是在前一個建立的ObservableCreate外面包了一層,把它包成一個ObservableSubscribeOn對象,一樣的原先的Observer也被包了一層包成一個SubscribeOnObserver對象,而線程切換的工做是由Scheduler完成的。

4.2 observeOn

接着再來看看切換回主線程的方法observeOn,仍是先修改使用代碼,查看運行日誌。

Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
observable2.subscribe(observer);
複製代碼

運行日誌:

接着仍是進入來看源碼。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼

這裏看到observeOn方法裏調用了重載方法,方法中仍是同一個套路,不過這裏建立的又是另外一個對象ObservableObserveOn了。根據前面的經驗這裏就又是將前一個Observable傳遞到ObservableObserveOn中的成員變量source上,這裏看到就是構造函數中的第一個參數。接着仍是會調用subscribe與觀察者創建訂閱關係進而會執行到ObservableObserveOn對象的subscribeActual方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
複製代碼

subscribeActual方法中判斷了scheduler的類型,這裏的scheduler就是由AndroidSchedulers.mainThread()傳入的,因而先來看一下這個方法。

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
    @Override public Scheduler call() throws Exception {
        return MainHolder.DEFAULT;
    }
});

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
複製代碼

mainThread開始看,發現代碼調用邏輯和以前的Schedulers.newThread方法相似,最終會返回一個HandlerScheduler而這個Scheduler中的Handler則是主線程的Handler,看到這裏就能猜測到了,後面觀察者的對應方法必定是由這個Handler來切換到主線程執行的。回到subscribeActual方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
複製代碼

這裏判斷完類型會走else中的方法首先仍是會調用HandlerSchedulercreateWorker方法建立一個Worker

@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
複製代碼

這裏是個HandlerWorker其中具體方法後面再看。接着上面建立完Worker後一樣仍是同樣調用source.subscribe建立了一個ObserveOnObserver對象傳入。這裏的source就仍是以前的ObservableCreate,因此這裏仍是會調用ObservableCreate中的subscribeActual方法。

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

ObservableCreate中的subscribeActual方法中的邏輯以前看過,不過此時傳入的observer仍然再也不是在使用時建立的觀察者對象了,而是傳過來的ObserveOnObserver對象,此時建立的CreateEmitter中的observer也就是這個ObserveOnObserver對象。和以前邏輯同樣,接着就會調用observeronNext等方法,此時調用的便是ObserveOnObserver中的onNext等方法。因此進入ObserveOnObserver查看。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
        schedule();
}

@Override
public void onComplete() {
    if (done) {
        return;
    }
    done = true;
    schedule();
}

void schedule() {
     if (getAndIncrement() == 0) {
         worker.schedule(this);
     }
}
複製代碼

查看ObserveOnObserver中的代碼會發現onNext方法中先將傳入的參數放入了一個隊列,而後不管是onNext仍是onComplete方法最後都調用了schedule方法,進而再進入查看,發現schedule方法中又調用了worker.schedule方法。這裏的worker就是以前建立的HandlerWorker,這時再來看它的schedule方法。

public Disposable schedule(@NonNull Runnable run) {
   return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼

單個參數schedule方法是在其父類中的,而這個方法中又調用另外一個三個參數的schedule方法,這個方法父類中是抽象方法因此實現就在子類HandlerWorker裏了。

@Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
	    // 判斷是否取消訂閱
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
	    // 建立ScheduledRunnable
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
	    // 建立消息,並將主線程Handler和ScheduledRunnable
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
	    // 判斷設置異步消息
            if (async) {
                message.setAsynchronous(true);
            }
	    // 發送消息執行callback
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // 檢查是否取消訂閱
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }
複製代碼

在子類的這個方法裏在作了取消訂閱的判斷後將方法傳入的RunnableHandler又封裝到一個ScheduledRunnable對象中。接着建立了一個Message並將ScheduledRunnable放入Message,最後調用handler.sendMessageDelayed方法經過這個主線程的Handler執行這個ScheduledRunnable

最後來追溯下ScheduledRunnable到底執行了什麼,不過猜也知道最後必定調用到觀察者中的對應方法。

private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }
    ......
    }
複製代碼

ScheduledRunnable中的run方法很簡單就是調用了構造中傳入的Runnablerun方法。而根據以前看過得建立ScheduledRunnable時傳入的Runnable又是從scheduleDirect方法中傳入的,而scheduleDirect方法中的Runnable又是從worker.schedule(this)方法時傳入的,根據上下文代碼發現這個this指代的是ObserveOnObserver對象,因而進一步進入它的run方法查看。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ......
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        ......
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    ......    
    }
複製代碼

能夠看到run方法中判斷了outputFused的真假,而後分別調用了drainFuseddrainNormal方法。這裏的outputFused是與RxJava2中的背壓處理相關暫時先無論,根據方法名也能知道正常調用會執行drainNormal方法,因而直接來看drainNormal方法。

void drainNormal() {
            int missed = 1;
            // 存放onNext傳入的事件對象隊列
            final SimpleQueue<T> q = queue;
            // 傳入的觀察者對象
            final Observer<? super T> a = downstream;
            // 循環check事件是否完成或者發生錯誤
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        // 從隊列中取出發送事件傳入的對象
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    // 再次判斷是否完成或者發生錯誤
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    // 判斷隊列中取出的發送事件傳入的對象v是否爲空
                    if (empty) {
                        break;
                    }
                    // 執行觀察者對象的onNext方法
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
複製代碼

drainNormal方法中先經過checkTerminated方法校驗發送事件是否完成或者發生異常,接着從隊列中取出事件對象,再次判斷是否完成或者發生錯誤和取出的對象是否爲空,沒有問題的話就會執行觀察者的onNext方法。而發送完成和出現異常的方法則是在checkTerminated方法處理。

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
複製代碼

checkTerminated方法里根據delayError判斷是否設置了超時的錯誤,接着再根據得到的錯誤e是否爲空再決定調用的是觀察者的onError()方法仍是onComplete方法。至此observeOn切換線程的流程也梳理結束。

5. map操做符

RxJava中有不少功能強大的操做符,經過使用這些操做符,能夠很容易的解決代碼編寫時遇到的一些複雜繁瑣的問題。這裏就用map操做符來做爲一個例子,來看看操做符是怎樣工做的。首先仍是來了解map操做符的使用方法和做用。

final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
                emitter.onNext("5");
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
            }

            @Override
            public void onNext(Integer i) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
            }
            @Override
            public void onComplete() {
                Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
            }
        };
        Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        });
        Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName());
        mapObservable.subscribe(observer);
複製代碼

運行日誌:

map操做符做用是能夠將被觀察者發送事件的數據類型轉換成其餘的數據類型。它的使用方法很簡單,例如上面這個例子就將一開始發送的String類型轉換成觀察者接收到的Integer類型。下面開始看map方法的源碼。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼

看到map方法中依舊仍是一樣的套路,經過RxJavaPlugins.onAssembly方法返回一個被觀察者對象,只不過此次構建傳入的類型又是另外一個ObservableMap類型的對象。訂閱的流程前面已經看過了,這裏和以前的同樣最終會走到ObservableMapsubscribeActual方法,因此直接來看這個方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼

ObservableMapsubscribeActual方法裏看到很熟悉仍是會調用source.subscribe方法,只是這裏傳入的Observer對象是一個MapObserver對象。接下來的邏輯又和以前同樣,根據以前的經驗source.subscribe方法最終會調用ObserveronNext方法,因此接下來直接來看MapObserveronNext方法。

public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
複製代碼

MapObserveronNext方法裏的邏輯很簡單,在作了一些的判斷後調用mapper.apply(t)方法得到類型轉換後的事件傳遞對象,最後就會調用觀察者的downstream.onNext方法,這裏的downstream就是訂閱方法傳入的觀察者對象。跟蹤mapper能夠找到,它是從MapObserver構造時傳入的一個Function類型,便是在使用map操做符時傳入的那個Function對象,又由於在使用時實現了Functionapply方法完成了數據的類型轉換邏輯,因此這裏調用mapper.apply(t)方法就能夠得到到轉換後的數據。

6. 總結

以上就是關於RxJava源碼工做流程的相關總結,總而言之,觀察者模式仍是其核心設計思想。除此以外,經過源碼閱讀還發現,不管在線程切換方面仍是其它功能的操做符的實現,根本上來講都是在其原有的被觀察者或觀察者基礎上包裝成一個新的對象,功能邏輯由新對象中的方法來實現完成。

相關文章
相關標籤/搜索