上一篇文章RxJava 線程模型分析詳細介紹了RxJava的線程模型,被觀察者(Observable、Flowable...)發射的數據流能夠經歷各類線程切換,可是數據流的各個元素之間不會產生並行執行的效果。咱們知道並行並非併發,不是同步,更不是異步。java
Java 8新增了並行流來實現並行的效果,只須要在集合上調用parallelStream()便可。算法
List<Integer> result = new ArrayList();
for(Integer i=1;i<=100;i++) {
result.add(i);
}
result.parallelStream()
.map(new java.util.function.Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer.toString();
}
}).forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});複製代碼
若是要達到相似於 Java8 的 parallel 執行效果,能夠藉助 flatMap 操做符來實現並行的效果。服務器
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});複製代碼
flatMap操做符的原理是將這個Observable轉化爲多個以原Observable發射的數據做爲源數據的Observable,而後再將這多個Observable發射的數據整合發射出來,須要注意的是最後的順序可能會交錯地發射出來。併發
flatMap會對原始Observable發射的每一項數據執行變換操做。在這裏,生成的每一個Observable可使用線程池(指定了computation做爲Scheduler)併發的執行。app
固然咱們還可使用ExecutorService來建立一個Scheduler。負載均衡
int threadNum = Runtime.getRuntime().availableProcessors()+1;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});複製代碼
須要補充的是: 當完成全部的操做以後,ExecutorService須要執行shutdown()來關閉 ExecutorService。在這裏,可使用doFinally操做符來執行shutdown()。異步
doFinally操做符能夠在onError或者onComplete以後調用指定的操做,或由下游處理。ide
增長了doFinally操做符以後,代碼是這樣的。post
int threadNum = Runtime.getRuntime().availableProcessors()+1;
final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
executor.shutdown();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});複製代碼
Round-Robin 算法實現並行spa
Round-Robin算法是最簡單的一種負載均衡算法。它的原理是把來自用戶的請求輪流分配給內部的服務器:從服務器1開始,直到服務器N,而後從新開始循環。也被稱爲哈希取模法,在實際中是很是經常使用的數據分片方法。Round-Robin算法的優勢是其簡潔性,它無需記錄當前全部鏈接的狀態,因此它是一種無狀態調度。
經過 Round-Robin 算法把數據分組, 按線程數分組,分紅5組每組個數相同,一塊兒發送處理。這樣作的目的能夠減小Observable的建立節省系統資源,可是會增長處理時間,Round-Robin 算法能夠當作是對時間和空間的綜合考慮。
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1,100)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return batch.getAndIncrement() % 5;
}
})
.flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
return integerIntegerGroupedObservable.observeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
System.out.println(o);
}
});複製代碼
在這裏,也可使用ExecutorService建立Scheduler,來替代Schedulers.io()
final AtomicInteger batch = new AtomicInteger(0);
int threadNum = 5;
final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return batch.getAndIncrement() % threadNum;
}
})
.flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
return integerIntegerGroupedObservable.observeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
System.out.println(o);
}
});複製代碼