先貼上前面幾篇的連接:
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
rxjava2源碼解析(三)線程池原理分析java
上一篇說了rxjava2
的線程池原理,這篇咱們來講說rxjava
的變換。緩存
變換和線程切換算是rxjava
最關鍵的兩個功能。常見的變換有map()
,flatMap()
。咱們先從map
方法提及吧。bash
咱們先舉一個簡單的例子,來看看map
能作什麼:併發
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Function<Student, String>() {
@Override
public String apply(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
複製代碼
上面的例子是一個功能,打印一個班級裏students的名字。很簡單,經過from
方法對student進行遍歷,一個map
方法將student變換成name,而後下游打印就完事了。咱們知道rxjava2
裏面是有不少泛型設定的,若是類型錯誤是會直接標紅。from
方法返回的下游數據類型是student,而subscriber
中接收的數據類型必須是String。很顯然,這裏map就將下游的數據類型進行了變換。
具體在源碼中是如何實現的呢?咱們先看map
的源碼:app
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
仍是老樣子,拋開判空代碼和hock機制,直接看ObservableMap
類。不過在此以前,先看看map
方法裏面設定的泛型。T是Observable裏設定的上游數據類型,map方法會返回一個Observable,這裏就將整個鏈條的數據類型進行了變換。異步
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
看過前面的幾篇就知道,這裏仍是老套路,仍是裝飾器模式,仍是建立一個內部處理器MapObserver
。內部處理器MapObserver
負責與上游綁定,因此它的處理數據類型仍爲T。ObservableMap
與下游進行綁定訂閱,因此ObservableMap
中數據的類型爲R。咱們在看MapObserver
以前,先看看Function
是什麼。ide
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
複製代碼
OK,Function是一個接口,只有一個接口方法apply
。Function
規定了兩個泛型:T、R。其中T是apply
的參數類型,R是返回值類型。咱們在使用過程當中,重寫apply
方法進行數據類型變換,而後再用map
方法插入到整條流水線中,就達到了變換的目的。oop
下面看看MapObserver
中具體是怎麼實現的:post
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
複製代碼
很簡單,MapObserver
的onNext
負責處理上游下來的數據,在onNext
方法中調用Function
的apply
方法,將T
變換爲下游須要的U
(也就是前面的R
),而後再將數據傳遞下去,達到變換的目的。ui
map的使用和源碼都很簡單,咱們來看看flatMap
的。
仍是先用一個簡單的例子來看flatMap
的用途:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Function<Student, Observable<Course>>() {
@Override
public Observable<Course> apply(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
複製代碼
產品說功能要改一改,不是打印每一個student的名字,而是要打印每一個sutdent全部課程名稱。正常狀況下,咱們在subscriber
中獲取到每一個student,而後用個for循環進行遍歷打印就行,可是flatMap
能夠直接一步搞定。
細心的已經發現,這裏的Function
比較奇怪,它的返回值類型居然是Observable
。具體怎麼回事,咱們看看源碼:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
//這裏的delayErrors,maxConcurrency,bufferSize都是默認值。
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼
先解釋一下,delayErrors
,maxConcurrency
,bufferSize
這幾個參數的意義:
delayErrors
表示異常是否須要延遲到全部內部數據都傳輸完畢後拋出。默認值是false
。maxConcurrency
表示最大併發數,默認值爲Integer.MAX_VALUE
。bufferSize
緩存的內部被觀察者事件總數大小,默認值爲128.老樣子,咱們直接看ObservableFlatMap
:
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼
仍是原來的配方,仍是原來的味道。咱們來看看MergeObserver
的源碼一探究竟:
@Override
public void onNext(T t) {
//調用apply方法,獲取到轉換的Observable
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
//隱藏了一些判斷代碼
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//這裏會走到else
if (p instanceof Callable) {
...
} else {
//這裏新建一個InnerObserver,調用addInner添加到隊列中,而後用apply中生成的Observable與之訂閱。
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
複製代碼
如註釋中所示,這裏根據上游每個數據,生成一個Observable
,而後新建一個InnerObserver
,將這個InnerObserver
添加到內部處理器隊列中,並將Observable
與這個InnerObserver
進行訂閱。
咱們以Observable.from()
爲例,看看這中間的流程是什麼樣的。
//from 方法返回一個ObservableFromArray裝飾器
public static <T> Observable<T> fromArray(T... items) {
//省略部分判空代碼
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
//ObservableFromArray源碼
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
//訂閱後,建立一個FromArrayDisposable內部類對象
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
//這個方法很關鍵,咱們待會能夠看看InnerObserver的onSubscribe方法。
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
//FromArrayDisposable不是一個處理器,他只是一個帶簡單隊列的Disposable
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
// 這裏顯然是返回同步
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
//poll方法會逐個返回隊列中的數據
@Nullable
@Override
public T poll() {
int i = index;
T[] a = array;
if (i != a.length) {
index = i + 1;
return ObjectHelper.requireNonNull(a[i], "The array element is null");
}
return null;
}
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
//在run方法中,開始向下遊傳遞數據。不過這時候已經不重要了,由於在InnerObserver的onSubscribe方法中,已經經過poll方法將隊列中的數據都傳遞出去了。固然這僅僅是在這個示例中是這樣
void run() {
T[] a = array;
int n = a.length;
//開始向下遊傳遞數據
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}
複製代碼
如上面註釋所示,from
方法返回一個簡單的ObservableFromArray
,ObservableFromArray
的subscribe
中,調用下游處理器的onSubscribe
方法,而後調用自身的run
方法。咱們看看InnerObserver
中是怎麼處理的:
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
//這裏會用一個獨特的ID來給每一個InnerObserver作標記
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
//FromArrayDisposable知足這個條件
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//由上面FromArrayDisposable的源碼可知這裏返回SYNC
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
//這裏直接將done設置爲true,是由於下面的parent.drain()會直接取出全部數據並傳遞給下游
done = true;
//數據在這其中進行下發和傳遞
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
//當上遊執行到這裏時,數據已經被傳遞完畢了。這裏單指此次示例
parent.drain();
}
}
....
}
複製代碼
具體的信息都寫在上面的註釋中,咱們直接來看MergeObserver
的drain()
方法。
void drain() {
//這裏進行判斷,確保drainLoop還在執行時不會被再次調用
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
//獲取到下游Observer
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
//判斷是否有error
if (checkTerminate()) {
return;
}
...
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
int nSources = 0;
...
int innerCompleted = 0;
if (n != 0) {
//初始lastId lastIndex都爲0
long startId = lastId;
int index = lastIndex;
...
int j = index;
sourceLoop:
for (int i = 0; i < n; i++) {
//獲取到當前InnerObserver
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
//q就是FromArrayDisposable。
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
//在這裏循環調取FromArrayDisposable隊列中數據,而後傳遞到下游
o = q.poll();
} catch (Throwable ex) {
....
}
if (o == null) {
break;
}
child.onNext(o);
...
}
}
//前面標記過,在onSubscribe中已經將done設置爲true.
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
//因爲上面已經將數據處理完畢,這裏innerQueue.isEmpty()返回爲true。
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
//將該InnerObserver從隊列中移除
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
...
//這裏與開頭getAndIncrement()相呼應,確保drainLoop在執行時不會被再次調用
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
OK,整個流程就清晰了,劃重點:
flatMap()
是基礎裝飾器Observable
的一個方法,參數是一個Function
,只不過這個Function
中apply()
方法返回類型爲一個Observable
。flatMap()
返回一個ObservableFlatMap
裝飾器對象。ObservableFlatMap
被訂閱後會調用subscribeActual()
方法,在此方法中,會建立一個內部類MergeObserver
對象,並將上游裝飾器與之訂閱。MergeObserver
在接收到上游數據後,會調用Function
中apply()
方法,將數據轉換爲一個Observable
,並建立一個內部InnerObserver
,將這個InnerObserver
放入隊列中,而後將生成的Observable
與之訂閱。InnerObserver
的onSubscribe()
方法會直接調用MergeObserver
的drain()
方法,將數據所有都直接傳遞給下游。從而完成整個流程。觀察代碼會發現,同步僅僅是flatMap
的一個簡單狀況,更復雜的狀況在於異步。具體的你們能夠去源碼裏研究一下,畢竟這篇的篇幅已經夠長了。下一篇預告一下,咱們來看看背壓。