談到並行,咱們可能最早想到的是線程,多個線程一塊兒運行,來提升咱們系統的總體處理速度;爲何使用多個線程就能提升處理速度,由於如今計算機廣泛都是多核處理器,咱們須要充分利用cpu資源;若是站的更高一點來看,咱們每臺機器均可以是一個處理節點,多臺機器並行處理;並行的處理方式能夠說無處不在,本文主要來談談Java在並行處理方面的努力。java
Java的垃圾回收器,咱們能夠看到每一代版本的更新,伴隨着GC更短的延遲,從serial到cms再到如今的G1,一直在摘掉Java慢的帽子;消息隊列從早期的ActiveMQ到如今的kafka和RocketMQ,引入的分區的概念,提升了消息的並行性;數據庫單表數據到必定量級以後,訪問速度會很慢,咱們會對錶進行分表處理,引入數據庫中間件;Redis你可能以爲自己處理是單線程的,可是Redis的集羣方案中引入了slot(槽)的概念;更廣泛的就是咱們不少的業務系統,一般會部署多臺,經過負載均衡器來進行分發;好了還有其餘的一些例子,此處不在一一例舉。數據庫
我以爲並行的核心在於"拆分",把大任務變成小任務,而後利用多核CPU也好,仍是多節點也好,同時並行的處理,Java歷代版本的更新,都在爲咱們開發者提供更方便的並行處理,從開始的Thread,到線程池,再到fork/join框架,最後到流處理,下面使用簡單的求和例子來看看各類方式是如何並行處理的;bash
首先看一下最簡單的單線程處理方式,直接使用主線程進行求和操做;併發
public class SingleThread {
public static long[] numbers;
public static void main(String[] args) {
numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
long sum = 0;
for (int i = 0; i < numbers.length; i++) {
sum += numbers[i];
}
System.out.println("sum = " + sum);
}
}
複製代碼
求和自己是一個計算密集型任務,可是如今已是多核時代,只用單線程,至關於只使用了其中一個cpu,其餘cpu被閒置,資源的浪費;負載均衡
咱們把任務拆分紅多個小任務,而後每一個小任務分別啓動一個線程,以下所示:框架
public class ThreadTest {
public static final int THRESHOLD = 10_000;
public static long[] numbers;
private static long allSum;
public static void main(String[] args) throws Exception {
numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
int taskSize = (int) (numbers.length / THRESHOLD);
for (int i = 1; i <= taskSize; i++) {
final int key = i;
new Thread(new Runnable() {
public void run() {
sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD));
}
}).start();
}
Thread.sleep(100);
System.out.println("allSum = " + getAllSum());
}
private static synchronized long sumAll(long threadSum) {
return allSum += threadSum;
}
public static synchronized long getAllSum() {
return allSum;
}
private static long sum(int start, int end) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
複製代碼
以上指定了一個拆分閥值,計算拆分多少個認爲,同時啓動多少線程;這種處理就是啓動的線程數過多,而CPU數有限,更重要的是求和是一個計算密集型任務,啓動過多的線程只會帶來更多的線程上下文切換;同時線程處理完一個任務就終止了,也是對資源的浪費;另外能夠看到主線程不知道什麼時候子任務已經處理完了,須要作額外的處理;全部Java後續引入了線程池。ide
jdk1.5引入了併發包,其中包括了ThreadPoolExecutor,相關代碼以下:測試
public class ExecutorServiceTest {
public static final int THRESHOLD = 10_000;
public static long[] numbers;
public static void main(String[] args) throws Exception {
numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
int taskSize = (int) (numbers.length / THRESHOLD);
for (int i = 1; i <= taskSize; i++) {
final int key = i;
completionService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
return sum((key - 1) * THRESHOLD, key * THRESHOLD);
}
});
}
long sumValue = 0;
for (int i = 0; i < taskSize; i++) {
sumValue += completionService.take().get();
}
// 全部任務已經完成,關閉線程池
System.out.println("sumValue = " + sumValue);
executor.shutdown();
}
private static long sum(int start, int end) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
複製代碼
上面已經分析了計算密集型並非線程越多越好,這裏建立了JDK默認的線程數:CPU數+1,這是一個通過大量測試之後給出的一個結果;線程池顧名思義,能夠重複利用現有的線程;同時利用CompletionService來對子任務進行彙總;合理的使用線程池已經能夠充分的並行處理任務,只是在寫法上有點繁瑣,此時JDK1.7中引入了fork/join框架;ui
分支/合併框架的目的是以遞歸的方式將能夠並行的認爲拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果;相關代碼以下:this
public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinTest(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinTest(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
// 注:join方法會阻塞,所以有必要在兩個子任務的計算都開始以後才執行join方法
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static void main(String[] args) {
System.out.println(forkJoinSum(10_000_000));
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinTest(numbers);
return new ForkJoinPool().invoke(task);
}
}
複製代碼
ForkJoinPool是ExecutorService接口的一個實現,子認爲分配給線程池中的工做線程;同時須要把任務提交到此線程池中,須要建立RecursiveTask的一個子類;大致邏輯就是經過fork進行拆分,而後經過join進行結果的合併,JDK爲咱們提供了一個框架,咱們只須要在裏面填充便可,更加方便;有沒有更簡單的方式,連拆分都省了,自動拆分合並,jdk在1.8中引入了流的概念;
Java8引入了stream的概念,可讓咱們更好的利用並行,使用流代碼以下:
public class StreamTest {
public static void main(String[] args) {
System.out.println("sum = " + parallelRangedSum(10_000_000));
}
public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
}
}
複製代碼
以上代碼是否是很是簡單,對於開發者來講徹底不須要手動拆分,使用同步機制等方式,就可讓任務並行處理,只須要對流使用parallel()方法,系統自動會對任務進行拆分,固然前提是沒有共享可變狀態;其實並行流內部使用的也是fork/join框架;
本文使用一個求和的實例,來介紹了jdk爲開發者提供並行處理的各類方式,能夠看到Java一直在爲提供更方便的並行處理而努力。
<<java8實戰>>