Android源碼系列-解密RxJava

rxjava是什麼?

ReactiveX

ReactiveX是Reactive Extensions的縮寫,通常簡寫爲Rx,最初是LINQ的一個擴展,由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源,Rx是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流,Rx庫支持.NET、JavaScript和C++,Rx近幾年愈來愈流行了,如今已經支持幾乎所有的流行編程語言了,Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET。html

rxjava

rxjava是ReactiveX在java平臺的一個實現。是一個編程模型,以觀察者模式提供鏈式的接口調用,動態控制線程的切換,使得能夠簡便的處理異步數據流。java

簡介

Github:rxjavareact

中文文檔:ReactiveX/RxJava文檔中文版android

官網:reactivexgit

特色

  • 鏈式調用,使用簡單
  • 簡化邏輯
  • 靈活的線程調度
  • 提供完善的數據操做符,功能強大

觀察者模式

觀察者模式定義對象間一種一對多的依賴關係,使得每當一個對象改變狀態,則因此依賴於它的對象都會獲得通知並被自動更新。rxjava的核心設計就是採用觀察者模式。Observable是被觀察者,Observer是觀察者,經過subscribe方法進行訂閱。github

  • 優勢

觀察者和被觀察者之間是抽象解耦,應對業務變化數據庫

加強系統靈活性、可擴展性編程

具體代碼示例可參考設計模式-觀察者模式json

  • 缺點

在應用觀察者模式時須要考慮一下開發效率和運行效率問題,程序中包括一個被觀察者、多個觀察者、開發和調試等內容會比較複雜,並且在Java中消息的通知默認是順序執行,一個觀察者卡頓,會影響總體的執行效率,在這種狀況下,通常考慮採用異步的方式設計模式

rxjava怎麼用?

gradle引入版本

implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
   implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
複製代碼

接着舉一個經常使用的rxjava使用的例子,咱們在項目常常須要請求服務端接口,而後獲取數據,將數據進行緩存,而後處理ui上的顯示。示例的代碼以下:

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> e) throws Exception {
                //獲取服務端的接口數據
                Request.Builder builder = new Request.Builder()
                        .url("http://xxx.com")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, Model>() {
            @Override
            public Model apply( Response response) throws Exception {
                //將json數據轉化爲對應的Model
                if (response.isSuccessful()) {
                    ResponseBody body = response.body();
                    if (body != null) {
                        Log.e(TAG, "map:轉換前:" + response.body());
                        return new Gson().fromJson(body.string(), Model.class);
                    }
                }
                return null;
            }
        }).doOnNext(new Consumer<Model>() {
                    @Override
                    public void accept( Model s) throws Exception {
                        //對數據進行其餘緩存的處理
                        Log.e(TAG, "doOnNext: 保存網絡加載的數據:" + s.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Model>() {
                    @Override
                    public void accept(Model model) throws Exception {
                        //刷新ui
                        Log.e(TAG, "成功刷新界面:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        //進行失敗的異常提示
                        Log.e(TAG, "失敗處理異常:" + throwable.getMessage() + "\n");
                    }
                });

複製代碼

本文主要對rxjava的源碼進行梳理分析,關於rxjava操做符的使用,推薦參考中文的文檔,以及下面的博文介紹。

這多是最好的RxJava 2.x 教程(完結版)

rxjava核心執行流程是怎樣?

rxjava主要是採用觀察者模式進行設計,當執行相關的操做符是會生成新的Observable及Observer。Observable會持有上游被觀察者,Observer會持有下游的觀察者。當執行subscribe訂閱方法的時候,經過持有上游的被觀察者對象,會往上游逐步執行訂閱方法。當執行到起始的被觀察者回調方法時,若是執行ObservableEmitter的onNext方法時,因爲Observer會持有下游的Observer對象,會逐步調用下游的onNext方法,直到最終subscribe傳入的觀察者實例。這是rxjava鏈式調用的核心執行流程。

固然rxjava還涉及到線程的調度及數據的背壓處理,關於這些實現的原理會再後續進行梳理。但rxjava的鏈式調用的核心執行流程都是一致。下面咱們將經過2個部分來梳理rxjava的核心執行流程,包含一些關鍵類的說明,及經過示例的代碼相關的執行流程圖進行梳理。

關鍵類功能說明

說明
ObservableSource 接口類,只有一個subscribe方法,參數是Observer對象
Observer 接口類,觀察者。有onSubscribe、onNext、onError、onComplete方法
Consumer 接口有,觀察者。只有一個accept方法,在被訂閱時最終也會轉換成Observer,設計這個類是爲了簡化調用
Observable 抽象類,繼承了ObservableSource接口,操做符的實現都是繼承與它。內部封裝了大量的操做符調用方法,主要是有一個核心的抽象方法abstract void subscribeActual(Observer<? super T> observer),用於實現相關的訂閱分發邏輯。
AbstractObservableWithUpstream 繼承於Observable,構造方法須要傳入ObservableSource source對象,source是父被觀察者。
ObservableCreate 繼承於AbstractObservableWithUpstream,source爲ObservableOnSubscribe。subscribeActual方法會實例化一個CreateEmitter對象,執行ObservableOnSubscribe的subscribe方法
ObservableMap 繼承於AbstractObservableWithUpstream,訂閱會新生產一個觀察者MapObserver
MapObserver ObservableMap的內部類,onNext方法會觸發mapper.apply(t)回調,而後執行下游觀察者的onNext方法
ObservableDoOnEach 繼承於AbstractObservableWithUpstream,訂閱會新生產一個觀察者DoOnEachObserver
DoOnEachObserver ObservableDoOnEach的內部類,onNext會執行onNext.accept(t)方法,而後執行下游觀察者的onNext方法
ObservableSubscribeOn 繼承於AbstractObservableWithUpstream,被觀察者線程調度控制。subscribeActual會執行scheduler.scheduleDirect(new SubscribeTask(parent)),SubscribeTask的run方法會執行source.subscribe(parent)。ObservableSubscribeOn根據線程調度器的策略去執行上游的訂閱方法實現。
ObservableObserveOn 繼承於AbstractObservableWithUpstream,觀察者線程調度控制。subscribeActual方法會判斷scheduler是否爲TrampolineScheduler。如果則執行下游的觀察者,否會建立新的ObserveOnObserver,並傳入schedule的work。
ObserveOnObserver ObservableObserveOn內部類,onNext會觸發執行schedule()方法,根據worker去控制下游觀察者的回調線程

代碼執行流程

首先咱們根據上面demo例子,梳理出rxjava的簡單執行流程,以下圖:

image

經過流程圖可知,rxjava當執行相關的操做符是會生成新的Observable及Observer。Observable會持有上游被觀察者,Observer會持有下游的觀察者。當執行subscribe訂閱方法的時候,經過持有上游的被觀察者對象,會往上游逐步執行訂閱方法。當執行到起始的被觀察者回調方法時,若是執行ObservableEmitter的onNext方法時,因爲Observer會持有下游的Observer對象,會逐步調用下游的onNext方法,直到最終subscribe傳入的觀察者實例。

瞭解了rxjava大體的執行流程,下面咱們來詳細的看看源碼的執行流程。首先仍是先上一下總體的流程圖,因爲圖片較大,建議結合上述的demo及rxjava的源碼進行查看。

image

下面咱們分配經過幾個操做符來看看rxjava源碼具體的實現。

create

create的操做符會返回一個ObservableCreate的被觀察者。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

接下來看看ObservableCreate對象的關鍵實現代碼,以下:

//構造方法會傳入ObservableOnSubscribe接口的引用,指定爲該被觀察者的source。
  public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

  //核心的subscribeActual 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      //建立了CreateEmitter發射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
          //執行了ObservableOnSubscribe的subscribe回調方法,傳入了CreateEmitter對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

當咱們在業務代碼執行了ObservableEmitter的onNext方法,咱們看一下CreateEmitter的onNext的實現代碼,以下:

//持有下游的觀察者引用
   CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //若是沒有取消訂閱,則會執行下游的觀察者的onNext方法,達到鏈式調用的效果
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
複製代碼

map

map的操做符會返回一個ObservableMap的被觀察者。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
複製代碼

接下來看看ObservableMap對象的關鍵實現代碼,以下:

@Override
    public void subscribeActual(Observer<? super U> t) {
        //將上游的被觀察者訂閱MapObserver觀察者
        source.subscribe(new MapObserver<T, U>(t, function));
    }
複製代碼

接下來主要看看MapObserver的onNext方法,該方法會在ObservableEmitter的onNext方法觸發後被調用,以下:

//持有下游的觀察者和回調函數mapper
   MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        
 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
              //map的核心執行代碼,mapper.apply(t)會執行數據的轉換,並將轉換後的結果v繼續交由下游的觀察者執行
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //將轉換後的結果v繼續交由下游的觀察者執行
            actual.onNext(v);
        }
複製代碼

doOnNext

doOnNext的操做符會返回一個ObservableDoOnEach的被觀察者。

private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
複製代碼

接下來看看ObservableDoOnEach對象的關鍵實現代碼,以下:

@Override
    public void subscribeActual(Observer<? super T> t) {
       //實例化一個DoOnEachObserver的觀察者對象
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
複製代碼

這裏核心咱們仍是要看DoOnEachObserver的onNext對於數據的處理,以下:

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            try {
                //回調accept方法
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }
            //繼續往下游調用觀察者的onNext
            actual.onNext(t);
        }
複製代碼

subscribeOn

subscribeOn的操做符會返回一個ObservableSubscribeOn的被觀察者,並傳入scheduler線程調度參數。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

接下來看看ObservableSubscribeOn對象的關鍵實現代碼,以下:

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //建立了SubscribeOnObserver的觀察者
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
        
        //這個是核心方法,調用了線程調度去的scheduleDirect方法,並傳入SubscribeTask任務
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

接下來咱們看看SubscribeTask的實現,以下:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //執行上游被觀察的訂閱方法,這裏就是subscribeOn將上游的訂閱方法控制在scheduler指定線程執行的核心
            source.subscribe(parent);
        }
    }
複製代碼

最後看下SubscribeOnObserver的onNext方法,比較簡單,直接執行下游觀察者的onNext方法,以下:

@Override
        public void onNext(T t) {
            actual.onNext(t);
        }
複製代碼

關於scheduler的具體實現,在後續的線程原理進行分析。這裏咱們只須要知道上游的被觀察者的訂閱在指定的scheduler線程策略中執行就能夠了。

observerOn

observerOn 的操做符會返回一個ObservableObserveOn的被觀察者,並傳入scheduler線程調度參數。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

接下來看看ObservableObserveOn對象的關鍵實現代碼,以下:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
       //TrampolineScheduler 若是是當前的線程 則直接將下游的觀察者與上游的被觀察訂閱
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //其餘線程策略
            Scheduler.Worker w = scheduler.createWorker();
            //將線程策略的worker傳入ObserveOnObserver觀察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

複製代碼

接下來關鍵仍是看ObserveOnObserver的實現,以下:

@Override
public void onNext(T t) {
    // 上一級的模式若是不是異步的,加入隊列
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //進行線程調度
    schedule();
}

void schedule() {
    // 判斷當前正在執行的任務數目
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}


複製代碼

ObserveOnObserver自己繼承了Runnable接口,run方法實現以下:

@Override
public void run() {
    //輸出結果是否融合
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
複製代碼

咱們先進入drainNormal方法:

void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    //第一層循環
    for (;;) {
        // 檢查異常處理
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        //第二層循環
        for (;;) {
            boolean d = done;
            T v;
            //從隊列中獲取數據
            v = q.poll();
            boolean empty = v == null;
            // 檢查異常
            if (checkTerminated(d, empty, a)) {
                return;
            }
            //若是沒有數據了,跳出
            if (empty) {
                break;
            }
            //執行下一次操做。
            a.onNext(v);
        }
        //減掉執行的次數,並獲取剩於任務數量,而後再次循環
        //直到獲取剩餘任務量爲0,跳出循環
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

複製代碼

關於scheduler的具體實現,在後續的線程原理進行分析。這裏咱們只須要知道下游的觀察者的onNext在指定的scheduler線程策略中執行就能夠了。

subscribe

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

複製代碼

最後的訂閱方法在作了非空檢查後,會調用subscribeActual方法,開始往上游逐層執行訂閱。

被觀察者Observable是如何發送數據?

經過上面的流程分析,咱們能夠知道。若是使用create建立了Observable,在ObservableOnSubscribe的subscribe方法中會經過ObservableEmitter的onNext去發送數據,onNext會觸發開始往下游觀察者傳遞數據。固然rxjava的建立型操做符還有不少,如just、from等,本質最後都是觸發下游觀察者的onNext進行數據的發送。

觀察者Observer是如何接收到數據的?

經過源碼分析,每個鏈層的Observer都會持有相鄰下游的Observer對象,當開始發送數據時,會依次鏈式執行Observer的onNext方法,最後執行到subscribe方法中建立的Observer對象。

被觀察者和觀察者之間是如何實現訂閱?

每個鏈層的Observable 都會持有相鄰上游的Observable對象,在subscribe方法開始調用後,最後會執行到subscribeActual方法,在subscribeActual方法中會將觀察者與上游的被觀察執行訂閱。

rxjava是如何進行線程的調度?

rxjava的Scheduler有不少種實現,下面咱們介紹Scheduler的相關說明,而後經過最經常使用的.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())來分析具體的線程調度流程。

Scheduler

咱們在調用subscribeOn與observeOn時,都會傳入Scheduler對象,首先咱們先看一下Scheduler的種類及其功能

Scheduler種類 說明
Schedulers.io( ) 用於IO密集型的操做,例如讀寫SD卡文件,查詢數據庫,訪問網絡等,具備線程緩存機制,在此調度器接收到任務後,先檢查線程緩存池中,是否有空閒的線程,若是有,則複用,若是沒有則建立新的線程,並加入到線程池中,若是每次都沒有空閒線程使用,能夠無上限的建立新線程
Schedulers.newThread( ) 在每執行一個任務時建立一個新的線程,不具備線程緩存機制,由於建立一個新的線程比複用一個線程更耗時耗力,雖然使用Schedulers.io( )的地方,均可以使用Schedulers.newThread( ),可是,Schedulers.newThread( )的效率沒有Schedulers.io( )高
Schedulers.computation() 用於CPU 密集型計算任務,即不會被 I/O 等操做限制性能的耗時操做,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具備固定的線程池,大小爲CPU的核數。不能夠用於I/O操做,由於I/O操做的等待時間會浪費CPU
Schedulers.trampoline() 在當前線程當即執行任務,若是當前線程有任務在執行,則會將其暫停,等插入進來的任務執行完以後,再將未完成的任務接着執行
Schedulers.single() 擁有一個線程單例,全部的任務都在這一個線程中執行,當此線程中有任務執行時,其餘任務將會按照先進先出的順序依次執行
Scheduler.from(Executor executor) 指定一個線程調度器,由此調度器來控制任務的執行策略
AndroidSchedulers.mainThread() 在Android UI線程中執行任務,爲Android開發定製

subscribeOn(Schedulers.io())

根據上面的分析,subscribeOn()方法最後會執行到subscribeActual方法,SubscribeTask上面分析了,繼承了Runnable接口, run方法最後會執行source.subscribe(parent)方法。

@Override
   public void subscribeActual(final Observer<? super T> s) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

       s.onSubscribe(parent);
       
       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }
複製代碼

這裏咱們主要要分析scheduler.scheduleDirect()方法。

@NonNull
  public Disposable scheduleDirect(@NonNull Runnable run) {
      return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
  }
  
   @NonNull
  public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    //建立一個Worker對象
      final Worker w = createWorker();


      final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
      
      //DisposeTasky也是一個包裝類 繼承了Runnable接口
      DisposeTask task = new DisposeTask(decoratedRun, w);

      //這裏是關鍵的實現,執行了worker的schedule方法
      w.schedule(task, delay, unit);

      return task;
  }
複製代碼

Worker的schedule是一個抽象的方法,Schedulers.io()對應的Worker實現爲EventLoopWorker。咱們看看EventLoopWorker的schedule實現以下:

static final class EventLoopWorker extends Scheduler.Worker {
      private final CompositeDisposable tasks;
      private final CachedWorkerPool pool;
      private final ThreadWorker threadWorker;

      final AtomicBoolean once = new AtomicBoolean();

      EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }

      @Override
      public void dispose() {
          if (once.compareAndSet(false, true)) {
              tasks.dispose();

              // releasing the pool should be the last action
              pool.release(threadWorker);
          }
      }

      @Override
      public boolean isDisposed() {
          return once.get();
      }

      @NonNull
      @Override
      public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
          if (tasks.isDisposed()) {
              // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 複製代碼

這裏會執行到 threadWorker的scheduleActual方法,繼續往下看

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
      Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

      ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

      if (parent != null) {
          if (!parent.add(sr)) {
              return sr;
          }
      }

      Future<?> f;
      try {
          if (delayTime <= 0) {
              f = executor.submit((Callable<Object>)sr);
          } else {
              f = executor.schedule((Callable<Object>)sr, delayTime, unit);
          }
          sr.setFuture(f);
      } catch (RejectedExecutionException ex) {
          if (parent != null) {
              parent.remove(sr);
          }
          RxJavaPlugins.onError(ex);
      }

      return sr;
  }
複製代碼

在這裏會使用executor最終去執行run方法。固然看到這裏有一個疑問IoScheduler在這裏是怎麼實現線程的複用呢?咱們看看threadWorker在IoScheduler中的線程的建立,以下:

EventLoopWorker(CachedWorkerPool pool) {
          this.pool = pool;
          this.tasks = new CompositeDisposable();
          this.threadWorker = pool.get();
      }
複製代碼

這裏會經過維護一個Worker的線程池來達到線程複用的效果,具體咱們看看CachedWorkerPool的get方法,以下:

ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
          }
          //從已經release的work線程隊列中獲取緩存
          while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              //若是找到,返回複用的線程
              if (threadWorker != null) {
                  return threadWorker;
              }
          }

          // 若是沒有,則會建立一個新的ThreadWorker
          ThreadWorker w = new ThreadWorker(threadFactory);
          allWorkers.add(w);
          return w;
      }
複製代碼

observeOn(AndroidSchedulers.mainThread())

@Override
   protected void subscribeActual(Observer<? super T> observer) {
       //若是指定當前線程 則不進行調度
       if (scheduler instanceof TrampolineScheduler) {
           source.subscribe(observer);
       } else {
           //建立Worker
           Scheduler.Worker w = scheduler.createWorker();
           //實例化ObserveOnObserver觀察者並傳入Worker
           source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
       }
   }
複製代碼

這裏咱們主要須要分析ObserveOnObserver對象,onNext實現以下:

@Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);
           }
           schedule();
       }
       
        void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);
           }
       }
複製代碼

關鍵仍是執行了worker的schedule,AndroidSchedulers的實現主要爲HandlerScheduler,HandlerScheduler中關於Worker的實現爲HandlerWorker,咱們看下schedule的實現以下:

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
           if (run == null) throw new NullPointerException("run == null");
           if (unit == null) throw new NullPointerException("unit == null");

           if (disposed) {
               return Disposables.disposed();
           }

           run = RxJavaPlugins.onSchedule(run);

           ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

           Message message = Message.obtain(handler, scheduled);
           message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } //經過handler發送消息執行run接口 handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } 複製代碼

關於handler的實例,咱們看AndroidSchedulers中的建立以下:

private static final class MainHolder {
       static final Scheduler DEFAULT
           = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
   }
複製代碼

綜上可知AndroidSchedulers.mainThread()是經過消息將run方法的實現交由主線程Looper進行處理,達到將觀察者的數據處理在主線程中執行的效果

rxjava背壓策略實現原理是怎樣的?

背壓(backpressure)

當上下游在不一樣的線程中,經過Observable發射,處理,響應數據流時,若是上游發射數據的速度快於下游接收處理數據的速度,這樣對於那些沒來及處理的數據就會形成積壓,這些數據既不會丟失,也不會被垃圾回收機制回收,而是存放在一個異步緩存池中,若是緩存池中的數據一直得不處處理,越積越多,最後就會形成內存溢出,這即是響應式編程中的背壓(backpressure)問題。

背壓處理機制

rxjava2.x使用Flowable來支持背壓的機制,調用create方法時須要傳入BackpressureStrategy策略。

Strategy 做用
MISSING 此策略表示,經過Create方法建立的Flowable沒有指定背壓策略,不會對經過OnNext發射的數據作緩存或丟棄處理,須要下游經過背壓操做符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背壓策略
ERROR 在此策略下,若是放入Flowable的異步緩存池中的數據超限了,則會拋出MissingBackpressureException異常
BUFFER 此策略下,Flowable的異步緩存池同Observable的同樣,沒有固定大小,能夠無限制向裏添加數據,不會拋出MissingBackpressureException異常,但會致使OOM
DROP 在此策略下,若是Flowable的異步緩存池滿了,會丟掉上游發送的數據
LATEST 與Drop策略同樣,若是緩存池滿了,會丟掉將要放入緩存池中的數據,不一樣的是,無論緩存池的狀態如何,LATEST都會將最後一條數據強行放入緩存池中

實現原理

首先看看Flowable的create實現

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
複製代碼

這裏會建立一個FlowableCreate對象,並傳入指定的BackpressureStrategy策略。接着看看FlowableCreate的訂閱方法

@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根據不一樣的策略初始化不一樣的數據發射器
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
複製代碼

BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }
}
    
    //這裏須要注意的是,Request最終會把n負責給AtomicLong
   @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
//省略其餘若干方法
複製代碼

經過上面的結束咱們知道Flowable有一個緩衝池,那個這個大小是多少,在哪裏進行復制給發射器呢?

//長度是128
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    
   public static int bufferSize() {
        return BUFFER_SIZE;
    }
    
    //在調用observeOn時,會將長度最後傳給emitter發射器,具體能夠打斷的追蹤查看調用鏈
    public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
複製代碼

MissingEmitter

不會對經過OnNext發射的數據作緩存或丟棄處理

@Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }
複製代碼

NoOverflowBaseAsyncEmitter

DropAsyncEmitter和ErrorAsyncEmitter繼承了NoOverflowBaseAsyncEmitter

@Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //若是數量不爲0則減1,經過上面的Request,能夠知道get()爲Flowable的BUFFER_SIZE 128
            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                //超出閾值 執行onOverflow
                onOverflow();
            }
        }
複製代碼

DropAsyncEmitter

若是Flowable的異步緩存池滿了,會丟掉上游發送的數據

static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 8360058422307496563L;

        DropAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            // nothing to do
        }

    }

複製代碼

ErrorAsyncEmitter

若是Flowable的異步緩存池滿了,會拋出異常

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {


        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }
複製代碼

BufferAsyncEmitter

Flowable的異步緩存池同Observable的同樣,沒有固定大小,能夠無限制向裏添加數據

@Override
        public void onNext(T t) {
            if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //加入隊列 queue爲SpscLinkedArrayQueue隊列
            queue.offer(t);
            //通知消費
            drain();
        }
複製代碼

LatestAsyncEmitter

Flowable的異步緩存池同Observable的同樣,沒有固定大小,能夠無限制向裏添加數據

@Override
        public void onNext(T t) {
           if (done || isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //進行覆蓋 queue爲AtomicReference
            queue.set(t);
             //通知消費
            drain();
        }
複製代碼

總結

思考

本文主要對rxjava的鏈式執行流程、線程調度以及背壓機制進行梳理分析。rxjava的庫還有很是多的操做符及功能,但願後續有時間繼續進行分析。rxjava的源碼及一些概念命名仍是相對比較複雜,前先後後大約花了2周的時間進行源碼的學習,堅持下來了,仍是收穫滿滿。

參考資料

這多是最好的RxJava 2.x 教程(完結版)

ReactiveX中文文檔

Rxjava2入門教程五:Flowable背壓支持——對Flowable最全面而詳細的講解

RxJava2 源碼解析——線程調度 Scheduler

推薦

Android源碼系列-解密OkHttp

Android源碼系列-解密Retrofit

Android源碼系列-解密Glide

Android源碼系列-解密EventBus

Android源碼系列-解密RxJava

Android源碼系列-解密LeakCanary

Android源碼系列-解密BlockCanary

關於

歡迎關注個人我的公衆號

微信搜索:一碼一浮生,或者搜索公衆號ID:life2code

image

相關文章
相關標籤/搜索