Fork Join 體現了分而治之java
什麼是分而治之?jquery
規模爲N的問題,若是N<閾值,直接解決,N>閾值,將N分解爲K個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合併獲得原問題的解 ajax
Fork Join 框架:編程
就是在必要的狀況下,將一個大任務,進行拆分(fork)成若干了小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行join彙總數組
Fork Join的另外一大特色:工做密取網絡
什麼是工做密取?多線程
就是在按指定閾值拆分後,的多個線程,若是線程A的任務執行的比較快,得到到的CPU時間片比較多,那麼在他執行完畢後,就會從未執行完畢的線程的任務中的尾部,進行任務竊取,任務完成後再把結果放回去,不會形成任務競爭,由於自身執行線程的任務是從頭部開始獲取的,而空閒的線程是從尾部竊取的.併發
Fork Join使用的標準範式框架
在使用的過程當中咱們是沒法直接new 一個ForkJoinTask類的,他是一個抽象類,可是他提供了兩個子類,RecursiveTask和ResursiveAction兩個子抽象類.咱們使用的時候,若是須要有返回值,咱們就繼承RecursiveTask,若是不須要返回值咱們就繼承RecursiveActiondom
Fork Join實戰
Fork Join的同步用法同時演示返回結果值:統計整數數組中全部元素的和
先建立一個工具類用於製做整數數組
package org.dance.day2.forkjoin.sum; import java.util.Random; /** * 數組製做類 * @author ZYGisComputer */ public class MarkArray { public static final int ARRAY_LENGTH = 4000; /** * int數組生成器 * @return int數組 */ public static int[] markArray(){ Random random = new Random(); int[] array = new int[ARRAY_LENGTH]; for (int i = 0; i < ARRAY_LENGTH; i++) { array[i] = random.nextInt(ARRAY_LENGTH*3); } return array; } }
而後建立一個單線程的求和類,用於和多線程的對比
package org.dance.day2.forkjoin.sum; import org.dance.tools.SleepTools; /** * 單線程實現求和 * @author ZYGisComputer */ public class SumNormal { public static void main(String[] args) { int count = 0; // 獲取數組 int[] src = MarkArray.markArray(); long l = System.currentTimeMillis(); for (int i = 0; i < src.length; i++) { // 執行一毫秒的休眠 SleepTools.ms(1); count += src[i]; } System.out.println("The count is "+count+" spend time "+(System.currentTimeMillis() - l)); } }
使用繼承RecursiveTask的ForkJoin框架類,完成多線程的求和計算
package org.dance.day2.forkjoin.sum; import org.dance.tools.SleepTools; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * 使用ForkJoin框架實現求和 * @author ZYGisComputer */ public class SumArray { /** * 由於須要返回值因此繼承RecursiveTask類 * 由於計算的是整型,因此泛型是Integer */ private static class SumTask extends RecursiveTask<Integer> { // 計算閾值 private final static int THRESHOLD = MarkArray.ARRAY_LENGTH/10; // 源數組 private int[] src; // 開始座標 private int fromIndex; // 結束座標 private int toIndex; /** * 經過建立時傳入 * @param src 元素組 * @param fromIndex 開始座標 * @param toIndex 結束座標 */ public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } /** * 覆蓋執行方法 * @return 整型 */ @Override protected Integer compute() { // 若是 結束下標減去開始下標小於閾值的時候,那麼任務就能夠開始執行了 if( toIndex - fromIndex < THRESHOLD ){ int count = 0; // 從開始下標開始循環,循環到結束下標 for (int i = fromIndex; i < toIndex; i++) { // 休眠1毫秒 SleepTools.ms(1); count += src[i]; } return count; }else{ // 大於閾值 繼續拆分任務 // 從formIndex---------------------->到toIndex // 計算中間值,從formIndex----------計算mid------------>到toIndex int mid = (fromIndex + toIndex) / 2; // 左側任務 從formIndex------------>到mid結束 SumTask left = new SumTask(src, fromIndex, mid); // 右側任務 從mid+1開始------------->到toIndex結束 SumTask right = new SumTask(src, mid+1,toIndex); // 調用任務 invokeAll(left,right); // 獲取結果 return left.join() + right.join(); } } } public static void main(String[] args) { // 建立ForkJoin任務池 ForkJoinPool forkJoinPool = new ForkJoinPool(); // 製做源數組 int[] src = MarkArray.markArray(); long l = System.currentTimeMillis(); // 建立一個任務 下標由於從0開始因此結束下標須要-1 SumTask sumTask = new SumTask(src, 0, src.length - 1); // 提交同步任務 Integer invoke = forkJoinPool.invoke(sumTask); // 不管是接收invoke方法的返回值仍是調用任務的Join方法均可以獲取到結果值 System.out.println("The count is "+invoke+" spend time "+(System.currentTimeMillis() - l)); System.out.println("The count is "+sumTask.join()+" spend time "+(System.currentTimeMillis() - l)); } }
運行結果對比:
如今是4000大小的數組,每次循環休眠1毫秒
單線程執行的結果:
The count is 23751855 spend time 5395
多線程執行的結果:
The count is 23387745 spend time 1487
The count is 23387745 spend time 1487
結果對比多線程比單線程快大概3倍的時間
接下來咱們去掉休眠時間,再次進行結果對比:
單線程執行結果:
The count is 23460518 spend time 0
多線程執行結果:
The count is 24078313 spend time 3
The count is 24078313 spend time 3
而後咱們驚奇的發現,多線程比單線程還要慢,爲何呢,是由於在小數據量的狀況下,單線程,執行期間沒有花費上下文切換時間,多線程執行期間是須要花費線程之間上下文切換的時間的,每次上下文切換時間以前說過,大概花費5000-20000個時鐘週期的,因此多線程執行會比單線程慢一些,因此說咱們在用多線程的時候,就須要考慮線程之間的上下文切換問題,並不必定多線程就必定是好,咱們只是看需求,而選擇,就像Redis同樣設計的時候就是單線程的,可是他的強大,倒是比多線程的memcached更增強大,因此說沒有確定的結論,只有適合和不適合.
接下來咱們往大調整整型數組的大小
4000調整爲1億,而後對比結果
單線程執行結果:
The count is -331253431 spend time 51
多線程執行結果:
The count is 75277814 spend time 49
The count is 75277814 spend time 50
咱們能夠發現,所用的執行時間,已經大概一致了
繼續調大1億調整爲3億,繼續對比結果
單線程執行結果:
The count is 57724808 spend time 205
多線程執行結果:
The count is 1028352167 spend time 106
The count is 1028352167 spend time 106
如今單線程已是多線程的執行時間的兩倍了,因而可知,當數據量愈來愈大的時候,單線程的性能每每就會逐漸下降,而多線程的優點就漸漸體現出來了
所謂的同步用法就是在調用
forkJoinPool.invoke(sumTask);
以後主線程就在這裏阻塞了,須要等待,執行完成後,主線程才能繼續往下執行,接下里咱們看異步用法
Fork Join的異步用法同時演示不要求返回值:遍歷指定目錄(含子目錄)尋找指定類型文件
package org.dance.day2.forkjoin; import org.dance.tools.SleepTools; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * 使用ForkJoin框架實現不定個數的任務執行 * @author ZYGisComputer */ public class FindDirsFiles { /** * 由於搜索文件不須要返回值,因此咱們繼承RecursiveAction */ private static class FindFilesByDirs extends RecursiveAction{ private File path; public FindFilesByDirs(File path) { this.path = path; } @Override protected void compute() { // 建立任務容器 List<FindFilesByDirs> findFilesByDirs = new ArrayList<>(); // 獲取文件夾下全部的對象 File[] files = path.listFiles(); if(null!=files){ for (File file : files) { // 判斷是不是文件夾 if (file.isDirectory()){ // 添加到任務容器中 findFilesByDirs.add(new FindFilesByDirs(file)); }else{ // 若是是一個文件,那麼檢查這個文件是否符合需求 if(file.getAbsolutePath().endsWith(".txt")){ // 若是符合 打印 System.out.println("文件:"+file.getAbsolutePath()); } } } // 判斷任務容器是否爲空 if(!findFilesByDirs.isEmpty()){ // 遞交任務組 for (FindFilesByDirs filesByDirs : invokeAll(findFilesByDirs)) { // 等待子任務執行完成 filesByDirs.join(); } } } } } public static void main(String[] args) { // 建立ForkJoin池 ForkJoinPool forkJoinPool = new ForkJoinPool(); File path = new File("E:/"); // 建立任務 FindFilesByDirs findFilesByDirs = new FindFilesByDirs(path); // 異步調用 這個方法是沒有返回值的 forkJoinPool.execute(findFilesByDirs); System.out.println("Task is Running................"); SleepTools.ms(1); // 在這裏作這個只是測試ForkJoin是否爲異步,當執行ForkJoin的時候主線程是否繼續執行 int otherWork = 0; for (int i = 0; i < 100; i++) { otherWork += i; } System.out.println("Main thread done sth.......,otherWork:"+otherWork); // 若是是有返回值的話,能夠獲取,固然這個join方法是一個阻塞式的,由於主線程執行的太快了,ForkJoin還沒執行完成主線程就死亡了,因此在這裏調用一下阻塞,等待ForkJoin執行完成 findFilesByDirs.join(); System.out.println("Thread end!"); } }
執行結果:
Task is Running................ Main thread done sth.......,otherWork:4950 文件:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\src\main\resources\static\file\rml.txt 文件:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\banner.txt 文件:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\static\ajax\libs\jquery-ztree\3.5\log v3.x.txt 文件:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\static\file\rml.txt ........................ Thread end!
從執行結果中能夠看到,主線程的執行時在ForkJoin執行以前就執行了,可是代碼中倒是在ForkJoin執行以後執行的,因此說這是異步的,線程是並行執行的,異步執行只能經過調用任務線程的Join方法獲取返回值,execute方法是沒有返回值的
做者:彼岸舞
時間:2020\09\18
內容關於:併發編程
本文來源於網絡,只作技術分享,一律不負任何責任