微信公衆號:MyClass社區
若有問題或建議,請公衆號留言java
前面咱們研究了線程池的使用,可是有人說有比線程池更快的多線程處理機制,今天咱們來看看java併發ForkJoinPool的使用。首先,我以爲forkjoin不是用來替代線程池的,只是適用場景不同,下面進行簡單的測試來看看forkjoin是怎樣高效利用CPU的。簡答說一下Forkjoin 只有你在將一個任務拆分紅小任務時纔有用處。fork-join池是是一個work-stealing工做竊取線程池。就是將大的任務拆分fork成小的任務,而後合併join結果的一種處理機制。微信
普通ForLoop處理
看看簡單的批處理list,咱們平時可能都會遇到一個很大的隊列,forloop處理起來很耗時,下面是最簡單最直接的處理。不用想,這樣的處理時間複雜度是O(n),基本上是最耗時的。這個沒有什麼好討論的。多線程
private List<Integer> taskList = new ArrayList<>();
@Before
public void initList(){
for (int i = 0; i < 200; i++) {
taskList.add(i);
}
}
/**
* 普通循環
* @throws InterruptedException
*/
@Test
public void normalLoopTest() throws InterruptedException {
Instant start = Instant.now();
int resultSum = 0;
for (int i = 0; i < taskList.size(); i++) {
//模擬cpu計算
resultSum += taskList.get(i);
}
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());
}
併發
使用ThreadPoolExecutor
這裏將list拆成步長爲20的處理跨度,而後線程池提交10個任務,採用CountDownLatch 原子計數器記錄線程執行是否完成,最後獲取Future結果求和(200個任務裏都須要一個for1000000的計算操做)。ide
/**
* 線程池處理
* @throws InterruptedException
*/
@Test
public void ThreadPoolExecutorTest() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Instant start = Instant.now();
//這裏能夠設置一個外部其餘線程池運行干擾,
//由於一個應用程序通常的可能有多個線程池。
//跨度
int spanSize = 20;
//處理批次
int branchSize = taskList.size()/20;
//任務執行計數器
CountDownLatch countDownLatch = new CountDownLatch(branchSize);
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
int resultSum = 0;
InterferenceThread();
for (int i = 0; i < branchSize; i++) {
final int finalI = i;
futures.add(executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
int startIndex = finalI *spanSize;
int end = (finalI+1)*spanSize;
for (int j = startIndex; j < end; j++) {
//模擬cpu計算
sum += taskList.get(j);
//這裏模擬IO等待,
//Thread.sleep(10);
//可是爲了和FJ比較,最好設置爲耗cpu的計算
int a = 0;
for (int n = 0; n < 1000000; n++) {
a++;
}
}
countDownLatch.countDown();
return sum;
}
}));
}
//阻塞等待執行完全部的任務,計數器==0,標識全部任務執行完畢,關閉線程池
countDownLatch.await();
if(countDownLatch.getCount() ==0){
executorService.shutdown();
for (int i = 0; i < futures.size(); i++) {
resultSum += futures.get(i).get();
}
}
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());
}oop
運行結果:
4個核心線程:resultSum:19900,耗費時間:38ms
10個線程:resultSum:19900,耗費時間:37ms單元測試
總結:我分別進行各類線程個數測試,改變線程池核心線程數,發現對處理時間徹底沒有改進,時間在一個範圍區間變更,沒有任何明顯的變化(我首先設置一4個核心線程,由於我電腦是4核CPU,下邊ForkJoinPool使用的時候,默認poolsize也是和本電腦的核數一致)。測試結果能夠看出,對於計算型cpu密集型任務,增長線程數是沒有任何效果的。因此線程池大小設置和cpu個數和執行IO時間和cup處理時間都有關係。測試
使用ForkJoinPool
單元測試類,一樣(200個任務裏都須要一個for1000000的計算操做)。可是ForkJoinPool是將200任務進行拆分,而後合併結果:ui
@Test
public void ForkJoinPoolTest() throws InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Instant start = Instant.now();
InterferenceThread();
int resultSum = forkJoinPool.invoke(new HandLerAction(taskList,0,taskList.size()-1));
System.out.println("forkJoinPool.getPoolSize:"+forkJoinPool.getPoolSize());
Instant end = Instant.now();
System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());
}
this
執行類HandlerAction
public class HandLerAction extends RecursiveTask<Integer> {
private List<Integer> list;
private int start;
private int end;
public HandLerAction(List<Integer> list, int start, int end) {
this.list = list;
this.start = start;
this.end = end;
}
/**
* The main computation performed by this task.
*/
@Override
protected Integer compute() {
if(end -start < 1){
int resultSum = 0;
for (int i = start; i <= end; i++) {
try {
resultSum += list.get(i);
//Thread.sleep(10);
//可是爲了和FJ比較,最好設置爲耗cpu的計算
int a = 0;
for (int n = 0; n < 1000000; n++) {
a++;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return resultSum;
}else {
//二分法拆分任務隊列
int middle = (start + end) / 2;
HandLerAction preAction = new HandLerAction(list,start,middle);
HandLerAction endAction = new HandLerAction(list,middle+1,end);
//fork執行
preAction.fork();
endAction.fork();
//結果合併
return preAction.join()+endAction.join();
}
}
}
執行結果:
forkJoinPool.getPoolSize:4
resultSum:19900,耗費時間:45
結果分析:使用forkJoinPool來跑這樣的任務,才200個任務,二分任務隊列,
1.當任務處理數拆分到小於20時,loop執行任務,發現執行耗時在45ms左右
2.當任務處理數拆分到小於10時,loop執行任務,發現執行耗時在30ms左右
3.當任務處理數拆分到小於5時,loop執行任務,發現執行耗時在25左右
4.當任務處理數拆分到小於1時,loop執行任務,發現執行耗時在22左右
總結:能夠看出拆分的任務越多,cpu利用率越高,處理時間也越短,可是實際運用的時候拆分的粒度須要把握好,由於拆分越多,越壓縮CPU使用,而且越小的粒度效果會愈來愈不明顯。因此選擇一個折中的粒度比較好,任務數很少很多,既能節省開闢任務內存消耗,又能合理利用CPU。
寫在最後
在上面的例子中,處理Sum++求和的時候使用 Thread.sleep 來模擬業務處理時間,發現,線程池仍是線程池,處理起來比forkJoinPool快多了,因此forkJoinPool不適合IO等待型業務邏輯,適合CPU計算型的任務處理,因此瞭解了他們使用的區別,就能夠幫助咱們更好的使用這兩個多線程處理機制。下一篇將繼續討論forkJoinPool一些實現原理。
本文分享自微信公衆號 - MyClass社區(MyClass_ZZ)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。