說到Fork/Join 框架 ,得不提起執行器框架(Executor Framework),它將任務的建立和執行進行了分離,經過Executor Framework只須要實現Runnable接口的對象和使用Executor對象,而後將Runnable 對象發送個執行器。執行器在負責運行這些任務所須要的線程,包括線程的建立,線程的管理以及線程的結束。html
java 7 又更近了一步,它包括了ExecutorService 接口的另外一種實現,用來解決特殊類型的問題。就是Fork/Join 框架,有時也稱爲」分解/合併框架「。java
Fork/Join 框架,是用來解決可以經過分治技術,將問題拆分爲小任務的問題。它和執行器框架的只要區別在於工做竊取算法。算法
下面實現簡單的例子,咱們實現一項更新產品價格的任務。最初的任務將負責更新列表中的全部的元素的價格。若是一個任務須要更新大於10個元素,將分爲兩個部分去執行。而後再去更新各自部分的產品價格。框架
一、建立一個產品類,用來存儲產品的名稱和價格 package five2; /** * * @author qpx * */ public class Product { private String name; private double price; public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
二、建立生成一個隨機產品列表的類異步
package five2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * 隨機產品列表 * @author qpx * */ public class productListGenetator { public List<Product> genetate(int size){ List<Product> products = new ArrayList<>(); for(int i = 0 ;i<size;i++){ Product product = new Product(); product.setName("Product"+i); product.setPrice(10); products.add(product); } return products; } public static void main(String[] args) { List<Integer> aaa = new ArrayList<Integer>(); aaa.add(1); aaa.add(0, 2); System.out.println(Arrays.toString(aaa.toArray())); } }
三、建立一個Task 的類,繼承RecursiveAction 類,這個是主任務類ide
package five2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class Task extends RecursiveAction { /** * */ private static final long serialVersionUID = 1L; private List<Product> products; private int first; private int last; private double increment; public Task(List<Product> products, int first, int last, double increment) { super(); this.products = products; this.first = first; this.last = last; this.increment = increment; } @Override protected void compute() { // TODO Auto-generated method stub‘ if (this.last - this.first < 10) { updateprices(); } else { int middle = (last + first) / 2+1; //System.out.println("middle:"+middle); System.out.printf(Thread.currentThread().getName()+" Task:Pending tasks:%s\n", getQueuedTaskCount()); Task t1 = new Task(products, first, (middle), increment); //System.out.println("t1:first:"+first+",last:"+middle); Task t2 = new Task(products, middle, last, increment); //System.out.println("t2:first:"+middle+",last:"+last); invokeAll(t1, t2); } } private void updateprices() { // TODO Auto-generated method stub for (int i = first; i < last; i++) { Product product = products.get(i); product.setPrice(product.getPrice() * (1 + increment)); System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } public static void main(String[] args) throws InterruptedException { productListGenetator a = new productListGenetator(); List<Product> products = a.genetate(1000); Task task = new Task(products,0,products.size(),0.20); //ForkJoinPool pool = new ForkJoinPool(10); ForkJoinPool pool = new ForkJoinPool(); pool.submit(task); do{ System.out.printf("Main: 線程數量:%d\n",pool.getActiveThreadCount()); System.out.printf("Main: Thread 竊取數量:%d\n",pool.getStealCount()); System.out.printf("Main: Thread 平行數量:%d\n",pool.getParallelism()); TimeUnit.MILLISECONDS.sleep(5); }while(!task.isDone()); pool.shutdown(); if(task.isCompletedNormally()){ System.out.printf("Main: The process has completed normally.\n"); } for(int i = 0;i<products.size();i++){ Product p = products.get(i); if(p.getPrice()!=12){ System.out.printf("Product %s:%f\n",p.getName(),p.getName()); } } System.out.printf("Main:End of the Program.\n"); } }
注意:咱們採用了無參的構造方式建立了this
ForkJoinPool pool = new ForkJoinPool(); 他將執行默認的配置,建立一個線程書等於計算機CPU數目的線程池。
另外ForkJionPool 類還提供瞭如下方法用於執行任務。spa
execute(Runnabletask) 注意的是使用Runnable對象時,ForkJionPool 不會採用工做竊取算法。僅僅實用ForkJoinTask類的時候採用工做竊取算法 invoke(ForkJoinTask<T> list) execute方法是異步調用的,此方法是同步調用的