Fork/Join框架

Fork/Join框架的核心是由下列兩個類組成的。java

ForkJoinPool:這個類實現了 ExecutorServcie接口和工做竊取算法(work-Stealing Algorithm)。他管理工組者線程,並提供任務的狀態信息,以及任務的執行信息。算法

ForkJoinTask:這個類是一個將在ForkJoinPool中執行的任務的基類。app

  Fork/Join框架提供了在一個任務裏執行fork()和join()操做的機制和控制狀態的方法。一般,爲了實現Fork/Join任務,須要實現一個如下兩個類之一的子類。
框架

    RecursiveAction: 用於惹怒沒有返回結果的場景。less

    RecursiveTask:  用於任務有返回結果的場景。ide


Code:this

package com.packtpub.java7.concurrency.chapter5.recipe01.util;

/**
 * This class stores the data of a Product. It's name and it's price
 *
 */
public class Product {
    
    /**
     * Name of the product
     */
    private String name;
    
    /**
     * Price of the product
     */
    private double price;
    
    /**
     * This method returns the name of the product
     * @return the name of the product
     */
    public String getName() {
        return name;
    }
    
    /**
     * This method establish the name of the product
     * @param name the name of the product
     */
    public void setName(String name) {
        this.name = name;
    }
    
    /**
     * This method returns the price of the product
     * @return the price of the product
     */
    public double getPrice() {
        return price;
    }
    
    /**
     * This method establish the price of the product
     * @param price the price of the product
     */
    public void setPrice(double price) {
        this.price = price;
    }

}
package com.packtpub.java7.concurrency.chapter5.recipe01.util;

import java.util.ArrayList;
import java.util.List;

/**
 * This class generates a product list of a determined size.
 * Each product is initialized with a predefined name and price.
 *
 */
public class ProductListGenerator {

    /**
     * This method generates the list of products
     * @param size the size of the product list
     * @return the generated list of products
     */
    public List<Product> generate (int size) {
        List<Product> ret=new ArrayList<Product>();
        
        for (int i=0; i<size; i++){
            Product product=new Product();
            product.setName("Product "+i);
            product.setPrice(10);
            ret.add(product);
        }
        
        return ret;
    }

}
package com.packtpub.java7.concurrency.chapter5.recipe01.task;

import java.util.List;
import java.util.concurrent.RecursiveAction;

import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product;

/**
 * This class implements the tasks that are going to update the
 * price information. If the assigned interval of values is less that 10, it
 * increases the prices of the assigned products. In other case, it divides
 * the assigned interval in two, creates two new tasks and execute them
 *
 */
public class Task extends RecursiveAction {

    /**
     * serial version UID. The ForkJoinTask class implements the serializable interface.
     */
    private static final long serialVersionUID = 1L;

    /**
     * List of products
     */
    private List<Product> products;
    
    /**
     * Fist and Last position of the interval assigned to the task
     */
    private int first;
    private int last;
    
    /**
     * Increment in the price of products this task has to apply
     */
    private double increment;
    
    /**
     * Constructor of the class. Initializes its attributes
     * @param products list of products
     * @param first first element of the list assigned to the task
     * @param last last element of the list assigned to the task
     * @param increment price increment that this task has to apply
     */
    public Task (List<Product> products, int first, int last, double increment) {
        this.products=products;
        this.first=first;
        this.last=last;
        this.increment=increment;
    }
    
    /**
     * Method that implements the job of the task
     * 若是last 和 first屬性值的差別大於10,就建立兩個新的Task對象,一個處理前一半的產品,另外一個處理後一半的產品
     * 而後調用ForkJoinPool的invokeAll()方法來執行這兩個新的任務。
     */
    @Override
    protected void compute() {
        if (last-first<10) {
            updatePrices();
        } else {
            int middle=(last+first)/2;
            System.out.printf("Task: Pending tasks: %s\n",getQueuedTaskCount());
            Task t1=new Task(products, first,middle+1, increment);
            Task t2=new Task(products, middle+1,last, increment);
            invokeAll(t1, t2);    
        }
    }

    /**
     * Method that updates the prices of the assigned products to the task
     */
    private void updatePrices() {
        for (int i=first; i<last; i++){
            Product product=products.get(i);
            product.setPrice(product.getPrice()*(1+increment));
        }
    }

}
package com.packtpub.java7.concurrency.chapter5.recipe01.core;

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

import com.packtpub.java7.concurrency.chapter5.recipe01.task.Task;
import com.packtpub.java7.concurrency.chapter5.recipe01.util.Product;
import com.packtpub.java7.concurrency.chapter5.recipe01.util.ProductListGenerator;

/**
 * Main class of the example. It creates a list of products, a ForkJoinPool and 
 * a task to execute the actualization of products. 
 *
 */
public class Main {

    /**
     * Main method of the example
     * @param args
     */
    public static void main(String[] args) {

        // Create a list of products
        ProductListGenerator generator=new ProductListGenerator();
        List<Product> products=generator.generate(10000);
        
        // Craete a task
        Task task=new Task(products, 0, products.size(), 0.20);
        
        // Create a ForkJoinPool
        ForkJoinPool pool=new ForkJoinPool();
        
        // Execute the Task
        pool.execute(task);

        // Write information about the pool
        do {
            System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
            System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount());
            System.out.printf("Main: Paralelism: %d\n",pool.getParallelism());
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());
    
        // Shutdown the pool
        pool.shutdown();
        
        // Check if the task has completed normally
        if (task.isCompletedNormally()){
            System.out.printf("Main: The process has completed normally.\n");
        }

        // Expected result: 12. Write products which price is not 12
        for (int i=0; i<products.size(); i++){
            Product product=products.get(i);
            if (product.getPrice()!=12) {
                System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
            }
        }
        
        // End of the program
        System.out.println("Main: End of the program.\n");

    }

}
相關文章
相關標籤/搜索