ThreadPoolExecutor與ForkJoin使用比較

微信公衆號:MyClass社區
若有問題或建議,請公衆號留言java

      前面咱們研究了線程池的使用,可是有人說有比線程池更快的多線程處理機制,今天咱們來看看java併發ForkJoinPool的使用。首先,我以爲forkjoin不是用來替代線程池的,只是適用場景不同,下面進行簡單的測試來看看forkjoin是怎樣高效利用CPU的。簡答說一下Forkjoin 只有你在將一個任務拆分紅小任務時纔有用處。fork-join池是是一個work-stealing工做竊取線程池。就是將大的任務拆分fork成小的任務,而後合併join結果的一種處理機制。微信

普通ForLoop處理

      看看簡單的批處理list,咱們平時可能都會遇到一個很大的隊列,forloop處理起來很耗時,下面是最簡單最直接的處理。不用想,這樣的處理時間複雜度是O(n),基本上是最耗時的。這個沒有什麼好討論的。多線程

private List<Integer> taskList = new ArrayList<>();

@Before
public void initList()
{
    for (int i = 0; i < 200; i++) {
        taskList.add(i);
    }
}

/**
 * 普通循環
 * @throws InterruptedException
 */

@Test
public void normalLoopTest() throws InterruptedException 
{
    Instant start = Instant.now();
    int resultSum = 0;
    for (int i = 0; i < taskList.size(); i++) {
        //模擬cpu計算
        resultSum += taskList.get(i);
    }
    Instant end = Instant.now();
    System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());

}
併發

使用ThreadPoolExecutor

      這裏將list拆成步長爲20的處理跨度,而後線程池提交10個任務,採用CountDownLatch 原子計數器記錄線程執行是否完成,最後獲取Future結果求和(200個任務裏都須要一個for1000000的計算操做)。ide

/**
 * 線程池處理
 * @throws InterruptedException
 */

@Test
public void ThreadPoolExecutorTest() throws InterruptedException, ExecutionException 
{
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Instant start = Instant.now();
    //這裏能夠設置一個外部其餘線程池運行干擾,
   //由於一個應用程序通常的可能有多個線程池。

    //跨度
    int spanSize = 20;
    //處理批次
    int branchSize = taskList.size()/20;
    //任務執行計數器
    CountDownLatch countDownLatch = new CountDownLatch(branchSize);
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
    int resultSum = 0;
    InterferenceThread();
    for (int i = 0; i < branchSize; i++) {
        final int finalI = i;
        futures.add(executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception 
{
                int sum = 0;
                int startIndex = finalI *spanSize;
                int end = (finalI+1)*spanSize;

                for (int j = startIndex; j < end; j++) {
                    //模擬cpu計算
                    sum += taskList.get(j);
                    //這裏模擬IO等待,
                    //Thread.sleep(10);
                    //可是爲了和FJ比較,最好設置爲耗cpu的計算                    
                    int a = 0;
                    for (int n = 0; n < 1000000; n++) {
                         a++;
                    }                                                            
                }
                countDownLatch.countDown();
                return sum;
            }
        }));
    }
    //阻塞等待執行完全部的任務,計數器==0,標識全部任務執行完畢,關閉線程池
    countDownLatch.await();
    if(countDownLatch.getCount() ==0){
        executorService.shutdown();
        for (int i = 0; i < futures.size(); i++) {
            resultSum += futures.get(i).get();
        }
    }
    Instant end = Instant.now();
    System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());
}oop

運行結果:
4個核心線程:resultSum:19900,耗費時間:38ms
10個線程:resultSum:19900,耗費時間:37ms單元測試

總結:我分別進行各類線程個數測試,改變線程池核心線程數,發現對處理時間徹底沒有改進,時間在一個範圍區間變更,沒有任何明顯的變化(我首先設置一4個核心線程,由於我電腦是4核CPU,下邊ForkJoinPool使用的時候,默認poolsize也是和本電腦的核數一致)。測試結果能夠看出,對於計算型cpu密集型任務,增長線程數是沒有任何效果的。因此線程池大小設置和cpu個數和執行IO時間和cup處理時間都有關係。測試

使用ForkJoinPool

      單元測試類,一樣(200個任務裏都須要一個for1000000的計算操做)。可是ForkJoinPool是將200任務進行拆分,而後合併結果:ui

@Test
public void ForkJoinPoolTest() throws InterruptedException 
{
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Instant start = Instant.now();
    InterferenceThread();
    int resultSum = forkJoinPool.invoke(new HandLerAction(taskList,0,taskList.size()-1));
    System.out.println("forkJoinPool.getPoolSize:"+forkJoinPool.getPoolSize());
    Instant end = Instant.now();
    System.out.println("resultSum:"+resultSum+",耗費時間:"+ Duration.between( start,end ).toMillis());
}
this

執行類HandlerAction

public class HandLerAction extends RecursiveTask<Integer> {
    private List<Integer> list;
    private int start;
    private int end;
    public HandLerAction(List<Integer> listint start, int end) {
        this.list = list;
        this.start = start;
        this.end = end;
    }
    /**
     * The main computation performed by this task.
     */

    @Override
    protected Integer compute() 
{
        if(end -start < 1){
            int resultSum = 0;
            for (int i = start; i <= end; i++) {
                try {
                    resultSum += list.get(i);
                    //Thread.sleep(10);
                    //可是爲了和FJ比較,最好設置爲耗cpu的計算                    
                    int a = 0;
                    for (int n = 0; n < 1000000; n++) {
                         a++;
                    }                      
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return resultSum;
        }else {
            //二分法拆分任務隊列
            int middle = (start + end) / 2;
            HandLerAction preAction = new HandLerAction(list,start,middle);
            HandLerAction endAction = new HandLerAction(list,middle+1,end);
            //fork執行
            preAction.fork();
            endAction.fork();
            //結果合併
            return preAction.join()+endAction.join();
        }
    }
}

執行結果:
forkJoinPool.getPoolSize:4
resultSum:19900,耗費時間:45

結果分析:使用forkJoinPool來跑這樣的任務,才200個任務,二分任務隊列,
1.當任務處理數拆分到小於20時,loop執行任務,發現執行耗時在45ms左右
2.當任務處理數拆分到小於10時,loop執行任務,發現執行耗時在30ms左右
3.當任務處理數拆分到小於5時,loop執行任務,發現執行耗時在25左右
4.當任務處理數拆分到小於1時,loop執行任務,發現執行耗時在22左右

總結:能夠看出拆分的任務越多,cpu利用率越高,處理時間也越短,可是實際運用的時候拆分的粒度須要把握好,由於拆分越多,越壓縮CPU使用,而且越小的粒度效果會愈來愈不明顯。因此選擇一個折中的粒度比較好,任務數很少很多,既能節省開闢任務內存消耗,又能合理利用CPU。

寫在最後

      在上面的例子中,處理Sum++求和的時候使用 Thread.sleep 來模擬業務處理時間,發現,線程池仍是線程池,處理起來比forkJoinPool快多了,因此forkJoinPool不適合IO等待型業務邏輯,適合CPU計算型的任務處理,因此瞭解了他們使用的區別,就能夠幫助咱們更好的使用這兩個多線程處理機制。下一篇將繼續討論forkJoinPool一些實現原理。

本文分享自微信公衆號 - MyClass社區(MyClass_ZZ)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索