翻譯自 Operator fusion in RxJava 2 html
RxJava是一個很是強大的庫,儘管它也存在一些問題。特別是性能和內存的問題。
爲了最小化RxJava中的開銷,有許多優化措施,稱爲「操做符融合」。
首先讓咱們回顧下RxJava如何工做以及他們存在哪些問題。 java
Observable不支持背壓,由於Observer沒法將自身處理能力通知給Observable。
android
與Observable類似,可是沒有Observer和Disposable,而是Subscriber和Subscription。
Subscription具備額外的request(n)方法,Subscriber可使用該方法顯式告知Flowable發出item的請求數量。若是不request值,Flowable將不會發出任何東西,這就是Flowable支持背壓的緣由。 安全
使用RxJava時,有兩個重要的階段組裝與訂閱:markdown
參考下面一段代碼app
Observable.fromArray(1, 2, 3)
.map { it + 1 }
.filter { it < 3 }
.subscribe { println(it) }
複製代碼
過程以下:1.組裝,2.訂閱,3.運行。 僅僅三條Rx鏈就發生了這麼多事情。若是換成Flowable,request(n)將會使得過程更加複雜。
異步
操做符的內部實現可能會有內部Queue,用來處理事件。Queue應當被串行訪問(這就意味着要有適當的同步機制)。RxJava2具備基於Atomics的無阻塞同步(例如AtomicInteger)和帶有compareAndSet方法的無限循環。
假設每一個操做符都有本身的內部Queue,那操做符中的Queue和Atomic對象一樣會帶來額外的開銷。
相似下面的代碼,ide
public final class QueueDrainHelper {
public static <T> boolean postCompleteRequest(...) {
for (; ; ) {
long r = state.get();
long r0 = r & REQUESTED_MASK;
long u = (r & COMPLETED_MASK) | BackpressureHelper.addCap(r0, n);
if (state.compareAndSet(r, u)) {
if (r == COMPLETED_MASK) {
postCompleteDrain(n | COMPLETED_MASK, actual, queue, state, isCancelled);
return true;
}
return false;
}
}
}
}
複製代碼
public final class ObservableObserveOn<T>{
void drainNormal() {
int missed = 1;
SimpleQueue<T> q = this.queue;
Observer a = this.actual;
do {
while(true) {
Object v;
try {
v = q.poll();
} catch (Throwable var7) {
...
}
boolean empty = v == null;
if (empty) {
missed = this.addAndGet(-missed);
break;
}
a.onNext(v);
}
} while(missed != 0);
}
}
複製代碼
綜上所述,RxJava存在的問題是:oop
爲了解決某些性能和內存問題,這就是「操做符融合」
操做符融合用兩種類型:post
組裝時的宏融合專一於最大程度地減小組裝期間建立的Observable和對象。
當咱們談「組裝時」,咱們指的是這個地方。
public abstract class Observable<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
}
}
複製代碼
優化某些Observable的最簡單方法是添加對特殊狀況的檢查,以建立更簡單的Observable。例如Observable.fromArray能夠被降級爲Observable.empty或Observable.just。
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
複製代碼
fuseable包中第一個「進階」優化是ScalarCallable接口
public interface ScalarCallable<T> extends Callable<T> {
@Override
T call();
}
複製代碼
它繼承通用的Java Callable,而且具備相同的接口,區別在於不能拋出異常。ScalarCallable是一個標記接口,某個類實現該接口,就意味着該類在組裝期間能夠安全得提取一個常量值(也能夠是null值)。基於以上描述,只有empty和just相關的數據源操做符(Observable/Flowable/Maybe)能夠被 scalarCallable標記。
例如在xMap操做符(flatMap,switchMap,concatMap)中,若是source被標記,則能夠用簡化版本的xMap代替繁瑣的完整實現(比較ObservableFlatMap和ObservableScalarXMap)。
public abstract class Observable<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
if (this instanceof ScalarCallable) {
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
複製代碼
在fuseable包中有這三個接口
public interface FuseToObservable<T> {
// Returns a (direct) Observable for the operator.
Observable<T> fuseToObservable();
}
public interface FuseToMaybe<T> {
Maybe<T> fuseToMaybe();
}
public interface FuseToFlowable<T> {
Flowable<T> fuseToFlowable();
}
複製代碼
進一步看看FuseToObservable,其餘兩個接口相似。考慮下面的Rx鏈:
========================================
Observable.range(1, 10)
.count()
.toObservable()
.subscribe { println(it) }
========================================
class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
@Override
public Observable<Long> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
}
}
abstract class Single {
public final Observable<T> toObservable() {
if (this instanceof FuseToObservable) {
return ((FuseToObservable<T>)this).fuseToObservable();
}
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}
}
複製代碼
有或者沒有FuseToObservable,組裝結構是不同的。
訂閱期間的宏融合與組裝期間是同樣的,只是發生在subscribeActual方法中。有時候在訂閱前數據是未知的,訂閱時優化會更方便。
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
複製代碼
相似於在組裝期間,咱們添加對特殊狀況的檢查,對subscribe降級。例以下面ObservableAmb代碼
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T>[] sources = this.sources;
...
if (count == 0) {
EmptyDisposable.complete(s);
return;
} else if (count == 1) {
sources[0].subscribe(s);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
ac.subscribe(sources);
}
複製代碼
相似於組裝期間的ScalarCallable。在訂閱期間,會經過Callable進行相似優化。
注意:由於ScalarCallable繼承Callable,因此在組裝期間ScalarCallable上的優化,一樣能夠在訂閱期間應用在Callable上。
好比說 XMap操做符,訂閱繼承了Callable接口的Observables,就有可能會被替換成簡化實現。參考ObservableFlatMap.MergeObserver,這個類太複雜了,我都不想看了。
Observable.fromCallable { 3 }
.flatMap { Observable.fromArray(it + 1, it + 2) }
.subscribe { println(it) }
=============================
public final class ObservableFlatMap<T, U> {
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
public final class ObservableScalarXMap {
public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source, Observer<? super R> observer, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
if (source instanceof Callable) {
T t;
try {
t = ((Callable<T>)source).call();
} catch (Throwable ex) {
...
return true;
}
if (t == null) {
EmptyDisposable.complete(observer);
return true;
}
ObservableSource<? extends R> r;
try {
r = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
...
return true;
}
...
r.subscribe(observer);
return true;
}
return false;
}
}
複製代碼
考慮 FlowableFilter操做符
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T, T> implements ConditionalSubscriber<T> {
final Predicate<? super T> filter;
@Override
public void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}
@Override
public boolean tryOnNext(T t) {
boolean b = filter.test(t);
if (b) {
actual.onNext(t);
}
return b;
}
}
}
複製代碼
試想若是有兩個filter,filter.test(value)執行兩次。N,request(1)或者 Y,onNext(value) 加起來會被執行兩次。
因而便有了FilterConditionalSubscriber.它聚合了連續的fliter.test(value)
static final class FilterConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
protected final ConditionalSubscriber<? super R> actual;
final Predicate<? super T> filter;
@Override
public void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}
@Override
public boolean tryOnNext(T t) {
return filter.test(t) && actual.tryOnNext(t);
}
}
複製代碼
相似的代碼能夠在FlowableRange,FlowableMap中看到。
最複雜的微融合是基於操做符間共享內部隊列。整個優化基於QueueFuseable接口。
public interface QueueFuseable<T> extends SimpleQueue<T> {
int NONE = 0;
int SYNC = 1;
int ASYNC = 2;
int ANY = SYNC | ASYNC;
int BOUNDARY = 4;
//Request a fusion mode from the upstream
int requestFusion(int mode);
}
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}
複製代碼
咱們以Flowable全家桶舉例,一樣適用於Observable。
QueueSubscription接口繼承QueueFuseable和Subscription,容許Flowable子類型操做符之間協商融合模式。
協商(requestFusion),一般發生在訂閱期間,即上游調用subscriber.onSubsribe(Subscription)時。下游要在Subscription.request(n)以前調用Subscription.requestFusion(int mode)。
與常規subscriber的onXXX回調方法相比,上游不只提供subscription,還提供QueueSubscription,從而容許下游直接訪問上游內部隊列。在融合模式下,下游經過調用上游QueueSubscription.poll(),獲取上游數值。
�
有三種融合模式:
上游值要麼已經可用,要麼能夠在poll()中同步生成。若是上下游贊成適用SYNC融合,則意味着:
參考Flowable.range與observeOn代碼
Flowable.range(1, 9)
.observeOn(Schedulers.newThread())
.subscribe { println(it) }
複製代碼
RangeSubscription只支持SYNC融合
public final class FlowableRange extends Flowable<Integer> {
abstract static class BaseRangeSubscription extends BasicQueueSubscription<Integer> {
@Override
public final int requestFusion(int mode) {
return mode & SYNC;
}
@Nullable
@Override
public final Integer poll() {
int i = index;
if (i == end) {
return null;
}
index = i + 1;
return i;
}
}
}
複製代碼
咱們再看FlowableObserveOn.FlowableSubscriber.onSubscribe()方法中,
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
//s爲上游RangeSubscription
this.s = s;
if (s instanceof QueueSubscription) {
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
//actual爲下游yourSubscriber,
//此時下游會調用本層.request(Long.MAX_VALUE)
actual.onSubscribe(this);
return;
}
}
}
}
@Override
public final void run() {
if (sourceMode == SYNC) {
runSync();
}
}
@Override
void runSync() {
...
final Subscriber<? super T> a = actual;
//上游RangeSubScription
final SimpleQueue<T> q = queue;
for (;;) {
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
//須要處理異常
a.onError(ex);
return;
}
//須要判斷null值
if (v == null) {
a.onComplete();
return;
}
a.onNext(v);
...
}
...
}
}
}
}
複製代碼
fuse SYNC相比於no fuse:
ObserveOn內部會去判斷上游subscription是否爲QueueSubscription。FlowableRange內部爲RangeSubscription屬於QueueSubscription,且僅支持SYNC融合模式
相比於SYNC融合,poll()沒法當即同步獲取上游值。
若是上下游贊成使用ASYNC融合,則意味着:
ObserveOnSubscriber繼承QueueSubscription,只支持異步融合。
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> {
@Override
public final void run() {
if (outputFused) {
runBackfused();
}
}
@Override
void runBackfused() {
for (;;) {
boolean d = done;
actual.onNext(null);
...
}
}
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
s.request(p);
} else {
produced = p;
}
}
return v;
}
}
}
複製代碼
咱們自定義一個FlowableSubscriber
var qs: QueueSubscription<*>? = null
Flowable.rangeLong(1, 5)
.observeOn(Schedulers.newThread())
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
s.requestFusion(QueueFuseable.ASYNC)
qs = s
}
s.request(Long.MAX_VALUE)
}
//onNext值爲null,須要本身去poll()
override fun onNext(t: Long?) {
println("onNext:$t ")
while (true) {
var value = qs?.poll()
if (value == null) {break }
println("onNext poll:$value")
}
}
override fun onError(t: Throwable?) { println("onError:$t") }
override fun onComplete() {println("onComplete") }
})
//輸出結果爲:
onNext:null
onNext poll:1
onNext poll:2
onNext poll:3
onNext poll:4
onNext poll:5
onComplete
//把」Flowable.rangeLong(1, 5)「
//換成」Flowable.intervalRange(1,5,0,100,TimeUnit.MILLISECONDS)「
//輸出結果爲:
onNext:null
onNext:null
onNext poll:1
onNext:null
onNext poll:2
onNext:null
onNext poll:3
onNext:null
onNext poll:4
onNext:null
onNext poll:5
onComplete
複製代碼
先後兩次onNext次數不一樣,區別在於FlowableRangeLong支持同步融合,而FlowableIntervalRange
�不支持融合。
poll()可能會使得上游數值計算被切換到新線程。
requestFusion參數包含BOUNDARY時,則告訴上游,poll()會被切換線程。若是上游不但願計算被切換線程,則不經過下游的融合請求。
static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
protected final int transitiveBoundaryFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null) {
if ((mode & BOUNDARY) == 0) {
int m = qs.requestFusion(mode);
if (m != NONE) {
sourceMode = m;
}
return m;
}
}
return NONE;
}
}
複製代碼
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
.map { println("int map: value $it,${Thread.currentThread()}");it }
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
var mode = s.requestFusion(QueueFuseable.SYNC)
println("onSubscribe fusion mode result: $mode")
qs = s
}
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Long?) {
executor.submit { qs?.poll() }
}
override fun onError(t: Throwable?) { }
override fun onComplete() {}
})
//輸出結果:
//能夠看到map計算是在新線程而非原始線程
onSubscribe fusion mode result: 1
int map: value 1,Thread[pool-1-thread-1,5,main]
int map: value 2,Thread[pool-1-thread-1,5,main]
int map: value 3,Thread[pool-1-thread-1,5,main]
int map: value 4,Thread[pool-1-thread-1,5,main]
int map: value 5,Thread[pool-1-thread-1,5,main]
複製代碼
註釋掉executor.submit { qs?.poll() } ,requestFusion參數新增QueueFuseable.BOUNDARY
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
.map { println("int map: value $it,${Thread.currentThread()}");it }
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
var mode = s.requestFusion(QueueFuseable.SYNC or QueueFuseable.BOUNDARY)
println("onSubscribe fusion mode result: $mode")
qs = s
}
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Long?) {}
override fun onError(t: Throwable?) { }
override fun onComplete() {}
})
//輸出結果:
//能夠看到map計算是在新線程而非原始線程
onSubscribe fusion mode result: 0
int map: value 1,Thread[main,5,main]
int map: value 2,Thread[main,5,main]
int map: value 3,Thread[main,5,main]
int map: value 4,Thread[main,5,main]
int map: value 5,Thread[main,5,main]
複製代碼
運算符融合很酷炫,但明沒有在全部操做符中獲得應用。有些運算符的優化,看着簡單,作起來難。
你能夠寫很長的Rx鏈,但千萬不要覺得RxJava能高效地完成全部事情。你本身能優化的,仍是要優化。
Operator fusion in RxJava 2
RxJava2做者親筆:Operator-fusion (Part 1)
RxJava2做者親筆:Operator fusion (part 2 - final)