fork Join很像mapreduce的處理過程。先將任務切割成小的任務分別計算而後再將小任務的計算結果合併起來java
package forkJoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.RecursiveTask; /** * Created by luozhitao on 2017/9/29. */ public class forkTask extends RecursiveTask<Integer> { private static int LIMIT=10; int start; int end; int sum; public forkTask(int start,int end){ this.start=start; this.end=end; } @Override protected Integer compute() { if(end-start<LIMIT){ for (int i=start;i<=end;i++){ sum+=i; } } else{ int mid=(start+end)/2; // System.out.println("new -----"); forkTask task1=new forkTask(start,mid); forkTask task2=new forkTask(mid+1,end); invokeAll(task1,task2); try {
//拿到小任務的計算結果 sum = task1.get() + task2.get(); }catch (InterruptedException e){ } catch (ExecutionException e){} } return sum; } }
main併發
package forkJoin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; /** * Created by luozhitao on 2017/9/29. */ public class Taskapp { private static Logger logger= LoggerFactory.getLogger(Taskapp.class); public static void main(String [] args){ ForkJoinPool forkJoinPool=new ForkJoinPool(); forkTask forktask=new forkTask(1,1000000); Future<Integer> future=forkJoinPool.submit(forktask); do{ logger.info("activeThreadCount {},stealCount {},parallelisem {},QueueCount {}",forkJoinPool.getActiveThreadCount(),forkJoinPool.getStealCount(),forkJoinPool.getParallelism(),forkJoinPool.getQueuedTaskCount()); // logger.info("stealCount",forkJoinPool.getStealCount()); //最大併發數 // logger.info("parallelisem",forkJoinPool.getParallelism()); // logger.info("QueueCount",forkJoinPool.getQueuedTaskCount()); }while (!forktask.isDone()); forkJoinPool.shutdown(); try { logger.info("results is {}",future.get()); }catch (ExecutionException e){ }catch (InterruptedException e){} // // // } }