深刻理解線程Java線程池系列二之ForkJoinPool

ForkJoinPool核心原理

ForkJoinPool是JDK7引入的線程池,核心思想是將大的任務拆分紅多個小任務(即fork),而後在將多個小任務處理彙總到一個結果上(即join),很是像MapReduce處理原理。同時,它提供基本的線程池功能,支持設置最大併發線程數,支持任務排隊,支持線程池中止,支持線程池使用狀況監控,也是AbstractExecutorService的子類,主要引入了「工做竊取」機制,在多CPU計算機上處理性能更佳。java

先來看一下這個work-stealing處理機制程序員

blob.png

work-stealing(工做竊取),ForkJoinPool提供了一個更有效的利用線程的機制,當ThreadPoolExecutor還在用單個隊列存聽任務時,ForkJoinPool已經分配了與線程數相等的隊列,當有任務加入線程池時,會被平均分配到對應的隊列上,各線程進行正常工做,當有線程提早完成時,會從隊列的末端「竊取」其餘線程未執行完的任務,當任務量特別大時,CPU多的計算機會表現出更好的性能。併發

經常使用方法

 ForkJoinPool是Java線程池的第二版實現,在JDK7開始提供。構建方法主要是經過構造函數dom

public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

ForkJoinPool線程應用很簡單,刪減了ThreadPoolExecutor線程中一些參數配置,主要提供了併發數的配置,其餘參數由系統默認指定,不須要程序員干預,若不指定值,默認等CPU核數。ide

ForkJoinPool線程池也是經過submit方法提交待執行任務,任務的定義主要經過兩個接口:函數

  • RecursiveTask—支持任務設置返回結果;
  • RecursiveAction—不提供直接返回結果;

經過shutdown關閉線程池,但要等線程池中的隊列任務所有執行完成,經過shutdownNow關閉線程池並當即中止線程池待執行的任務。性能

線程池監控測試

在線程池使用監控方面,主要經過以下方法:    ui

  • isTerminated—判斷線程池對應的workQueue中是否有待執行任務未執行完;
  • awaitTermination—判斷線程池是否在約定時間內完成,並返回完成狀態;
  • getQueuedSubmissionCount—獲取全部待執行的任務數;
  • getRunningThreadCount—獲取正在運行的任務數。

這樣咱們就可以很容易計算出線程的執行進度等信息,下面樣例會展現。this

應用示例

Fork/Join示例(模擬統計大於18歲的人數)

Fork/Join示意圖

定義任務

package com.zhihuiku.threadpool.forkjoin;

public class Person {

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    private String name;

    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

}

package com.zhihuiku.threadpool.forkjoin;

import java.util.concurrent.RecursiveTask;

public class JoinTask extends RecursiveTask {

    private static final long serialVersionUID = 1L;

    private Person[] persons = null;

    private int start;

    private int end;

    public JoinTask(Person[] persons, int start, int end) {
        this.persons = persons;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
    if (end - start < 1000) {
        Person p = null;
        long young = 0;
        for (int i = start; i > 18; i++) {
            young++;
        }
        return young;
    } else {
        int middle = (start + end) / 2;
        JoinTask lt = new JoinTask(persons, start, middle);
        JoinTask gt = new JoinTask(persons, middle, end);
        lt.fork();
        gt.fork();
        return lt.join() + gt.join();
    }
}

線程池應用

package com.zhihuiku.threadpool.forkjoin;

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class ForkJoinMain {

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        ForkJoinMain fj = new ForkJoinMain();
        int c = 100000;
        Person[] persons = new Person[c];
        for (int i = 0; i < c; i++) {
            persons[i] = new Person("姓名" + i, new Random().nextInt(100));
        }
        fj.start(4, persons);
    }

    public void start(int threadNum, Person[] persons)
            throws InterruptedException, ExecutionException {
        long s = System.currentTimeMillis();
        ForkJoinPool executor = new ForkJoinPool(threadNum);
        JoinTask task = new JoinTask(persons, 0, persons.length - 1);
        Future result = executor.submit(task);
        System.out.println("統計結果:" + result.get());
        long e = System.currentTimeMillis();
        System.out.println("耗時:" + (e - s));
    }
}

普通線程池示例(模擬執行任務)

定義任務

package com.zhihuiku.threadpool;

import java.util.concurrent.RecursiveTask;

public class JoinTask extends RecursiveTask {

    private static final long serialVersionUID = 1L;

    private String taskName = null;

    public JoinTask(String modelName) {
        this.taskName = modelName;
    }

    @Override
    protected Boolean compute() {
        boolean isOk = true;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        if (Math.random() > 0.2) {
            isOk = true;
        } else {
            isOk = false;
        }
        System.out.println("模擬任務:" + this.taskName + ",測試結果:" + isOk);
        return isOk;
    }
}

線程池應用

package com.zhihuiku.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ForkJoinTest {

    public static void main(String[] args) throws InterruptedException {
        ForkJoinTest fj = new ForkJoinTest();
        fj.start(4, 100);
    }

    public void start(int threadNum, int taskNum) throws InterruptedException {
        long s = System.currentTimeMillis();
        ForkJoinPool executor = new ForkJoinPool(threadNum);
        List futures = new ArrayList(taskNum);
        for (int i = 0; i < taskNum; i++) {
            futures.add(executor.submit(new JoinTask("modelname_" + i)));
        }
        executor.shutdown();
        System.out.println("等待全部任務執行...");
        while (!executor.isTerminated()) {
            executor.awaitTermination(1, TimeUnit.SECONDS);
            int sc = executor.getQueuedSubmissionCount();
            int runningCount = executor.getRunningThreadCount();
            int okNum = (taskNum - sc - runningCount);
            int progress = Math.round((okNum * 100) / taskNum);
            System.out.println("已執行完成任務數:" + okNum + ",當前執行進度:" + progress);
        }
        long e = System.currentTimeMillis();
        System.out.println("fork線程調配耗時:" + (e - s));
    }
}

執行效果

等待全部任務執行...
模擬任務:modelname_0,測試結果:true
模擬任務:modelname_2,測試結果:true
模擬任務:modelname_3,測試結果:true
模擬任務:modelname_1,測試結果:false
模擬任務:modelname_6,測試結果:true
...
模擬任務:modelname_32,測試結果:true
模擬任務:modelname_35,測試結果:false
模擬任務:modelname_33,測試結果:true
已執行完成任務數:36,當前執行進度:36
模擬任務:modelname_37,測試結果:true
模擬任務:modelname_36,測試結果:true
...
模擬任務:modelname_67,測試結果:false
模擬任務:modelname_69,測試結果:true
模擬任務:modelname_71,測試結果:true
模擬任務:modelname_70,測試結果:true
模擬任務:modelname_68,測試結果:true
模擬任務:modelname_75,測試結果:true
模擬任務:modelname_72,測試結果:true
模擬任務:modelname_73,測試結果:true
模擬任務:modelname_74,測試結果:true
已執行完成任務數:76,當前執行進度:76
模擬任務:modelname_78,測試結果:true
模擬任務:modelname_79,測試結果:false
...
模擬任務:modelname_96,測試結果:true
模擬任務:modelname_99,測試結果:true
模擬任務:modelname_97,測試結果:true
已執行完成任務數:100,當前執行進度:100
fork線程調配耗時:2526
相關文章
相關標籤/搜索