深刻RxJava2 源碼解析(一)

本文做者JasonChen,原文地址: chblog.me/2018/12/19/…html

ReactiveX 響應式編程庫,這是一個程序庫,經過使用可觀察的事件序列來構成異步事件驅動的程序。java

其簡化了異步多線程編程,在之前多線程編程的世界中,鎖、可重入鎖、同步隊列器、信號量、併發同步器、同步計數器、並行框架等都是具備必定的使用門檻,稍有不慎或者使用不成熟或對其源碼理解不深刻都會形成相應的程序錯誤和程序性能的低下。android

觀察者模型

24種設計模式的一種,觀察者Observer和主題Subject之間創建組合關係:Subject類實例中包含觀察者Observer的引用,增長引用的目的就是爲了通知notify,重要點就是要在Subject的notify功能中調用Observer的接受處理函數receiveAndHandle。git

我的理解:觀察者模型實際上是一種異步回調通知,將數據的處理者先註冊到數據的輸入者那邊,這樣經過數據輸入者執行某個函數去調用數據處理者的某個處理方法。github

RxJava2

Rx有不少語言的實現庫,目前比較出名的就是RxJava2。本文主要講Rxjava2的部分源碼解讀,內部設計機制和內部執行的線程模型。編程

RxJava是近兩年來愈來愈流行的一個異步開發框架,其使用起來十分簡單方便,功能一應俱全,十分強大。設計模式

基本使用

使用RxJava2大體分爲四個操做:緩存

  1. 創建數據發佈者
  2. 添加數據變換函數
  3. 設置數據發佈線程池機制,訂閱線程池機制
  4. 添加數據訂閱者
// 建立flowable
Flowable<Map<String, Map<String,Object>>> esFlowable = Flowable.create(new ElasticSearchAdapter(), BackpressureStrategy.BUFFER);
Disposable disposeable = esFlowable
    // map操做 1.採集、2.清洗
    .map(DataProcess::dataProcess)
    .subscribeOn(Schedulers.single())
    //計算任務調度器
    .observeOn(Schedulers.computation())
    // 訂閱者 consumer 執行運算
    .subscribe(keyMaps -> new PredictEntranceForkJoin().predictLogic(keyMaps));
複製代碼

以上就是一個實際的例子,裏面的ElasticSearchAdapter實際隱藏了一個用戶自定義實現數據生產的subscribe接口:bash

FlowableOnSubscribe<T> source
複製代碼

用戶須要實現這個接口函數:微信

void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception
複製代碼

這個接口主要用於內部回調,後面會有具體分析, emitter 英文翻譯發射器,很形象,數據就是由它產生的,也是業務系統須要對接的地方,通常業務代碼實現這個接口類而後發射出須要處理的原始數據。

map函數做爲數據變換處理的功能函數將原來的數據輸入變換爲另外的數據集合,而後設置發佈的線程池機制subscribeOn(Schedulers.single()),訂閱的線程池機制observeOn(Schedulers.computation()),最後添加數據訂閱函數,也就是業務系統須要實現另一個地方,從而實現數據的自定義處理消費。

rxjava2支持的lambda語法

  • 建立操做符:just fromArray empty error never fromIterable timer interval intervalRange range/rangeLong defer
  • 變換操做符:map flatMap flatmapIterable concatMap switchmap cast scan buffer toList groupBy toMap
  • 過濾操做符:filter take takeLast firstElement/lastElement first/last firstOrError/lastOrError elementAt/elementAtOrError ofType skip/skipLast ignoreElements distinct/distinctUntilChanged timeout throttleFirst throttleLast/sample throttleWithTimeout/debounce
  • 合併聚合操做符:startWith/startWithArray concat/concatArray merge/mergeArray concatDelayError/mergeDelayError zip combineLatest combineLatestDelayError reduce count collect
  • 條件操做符:all ambArray contains any isEmpty defaultIfEmpty switchIfEmpty sequenceEqual takeUntil takeWhile skipUntil skipWhile

有一篇博客詳細介紹了rxjava的各類操做符,連接maxwell-nc.github.io/android/rxj…

RxJava2 源碼解析

閱讀源碼我的比較喜歡帶着疑惑去看,這樣與目標有方向。接下來的分析以Flowable爲例,這裏全部的例子都是按照Flowable爲例,由於Flowable在實際項目中比Observable可能用的多,由於實際場景中數據生產速度和數據消費速度都會有必定的不一致甚至數據生產速度遠大於數據消費速度。

數據發佈和訂閱

首先從數據訂閱者開始,點進源碼看進一步解析,裏面有不少subscribe重載接口:

public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING,
        Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
  }
  public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    //組裝成FlowableSubscriber
    LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
    //調用核心的訂閱方法
    subscribe(ls);

    return ls;
  }
  public final void subscribe(FlowableSubscriber<? super T> s) {
      ObjectHelper.requireNonNull(s, "s is null");
      try {
          //註冊一些鉤子這裏對此不進行講解,主要不是核心方法
          Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
          ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
          //核心訂閱方法,從名字也能讀出是指訂閱實際調用處
          //不一樣的數據產生類也就是實現Flowable抽象類的類
          //好比FlowableCreate,FlowSingle,FlowMap等等去實現本身的實際方法
          subscribeActual(z);
      } catch (NullPointerException e) { // NOPMD
          throw e;
      } catch (Throwable e) {
          Exceptions.throwIfFatal(e);
          // can't call onError because no way to know if a Subscription has been set or not
          // can't call onSubscribe because the call might have set a Subscription already
          RxJavaPlugins.onError(e);

          NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
          npe.initCause(e);
          throw npe;
      }
  }
複製代碼

下面選擇FlowCreate的subscribeActual(Subscriber<? super T> t)方法進行剖析。

public void subscribeActual(Subscriber<? super T> t) {
      BaseEmitter<T> emitter;
      //根據不一樣的回壓模式選擇不同的數據發射類
      //神奇的回壓模式其實本質上就是一個個數據發射-消費模式
      switch (backpressure) {
      case MISSING: {
          emitter = new MissingEmitter<T>(t);
          break;
      }
      //...
      default: {
          emitter = new BufferAsyncEmitter<T>(t, bufferSize());
          break;
      }
      }
      //回調註冊的FlowableSubscriber的onSubscribe方法
      //這裏很是重要,由於這裏涉及了rxjava特有的 request請求再消費數據的模式
      //也就是說若是沒有request數據,那麼就不會調用數據發射(發佈)者的onNext方法,
      //那麼數據訂閱者也就不會消費到數據
      t.onSubscribe(emitter);
      try {
          //回調註冊的FlowableOnSubscribe<T> source的subscribe方法
          //這個source其實就是在建立Flow流時註冊的數據產生類,進一步驗證了上文中
          //說起的其須要實現FlowableOnSubscribe<T>接口
          source.subscribe(emitter);
      } catch (Throwable ex) {
          Exceptions.throwIfFatal(ex);
          emitter.onError(ex);
      }
  }
  //重點分析BufferAsyncEmitter這個類,看字面意思這是一個switch的默認選擇類,
  //但其實它是回壓策略爲BUFFER時的數據發射類
  //首先這個類的構造函數具備兩個參數,很明顯這是 actul就是前面的t這個變量,也就是
  //註冊的數據消費(訂閱)者,capacityHint則是設置容量大小的,默認是128,若是須要擴大須要
  //自行設置環境變量 rx2.buffer-size
  BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
      super(actual);
      this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
      this.wip = new AtomicInteger();
  }

  public void onNext(T t) {
     if (done || isCancelled()) {
         return;
     }

     if (t == null) {
         onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
         return;
     }
     // queue 是存儲元素的隊列,也就是buffer的核心存儲。
     // 當咱們開始向下遊發送數據的時候首先存入隊列,而後下面的drain則是進行核心的
     queue.offer(t);
     drain();
  }
  //核心的類
  void drain() {
    //關鍵的地方 解決生產速率和消費速率不一致的關鍵地方,也是咱們寫併發程序值得借鑑的地方。
    //當數據的產生者(發佈)頻繁調用onNext方法時,這裏產生併發調用關係,wip變量是atomic變量,
    //當第一次執行drain函數時,爲0繼續執行後面的流程,當快速的繼續調用onNext方法時,wip不爲0而後返回
    //那麼後面的流程咱們其實已經很大機率會猜想到應該是去取隊列的數據而後作一些操做
   if (wip.getAndIncrement() != 0) {
       return;
   }

   int missed = 1;
   //這裏的downstream其實就是註冊的數據訂閱者,它是基類BaseEmitter的變量,前面初始化時調用了基類的構造函數
   final Subscriber<? super T> a = downstream;
   final SpscLinkedArrayQueue<T> q = queue;

   for (;;) {
       long r = get();
       long e = 0L;

       while (e != r) {
           if (isCancelled()) {
               q.clear();
               return;
           }
           boolean d = done;
           //取隊列中的數據
           T o = q.poll();

           boolean empty = o == null;

           if (d && empty) {
               Throwable ex = error;
               if (ex != null) {
                   error(ex);
               } else {
                   complete();
               }
               return;
           }

           if (empty) {
               break;
           }
           //此處回調訂閱者的onNext方法去真正的執行數據實例程序
           //到此數據從產生到消費其生命週期已經走完
           a.onNext(o);
           e++;
       }

       if (e == r) {
           if (isCancelled()) {
               q.clear();
               return;
           }
           boolean d = done;
           boolean empty = q.isEmpty();
           if (d && empty) {
               Throwable ex = error;
               if (ex != null) {
                   error(ex);
               } else {
                   complete();
               }
               return;
           }
       }
       if (e != 0) {
          //標記已經消費的個數
           BackpressureHelper.produced(this, e);
       }
       //前面說過wip會原子性的增長,並且是每調用一次onNext增長一次
       //missed從其名解釋是指錯過的意思,我的理解是錯過消費的數據個數,錯過消費
       //的意思其實就是指沒有進行a.onNext數據消費處理的數據
       missed = wip.addAndGet(-missed);
       if (missed == 0) {
          //若是沒有錯過的數據也就是所有都消費完那就跳出for循環
          //此處for循環方式和JUC源碼中Doug Lea的作法都有相似之處
           break;
          }
      }
  }
複製代碼

操做符與線程池機制原理剖析

首先在進行源碼分析以前講述一下一種模式:裝飾者模式 24種模式中的一種,在java io源碼包中普遍應用 簡單的來講是與被裝飾者具備相同接口父類同時又對被裝飾者進行一層封裝(持有被裝飾者的引用),以此用來加上自身的特性。

迴歸主題,當咱們使用操做符和線程池機制的時候作法都是在數據發佈者後面進行相應的函數操做:

Disposable disposeable = scheduleObservable
            .map(aLong -> dataAdapter.handlerDpti())
            .map(DataProcess::dataProcess)
            .subscribeOn(Schedulers.single())
複製代碼

那麼爲什麼這麼作,接下來咱們進行源碼分析:

  1. subscribeOn map 方法都在Flowable類中:
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
    }
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
複製代碼

這裏是實例方法調用,傳進了this對象這個很關鍵,這裏其實就是咱們前面提到的裝修者模式,持有上游對象也就是數據源source的引用。

以FlowableSubscribeOn爲例進行分析,這個類常常會用到,由於其內部設置了線程池的機制因此在實際使用項目中會大量使用,那麼是如何作到線程池方式的呢?進一步利用源碼進行分析。

2.裝飾者的內部代碼分析

以subscribeOn 爲例:

//很明顯 實現的抽象類實際上是裝修者抽象類
  public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> // 這個在前面咱們重點分析過這是實際訂閱執行的類方法,其實也就是咱們說的裝飾方法,裏面實現了每一個類本身的特定「裝修」方法 @Override public void subscribeActual(final Subscriber<? super T> s) {
      // 獲取訂閱者,下一篇文章會重點講述rxjava的線程池分配機制
      Scheduler.Worker w = scheduler.createWorker();
      final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
      // 跟前面同樣調用數據訂閱者的onSubscribe方法
      s.onSubscribe(sos);
      // 由分配的調度者進行訂閱任務的執行
      w.schedule(sos);
  }

  // 開始分析SubscribeOnSubscriber這個靜態內部類的內部代碼
  // 實現了Runable用來異步執行
  static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable // 下游訂閱引用 final Subscriber<? super T> downstream;
    // 上游發射類引用
    final AtomicReference<Subscription> upstream;
    // 上游數據源引用 跟上游引用有區別,簡單的說每一個上游數據源引用有本身的上游發射類
    Publisher<T> source;
  // 這裏是裝飾的核心代碼
  @Override
  public void run() {
      lazySet(Thread.currentThread());
      // source即爲上游,表示其所裝飾的源
      Publisher<T> src = source;
      source = null;
      // 調用上游的自身的subscribe方法,在上面一開始咱們說這個方法內部會去調用自身實現的subscribeActual方法
      // 從而實現上游本身的特定方法,好比假設source是FlowCreate那麼此處就會調用前面一開始咱們所講到的數據的發射
      src.subscribe(this);
  }

  // 既然已經保證了數據的發射那麼數據的處理是否是也要處理
  // 很明顯這是調用了下游訂閱者的onNext方法
  @Override
  public void onNext(T t) {
      downstream.onNext(t);
  }
複製代碼

本文總結

筆者喜歡總結,總結意味着咱們反思和學習前面的知識點,應用點以及自身的不足。

  • 設計模式:觀察者模式和裝修者模式
  • 併發處理技巧:回壓策略(其實本質是緩存)的實現原理以及細節點

訂閱最新文章,歡迎關注個人公衆號

微信公衆號
相關文章
相關標籤/搜索