博客主頁java
被觀察者( Observable/Flowable/Single/Completable/May )發射的數據流能夠經歷各類線程切換,可是數據流的各個元素之間不會產生並行執行的效果。井行不是併發,也不是同步,更不是異步。算法
併發( concurrency )是指一個處理器同時處理多個任務。並行( parallelism )是多個處理器
或者是多核的處理器同時處理多個不一樣的任務。井行是同時發生的多個併發事件,具備井發的含義,而併發則不必定是並行。segmentfault
在 RxJava 中能夠藉助 flatMap 操做符來實現服務器
Observable.range(1, 100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(Schedulers.computation()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } });
flatMap 操做符的原理是將這個 Observable 轉化爲爲多個以原 Observable 發射的數據做爲源數據 Observable,而後再將這多個 Observable 發射的數據整合發射出來。最後的順序可能會交錯地發射出來。併發
flatMap 會對原始 Observable 發射的每一項數據執行變換操做。在這裏,生成的每一個 Observable 使用線程池(指定了 computation 做爲 Scheduler )併發地執行。app
還可使用 ExecutorService 來建立一個 Scheduler 對剛纔的代碼稍微作一些改動。負載均衡
int threadNum = Runtime.getRuntime().availableProcessors() + 1; final ExecutorService executorService = Executors.newFixedThreadPool(threadNum); final Scheduler scheduler = Schedulers.from(executorService); Observable.range(1, 100) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(integer) .subscribeOn(scheduler) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .doFinally(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Finally."); executorService.shutdown(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error."); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } });
當完成全部的操做以後, executorService 須要執行 shutdown() 來關閉 ExecutorService。咱們可使用 doFinally 操做符來執行 shutdown()。異步
doFinally 操做符能夠在 onError 或者 onComplete 以後調用指定的操做,或由下游處理。ide
Round-Robin 算法是最簡單的一種負載均衡算法。它的原理是把來自用戶的請求輪流分配給內部的服務器:從服務器 1 開始,直到服務器 N、,而後從新開始循環,也被稱爲啥希取模法,是很是經常使用的數據分片方法。 Round-Robin 算法的優勢是簡潔,它無須記錄當前全部鏈接的狀態,因此是一種無狀態調度。fetch
經過 Round-Robin 算法把數據按線程數分組,例如分紅 5 組,每組個數相同,一塊兒發送處理。這樣作的目的是能夠減小 Observable 的建立 ,從而節省系統資源,可是會增長處理時間。Round-Robin 算法能夠當作是對時間和空間的綜合考慮。
final AtomicInteger batch = new AtomicInteger(0); Observable.range(1, 100) .groupBy(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return batch.getAndIncrement() % 5; } }) .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception { return integerIntegerGroupedObservable.observeOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }); } }) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } });
RxJava 2.0.5 版本新增了 ParallelFlowable API ,它容許並行地執行一些操做符,例如 map、filter、concatMap、flatMap、collect、reduce 等。
public abstract class ParallelFlowable<T> { }
ParallelFlowable 是並行的 Flowable 版本,並非新增的被觀察者類型。在 ParallelFlowable 中,不少典型的操做符( take、skip 等)是不可用的。
在 RxJava 並無 ParallelObservable ,由於在 RxJava 2.x 以後, Observable 再也不支持背壓。
然而在並行處理中背壓是必不可少的, 不然會淹沒在並行操做符的內部隊列中。
也不存在 ParallelSingle、ParallelCompletable、ParallelMaybe
在相應的操做符上調用 Flowable 的 parallel() 就會返回 ParallelFlowable
ParallelFlowable<Integer> parallelFlowable = Flowable.range(1, 100).parallel(); parallelFlowable .runOn(Schedulers.io()) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return integer.toString(); } }) .sequential() .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } });
其中 parallel() 調用了 ParallelFlowable.from
public final ParallelFlowable<T> parallel() { return ParallelFlowable.from(this); } public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) { return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize()); }
Paralle!ParallelFlowable 的 from() 方法是經過 Publisher 並以循環的方式在多個「軌道」 (CPU 數)上消費它的。
默認狀況下,並行級別被設置爲可用 CPU 的數量 ( Runtime.getRuntime().availableProcessors() ),井且順序源的預取量設置爲 Flowable.bufferSize()。二者均可以經過重載 parallel() 方法來指定。
public final ParallelFlowable<T> parallel(int parallelism) { ObjectHelper.verifyPositive(parallelism, "parallelism"); return ParallelFlowable.from(this, parallelism); } public final ParallelFlowable<T> parallel(int parallelism, int prefetch) { ObjectHelper.verifyPositive(parallelism, "parallelism"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return ParallelFlowable.from(this, parallelism, prefetch); }
若是己經使用了必要的井行操做,則能夠經過 ParallelFlowable.sequential() 操做符返回到順序流。
ParallelFlowable 遵循與 Flowable 相同的異步原理,所以 parallel() 自己並不引入順序源的異
步消耗,只准備並行流,可是能夠經過 runOn(Scheduler) 操做符定義異步。這點與 Flowable
很大不一樣, Flowable 使用 subscribeOn, observeOn 操做符。
runOn() 能夠指定 prefetch 的數量。
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler) { return runOn(scheduler, Flowable.bufferSize()); } public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch)); }
並不是全部的順序操做在並行世界中都是有意義的。目前 ParallelFlowable 只支持以下操做
map, filter, flatMap, concatMap, reduce, collect, sorted, toSortedList, compose,fromArray, doOnCancel, doOnError, doOnComplete, doOnNext, doAfterNext, doOnSubscribe, doAfterTerminated, doOnRequest
這些 ParallelFlowable 可用的操做符,使用方法與 Flowable 中的用法
Observable.flatMap 來實現並行, Flowable.flatMap 實現井行的原理和 Observable.flatMap 實現並行的原理相同。
那麼何時使用 flatMap 進行並行處理比較好,何時使用 ParallelFlowable 比較好呢?
RxJava 本質上是連續的,藉助 flatMap 操做符進行分離和加入一個序列可能會變得很複雜,
並引發必定的開銷 。是若是使用 ParallelF!owable ,則開銷會更小。
然而, parallelFlowable!Flowable 的操做符頗有限,若是有一些特殊的操做須要並行執行,而這些操
做不能用 parallelFlowable 所支持的操做符來表達,那麼就應該使用基於 Flowable.flatMap 來實
現井行。
所以,優先推薦使用 parallelFlowable ,對於沒法使用 parallelFlowable 的操做符,則可使
flatMap 來實現井行。
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)