Java開發筆記(一百零六)Fork+Join框架實現分而治之

前面依次介紹了普通線程池和定時器線程池的用法,這兩種線程池有個共同點,就是線程池的內部線程之間並沒有什麼關聯,然而某些狀況下的各線程間存在着來龍去脈關係。譬如人口普查工做,你們都知道我國總人口爲14億左右,但是14億的數目是怎麼數出來呢?假若只有一我的去統計,從小數到老都數不完。比如一個線程老牛破車幹不了多少事情,既然如此,不妨多起一些線程唄。因而人口普查工做就由中央分解到各個省份,各省又分派到下面的市縣,再由市縣分派到更下面的街道或鄉鎮,每一個街道和鄉鎮統計完本轄區內的人口數量後,分別上報給對應的市縣,市縣再上報給省裏,最後由各省上報中央,這才統計完成全國的人口總數。在人口普查的案例中,這些線程不但存在上下級關係,並且下級線程的任務由上級線程分派而來,同時下級線程的處理結果又要交給上級線程彙總。根據任務流的走向,可將整個處理過程劃分紅下列三個階段:
一、第一階段從主線程開始,從上往下逐級分解任務,此時線程總數逐漸變多,每一個分線程都前後收到上級線程分派的任務;
二、第二階段由最下面的基層線程進行具體的任務操做,此時線程總數是不變的;
三、第三階段從基層線程開始,從下往上逐級彙總任務結果,此時線程總數逐漸變少,最後主線程會收到彙總完成的最終結果;
以上的第一階段,歸納地說叫作「分而治之」;至於第三階段,可歸納稱之爲「匯聚歸一」。爲了實現這種分而治之的業務需求,Java7新增了Fork/Join框架用以對症下藥。該框架的Fork操做會按照樹狀結構不斷分出下級線程,其對應的是分而治之的過程;而Join操做則把葉子線程的運算結果逐級合併,其對應的是匯聚歸一的過程。在這分分合合的過程中,悄然浮現出Fork/Join框架專用的線程池工具ForkJoinPool,而它正是從ExecutorService派生出來的一個子類。鑑於分治策略的特殊性質,Fork/Join框架並不使用常見的Runnable任務,而改成使用專門的遞歸任務RecursiveTask,該任務的fork方法實現了分而治之的Fork操做,join方法實現了匯聚歸一的Join操做。
舉個簡單應用的例子,對於一段連續的數列求和,好比對0到99之間的全部整數求和,一般的作法是寫個循環語句依次累加。常規的寫法顯然只有一個主線程在執行加法運算,沒法體現多核CPU的性能優點,故而能夠嘗試將求和操做分而治之,先把整段數列劃分爲若干個子數列,再對各個子數列分別求和,最後彙總全部子數列的求和結果。採起RecursiveTask實現這種分派求和任務的話,可參見下面的代碼例子,注意遞歸任務的入口由run方法改爲了compute方法:html

//定義一個求和的遞歸任務
public class SumTask extends RecursiveTask<Integer> {
	private static final long serialVersionUID = 1L;
	private static final int THRESHOLD = 20; // 不可再切割的元素個數門檻
	private int src[]; // 待求和的整型數組
	private int start; // 待求和的下標起始值
	private int end; // 待求和的下標終止值

	public SumTask(int[] src, int start, int end) {
		this.src = src;
		this.start = start;
		this.end = end;
	}

	// 對指定區間的數組元素求和
	private Integer subTotal() {
		Integer sum = 0;
		for (int i = start; i < end; i++) { // 求數組在指定區間的元素之和
			sum += src[i];
		}
		// 打印求和日誌,包括當前線程的名稱、起始數值、終止數值、區間之和
		String desc = String.format("%s ∑(%d~%d)=%d", Thread.currentThread().getName(), start, end, sum);
		System.out.println(desc);
		return sum;
	}

	@Override
	protected Integer compute() {
		if ((end - start) <= THRESHOLD) { // 不可再切割了
			return subTotal(); // 對指定區間的數組元素求和
		} else { // 區間過大,還能繼續切割
			int middle = (start + end) / 2; // 計算區間中線的位置
			// 建立左邊分區的求和任務
			SumTask left = new SumTask(src, start, middle);
			left.fork(); // 把左邊求和任務添加處處理隊列中
			// 建立右邊分區的求和任務
			SumTask right = new SumTask(src, middle, end);
			right.fork(); // 把右邊求和任務添加處處理隊列中
			// 左邊子任務的求和結果加上右邊子任務的求和結果,等於當前任務的求和結果
			int sum = left.join() + right.join();
			// 打印求和日誌,包括當前線程的名稱、起始數值、終止數值、區間之和
			String desc = String.format("%s ∑(%d~%d)=%d", Thread.currentThread().getName(), start, end, sum);
			System.out.println(desc);
			return sum; // 返回本次任務的求和結果
		}
	}
}

而後外部往上面的求和任務輸入待求和的整型數組,並調用任務對象的invoke獲取執行結果,便可命令內置的線程池啓動求和任務。調用代碼示例以下:數組

	// 測試任務自帶的線程池框架
	private static void testInternalTask() {
		// 下面初始化從0到99的整型數組
		int[] arr = new int[100];
		for (int i = 0; i < 100; i++) {
			arr[i] = i + 1;
		}
		// 建立一個求和的遞歸任務
		SumTask task = new SumTask(arr, 0, arr.length);
		try {
			// 執行同步任務,並返回執行結果。任務的invoke方法使用了內部的ForkJoinPool
			Integer result = task.invoke();
			System.out.println("最終計算結果: " + result);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

 

運行以上的調用代碼,輸出下列的線程池日誌:併發

ForkJoinPool.commonPool-worker-3: ∑(0~12)=78
ForkJoinPool.commonPool-worker-0: ∑(75~87)=978
ForkJoinPool.commonPool-worker-2: ∑(50~62)=678
ForkJoinPool.commonPool-worker-0: ∑(87~100)=1222
ForkJoinPool.commonPool-worker-3: ∑(12~25)=247
ForkJoinPool.commonPool-worker-3: ∑(0~25)=325
ForkJoinPool.commonPool-worker-0: ∑(75~100)=2200
ForkJoinPool.commonPool-worker-2: ∑(62~75)=897
ForkJoinPool.commonPool-worker-2: ∑(50~75)=1575
ForkJoinPool.commonPool-worker-1: ∑(37~50)=572
ForkJoinPool.commonPool-worker-3: ∑(25~37)=378
ForkJoinPool.commonPool-worker-3: ∑(25~50)=950
ForkJoinPool.commonPool-worker-1: ∑(0~50)=1275
ForkJoinPool.commonPool-worker-2: ∑(50~100)=3775
main: ∑(0~100)=5050
最終計算結果: 5050

 

從日誌可見,Fork/Join框架的默認線程池一共啓動了四個線程(正好是設備的CPU個數),同時最後一步的統計工做由主線程來完成。框架

注意到前述的調用代碼並未寫明Fork/Join框架的線程池工具ForkJoinPool,這是由於遞歸任務擁有默認的內置線程池,即便外部不指定線程池對象,遞歸任務也會使用內置線程池進行線程調度。不過默認的線程池沒法設置個性化的參數,因此仍是建議在代碼中顯式指定ForkJoinPool線程池,並調用線程池對象的execute/invoke/submit三個方法之一啓動遞歸任務。有關這三個方法的具體用途說明以下:
execute:異步執行指定任務,且無返回值。
invoke:同步執行指定任務,並等待返回值,返回值就是最終的運算結果。
submit:異步執行指定任務,且返回結果任務對象。以後可擇機調用結果任務的get方法獲取最終的運算結果。
下面是在外部調用時顯式指定線程池的求和代碼例子:異步

	// 測試任務之外的線程池框架
	private static void testPoolTask() {
		// 下面初始化從0到99的整型數組
		int[] arr = new int[100];
		for (int i = 0; i < 100; i++) {
			arr[i] = i + 1;
		}
		// 建立一個求和的遞歸任務
		SumTask task = new SumTask(arr, 0, arr.length);
		// 建立一個用於分而治之的線程池,併發數量爲6
		ForkJoinPool pool = new ForkJoinPool(6);
		// 命令線程池執行求和任務,並返回存放執行結果的任務對象
		ForkJoinTask<Integer> taskResult = pool.submit(task);
		try {
			Integer result = taskResult.get(); // 等待執行完成,並獲取求和的結果數值
			System.out.println("最終計算結果: " + result);
		} catch (Exception e) {
			e.printStackTrace();
		}
		pool.shutdown(); // 關閉線程池
	}

 

運行修改後的調用代碼,輸出下列的線程池日誌:ide

ForkJoinPool-1-worker-1: ∑(0~12)=78
ForkJoinPool-1-worker-3: ∑(62~75)=897
ForkJoinPool-1-worker-5: ∑(12~25)=247
ForkJoinPool-1-worker-5: ∑(87~100)=1222
ForkJoinPool-1-worker-5: ∑(25~37)=378
ForkJoinPool-1-worker-5: ∑(37~50)=572
ForkJoinPool-1-worker-5: ∑(25~50)=950
ForkJoinPool-1-worker-1: ∑(0~25)=325
ForkJoinPool-1-worker-4: ∑(50~62)=678
ForkJoinPool-1-worker-4: ∑(50~75)=1575
ForkJoinPool-1-worker-6: ∑(75~87)=978
ForkJoinPool-1-worker-6: ∑(75~100)=2200
ForkJoinPool-1-worker-2: ∑(0~50)=1275
ForkJoinPool-1-worker-3: ∑(50~100)=3775
ForkJoinPool-1-worker-1: ∑(0~100)=5050
最終計算結果: 5050

 

由日誌可見,此時的線程池運行狀況與剛纔相比有兩點不一樣:其一開啓的線程數量變多了,這緣於新的線程池對象設置了併發數量爲6;其二最後一步的統計工做仍在線程池內部執行,於是減輕了主線程的負擔。結論固然是外部顯式指定ForkJoinPool的方式更優。工具



更多Java技術文章參見《Java開發筆記(序)章節目錄性能

相關文章
相關標籤/搜索