關於Observable的源碼解析能夠看Rxjava2 Observable源碼淺析java
關於Subject的源碼解析能夠看RxJava2 Subject源碼淺析緩存
看過Rxjava2 Observable源碼淺析的你會發現其實Rxjava的實現套路都差很少,因此其實Flowable
也差很少,只是在實現的細節上稍微有些差別而已。bash
Flowable
的出現其實主要是爲了解決在異步模型中上下游數據發送和接收的差別性而存在的。上游發送速度大於下游接收速度時就會產生數據積壓致使OOM,而Flowable
就提供了背壓(BackPressure) 策略來處理數據積壓問題。多線程
從最原始的Flowable#create
開始異步
//FlowableOnSubscribe就是最原始的數據源發生器
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
//將FlowableOnSubscribe轉化成了FlowableCreate
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
複製代碼
能夠看到create
方法也是將數據源進行了一層封裝。而subscribe
方法和Observable#subscribe
就是差很少,最終仍是調用的Flowable#subscribeActual
,而這裏就是FlowableCreate#subscribeActual
ide
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
//根據不一樣的背壓策略實現不一樣Emitter
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
//通常來講在Subscriber#onSubscribe,調用emitter.request指定拉取上游多少數據
t.onSubscribe(emitter);
try {
//將上下游關聯
//調用Flowable#create一開始建立的FlowableOnSubscribe#subscribe
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
}
複製代碼
能夠纔看到這裏核心就是根據不用背壓策略實現不一樣的Emitter
。通常來講在Subscriber#onSubscribe
,調用emitter.request
指定拉取上游多少數據,從而經過背壓策略對數據下發的策略不一樣。post
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> actual;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;
this.serial = new SequentialDisposable();
}
@Override
public void onComplete() {
complete();
}
protected void complete() {
if (isCancelled()) {
return;
}
try {
actual.onComplete();
} finally {
serial.dispose();
}
}
@Override
public final void onError(Throwable e) {
//嘗試下發完成緩存數據
if (!tryOnError(e)) {
RxJavaPlugins.onError(e);
}
}
@Override
public boolean tryOnError(Throwable e) {
return error(e);
}
protected boolean error(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
return false;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
return true;
}
@Override
public final void cancel() {
serial.dispose();
onUnsubscribed();
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
//記錄請求的個數
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
@Override
public final void setDisposable(Disposable s) {
serial.update(s);
}
@Override
public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public final long requested() {
return get();
}
.....
}
複製代碼
這裏能夠看到BaseEmitter
經過自身繼承AtomicLong
取記錄請求個數,而不是經過鎖或者volatile
來提升性能。性能
不作任何處理,由下游自行處理overflow。MissingEmitter
實現很簡單。學習
static final class MissingEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 3776720187248809713L;
MissingEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
//這裏能夠看出,對應數據下發沒有任何限制
if (t != null) {
actual.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//request減1
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
}
複製代碼
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 2427151001689639875L;
final SpscLinkedArrayQueue<T> queue;///數據緩存列表
Throwable error;
volatile boolean done;//標記是否onComplete或onError
final AtomicInteger wip;//標記調用了多少次drain
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
queue.offer(t);///數據入隊列
drain();//檢測並下發數據
}
@Override
public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
return false;
}
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
error = e;
done = true;//標記完成
drain();//檢測並下發未完成數據
return true;
}
@Override
public void onComplete() {//僅標記,若隊列有數據繼續下發完成
done = true;//標記完成
drain();//檢測並下發未完成數據
}
@Override
void onRequested() {//#request(long n)後調用
drain();//檢測並下發數據
}
@Override
void onUnsubscribed() {
if (wip.getAndIncrement() == 0) {
queue.clear();
}
}
void drain() {
//相似於if(wip++ != 0)
//因此這裏屢次調用#drain只有第一次調用纔會經過,或者已經清空隊列等待一下調用#drain
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
final Subscriber<? super T> a = actual;
final SpscLinkedArrayQueue<T> q = queue;
for (; ; ) {
long r = get();//數據請求數,由#request決定
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
//是否已完成,調用onComplete/onError後會標記done==true
boolean d = done;
//獲取隊列第一條數據
T o = q.poll();
//用於標記隊列是否爲空
boolean empty = o == null;
//已標記完成且隊列爲空,調用onComplete/onError
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
//隊列爲空,退出獲取數據循環
if (empty) {
break;
}
//下發數據
a.onNext(o);
//標記已下發數據
e++;
}
//數據下發量和請求數相符
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
//標記是否完成
boolean d = done;
//標記隊列是否爲空
boolean empty = q.isEmpty();
//隊列爲空且已完成,調用onComplete/onError
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
//request數減去已經下發數
if (e != 0) {
BackpressureHelper.produced(this, e);
}
//已處理一次drain,wip-missed避免錯過屢次調用drain
//和Observable#observeOn時的ObserveOnObserver#drainNormal處理方式同樣
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
複製代碼
這裏的#drain
下發數據方法和Observable#observeOn
->ObserveOnObserver#drainNormal
的處理方式是有點類似的。經過自己記錄request
數和wip
協調下發數據量及正確的下發。在調用Subscriber#onSubscribe
、Emitter#onNext
、Emitter#onComplete
都會觸發#drain
嘗試去下發緩存的數據。其中Emitter#onNext
時先緩存數據在嘗試下發,並且數據還沒下發完成前調用onComplete
和onError
(這裏重寫了tryOnError
)僅先標記完成,還要等數據徹底下發纔會真正調用actual
對應方法。ui
其實這裏咱們仍是能夠學到一些東西的:
- 若是能夠的話,使用
Atomic
包下的類代替volatile
和鎖提升性能- 使用
missed
和wip
來協調多線程分發任務- 多線程中標誌位的判斷最好經過臨時變量存儲判斷並屢次判斷
BackpressureStrategy.LATEST
當數據背壓時只會緩存最後一次下發的數據(經過AtomicReference
來緩存)。具體實現原理和BackpressureStrategy.BUFFER
較爲相似就不貼代碼了。
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//是否已達請求數
if (get() != 0) {
actual.onNext(t);//未達請求數,下發
BackpressureHelper.produced(this, 1);//請求數減1
} else {
onOverflow();//已超過請求,調用對應策略方法
}
}
//
abstract void onOverflow();
}
複製代碼
BackpressureStrategy.DROP
對應的DropAsyncEmitter
和BackpressureStrategy.ERROR
對應的ErrorAsyncEmitter
都是繼承於NoOverflowBaseAsyncEmitter
。實現方式也是很簡單,僅僅在onNext
判斷一下是否已經到達了請求數,未到達就下發,若到達了調用onOverflow()
處理溢出方案。
BackpressureStrategy.DROP
的溢出方案爲空實現即捨去溢出數據BackpressureStrategy.ERROR
的溢出方案爲調用onError
即溢出時報錯
MISS
策略須要下游自行處理背壓問題
BUFFER
策略則在還有數據未下發完成時就算上游調用onComplete
或onError
也會等待數據下發完成
LATEST
策略則當產生背壓時僅會緩存最新的數據
DROP
策略爲背壓時拋棄背壓數據
ERROR
策略是背壓時拋出異常調用onError
在學習源碼時獲得的一些關於多線程的領悟:
- 若是能夠的話,使用
Atomic
包下的類代替volatile
和鎖提升性能- 使用
missed
和wip
來協調多線程分發任務- 多線程中標誌位的判斷最好經過臨時變量存儲判斷並屢次判斷