Java線程之fork/join框架

fork/join框架是用多線程的方式實現分治法來解決問題。fork指的是將問題不斷地縮小規模,join是指根據子問題的計算結果,得出更高層次的結果。java

fork/join框架的使用有必定的約束條件:
算法

1. 除了fork()  和  join()方法外,線程不得使用其餘的同步工具。線程最好也不要sleep()數組

2. 線程不得進行I/O操做多線程

3. 線程不得拋出checked exception框架

此框架有幾個核心類:ForkJoinPool是實現了工做竊取算法的線程池。ForkJoinTask是任務類,他有2個子類:RecursiveAction無返回值,RecursiveTask有返回值,在定義本身的任務時,通常都是從這2類中挑一個,經過繼承的方式定義本身的新類。因爲ForkJoinTask類實現了Serializable接口,所以,定義本身的任務類時,應該定義serialVersionUID屬性。dom

在編寫任務時,推薦的寫法是這樣的:異步

[java] view plaincopyide

  1. If (problem size > default size){  工具

  2. task s = divide(task);  this

  3. execute(tasks);  

  4. else {  

  5. resolve problem using another algorithm;  

  6. }  

ForkJoinPool實現了工做竊取算法(work-stealing),線程會主動尋找新建立的任務去執行,從而保證較高的線程利用率。它使用守護線程(deamon)來執行任務,所以無需對他顯示的調用shutdown()來關閉。通常狀況下,一個程序只須要惟一的一個ForkJoinPool,所以應該按以下方式建立它:

static final ForkJoinPool mainPool = new ForkJoinPool(); //線程的數目等於CPU的核心數

下面給出一個很是簡單的例子,功能是將一個數組中每個元素的值加1。具體實現爲:將大數組不斷分解爲更短小的子數組,當子數組長度不超過10的時候,對其中全部元素進行加1操做。

[java] view plaincopy

  1. public class Test {  

  2.       

  3.     public final static ForkJoinPool mainPool = new ForkJoinPool();  

  4.       

  5.     public static void main(String[] args){  

  6.         int n = 26;  

  7.         int[] a = new int[n];  

  8.         for(int i=0; i<n; i++) {  

  9.             a[i] = i;  

  10.         }  

  11.         SubTask task = new SubTask(a, 0, n);  

  12.         mainPool.invoke(task);  

  13.         for(int i=0; i<n; i++) {  

  14.             System.out.print(a[i]+" ");  

  15.         }  

  16.     }  

  17. }  

  18.   

  19. class SubTask extends RecursiveAction {  

  20.   

  21.     private static final long serialVersionUID = 1L;  

  22.       

  23.     private int[] a;  

  24.     private int beg;  

  25.     private int end;  

  26.       

  27.     public SubTask(int[] a, int beg, int end) {  

  28.         super();  

  29.         this.a = a;  

  30.         this.beg = beg;  

  31.         this.end = end;  

  32.     }  

  33.   

  34.     @Override  

  35.     protected void compute() {  

  36.         if(end-beg>10) {  

  37.             int mid = (beg+end) / 2;  

  38.             SubTask t1 = new SubTask(a, beg, mid);  

  39.             SubTask t2 = new SubTask(a, mid, end);  

  40.             invokeAll(t1, t2);  

  41.         }else {  

  42.             for(int i=beg; i<end; i++) {  

  43.                 a[i] = a[i] + 1;  

  44.             }  

  45.         }  

  46.     }  

  47. }  

例子2,任務擁有返回值。隨機生成一個數組,每一個元素均是0-999之間的整數,統計該數組中每一個數字出現1的次數的和。

實現方法,將該數組不斷的分紅更小的數組,直到每一個子數組的長度爲1,即只包含一個元素。此時,統計該元素中包含1的個數。最後彙總,獲得數組中每一個數字共包含了多少個1。

[java] view plaincopy

  1. public class Test {  

  2.       

  3.     public final static ForkJoinPool mainPool = new ForkJoinPool();  

  4.       

  5.     public static void main(String[] args){  

  6.         int n = 26;  

  7.         int[] a = new int[n];  

  8.         Random rand = new Random();  

  9.         for(int i=0; i<n; i++) {  

  10.             a[i] = rand.nextInt(1000);  

  11.         }  

  12.         SubTask task = new SubTask(a, 0, n);  

  13.         int count = mainPool.invoke(task);  

  14.         for(int i=0; i<n; i++) {  

  15.             System.out.print(a[i]+" ");  

  16.         }  

  17.         System.out.println("\n數組中共出現了" + count + "個1");  

  18.     }  

  19. }  

  20.   

  21. class SubTask extends RecursiveTask<Integer> {  

  22.   

  23.     private static final long serialVersionUID = 1L;  

  24.       

  25.     private int[] a;  

  26.     private int beg;  

  27.     private int end;  

  28.       

  29.     public SubTask(int[] a, int beg, int end) {  

  30.         super();  

  31.         this.a = a;  

  32.         this.beg = beg;  

  33.         this.end = end;  

  34.     }  

  35.   

  36.     @Override  

  37.     protected Integer compute() {  

  38.         int result = 0;  

  39.         if(end-beg>1) {  

  40.             int mid = (beg+end)/2;  

  41.             SubTask t1 = new SubTask(a, beg, mid);  

  42.             SubTask t2 = new SubTask(a, mid, end);  

  43.             invokeAll(t1, t2);  

  44.             try {  

  45.                 result = t1.get()+t2.get();  

  46.             } catch (InterruptedException | ExecutionException e) {  

  47.                 e.printStackTrace();  

  48.             }  

  49.         } else {  

  50.             result = count(a[beg]);  

  51.         }  

  52.         return result;  

  53.     }  

  54.       

  55.     //統計一個整數中出現了幾個1  

  56.     private int count(int n) {  

  57.         int result = 0;  

  58.         while(n>0) {  

  59.             if(n % 10==1) {  

  60.                 result++;  

  61.             }  

  62.             n = n / 10;  

  63.         }  

  64.         return result;  

  65.     }  

  66. }  

例子3,異步執行任務。前面兩個例子都是同步執行任務,當啓動任務後,主線程陷入了阻塞狀態,直到任務執行完畢。若建立新任務後,但願當前線程能繼續執行而非陷入阻塞,則須要異步執行。ForkJoinPool線程池提供了execute()方法來異步啓動任務,而做爲任務自己,能夠調用fork()方法異步啓動新的子任務,並調用子任務的join()方法來取得計算結果。須要注意的是,異步使用ForkJoin框架,沒法使用「工做竊取」算法來提升線程的利用率,針對每一個子任務,系統都會啓動一個新的線程。

本例的功能是查找硬盤上某一類型的文件。給定文件擴展名後,將硬盤上全部該類型的文件名打印顯示出來。做爲主程序,啓動任務後,繼續顯示任務的執行進度,每3秒鐘打印顯示一個黑點,表示任務在繼續。最後,當全部線程都結束了,打印顯示結果。

[java] view plaincopy

  1. public class ThreadLocalTest {  

  2.   

  3.     public static void main(String[] args) throws Exception {  

  4.         Path p = Paths.get("D:/");  

  5.         List<Path> roots = (List<Path>) FileSystems.getDefault().getRootDirectories();  

  6.         List<Path> result = new ArrayList<>();  

  7.         List<MyTask> tasks = new ArrayList<>();  

  8.         ForkJoinPool pool = new ForkJoinPool();  

  9.         for(Path root:roots) {  

  10.             MyTask t = new MyTask(root, "pdf");  

  11.             pool.execute(t);  

  12.             tasks.add(t);  

  13.         }  

  14.           

  15.         System.out.print("正在處理中");  

  16.         while(isAllDone(tasks) == false) {  

  17.             System.out.print(". ");  

  18.             TimeUnit.SECONDS.sleep(3);  

  19.         }  

  20.           

  21.         for(MyTask t:tasks) {  

  22.             result.addAll(t.get());  

  23.         }  

  24.           

  25.         for(Path pp:result) {  

  26.             System.out.println(pp);  

  27.         }  

  28.     }  

  29.       

  30.     private static boolean isAllDone(List<MyTask> tasks) {  

  31.         boolean result = true;  

  32.         for(MyTask t:tasks) {  

  33.             if(t.isDone() == false) {  

  34.                 result = false;  

  35.                 break;  

  36.             }  

  37.         }  

  38.         return result;  

  39.     }  

  40. }  

  41.   

  42. class MyTask extends RecursiveTask<List<Path>> {  

  43.   

  44.     private static final long serialVersionUID = 1L;  

  45.       

  46.     private Path path;  

  47.     private String fileExtention;  

  48.   

  49.     public MyTask(Path path, String fileExtention) {  

  50.         super();  

  51.         this.path = path;  

  52.         this.fileExtention = fileExtention;  

  53.     }  

  54.   

  55.     @Override  

  56.     protected List<Path> compute() {  

  57.         List<Path> result = new ArrayList<>();  

  58.         try {  

  59.             DirectoryStream<Path> paths = Files.newDirectoryStream(path);  

  60.             List<MyTask> subTasks = new ArrayList<>();  

  61.             for(Path p:paths) {  

  62.                 if(Files.isDirectory(p)) {  

  63.                     MyTask t = new MyTask(p, fileExtention);  

  64.                     t.fork();  

  65.                     subTasks.add(t);  

  66.                 }else if(Files.isRegularFile(p)) {  

  67.                     if(p.toString().toLowerCase().endsWith("."+fileExtention)) {  

  68.                         result.add(p);  

  69.                     }  

  70.                 }  

  71.             }  

  72.               

  73.             for(MyTask t:subTasks) {  

  74.                 result.addAll(t.join());  

  75.             }  

  76.         } catch (IOException e) {  

  77.         }  

  78.         return result;  

  79.     }  

相關文章
相關標籤/搜索