RxJava介紹5:操做符融合

Operator Fusion

翻譯自 Operator fusion in RxJava 2 html

介紹

RxJava是一個很是強大的庫,儘管它也存在一些問題。特別是性能和內存的問題。
爲了最小化RxJava中的開銷,有許多優化措施,稱爲「操做符融合」。
首先讓咱們回顧下RxJava如何工做以及他們存在哪些問題。 java

Observable

截屏2021-05-17 下午8.43.57.png

  • 構建Observable
  • Observer訂閱Observable
  • Observable經過Observer.onSubscribe方法,將建立的Disposable傳遞給Observer。
  • 隨後Observable能夠調用Observer.onNext傳遞值

Observable不支持背壓,由於Observer沒法將自身處理能力通知給Observable。
android

Flowable

截屏2021-05-17 下午8.42.09.png

與Observable類似,可是沒有Observer和Disposable,而是Subscriber和Subscription。
Subscription具備額外的request(n)方法,Subscriber可使用該方法顯式告知Flowable發出item的請求數量。若是不request值,Flowable將不會發出任何東西,這就是Flowable支持背壓的緣由。 安全

Assembly and subscribe

使用RxJava時,有兩個重要的階段組裝與訂閱:markdown

  • 組裝階段,創建Rx鏈
  • 訂閱階段,啓動Rx鏈(觸發各類操做符內部的「風暴」)

參考下面一段代碼app

Observable.fromArray(1, 2, 3)
	.map { it + 1 }
	.filter { it < 3 }
	.subscribe { println(it) }
複製代碼

過程以下:1.組裝,2.訂閱,3.運行。 僅僅三條Rx鏈就發生了這麼多事情。若是換成Flowable,request(n)將會使得過程更加複雜。
截屏2021-05-17 下午9.19.33.png 異步

Queues and synchronization

操做符的內部實現可能會有內部Queue,用來處理事件。Queue應當被串行訪問(這就意味着要有適當的同步機制)。RxJava2具備基於Atomics的無阻塞同步(例如AtomicInteger)和帶有compareAndSet方法的無限循環。
假設每一個操做符都有本身的內部Queue,那操做符中的Queue和Atomic對象一樣會帶來額外的開銷。
相似下面的代碼,ide

public final class QueueDrainHelper {
    public static <T> boolean postCompleteRequest(...) {
        for (; ; ) {
            long r = state.get();
            long r0 = r & REQUESTED_MASK;
            long u = (r & COMPLETED_MASK) | BackpressureHelper.addCap(r0, n);
            if (state.compareAndSet(r, u)) {
                if (r == COMPLETED_MASK) {
                    postCompleteDrain(n | COMPLETED_MASK, actual, queue, state, isCancelled);
                    return true;
                }
                return false;
            }
        }
    }
}
複製代碼
public final class ObservableObserveOn<T>{
     void drainNormal() {
            int missed = 1;
            SimpleQueue<T> q = this.queue;
            Observer a = this.actual;
            do {
                while(true) {
                    Object v;
                    try {
                        v = q.poll();
                    } catch (Throwable var7) {
                       ...
                    }

                    boolean empty = v == null;                  
                    if (empty) {
                        missed = this.addAndGet(-missed);
                        break;
                    }
                    a.onNext(v);
                }
            } while(missed != 0);

        }
}
複製代碼

Issues

綜上所述,RxJava存在的問題是:oop

  • 組裝開銷,建立Rx鏈,會建立不少對象,這會帶來內存開銷
  • 訂閱開銷,會發生大量通訊,這會帶來性能開銷
  • 分配和串行化開銷-爲每一個操做符建立內部結構(好比隊列和原子對象),帶來內存和性能開銷

Operator fusion

爲了解決某些性能和內存問題,這就是「操做符融合」
操做符融合用兩種類型:post

  • 宏融合(Macro fusion),合併操做符,最大程度的減小在組裝和訂閱階段建立的對象數量。
  • 微融合(Micro fusion),移除沒必要要的同步和在操做符間共享內部結構(例如Queue)

Macro fusion on Assembly

組裝時

組裝時的宏融合專一於最大程度地減小組裝期間建立的Observable和對象。
當咱們談「組裝時」,咱們指的是這個地方。

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
        ObjectHelper.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
    }
}
複製代碼

組裝融合基礎

優化某些Observable的最簡單方法是添加對特殊狀況的檢查,以建立更簡單的Observable。例如Observable.fromArray能夠被降級爲Observable.empty或Observable.just。

public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
複製代碼

ScalarCallable

fuseable包中第一個「進階」優化是ScalarCallable接口

public interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call();
}
複製代碼

它繼承通用的Java Callable,而且具備相同的接口,區別在於不能拋出異常。ScalarCallable是一個標記接口,某個類實現該接口,就意味着該類在組裝期間能夠安全得提取一個常量值(也能夠是null值)。基於以上描述,只有empty和just相關的數據源操做符(Observable/Flowable/Maybe)能夠被 scalarCallable標記。
例如在xMap操做符(flatMap,switchMap,concatMap)中,若是source被標記,則能夠用簡化版本的xMap代替繁瑣的完整實現(比較ObservableFlatMap和ObservableScalarXMap)。

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
        if (this instanceof ScalarCallable) {
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}
複製代碼

FuseToXXX

在fuseable包中有這三個接口

public interface FuseToObservable<T> {
    // Returns a (direct) Observable for the operator.
    Observable<T> fuseToObservable();
}

public interface FuseToMaybe<T> {
    Maybe<T> fuseToMaybe();
}

public interface FuseToFlowable<T> {
    Flowable<T> fuseToFlowable();
}
複製代碼

進一步看看FuseToObservable,其餘兩個接口相似。考慮下面的Rx鏈:

========================================
Observable.range(1, 10)
	.count()
	.toObservable()
	.subscribe { println(it) }
========================================

class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
    @Override
    public Observable<Long> fuseToObservable() {
        return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
    }
}

abstract class Single {
 	public final Observable<T> toObservable() {
        if (this instanceof FuseToObservable) {
            return ((FuseToObservable<T>)this).fuseToObservable();
        }
        return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
    }
}
複製代碼

有或者沒有FuseToObservable,組裝結構是不同的。
截屏2021-05-18 下午1.19.11.png

Macro fusion on subscribe

訂閱期間的宏融合與組裝期間是同樣的,只是發生在subscribeActual方法中。有時候在訂閱前數據是未知的,訂閱時優化會更方便。

public abstract class Observable<T> implements ObservableSource<T> {
    
    @Override
    public final void subscribe(Observer<? super T> observer) {
         subscribeActual(observer);
    }
    
    protected abstract void subscribeActual(Observer<? super T> observer);
}

複製代碼

Basic on subscribe fusion

相似於在組裝期間,咱們添加對特殊狀況的檢查,對subscribe降級。例以下面ObservableAmb代碼

public final class ObservableAmb<T> extends Observable<T> {
    final ObservableSource<? extends T>[] sources;
    final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;

  
    public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T>[] sources = this.sources;
        ...
        if (count == 0) {
            EmptyDisposable.complete(s);
            return;
        } else if (count == 1) {
            sources[0].subscribe(s);
            return;
        }
        AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
        ac.subscribe(sources);
    }
複製代碼

Callable

相似於組裝期間的ScalarCallable。在訂閱期間,會經過Callable進行相似優化。

注意:由於ScalarCallable繼承Callable,因此在組裝期間ScalarCallable上的優化,一樣能夠在訂閱期間應用在Callable上。

好比說 XMap操做符,訂閱繼承了Callable接口的Observables,就有可能會被替換成簡化實現。參考ObservableFlatMap.MergeObserver,這個類太複雜了,我都不想看了。

Observable.fromCallable { 3 }
	.flatMap { Observable.fromArray(it + 1, it + 2) }
    .subscribe { println(it) }
=============================
public final class ObservableFlatMap<T, U> {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}
public final class ObservableScalarXMap {
     public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source, Observer<? super R> observer, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        if (source instanceof Callable) {
            T t;
            try {
                t = ((Callable<T>)source).call();
            } catch (Throwable ex) {
                ...
                return true;
            }
            if (t == null) {
                EmptyDisposable.complete(observer);
                return true;
            }
            ObservableSource<? extends R> r;
            try {
                r = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable ex) {
                ...
                return true;
            }
            ...
            r.subscribe(observer);
            return true;
        }
        return false;
    }
}
複製代碼

Micro fusion

微融合旨在經過減小同步或者共享內部結構來減小開銷。

ConditionalSubscriber

考慮 FlowableFilter操做符

public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T, T> implements ConditionalSubscriber<T> {
        final Predicate<? super T> filter;
        
        @Override
        public void onNext(T t) {
            if (!tryOnNext(t)) {
                s.request(1);
            }
        }

        @Override
        public boolean tryOnNext(T t) {
            boolean  b = filter.test(t);
            if (b) { 
                actual.onNext(t); 
            }
            return b;
        }
    }
}
複製代碼

截屏2021-05-18 下午4.16.06.png
試想若是有兩個filter,filter.test(value)執行兩次。N,request(1)或者 Y,onNext(value) 加起來會被執行兩次。
因而便有了FilterConditionalSubscriber.它聚合了連續的fliter.test(value)

static final class FilterConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
   
    protected final ConditionalSubscriber<? super R> actual;   
    
    final Predicate<? super T> filter;
        @Override
        public void onNext(T t) {
            if (!tryOnNext(t)) {
                s.request(1);
            }
        }
        @Override
        public boolean tryOnNext(T t) {
            return filter.test(t) && actual.tryOnNext(t);
        }
}
複製代碼

截屏2021-05-18 下午4.39.40.png
相似的代碼能夠在FlowableRange,FlowableMap中看到。

Queue fuseable

最複雜的微融合是基於操做符間共享內部隊列。整個優化基於QueueFuseable接口。

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;
	int SYNC = 1;
	int ASYNC = 2;
	int ANY = SYNC | ASYNC;
	int BOUNDARY = 4;
    //Request a fusion mode from the upstream
	int requestFusion(int mode);
}
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}
複製代碼

咱們以Flowable全家桶舉例,一樣適用於Observable。
QueueSubscription接口繼承QueueFuseable和Subscription,容許Flowable子類型操做符之間協商融合模式。
協商(requestFusion),一般發生在訂閱期間,即上游調用subscriber.onSubsribe(Subscription)時。下游要在Subscription.request(n)以前調用Subscription.requestFusion(int mode)。
與常規subscriber的onXXX回調方法相比,上游不只提供subscription,還提供QueueSubscription,從而容許下游直接訪問上游內部隊列。在融合模式下,下游經過調用上游QueueSubscription.poll(),獲取上游數值。

有三種融合模式:

  • NONE — 不容許融合
  • SYNC — 支持同步方式融合
  • ASYNC — 支持異步方式融合


SYNC融合

上游值要麼已經可用,要麼能夠在poll()中同步生成。若是上下游贊成適用SYNC融合,則意味着:

  • 下游在須要時直接調用poll()方法
  • poll()會拋出異常,至關於onError
  • poll()能夠返回null,至關於onComplete
  • poll能夠返回非null值,至關於onNext
  • 上游不會調用任何onXXX回調

參考Flowable.range與observeOn代碼

Flowable.range(1, 9)
	.observeOn(Schedulers.newThread())
	.subscribe { println(it) }
複製代碼

RangeSubscription只支持SYNC融合

public final class FlowableRange extends Flowable<Integer> {
    abstract static class BaseRangeSubscription extends BasicQueueSubscription<Integer> {
        
        @Override
        public final int requestFusion(int mode) {
            return mode & SYNC;
        }

        @Nullable
        @Override
        public final Integer poll() {
            int i = index;
            if (i == end) {
                return null;
            }
            index = i + 1;
            return i;
        }
    }
}
複製代碼

咱們再看FlowableObserveOn.FlowableSubscriber.onSubscribe()方法中,

  • s爲上游RangeSubscription。
  • actual爲下游yourSubscriber
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> {

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.s, s)) {
                //s爲上游RangeSubscription
                this.s = s;

                if (s instanceof QueueSubscription) {
                    QueueSubscription<T> f = (QueueSubscription<T>) s;
                    int m = f.requestFusion(ANY | BOUNDARY);
                    if (m == SYNC) {
                        sourceMode = SYNC;
                        queue = f;
                        done = true;
                        //actual爲下游yourSubscriber,
                        //此時下游會調用本層.request(Long.MAX_VALUE)
                        actual.onSubscribe(this);
                        return;
                    } 
                }
            }
        }
        @Override
        public final void run() {
           	if (sourceMode == SYNC) {
                runSync();
            }
        }
         @Override
        void runSync() {
           	...
            final Subscriber<? super T> a = actual;
            //上游RangeSubScription
            final SimpleQueue<T> q = queue;
            for (;;) {
                while (e != r) {
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        //須要處理異常
                        a.onError(ex);
                        return;
                    }
                    //須要判斷null值
                    if (v == null) {
                        a.onComplete();
                        return;
                    }
                    a.onNext(v);
					...
                }
				...
            }
        } 
    }
}
複製代碼

fuse SYNC相比於no fuse:

  • observerOn少了request(x)
  • observerOn不須要維護內部隊列SpscArrayQueue,使用上游QueueSubScription
  • observerOn與range少了onNext,這就意味着少了worker.schedule線程調度。

ObserveOn內部會去判斷上游subscription是否爲QueueSubscription。FlowableRange內部爲RangeSubscription屬於QueueSubscription,且僅支持SYNC融合模式
截屏2021-05-19 下午3.46.13.png

ASYNC融合

相比於SYNC融合,poll()沒法當即同步獲取上游值。
若是上下游贊成使用ASYNC融合,則意味着:

  • 上游仍然會調用onError,onNext,onComplete
  • 當onNext實際上爲null值時,下游能夠調用poll()獲取實際數值
  • 下游仍然要調用request(x)

ObserveOnSubscriber繼承QueueSubscription,只支持異步融合。

public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> {

        @Override
        public final void run() {
           	if (outputFused) {
                runBackfused();
            }
        }
         @Override
        void runBackfused() {
            for (;;) {
                boolean d = done;
                actual.onNext(null);
                ...
            }
        }
        
        @Override
        public final int requestFusion(int requestedMode) {
            if ((requestedMode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
        
        @Nullable
        @Override
        public T poll() throws Exception {
            T v = queue.poll();
            if (v != null && sourceMode != SYNC) {
                long p = produced + 1;
                if (p == limit) {
                    produced = 0;
                    s.request(p);
                } else {
                    produced = p;
                }
            }
            return v;
        }
    }
}
複製代碼

咱們自定義一個FlowableSubscriber

var qs: QueueSubscription<*>? = null
Flowable.rangeLong(1, 5)
	.observeOn(Schedulers.newThread())
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				s.requestFusion(QueueFuseable.ASYNC)
                qs = s
             }
             s.request(Long.MAX_VALUE)
        }

        //onNext值爲null,須要本身去poll()
        override fun onNext(t: Long?) {
        	println("onNext:$t ")
            while (true) {
            	 var value = qs?.poll()
                 if (value == null) {break }
                 println("onNext poll:$value")
            }
        }
        override fun onError(t: Throwable?) { println("onError:$t") }
        override fun onComplete() {println("onComplete") }
	})
//輸出結果爲:
onNext:null 
onNext poll:1
onNext poll:2
onNext poll:3
onNext poll:4
onNext poll:5
onComplete

//把」Flowable.rangeLong(1, 5)「
//換成」Flowable.intervalRange(1,5,0,100,TimeUnit.MILLISECONDS)「
//輸出結果爲:
onNext:null 
onNext:null 
onNext poll:1
onNext:null 
onNext poll:2
onNext:null 
onNext poll:3
onNext:null 
onNext poll:4
onNext:null 
onNext poll:5
onComplete
複製代碼

先後兩次onNext次數不一樣,區別在於FlowableRangeLong支持同步融合,而FlowableIntervalRange
�不支持融合。

融合的線程問題

poll()可能會使得上游數值計算被切換到新線程。
requestFusion參數包含BOUNDARY時,則告訴上游,poll()會被切換線程。若是上游不但願計算被切換線程,則不經過下游的融合請求。

static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
	@Override
	public int requestFusion(int mode) {
		return transitiveBoundaryFusion(mode);
	}
    
    protected final int transitiveBoundaryFusion(int mode) {
        QueueSubscription<T> qs = this.qs;
        if (qs != null) {
            if ((mode & BOUNDARY) == 0) {
                int m = qs.requestFusion(mode);
                if (m != NONE) {
                    sourceMode = m;
                }
                return m;
            }
        }
        return NONE;
    }
}  
複製代碼
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
	.map { println("int map: value $it,${Thread.currentThread()}");it }
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
				if (s is QueueSubscription<*>) {
					var mode = s.requestFusion(QueueFuseable.SYNC)
                    println("onSubscribe fusion mode result: $mode")
                    qs = s
                 }
                 s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) {
            executor.submit { qs?.poll() } 
        }
		override fun onError(t: Throwable?) { }
		override fun onComplete() {}
     })
//輸出結果: 
//能夠看到map計算是在新線程而非原始線程
onSubscribe fusion mode result: 1
int map: value 1,Thread[pool-1-thread-1,5,main]
int map: value 2,Thread[pool-1-thread-1,5,main]
int map: value 3,Thread[pool-1-thread-1,5,main]
int map: value 4,Thread[pool-1-thread-1,5,main]
int map: value 5,Thread[pool-1-thread-1,5,main]
複製代碼

註釋掉executor.submit { qs?.poll() } ,requestFusion參數新增QueueFuseable.BOUNDARY

var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
	.map { println("int map: value $it,${Thread.currentThread()}");it }
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				var mode = s.requestFusion(QueueFuseable.SYNC or QueueFuseable.BOUNDARY)
                println("onSubscribe fusion mode result: $mode")
                qs = s
            }
            s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) {}
		override fun onError(t: Throwable?) { }
		override fun onComplete() {}
     })
//輸出結果: 
//能夠看到map計算是在新線程而非原始線程
onSubscribe fusion mode result: 0
int map: value 1,Thread[main,5,main]
int map: value 2,Thread[main,5,main]
int map: value 3,Thread[main,5,main]
int map: value 4,Thread[main,5,main]
int map: value 5,Thread[main,5,main]
複製代碼

結論

運算符融合很酷炫,但明沒有在全部操做符中獲得應用。有些運算符的優化,看着簡單,作起來難。
你能夠寫很長的Rx鏈,但千萬不要覺得RxJava能高效地完成全部事情。你本身能優化的,仍是要優化。

參考

Operator fusion in RxJava 2
RxJava2做者親筆:Operator-fusion (Part 1)
RxJava2做者親筆:Operator fusion (part 2 - final)

相關文章
相關標籤/搜索