Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架,這種開發方法也叫分治編程。分治編程能夠極大地利用CPU資源,提升任務執行的效率,也是目前與多線程有關的前沿技術。java
分治的原理上面已經介紹了,就是切割大任務成小任務來完成。看起來好像也不難實現啊!爲何專門弄一個新的框架呢?算法
咱們先看一下,在不使用 Fork-Join 框架時,使用普通的線程池是怎麼實現的。編程
咦,好像一切都很美好。真的嗎?別忘了, 每個切割任務的線程(如線程A)都被阻塞了,直到其子任務完成,才能繼續往下運行 。若是任務太大了,須要切割屢次,那麼就會有多個線程被阻塞,性能將會急速降低。更糟糕的是,若是你的線程池的線程數量是有上限的,很可能會形成池中全部線程被阻塞,線程池沒法執行任務。多線程
public class CountTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//建立一個計算任務,計算 由1加到12
CountTask countTask = new CountTask(1, 12);
Future<Integer> future = forkJoinPool.submit(countTask);
System.out.println("最終的計算結果:" + future.get());
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
//任務已經足夠小,能夠直接計算,並返回結果
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());
return sum;
} else { //任務過大,須要切割
System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());
int middle = (start + end) / 2;
//切割成兩個子任務
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執行子任務
leftTask.fork();
rightTask.fork();
//等待子任務的完成,並獲取執行結果
invokeAll(leftTask,rightTask);
// int leftResult = leftTask.join();
// int rightResult = rightTask.join();
//合併子任務
// sum = leftResult + rightResult;
// return sum;
return leftTask.join()+rightTask.join();
}
}
}
複製代碼
運行結果:併發
切割的任務:1加到10 執行此任務的線程是 pool-1-thread-1
切割的任務:1加到5 執行此任務的線程是 pool-1-thread-2
切割的任務:6加到10 執行此任務的線程是 pool-1-thread-3
複製代碼
池的線程只有三個,當任務分割了三次後,池中的線程也就都被阻塞了,沒法再執行任何任務,一直卡着動不了。框架
爲了解決這個問題,工做竊取算法呼之欲出異步
針對上面的問題,Fork-Join 框架使用了「工做竊取(work-stealing)」算法。工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。在《Java 併發編程的藝術》對工做竊取算法的解釋:ide
使用工做竊取算法有什麼優點呢?假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。性能
Fork-Join 框架中的工做竊取算法的優勢能夠總結爲如下兩點:this
Fork/Join有三個核心類:
由於ForkJoinTask比較複雜,抽象方法比較多,平常使用時通常不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類,實現 compute() 方法:
RecursiveTask: 子任務帶返回結果時使用
RecursiveAction: 子任務不帶返回結果時使用
複製代碼
compute 方法的實現模式通常是:
if 任務足夠小
直接返回結果
else
分割成N個子任務
依次調用每一個子任務的fork方法執行子任務
依次調用每一個子任務的join方法合併執行結果
複製代碼
計算 1+2+....+12 的結果。
使用Fork/Join框架首先要考慮到的是如何分割任務,若是咱們但願每一個子任務最多執行兩個數的相加,那麼咱們設置分割的閾值是2,因爲是12個數字相加。同時,觀察執行任務的線程名稱,理解工做竊取算法的實現。
public class CountTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//建立一個計算任務,計算 由1加到12
CountTask countTask = new CountTask(1, 12);
Future<Integer> future = forkJoinPool.submit(countTask);
System.out.println("最終的計算結果:" + future.get());
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
//任務已經足夠小,能夠直接計算,並返回結果
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());
} else { //任務過大,須要切割
System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());
int middle = (start + end) / 2;
//切割成兩個子任務
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執行子任務
leftTask.fork();
rightTask.fork();
//等待子任務的完成,並獲取執行結果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合併子任務
sum = leftResult + rightResult;
}
return sum;
}
}
複製代碼
運行結果:
任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-1
任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-3
任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool-1-worker-2
執行計算任務,計算 7到 9的和 ,結果是:24 執行此任務的線程:ForkJoinPool-1-worker-3
執行計算任務,計算 1到 3的和 ,結果是:6 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 4到 6的和 ,結果是:15 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 10到 12的和 ,結果是:33 執行此任務的線程:ForkJoinPool-1-worker-3
最終的計算結果:78
複製代碼
從結果能夠看出,提交的計算任務是由線程1執行,線程1進行了第一次切割,切割成兩個子任務 「7加到12「 和 」1加到6「,並提交這兩個子任務。而後這兩個任務便被 線程二、線程3 給竊取了。線程1 的內部隊列中已經沒有任務了,這時候,線程二、線程3 也分別進行了一次任務切割並各自提交了兩個子任務,因而線程1也去竊取任務(這裏竊取的都是線程2的子任務)。
遍歷指定目錄(含子目錄)找尋指定類型文件
public class FindDirsFiles extends RecursiveAction{
/** * 當前任務須要搜尋的目錄 */
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
public static void main(String [] args){
try {
// 用一個 ForkJoinPool 實例調度總任務
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("D:/"));
//異步調用
pool.execute(task);
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<1000000;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
//阻塞的方法
task.join();
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if(files!=null) {
for(File file:files) {
if(file.isDirectory()) {
subTasks.add(new FindDirsFiles(file));
}else {
//遇到文件,檢查
if(file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:"+file.getAbsolutePath());
}
}
}
if(!subTasks.isEmpty()) {
for (FindDirsFiles subTask : invokeAll(subTasks)) {
//等待子任務執行完成
subTask.join();
}
}
}
}
}
複製代碼