Rxjava源碼閱讀指南

版權聲明:轉載請務必註明做者與原文連接html

引言

本文不對Rxjava的基本使用進行講解,僅對源碼作分析,若是你對Rxjava的基本使用還有不清楚的,建議學習官方文檔以後再閱讀本文java

ReactiveX文檔中文翻譯
Rxjavagit

本文會逐一解析Rxjava的create()、subscribe()、操做符、subscribeOn()、obsweveOn()、背壓的源碼,模式是先給出一段模版代碼,而後逐漸深刻分析github

正文

Create()方法

這裏給出一個最簡單的Rxjava的實例app

Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				e.onNext("next");
				e.onComplete();
			}
		}).subscribe(new Observer<String>() {
			@Override
			public void onSubscribe(Disposable d) {
				Log.d(TAG, "onSubscribe: " + d);
			}
			@Override
			public void onNext(String value) {
				Log.d(TAG, "onNext: " + value);
			}
			@Override
			public void onError(Throwable e) {
				Log.d(TAG, "onError: " + e);
			}
			@Override
			public void onComplete() {
				Log.d(TAG, "onComplete: ");
			}
		});
複製代碼

直接看create()方法主體ide

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
		ObjectHelper.requireNonNull(source, "source is null");
		return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
	}
複製代碼

調用對象和返回對象都爲Observable,而傳入參數爲ObservableOnSubscribe源碼分析

public interface ObservableOnSubscribe<T> {
		void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
	}
複製代碼

這是一個接口,僅包含一個方法,就是上面咱們在new ObservableOnSubscribe時候須要重寫的那個方法。 再看subscribe()的形參類型ObservableEmitter學習

public interface ObservableEmitter<T> extends Emitter<T> {
		void setDisposable(@Nullable Disposable d);
		void setCancellable(@Nullable Cancellable c);
		boolean isDisposed();

		@NonNull
		ObservableEmitter<T> serialize();
		@Experimental
		boolean tryOnError(@NonNull Throwable t);
	}
複製代碼

發現這也是一個接口,不須要太過關注,值得關注的是他的上層,由接口特性咱們知道Emitter確定也是一個接口,咱們來看下它定義了什麼方法ui

public interface Emitter<T> {
		void onNext(@NonNull T value);

		void onError(@NonNull Throwable error);

		void onComplete();
	}
複製代碼

看到這三個熟悉的方法,你就知道爲何咱們實例化的ObservableEmitter對象e能夠調用onNext()、onError()、onComplete()這三個方法了this

create()的參數已經看完了,下面看下create()的內容

第一句ObjectHelper.requireNonNull(source, "source is null");是判空代碼。 返回值是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)),我看看下這個方法。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
		...
		return source;
	}
複製代碼

具體的內容咱們不須要去解讀,咱們只看他的返回值和傳入參數,通過觀察發現都是Observable類型,乍看好像沒什麼問題,可是看上面,源碼中傳入的是一個ObservableCreate類型,因此這裏ObservableCreate有適配器的做用,將ObservableOnSubscribe適配爲Observable類型。 下面咱們就看看這個適配器ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
		final ObservableOnSubscribe<T> source;
		
		public ObservableCreate(ObservableOnSubscribe<T> source) {
			this.source = source;
		}
		
		@Override
		protected void subscribeActual(Observer<? super T> observer) {
			
			CreateEmitter<T> parent = new CreateEmitter<T>(observer);
			
			observer.onSubscribe(parent);
			
			source.subscribe(parent);
			
			...
		}
	}
複製代碼

成員變量、構造方法略過,咱們先看看這裏頻頻出現的觀察者observer

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}
複製代碼

一樣的也是一個接口,定義的這個四個方法就是咱們在訂閱時,觀察者須要重寫的四個方法,注意與上面的Emitter接口及其三個方法進行區分。 看這行observer.onSubscribe(parent);,由上面咱們知道observer.onSubscribe()是接受Disposable類型,而這裏的parent是CreateEmitter類型,你可能已經猜出來了,沒錯,這裏的CreateEmitter也是一個適配器,前面的ObservableCreate對被觀察者進行了適配,CreateEmitter則對觀察者進行了適配,將observer類型轉化爲Disposable類型,下面看下他的源碼

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

		...
		
		@Override
		public void onNext(T t) {
			if (t == null) {
				onError(new NullPointerException("..."));
				return;
			}
			if (!isDisposed()) {
				observer.onNext(t);
			}
		}

		@Override
		public void onError(Throwable t) {
			if (!tryOnError(t)) {
				
				RxJavaPlugins.onError(t);
			}
		}

		@Override
		public void onComplete() {
			if (!isDisposed()) {
				try {
					observer.onComplete();
				} finally {
					dispose();
				}
			}
		}

		@Override
		public void dispose() {
			DisposableHelper.dispose(this);
		}
		
		...
	}
複製代碼

主要就是重寫了那四個方法,定義了規則,好比:

  • onComplete()與onError()互斥,切CreateEmitter在回調他們兩中任意一個後,都會自動dispose()
  • Observable和Observer的關係沒有被dispose,纔會回調Observer的onXXXX()方法

而且,到這裏你對onCreate()中的數據流動也必定有了必定的理解:
--> e.onNext("next")
--> CreateEmitter.onNext("next")
--> Observer.onNext("next")
--> Log.d(TAG, "onNext: "+value)

咱們再回到ObservableCreate的subscribeActual()中。

source.subscribe(parent);,最重要的是這一行,調用者是被觀察者,傳入的參數爲觀察者,基本能夠猜出來了,這裏是訂閱的做用,真正將被觀察者與觀察者聯繫起來的地方

subscribe()方法

public final void subscribe(Observer<? super T> observer) {
		ObjectHelper.requireNonNull(observer, "observer is null");

		observer = RxJavaPlugins.onSubscribe(this, observer);

		subscribeActual(observer);
		...
	}
複製代碼

第一句的做用一樣是判空,接下來先獲取了傳入的observer並進行了相關配置,而後調用subscribeActual(observer);,細心的同窗可能注意到了,subscribeActual()正是在上面ObservableCreate中被重寫的方法,而具備「訂閱」意義的那行代碼也包含其中,結合subscribe()的本意,這行代碼的做用也很明朗了

若是你只是想對Rxjava基本的數據傳輸流程、訂閱的原理感興趣,那麼就不用看下去了,下面的內容主要是Rxjava操做符線程調度揹包的源碼分析


操做符(Map)

開始分析Rxjava的操做符部分

咱們以Map操做符爲例展開分析,首先,仍是給出一個最簡單的實例

Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				e.onNext("next");
				e.onComplete();
			}
		}).map(new Function<String, Integer>() {
			@Override
			public Integer apply(String s) throws Exception {
				return Integer.parseInt(s);
			}
		}).subscribe(new Observer<Integer>() {
			@Override
			public void onSubscribe(Disposable d) {
				Log.d(TAG, "onSubscribe: " + d);
			}
			@Override
			public void onNext(Integer value) {
				Log.d(TAG, "onNext: " + value);
			}
			@Override
			public void onError(Throwable e) {
				Log.d(TAG, "onError: " + e);
			}
			@Override
			public void onComplete() {
				Log.d(TAG, "onComplete: ");
			}
		});
複製代碼

先看map()方法總體

public final <R > Observable < R > map(Function < ? super T, ? extends R > mapper){
			ObjectHelper.requireNonNull(mapper, "mapper is null");
			return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
		}
複製代碼

返回值確定是Observable,參數是一個泛型接口,咱們看下這個接口

public interface Function<T, R> {
    R apply(@NonNull T t) throws Exception;
}
複製代碼

傳入T,返回R,符合Map操做符傳入兩個數據類型進行轉換的效果,在乎料之中。 繼續看map()的方法內容,第一行按照慣例是判空語句,咱們發現map()的return語句與create()極爲類似,都是調用了RxJavaPlugins.onAssembly(),僅是傳入的參數不一樣,其實不僅是Map操做符,大多操做符都是這樣的,他們的不一樣僅僅是傳入參數的不一樣,也就是適配器的不一樣,這說明,操做符的具體實現(好比Map的類型轉換)都是在各自的適配器中作的。

小結:create以及對大多數操做符的retun語句都是RxJavaPlugins.onAssembly(),僅是傳入參數不一樣

進入ObservableMap的部分

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
			final Function<? super T, ? extends U> function;

			public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
				super(source);
				this.function = function;
			}

			@Override
			public void subscribeActual(Observer<? super U> t) {
				source.subscribe(new MapObserver<T, U>(t, function));
			}
		}
複製代碼

咱們發現,ObservableMap作的事情不多,就三件事,第一:在構造方法中,將傳入的Observable也就是自己拋給父類(ObservableSource是Observable的父類,因此能夠接受);第二:對轉換邏輯funtion進行保存;第三:重寫subscribeActual()方法並在其中實現訂閱,這裏與ObservableCreate是同樣的,只是傳遞的參數不一樣

小結:create以及對大多數操做符的第一層適配器中都會重寫subscribeActual()並實現訂閱邏輯

咱們並無在ObservableMap的代碼中發現進行類型轉換的代碼,不要心急,有的同窗估計已經發現了,這裏的進行訂閱操做的source.subscribe()傳入的參數類型改變了 ,以前是CreateEmitter,如今變爲了一個叫MapObserver的類,咱們知道CreateEmitter中實現了那四個經常使用的方法並制定了相關規則,因此你推測MapObserver中作了一樣的操做,其實不是的,但也查不了太多,除onNext()以外的三個方法是在它的父類BasicFuseableObserver中重寫的,MapObserver中只對onNext()進行的重寫,並且在其中進行了數據類型轉換的工做,咱們看一下源碼(這裏咱們只看onNext()部分就能夠了)

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
			@Override
			public void onNext(T t) {
				if (done) {
					return;
				}
				if (sourceMode != NONE) {
					actual.onNext(null);
					return;
				}
				U v;
				try {
					v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
				} catch (Throwable ex) {
					fail(ex);
					return;
				}
				actual.onNext(v);
			}
			...
		}
複製代碼

能夠看到再代碼中利用ObjectHelper將上游傳過來的T,轉換成了下游須要的U

到這裏你對.map()下的數據流動也必定有了必定的理解: --> e.onNext("next") --> ObservableMap.MapObserver.onNext ("next") --> Observer.onNext("next") --> Log.d(TAG, "onNext: "+value) 訂閱的發送順序: --> .subscribe(observer) --> ObservableMap.subscribeActual(observer) --> ObservableCreate.subscribeActual(new MapObserver(observer,function))


下面進入線程調度源碼分析的階段,先看subscribeOn()

線程調度-subscribeOn()

老規矩,先來一個參考代碼

Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				e.onNext("next");
				e.onComplete();
			}
		}).subscribeOn(Schedulers.io())
				.subscribe(new Observer<String>() {
					@Override
					public void onSubscribe(Disposable d) {
						Log.d(TAG, "onSubscribe: " + d);
					}
					@Override
					public void onNext(String value) {
						Log.d(TAG, "onNext: " + value);
					}
					@Override
					public void onError(Throwable e) {
						Log.d(TAG, "onError: " + e);
					}
					@Override
					public void onComplete() {
						Log.d(TAG, "onComplete");
					}
				});
複製代碼

仍是同樣,直接看SubscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler) {
			ObjectHelper.requireNonNull(scheduler, "scheduler is null");
			return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
		}
複製代碼

返回值Observable情理之中,return返回RxJavaPlugins.onAssembly()也是同樣,兩點不一樣:

  • 裝飾類(也就是上文說的適配器)是ObservableSubscribeOn
  • 傳入參數爲一個Scheduler

進入ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
	final Scheduler scheduler;

	public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
		super(source);
		this.scheduler = scheduler;
	}

	@Override
	public void subscribeActual(final Observer<? super T> s) {
		final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
		s.onSubscribe(parent);
		parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
	}
}
複製代碼

根據經驗,構造中進行調用父類、存值一些操做,沒什麼可看的,直接看訂閱的實現subscribeActual()方法,能夠看見,此次對下游觀察者進行封裝的適配器是SubscribeOnObserver類,根據CreateEmitter、MapObserver的經驗,咱們能夠猜想出它或它的父類確定實現了那四個方法,下面咱們看一下

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

	private static final long serialVersionUID = 8094547886072529208L;

	final Observer<? super T> actual;

	final AtomicReference<Disposable> s;

	SubscribeOnObserver(Observer<? super T> actual) {
		this.actual = actual;
		this.s = new AtomicReference<Disposable>();
	}

	@Override
	public void onSubscribe(Disposable s) {
		DisposableHelper.setOnce(this.s, s);
	}

	@Override
	public void onNext(T t) {
		actual.onNext(t);
	}

	@Override
	public void onError(Throwable t) {
		actual.onError(t);
	}

	@Override
	public void onComplete() {
		actual.onComplete();
	}

	@Override
	public void dispose() {
		DisposableHelper.dispose(s);
		DisposableHelper.dispose(this);
	}

	@Override
	public boolean isDisposed() {
		return DisposableHelper.isDisposed(get());
	}

	void setDisposable(Disposable d) {
		DisposableHelper.setOnce(this, d);
	}
}
複製代碼

除去構造、四個方法、基本的存儲語句就剩下一個setDisposable()方法了,若是你對Scheduler有研究,你就知道在Scheduler中真正處理線程調用邏輯的是Worker類,這裏setDisposable()的做用就是將你傳入的Scheduler返回的worker加入管理。

目光回到subscribeActual()中,調用觀察者的onSubscribe()以後,立刻調用了parent.setDisposable(),這裏停一下,你能夠翻上去觀察一下其餘方法的subscribeActual()部分,都是在這時候執行訂閱操做,可是咱們在這裏並無發現,訂閱操做不可能沒有發生,那麼是否是發生在了parent.setDisposable()這個方法裏面呢?咱們以前只關注了這個方法的內容,對於傳入的參數尚未解析,咱們如今看一下,但願有新的發現。

傳入的參數是scheduler.scheduleDirect(new SubscribeTask(parent))。 先看SubscribeTask這個類

final class SubscribeTask implements Runnable {
	private final SubscribeOnObserver<T> parent;

	SubscribeTask(SubscribeOnObserver<T> parent) {
		this.parent = parent;
	}

	@Override
	public void run() {
		source.subscribe(parent);
	}
}
複製代碼

這個類繼承Runnable,因此實現了一個子線程,在run()中執行操做,沒錯,source.subscribe(parent);,熟悉的語句,這就證實了這裏的訂閱操做發生在了Scheduler的線程中。

咱們繼續看scheduleDirect()這個方法

public Disposable scheduleDirect(@NonNull Runnable run) {
	return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼

繼續

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
	final Worker w = createWorker();

	final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

	DisposeTask task = new DisposeTask(decoratedRun, w);

	w.schedule(task, delay, unit);

	return task;
}
複製代碼

咱們能夠發現,傳入的子線程被包裝配置以後,開始在Worker也就是Scheduler線程中執行 咱們繼續看DisposeTask這個類,具體的訂閱子線程的啓動就在這裏

static final class DisposeTask implements Runnable, Disposable {
	final Runnable decoratedRun;
	final Worker w;

	Thread runner;

	DisposeTask(Runnable decoratedRun, Worker w) {
		this.decoratedRun = decoratedRun;
		this.w = w;
	}

	@Override
	public void run() {
		runner = Thread.currentThread();
		try {
			decoratedRun.run();
		} finally {
			dispose();
			runner = null;
		}
	}

	@Override
	public void dispose() {
		if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
			((NewThreadWorker) w).shutdown();
		} else {
			w.dispose();
		}
	}

	@Override
	public boolean isDisposed() {
		return w.isDisposed();
	}
}
複製代碼

能夠看到run()中調用了ecoratedRun.run();來啓動線程,注意這裏是使用的run()而不是start(),並且整個rxjava流程走完後會本身調用dispose();關閉線程。

到這裏,你應該明白了subscribeOn()線程調度的過程,正如它的效果描述同樣:將觀察者的操做運行在Scheduler.io()線程中
--> subscribeOn(Schedulers.io())
--> 返回一個ObservableSubscribeOn的包裝類
--> 當上遊的被觀察者被訂閱以後,回調ObservableSubscribeOn包裝類中的subscribeActual()
--> 線程切換至Schedulers.io(),並進行訂閱操做source.subscribe(parent)

理順思路以後咱們發現,這裏訂閱,模式與以前相同,仍是下游觀察者對上游被觀察者進行訂閱,依舊是自下向上的,可是咱們經過以前的源碼分析知道,上游發送數據時調用的那個四個方法實際是調用下游觀察者對應重寫的四個方法,因此這邊知足了線程調度的目的:將觀察者所作的操做置與Schedulers.io()線程中

而且,咱們這裏還能夠解釋一個問題 爲何subscribeOn(Schedulers.xxx())切換線程N次,老是以第一次爲準? 咱們知道使用subscribeOn()進行線程調度時訂閱的順序是從下往上,因此有多個subscribeOn()時,從最後一個開始執行,一直執行到第一個,最後的結果仍是以第一個爲準


而後看obsweveOn(),有了上面subscribeOn()的經驗,分析obsweveOn()就快了

線程調度-obsweveOn()

實例

Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				e.onNext("next");
				e.onComplete();
			}
		}).subscribeOn(Schedulers.io())
				.observeOn(AndroidSchedulers.mainThread())
				.subscribe(new Observer<String>() {
					@Override
					public void onSubscribe(Disposable d) {
						Log.d(TAG, "onSubscribe: " + d);
					}

					@Override
					public void onNext(String value) {
						Log.d(TAG, "onNext: " + value);
					}

					@Override
					public void onError(Throwable e) {
						Log.d(TAG, "onError: " + e);
					}

					@Override
					public void onComplete() {
						Log.d(TAG, "onComplete: ");
					}
				});
複製代碼

observeOn()

public final Observable<T> observeOn (Scheduler scheduler){
	return observeOn(scheduler, false, bufferSize());
}
複製代碼

沒看見RxJavaPlugins.onAssembly(),擔憂不同?不存在的,被包了一層而已

public final Observable<T> observeOn (Scheduler scheduler,boolean delayError, int bufferSize){
	ObjectHelper.requireNonNull(scheduler, "scheduler is null");
	ObjectHelper.verifyPositive(bufferSize, "bufferSize");
	return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼

仍是那個循序,ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
	final Scheduler scheduler;
	final boolean delayError;
	final int bufferSize;

	public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
		super(source);
		this.scheduler = scheduler;
		this.delayError = delayError;
		this.bufferSize = bufferSize;
	}

	@Override
	protected void subscribeActual(Observer<? super T> observer) {
		if (scheduler instanceof TrampolineScheduler) {
			source.subscribe(observer);
		} else {
			Scheduler.Worker w = scheduler.createWorker();
			source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
		}
	}
}
複製代碼

看subscribeActual(),很好理解,就是先判斷是否是在主線程,是的話,直接訂閱完事,不是的話跳到主線程去,在訂閱,切換線程依舊是使用的Worker那一套,與subscribeOn()中相似,先建立一個主線程的Worker,而後把Worker放進觀察者的包裝類ObserveOnObserver中,不用多說,裏面確定有對那四個方法的實現,我這裏簡化一下他的代碼

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

	@Override
	public void onNext(T t) {
		if (done) {
			return;
		}
		if (sourceMode != QueueDisposable.ASYNC) {
			queue.offer(t);
		}
		schedule();
	}
	
	...
	
	void schedule() {
		if (getAndIncrement() == 0) {
			worker.schedule(this);
		}
	}
}
複製代碼

其餘那三個方法與onNext()大體相同,只看這一個就能夠了,schedule();這行代碼上面都是取數據的操做,並無對數據進行發送,因此說即便使用線程調用將被觀察者的操做放在主線程,他的數據準備階段仍然是在原線程執行的,當schedule();執行後,進入上面傳入Workder線程,也就是主線程,而後纔將queue中的T取出,繼而發送給下游的觀察者。其餘方法也是同樣的流程,好比onError()、onComplete(),都是將錯誤或完成的信息先保存,等待切換線程後在執行發送操做。

由此,咱們可知ObserverOn()是向下做用的,每次調用都對下游的代碼產生做用,因此屢次調用ObserverOn(),是最後一次生效的


背壓Flowable

Flowable.create(new FlowableOnSubscribe<Integer>() {
			@Override
			public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
				Log.d(TAG, "next 1");
				emitter.onNext(1);
				Log.d(TAG, "next 2");
				emitter.onNext(2);
				Log.d(TAG, "next 3");
				emitter.onNext(3);
				Log.d(TAG, "發送完成");
				emitter.onComplete();
			}
		}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {
			@Override
			public void onSubscribe(Subscription s) {
				Log.d(TAG, "onSubscribe");
				s.request(3);
			}
			@Override
			public void onNext(Integer integer) {
				Log.d(TAG, integer);
			}
			@Override
			public void onError(Throwable t) {
				Log.w(TAG, "onError: ", t);
			}
			@Override
			public void onComplete() {
				Log.d(TAG, "onComplete");
			}
		});
複製代碼

由於使用背壓須要特定的觀察者類,因此這裏從頭開始分析。

先看最開始的Flowable.create()

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
			ObjectHelper.requireNonNull(source, "source is null");
			ObjectHelper.requireNonNull(mode, "mode is null");
			return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
		}
複製代碼

返回Flowable類型,第一個參數是FlowableOnSubscribe接口,做用咱們確定都清楚了,定義subscribe()、以及那三個在subscribe()中調用的方法,第二個參數是BackpressureStrategy,用於肯定背壓的策略,方法內容與observable的create()大體相同,先是判空,而後返回RxJavaPlugins.onAssembly(),其中傳入裝飾類,這裏裝飾類爲FlowableCreate,做用咱們確定都能猜出來,將FlowableOnSubscribe類型適配爲Flowable類型。

下面咱們細看證實一下本身的猜測

public interface FlowableOnSubscribe<T> {
    void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
複製代碼
public interface FlowableEmitter<T> extends Emitter<T> {
			void setDisposable(@Nullable Disposable s);
			void setCancellable(@Nullable Cancellable c);
			long requested();
			boolean isCancelled();
			
			@NonNull
			FlowableEmitter<T> serialize();
			
			@Experimental
			boolean tryOnError(@NonNull Throwable t);
		}
複製代碼
public interface Emitter<T> {
			void onNext(@NonNull T value);

			void onError(@NonNull Throwable error);

			void onComplete();
		}
複製代碼
public final class FlowableCreate<T> extends Flowable<T> {
	final FlowableOnSubscribe<T> source;
	final BackpressureStrategy backpressure;

	public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
		this.source = source;
		this.backpressure = backpressure;
	}
	
	@Override
	public void subscribeActual(Subscriber<? super T> t) {
		BaseEmitter<T> emitter;

		switch (backpressure) {
			case MISSING: {
				emitter = new MissingEmitter<T>(t);
				break;
			}
			case ERROR: {
				emitter = new ErrorAsyncEmitter<T>(t);
				break;
			}
			case DROP: {
				emitter = new DropAsyncEmitter<T>(t);
				break;
			}
			case LATEST: {
				emitter = new LatestAsyncEmitter<T>(t);
				break;
			}
			default: {
				emitter = new BufferAsyncEmitter<T>(t, bufferSize());
				break;
			}
		}

		t.onSubscribe(emitter);
		try {
			source.subscribe(emitter);
		} catch (Throwable ex) {
			Exceptions.throwIfFatal(ex);
			emitter.onError(ex);
		}
	}
}
複製代碼

沒錯,基本都和observable同樣,只不過在FlowableCreate的subscribeActual()添加了相關代碼以對不一樣的背壓策略進行不一樣的操做。

到目前爲止,發現Flowable的create()和Observable類的大同小異,以前咱們分析出的結論仍然能夠用到這裏,咱們繼續看subscribe()

public final void subscribe(Subscriber<? super T> s) {
	if (s instanceof FlowableSubscriber) {
		subscribe((FlowableSubscriber<? super T>)s);
	} else {
		ObjectHelper.requireNonNull(s, "s is null");
		subscribe(new StrictSubscriber<T>(s));
	}
}
複製代碼

這裏發現與以前有些不一樣,observable中是在這裏new一個CreateEmitter適配器而後傳入並調用subscribeActual(),而Flowable這裏咱們並無發現subscribeActual()的調用,而是調用了重載的另外一個subscribe()並傳入一個StrictSubscriber包裝類,咱們先看下這個重載的subscribe()

public final void subscribe(FlowableSubscriber<? super T> s) {
		ObjectHelper.requireNonNull(s, "s is null");
		try {
			Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
			ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
			subscribeActual(z);
		} catch (NullPointerException e) { // NOPMD
			throw e;
		} catch (Throwable e) {
			Exceptions.throwIfFatal(e);
			RxJavaPlugins.onError(e);

			NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
			npe.initCause(e);
			throw npe;
		}
	}
複製代碼

嗯,subscribeActual()在這裏被調用,再看下傳入的StrictSubscriber

public class StrictSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {

	...

	@Override
	public void request(long n) {
		if (n <= 0) {
			cancel();
			onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
		} else {
			SubscriptionHelper.deferredRequest(s, requested, n);
		}
	}

	@Override
	public void cancel() {
		if (!done) {
			SubscriptionHelper.cancel(s);
		}
	}

	@Override
	public void onSubscribe(Subscription s) {
		if (once.compareAndSet(false, true)) {

			actual.onSubscribe(this);

			SubscriptionHelper.deferredSetOnce(this.s, requested, s);
		} else {
			s.cancel();
			cancel();
			onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
		}
	}

	@Override
	public void onNext(T t) {
		HalfSerializer.onNext(actual, t, this, error);
	}

	@Override
	public void onError(Throwable t) {
		done = true;
		HalfSerializer.onError(actual, t, this, error);
	}

	@Override
	public void onComplete() {
		done = true;
		HalfSerializer.onComplete(actual, this, error);
	}
}
複製代碼

其中重寫了咱們使用背壓時在onSubscribe()中會調用的request()方法,以知足「控流」的效果,並且還重寫了那四個方法,基本能夠肯定是功能與CreateEmitter相似的適配器了,注意下,上面重載的subscribe()中接受的參數是FlowableSubscriber類型,那StrictSubscriber的做用就是把Subscriber類型轉換爲FlowableSubscriber類型了

最後來看下這裏的subscribeActual()

public void subscribeActual(Subscriber<? super T> t) {
		BaseEmitter<T> emitter;

		switch (backpressure) {
			case MISSING: {
				emitter = new MissingEmitter<T>(t);
				break;
			}
			case ERROR: {
				emitter = new ErrorAsyncEmitter<T>(t);
				break;
			}
			case DROP: {
				emitter = new DropAsyncEmitter<T>(t);
				break;
			}
			case LATEST: {
				emitter = new LatestAsyncEmitter<T>(t);
				break;
			}
			default: {
				emitter = new BufferAsyncEmitter<T>(t, bufferSize());
				break;
			}
		}

		t.onSubscribe(emitter);
		try {
			source.subscribe(emitter);
		} catch (Throwable ex) {
			Exceptions.throwIfFatal(ex);
			emitter.onError(ex);
		}
	}
複製代碼

依舊是調用source.subscribe()進行訂閱操做,只很少傳入的參數多了一層由對應XXXEmitter類的包裝,這種有關背壓策略選擇的代碼在subscribeActual()中也出現過一次,咱們選擇就進去看一下他具體作了什麼包裝。

這裏以ERROR對應的ErrorAsyncEmitter爲例

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }
複製代碼

內容很簡單就是重寫一個onOverflow()方法實現了對錯誤的輸出,那麼他是在什麼地方調用呢,咱們繼續看他的父類

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

	private static final long serialVersionUID = 4127754106204442833L;

	NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
		super(actual);
	}

	@Override
	public final void onNext(T t) {
		if (isCancelled()) {
			return;
		}

		if (t == null) {
			onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
			return;
		}

		if (get() != 0) {
			actual.onNext(t);
			BackpressureHelper.produced(this, 1);
		} else {
			onOverflow();
		}
	}

	abstract void onOverflow();
}
複製代碼

發現了onOverflow()調用的地方,可是有前置條件「get() == 0」,get()是在哪裏調用呢? 繼續看父類

abstract static class BaseEmitter<T> extends AtomicLong implements FlowableEmitter<T>, Subscription {
	...

	BaseEmitter(Subscriber<? super T> actual) {
		this.actual = actual;
		this.serial = new SequentialDisposable();
	}
	
	...
}
複製代碼

並無發現關於get()的定義,可是不要疏忽了,看他的父類是誰?是AtomicLong,若是你對Java的研究夠深的話,你不會對這個類陌生,正是Java中的一個原子類,用來對長整型進行原子操做,這個get()就是其中的了

到這裏,咱們來理順一下背壓發生的數據流動
--> emitter.onNext()
-->StrictSubscriber.onNext()
--> Subscriber.onNext()

好了,Rxjava的源碼分析到這裏結束了,文中有不少沒有講到的地方,往後有時間的會繼續講解剩餘部分。

總結

本文中,咱們對create()、subscribe()、map()、subscribeOn()、observeOn()的源碼進行的閱讀,想你已經能夠從源碼的角度回答如下問題:

  • 觀察者如何發送數據?
  • 被觀察者如何接受數據?
  • 操做符的實現原理是什麼?
  • Map關鍵字是如何實現類型轉換的?
  • 線程調度是如何實現的?
  • 爲何屢次調用subscribeOn(),只有第一次生效?
  • 爲何屢次調用observeOn(),只有最後一次生效?
  • 背壓是如何實現的?

參考連接


這是我第一次寫源碼閱讀類文章,寫的很差的地方還請多多指點,謝謝!個人郵箱:mail@jiguankai.cn

相關文章
相關標籤/搜索