RxJava如何結合觀察者與鏈式處理

RxJava如何結合觀察者與鏈式處理

Author: Dorae
Date: 2018年12月3日17:10:31 轉載請註明出處html


1、概述

首先問本身幾個問題,若是很是清楚這幾個問題的目的與答案,那麼恭喜你,不用繼續往下看了-_-。java

  1. RxJava是幹什麼的;
  2. 鏈式調用中當存在數個Observable.subscribeOn()時的處理方式;
  3. 鏈式調用中當存在數個Observable.observeOn()時的處理方式;
  4. 數據是如何通過操做符進行的處理。

回顧

觀察者模式

如圖1-1所示react

圖 1-1

Java8的stream

參見這裏api

2、RxJava是什麼

一款爲了簡化異步調用,且功能比較全面的框架。網絡

3、RxJava如何結合了觀察者模式與鏈式處理

參見Java8中的sink鏈,在RxJava中一樣實現了鏈式處理。如代碼片斷code1-1所示,咱們對其結構進行分析:框架

code1-1異步

Observable.create(new ObservableOnSubscribe<String>() {

		@Override
		public void subscribe(ObservableEmitter<String> emitter) throws Exception {
			emitter.onNext("Dorae");
		}
	})
	.filter(e -> e.contains("o"))
	.map(e -> "AfterMap: " + e)
	.filter(e -> e.contains("D"))
	.subscribe(new Observer<String>() {

				@Override
				public void onNext(@NonNull String o) {
					System.out.println("觀察者 onNext: " + o);
				}

				@Override
				public void onSubscribe(Disposable d) {
					System.out.println("觀察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
				}

				@Override
				public void onError(Throwable e) {
				}

				@Override
				public void onComplete() {

				}
			});
複製代碼

一、首先看下其輸出結果:

觀察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### mainide

觀察者 onNext: AfterMap: Dorae源碼分析

二、如今來對如何輸出這段結果進行分析

首先了解下RxJava中的幾種基本角色:spa

  • Observable,是RxJava描述的事件流,能夠說其與Observer構成了RxJava的基礎。在鏈式調用中,事件從建立到加工到最後被Observer接受,其實就是由一條Observerable鏈串起來的。
  • Observer,RxJava中的訂閱者,也就是須要對事件進行響應的一個角色。其實除了咱們一般本身實現的一個Observer,在鏈中的每一步都會產生一個Observer,而後這些Observer構成一條鏈,最終完成整個鏈的計算。
  • ObservableOnSubscribe,整個事件流的源頭,一般須要咱們本身實現,其依賴一個Emitter。
  • Emitter,能夠將其理解爲觸發器,推進整個流程的運轉。
  • Scheduler,這個其實不用太過關心,RxJava用其封裝了Thread,用於完成線程切換等任務。

是否是感受上邊的一堆廢話很是枯燥?先上一張RxJava的核心結構,如圖3-1所示。

圖 3-1

如今咱們再來看看code1-1,其最終造成的Observable鏈如圖3-2所示,每次調用map、filter等操做,都會生成一個新對象,而且保持了一個對上游的引用(用於生成Observer鏈)。

圖 3-2

Observer鏈如圖3-3所示,整個事件流程由CreateEmitter觸發,最終交由咱們的實現Observer$1處理。

圖 3-3

看了上邊幾張圖以後,是否是感受清晰了不少?那麼讓咱們進一步看下Rxjava如何完成了一鍵線程切換。

4、RxJava如何實現線程切換

一般咱們使用RxJava的線程切換功能時,只須要在調用鏈中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其實就是一個包裝了ThreadPool的調度器。那麼咱們先來看下相關源碼。

一、subscribeOn

如代碼code4-1所示,爲subscribeOn的核心代碼。很明顯,其中在新線程中只是簡單的直接調用了source,也就是說這裏以後的全部操做均在一個新線程中進行,和單線程並無什麼區別。

code 4-1

public final Observable<T> subscribeOn(Scheduler scheduler) {
	return new ObservableSubscribeOn<T>() {
		@Override
		public void subscribeActual(final Observer<? super T> observer) {
			scheduler.createWorker().schedule(new SubscribeTask() {
				@Override
				public void run() {
					source.subscribe(e);
				}
			});
		}
	};
}
複製代碼

二、observeOn

如代碼段code4-2所示,爲observeOn的核心邏輯,能夠看出其在訂閱階段(生成Observer鏈的階段)仍是在當前線程執行,只有觸發以後,到了ObserverOn的Observer的節點時纔會真正的切換到新線程。

code 4-2

public final Observable<T> observeOn(Scheduler scheduler) {
	return new ObservableOnSubscribe<T>() {
		@Override
		public void subscribeActual(@NonNull Observer<Object> e) { 
			source.subscribe(new Observer<T>() {

				@Override
				public void onNext(T var1) {
					scheduler.createWorker().schedule(new Runnable() {
						@Override
						public void run() {
							e.onNext(var1);
						}
					});
				}
			});
		}
	};
}
複製代碼

屢次Observable.subscribeOn()、屢次Observable.observeOn()會發生什麼

經過上述code4-一、code4-2的分析,是否是能夠推斷出當屢次subscribeOn時會發生什麼?沒錯,雖然每次subscribeOn都會產生一次線程切換,可是真正起做用的只有最開始的一次subscribeOn,也就至關於只在最初的位置調用了subscribeOn;對於observeOn也是相似,每次都會產生新線程,可是每次都會產生必定的影響,也就是每一個線程都承擔了一部分工做。

小結

經過本文,咱們能夠簡要了解到RxJava的基本原理,可是對於其豐富的api還須要在實踐中進行磨合。可是,RxJava既然做爲一個異步框架,其必然有必定的侷限,好比其切換線程時沒法阻塞當前線程(這種對於Android等須要渲染或者網絡IO的需求來講很是適用),可是對於常見的服務端業務來講,還須要額外引入阻塞當前線程的操做(由於大部分的server代碼仍是單線程模型),假若徹底不用線程切換在服務端強行引入,可能會得不償失。我的更推薦Java8的CompletableFuture。

參考

理解RxJava(一)基本流程源碼分析

RxJava基本原理分析

相關文章
相關標籤/搜索