fork Join框架

fork Join很像mapreduce的處理過程。先將任務切割成小的任務分別計算而後再將小任務的計算結果合併起來java

package forkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

/**
 * Created by luozhitao on 2017/9/29.
 */
public class forkTask extends RecursiveTask<Integer> {
    private static int LIMIT=10;
    int start;
    int end;

    int sum;

    public forkTask(int start,int end){

        this.start=start;
        this.end=end;


    }


    @Override
    protected Integer compute() {


        if(end-start<LIMIT){


            for (int i=start;i<=end;i++){


                sum+=i;
            }

        }
        else{

            int mid=(start+end)/2;

         //   System.out.println("new -----");

            forkTask task1=new forkTask(start,mid);
            forkTask task2=new forkTask(mid+1,end);

            invokeAll(task1,task2);
            try {

//拿到小任務的計算結果 sum
= task1.get() + task2.get(); }catch (InterruptedException e){ } catch (ExecutionException e){} } return sum; } }

main併發

package forkJoin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
 * Created by luozhitao on 2017/9/29.
 */
public class Taskapp {

    private  static Logger logger= LoggerFactory.getLogger(Taskapp.class);

    public static void main(String [] args){


        ForkJoinPool forkJoinPool=new ForkJoinPool();


        forkTask forktask=new forkTask(1,1000000);

        Future<Integer> future=forkJoinPool.submit(forktask);


        do{

            logger.info("activeThreadCount {},stealCount {},parallelisem {},QueueCount {}",forkJoinPool.getActiveThreadCount(),forkJoinPool.getStealCount(),forkJoinPool.getParallelism(),forkJoinPool.getQueuedTaskCount());
        //    logger.info("stealCount",forkJoinPool.getStealCount());

            //最大併發數
      //      logger.info("parallelisem",forkJoinPool.getParallelism());
       //     logger.info("QueueCount",forkJoinPool.getQueuedTaskCount());


        }while (!forktask.isDone());


        forkJoinPool.shutdown();

        try {
            logger.info("results is {}",future.get());

        }catch (ExecutionException e){


        }catch (InterruptedException e){}



        //



        //

     //


    }
}
相關文章
相關標籤/搜索