RxJava 並行操做

上一篇文章RxJava 線程模型分析詳細介紹了RxJava的線程模型,被觀察者(Observable、Flowable...)發射的數據流能夠經歷各類線程切換,可是數據流的各個元素之間不會產生並行執行的效果。咱們知道並行並非併發,不是同步,更不是異步。java

Java 8新增了並行流來實現並行的效果,只須要在集合上調用parallelStream()便可。算法

List<Integer> result = new ArrayList();
        for(Integer i=1;i<=100;i++) {

            result.add(i);
        }

        result.parallelStream()
                .map(new java.util.function.Function<Integer, String>() {


            @Override
            public String apply(Integer integer) {
                return integer.toString();
            }
        }).forEach(new java.util.function.Consumer<String>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });複製代碼

若是要達到相似於 Java8 的 parallel 執行效果,能夠藉助 flatMap 操做符來實現並行的效果。服務器

Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.computation())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });複製代碼

flatMap操做符的原理是將這個Observable轉化爲多個以原Observable發射的數據做爲源數據的Observable,而後再將這多個Observable發射的數據整合發射出來,須要注意的是最後的順序可能會交錯地發射出來。併發

flatMap.png
flatMap.png

flatMap會對原始Observable發射的每一項數據執行變換操做。在這裏,生成的每一個Observable可使用線程池(指定了computation做爲Scheduler)併發的執行。app

固然咱們還可使用ExecutorService來建立一個Scheduler。負載均衡

int threadNum = Runtime.getRuntime().availableProcessors()+1;

        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });複製代碼

須要補充的是: 當完成全部的操做以後,ExecutorService須要執行shutdown()來關閉 ExecutorService。在這裏,可使用doFinally操做符來執行shutdown()。異步

doFinally操做符能夠在onError或者onComplete以後調用指定的操做,或由下游處理。ide

增長了doFinally操做符以後,代碼是這樣的。post

int threadNum = Runtime.getRuntime().availableProcessors()+1;

        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);
        Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        executor.shutdown();
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });複製代碼

Round-Robin 算法實現並行spa

Round-Robin算法是最簡單的一種負載均衡算法。它的原理是把來自用戶的請求輪流分配給內部的服務器:從服務器1開始,直到服務器N,而後從新開始循環。也被稱爲哈希取模法,在實際中是很是經常使用的數據分片方法。Round-Robin算法的優勢是其簡潔性,它無需記錄當前全部鏈接的狀態,因此它是一種無狀態調度。

經過 Round-Robin 算法把數據分組, 按線程數分組,分紅5組每組個數相同,一塊兒發送處理。這樣作的目的能夠減小Observable的建立節省系統資源,可是會增長處理時間,Round-Robin 算法能夠當作是對時間和空間的綜合考慮。

final AtomicInteger batch = new AtomicInteger(0);

        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % 5;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });複製代碼

在這裏,也可使用ExecutorService建立Scheduler,來替代Schedulers.io()

final AtomicInteger batch = new AtomicInteger(0);

        int threadNum = 5;

        final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        final Scheduler scheduler = Schedulers.from(executor);

        Observable.range(1,100)
                .groupBy(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return batch.getAndIncrement() % threadNum;
                    }
                })
                .flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
                        return integerIntegerGroupedObservable.observeOn(scheduler)
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(@NonNull Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(@NonNull Object o) throws Exception {
                        System.out.println(o);
                    }
                });複製代碼
相關文章
相關標籤/搜索