Fork/Join框架的核心是由下列兩個類組成的。java
ForkJoinPool:這個類實現了 ExecutorServcie接口和工做竊取算法(work-Stealing Algorithm)。他管理工組者線程,並提供任務的狀態信息,以及任務的執行信息。算法
ForkJoinTask:這個類是一個將在ForkJoinPool中執行的任務的基類。app
Fork/Join框架提供了在一個任務裏執行fork()和join()操做的機制和控制狀態的方法。一般,爲了實現Fork/Join任務,須要實現一個如下兩個類之一的子類。
框架
RecursiveAction: 用於惹怒沒有返回結果的場景。less
RecursiveTask: 用於任務有返回結果的場景。ide
Code:this
package com.packtpub.java7.concurrency.chapter5.recipe01.util; /** * This class stores the data of a Product. It's name and it's price * */ public class Product { /** * Name of the product */ private String name; /** * Price of the product */ private double price; /** * This method returns the name of the product * @return the name of the product */ public String getName() { return name; } /** * This method establish the name of the product * @param name the name of the product */ public void setName(String name) { this.name = name; } /** * This method returns the price of the product * @return the price of the product */ public double getPrice() { return price; } /** * This method establish the price of the product * @param price the price of the product */ public void setPrice(double price) { this.price = price; } }
package com.packtpub.java7.concurrency.chapter5.recipe01.util; import java.util.ArrayList; import java.util.List; /** * This class generates a product list of a determined size. * Each product is initialized with a predefined name and price. * */ public class ProductListGenerator { /** * This method generates the list of products * @param size the size of the product list * @return the generated list of products */ public List<Product> generate (int size) { List<Product> ret=new ArrayList<Product>(); for (int i=0; i<size; i++){ Product product=new Product(); product.setName("Product "+i); product.setPrice(10); ret.add(product); } return ret; } }
package com.packtpub.java7.concurrency.chapter5.recipe01.task; import java.util.List; import java.util.concurrent.RecursiveAction; import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product; /** * This class implements the tasks that are going to update the * price information. If the assigned interval of values is less that 10, it * increases the prices of the assigned products. In other case, it divides * the assigned interval in two, creates two new tasks and execute them * */ public class Task extends RecursiveAction { /** * serial version UID. The ForkJoinTask class implements the serializable interface. */ private static final long serialVersionUID = 1L; /** * List of products */ private List<Product> products; /** * Fist and Last position of the interval assigned to the task */ private int first; private int last; /** * Increment in the price of products this task has to apply */ private double increment; /** * Constructor of the class. Initializes its attributes * @param products list of products * @param first first element of the list assigned to the task * @param last last element of the list assigned to the task * @param increment price increment that this task has to apply */ public Task (List<Product> products, int first, int last, double increment) { this.products=products; this.first=first; this.last=last; this.increment=increment; } /** * Method that implements the job of the task * 若是last 和 first屬性值的差別大於10,就建立兩個新的Task對象,一個處理前一半的產品,另外一個處理後一半的產品 * 而後調用ForkJoinPool的invokeAll()方法來執行這兩個新的任務。 */ @Override protected void compute() { if (last-first<10) { updatePrices(); } else { int middle=(last+first)/2; System.out.printf("Task: Pending tasks: %s\n",getQueuedTaskCount()); Task t1=new Task(products, first,middle+1, increment); Task t2=new Task(products, middle+1,last, increment); invokeAll(t1, t2); } } /** * Method that updates the prices of the assigned products to the task */ private void updatePrices() { for (int i=first; i<last; i++){ Product product=products.get(i); product.setPrice(product.getPrice()*(1+increment)); } } }
package com.packtpub.java7.concurrency.chapter5.recipe01.core; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import com.packtpub.java7.concurrency.chapter5.recipe01.task.Task; import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product; import com.packtpub.java7.concurrency.chapter5.recipe01.util.ProductListGenerator; /** * Main class of the example. It creates a list of products, a ForkJoinPool and * a task to execute the actualization of products. * */ public class Main { /** * Main method of the example * @param args */ public static void main(String[] args) { // Create a list of products ProductListGenerator generator=new ProductListGenerator(); List<Product> products=generator.generate(10000); // Craete a task Task task=new Task(products, 0, products.size(), 0.20); // Create a ForkJoinPool ForkJoinPool pool=new ForkJoinPool(); // Execute the Task pool.execute(task); // Write information about the pool do { System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount()); System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount()); System.out.printf("Main: Paralelism: %d\n",pool.getParallelism()); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); // Shutdown the pool pool.shutdown(); // Check if the task has completed normally if (task.isCompletedNormally()){ System.out.printf("Main: The process has completed normally.\n"); } // Expected result: 12. Write products which price is not 12 for (int i=0; i<products.size(); i++){ Product product=products.get(i); if (product.getPrice()!=12) { System.out.printf("Product %s: %f\n",product.getName(),product.getPrice()); } } // End of the program System.out.println("Main: End of the program.\n"); } }