Fork/Join 框架(一):建立Fork/Join

說到Fork/Join 框架 ,得不提起執行器框架(Executor Framework),它將任務的建立和執行進行了分離,經過Executor Framework只須要實現Runnable接口的對象和使用Executor對象,而後將Runnable 對象發送個執行器。執行器在負責運行這些任務所須要的線程,包括線程的建立,線程的管理以及線程的結束。html

java 7 又更近了一步,它包括了ExecutorService 接口的另外一種實現,用來解決特殊類型的問題。就是Fork/Join 框架,有時也稱爲」分解/合併框架「。java

Fork/Join 框架,是用來解決可以經過分治技術,將問題拆分爲小任務的問題。它和執行器框架的只要區別在於工做竊取算法。算法

下面實現簡單的例子,咱們實現一項更新產品價格的任務。最初的任務將負責更新列表中的全部的元素的價格。若是一個任務須要更新大於10個元素,將分爲兩個部分去執行。而後再去更新各自部分的產品價格。框架

一、建立一個產品類,用來存儲產品的名稱和價格
package five2;
/**
 * 
 * @author qpx
 *
 */
public class Product {
	
	private String name;
	private double price;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	
	

}

  二、建立生成一個隨機產品列表的類異步

package five2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * 隨機產品列表
 * @author qpx
 *
 */
public class productListGenetator {
	
	public List<Product> genetate(int size){
		
		List<Product> products = new ArrayList<>();
		for(int i = 0 ;i<size;i++){
			Product product = new Product();
			product.setName("Product"+i);
			product.setPrice(10);
			products.add(product);
			
			
		}
		return products;
		
	}
	
	public static void main(String[] args) {
		List<Integer> aaa = new ArrayList<Integer>();
		aaa.add(1);
		aaa.add(0, 2);
		
		System.out.println(Arrays.toString(aaa.toArray()));
	}

}

 三、建立一個Task 的類,繼承RecursiveAction 類,這個是主任務類ide

package five2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveAction {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private List<Product> products;

	private int first;
	private int last;

	private double increment;

	public Task(List<Product> products, int first, int last, double increment) {
		super();
		this.products = products;
		this.first = first;
		this.last = last;
		this.increment = increment;
	}

	@Override
	protected void compute() {
		// TODO Auto-generated method stub‘
		if (this.last - this.first < 10) {

			updateprices();
		} else {
			int middle = (last + first) / 2+1;
			//System.out.println("middle:"+middle);
			System.out.printf(Thread.currentThread().getName()+"  Task:Pending tasks:%s\n", getQueuedTaskCount());
			Task t1 = new Task(products, first, (middle), increment);
			//System.out.println("t1:first:"+first+",last:"+middle);
			Task t2 = new Task(products, middle, last, increment);
			//System.out.println("t2:first:"+middle+",last:"+last);
			invokeAll(t1, t2);

		}

	}

	private void updateprices() {
		// TODO Auto-generated method stub
		for (int i = first; i < last; i++) {
			Product product = products.get(i);
			product.setPrice(product.getPrice() * (1 + increment));
			System.out.println(Thread.currentThread().getName() + "的i值:"    
                    + i);    

		}
	}

	public static void main(String[] args) throws InterruptedException {
			productListGenetator a = new productListGenetator();
			List<Product> products = a.genetate(1000);
			
			Task task = new Task(products,0,products.size(),0.20);
			
			//ForkJoinPool pool = new ForkJoinPool(10);
		        ForkJoinPool pool = new ForkJoinPool();

			
			pool.submit(task);
			
			
			do{
				System.out.printf("Main: 線程數量:%d\n",pool.getActiveThreadCount());
				
				System.out.printf("Main: Thread 竊取數量:%d\n",pool.getStealCount());
				
				System.out.printf("Main: Thread 平行數量:%d\n",pool.getParallelism());
				
				
				TimeUnit.MILLISECONDS.sleep(5);

				
				
			}while(!task.isDone());
			pool.shutdown();
			
			
			if(task.isCompletedNormally()){
				System.out.printf("Main: The process has completed normally.\n");
				
				
			}
			
			for(int i = 0;i<products.size();i++){
				
				Product p = products.get(i);
				if(p.getPrice()!=12){
					
					System.out.printf("Product %s:%f\n",p.getName(),p.getName());
				}
				
				
			}
			
			System.out.printf("Main:End of the Program.\n");
	}
}

 注意:咱們採用了無參的構造方式建立了this

ForkJoinPool pool = new ForkJoinPool(); 他將執行默認的配置,建立一個線程書等於計算機CPU數目的線程池。

   另外ForkJionPool 類還提供瞭如下方法用於執行任務。spa

   

execute(Runnabletask) 注意的是使用Runnable對象時,ForkJionPool 不會採用工做竊取算法。僅僅實用ForkJoinTask類的時候採用工做竊取算法
invoke(ForkJoinTask<T> list)  execute方法是異步調用的,此方法是同步調用的
相關文章
相關標籤/搜索