Java併發編程(07):Fork/Join框架機制詳解

本文源碼:GitHub·點這裏 || GitEE·點這裏java

1、Fork/Join框架

Java提供Fork/Join框架用於並行執行任務,核心的思想就是將一個大任務切分紅多個小任務,而後彙總每一個小任務的執行結果獲得這個大任務的最終結果。git

這種機制策略在分佈式數據庫中很是常見,數據分佈在不一樣的數據庫的副本中,在執行查詢時,每一個服務都要跑查詢任務,最後在一個服務上作數據合併,或者提供一箇中間引擎層,用來彙總數據:github

核心流程:切分任務,模塊任務異步執行,單任務結果合併;在編程裏面,通用的代碼很少,可是通用的思想卻隨處可見。算法

2、核心API和方法

一、編碼案例

基於1+2..+100的計算案例演示Fork/Join框架基礎用法。數據庫

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoin01 {

    public static void main (String[] args) {
        int[] numArr = new int[100];
        for (int i = 0; i < 100; i++) {
            numArr[i] = i + 1;
        }
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask =
                pool.submit(new SumTask(numArr, 0, numArr.length));
        System.out.println("合併計算結果: " + forkJoinTask.invoke());
        pool.shutdown();
    }
}
/**
 * 線程任務
 */
class SumTask extends RecursiveTask<Integer> {
    /*
     * 切分任務塊的閾值
     * 若是THRESHOLD=100
     * 輸出:main【求和:(0...100)=5050】 合併計算結果: 5050
     */
    private static final int THRESHOLD = 100;
    private int arr[];
    private int start;
    private int over;

    public SumTask(int[] arr, int start, int over) {
        this.arr = arr;
        this.start = start;
        this.over = over;
    }

    // 求和計算
    private Integer sumCalculate () {
        Integer sum = 0;
        for (int i = start; i < over; i++) {
            sum += arr[i];
        }
        String task = "【求和:(" + start + "..." + over + ")=" + sum +"】";
        System.out.println(Thread.currentThread().getName() + task);
        return sum ;
    }

    @Override
    protected Integer compute() {
        if ((over - start) <= THRESHOLD) {
            return sumCalculate();
        }else {
            int middle = (start + over) / 2;
            SumTask left = new SumTask(arr, start, middle);
            SumTask right = new SumTask(arr, middle, over);
            left.fork();
            right.fork();
            return left.join() + right.join();
        }
    }
}

二、核心API說明

ForkJoinPool:線程池最大的特色就是分叉(fork)合併(join)模式,將一個大任務拆分紅多個小任務,並行執行,再結合工做竊取算法提升總體的執行效率,充分利用CPU資源。編程

ForkJoinTask:運行在ForkJoinPool的一個任務抽象,能夠理解爲類線程可是比線程輕量的實體,在ForkJoinPool中運行的少許ForkJoinWorkerThread能夠持有大量的ForkJoinTask和它的子任務,同時也是一個輕量的Future,使用時應避免較長阻塞或IO。後端

繼承子類:api

  • RecursiveAction:遞歸無返回值的ForkJoinTask子類;
  • RecursiveTask:遞歸有返回值的ForkJoinTask子類;

核心方法:多線程

  • fork():在當前線程運行的線程池中建立一個子任務;
  • join():模塊子任務完成的時候返回任務結果;
  • invoke():執行任務,也能夠實時等待最終執行結果;

三、核心策略說明

任務拆分併發

ForkJoinPool基於分治算法,將大任務不斷拆分下去,每一個子任務再拆分一半,直到達到最閾值設定的任務粒度爲止,而且把任務放到不一樣的隊列裏面,而後從最底層的任務開始執行計算,而且往上一層合併結果,這樣用相對少的線程處理大量的任務。

工做竊取算法

大任務被分割爲獨立的子任務,而且子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個線程來執行隊列裏的任務,假設線程A優先把分配到本身隊列裏的任務執行完畢,此時若是線程E對應的隊列裏還有任務等待執行,空閒的線程A會竊取線程E隊列裏任務執行,而且爲了減小竊取任務時線程A和被竊取任務線程E之間的發生競爭,竊取任務的線程A會從隊列的尾部獲取任務執行,被竊取任務線程E會從隊列的頭部獲取任務執行。

工做竊取算法的優勢:線程間的競爭不多,充分利用線程進行並行計算,可是在任務隊列裏只有一個任務時,也可能會存在競爭狀況。

3、應用案例分析

在後端系統的業務開發中,可用作權限校驗,批量定時任務狀態刷新等各類功能場景:

如上圖,假設數據的主鍵id分段以下,數據場景多是數據源的鏈接信息,或者產品有效期相似業務,均可以基於線程池任務處理:

權限校驗

基於數據源的鏈接信息,判斷數據源是否可用,例如:判斷鏈接是否可用,用戶是否有庫表的讀寫權限,在數據源多的狀況下,基於線程池快速校驗。

狀態刷新

在定時任務中,常常見到狀態類的刷新操做,例如判斷產品是否在有效期範圍內,在有效期範圍以外,把數據置爲失效狀態,均可以利用線程池快速處理。

4、源代碼地址

GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent

推薦閱讀:Java併發系列

序號 文章標題
01 Java併發:線程的建立方式,狀態週期管理
02 Java併發:線程核心機制,基礎概念擴展
03 Java併發:多線程併發訪問,同步控制
04 Java併發:線程間通訊,等待/通知機制
05 Java併發:悲觀鎖和樂觀鎖機制
06 Java併發:Lock機制下API用法詳解
相關文章
相關標籤/搜索