線程池整理

通常在生產環境中,咱們都不會直接new一個Thread,而後再去start(),由於這麼作會不斷頻繁的建立線程,銷燬線程,過大的線程會耗盡CPU和內存資源,大量的垃圾回收,也會給GC帶來壓力,延長GC停頓時間.java

一、固定大小線程池算法

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

運行結果:併發

1539134496389:Thread ID:11
1539134496389:Thread ID:12
1539134496389:Thread ID:13
1539134496389:Thread ID:14
1539134496389:Thread ID:15
1539134497390:Thread ID:14
1539134497390:Thread ID:12
1539134497390:Thread ID:15
1539134497390:Thread ID:13
1539134497390:Thread ID:11框架

結果解讀:運行結果並非一次刷出來的,而是刷出了5個,中間會停頓1秒,再刷出5個,說明,並行處理是5個線程執行一次,而後再並行處理5個。ide

將Executors.newFixedThreadPool改爲Executors.newCachedThreadPool()性能

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

結果相同,可是是同時並行處理的,中間沒有停頓,說明newCachedThreadPool()是根據須要來分配線程數的。this

二、計劃任務線程

newScheduledThreadPool()有兩個方法來調用線程對象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他們之間的差異就是scheduleAtFixedRate()總共只佔用調度時間,而scheduleWithFixedDelay()佔用的是線程執行時間加調度時間.但若是scheduleAtFixedRate()的線程執行時間大於調度時間,也不會出現重複調度(即一個線程尚未執行完,另一個線程會啓動),而是一個線程執行完,另外一個線程立刻啓動.3d

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

運行結果(部分截取)對象

2001:pool-1-thread-1
2000:pool-1-thread-1
2000:pool-1-thread-2
2000:pool-1-thread-1
2001:pool-1-thread-3
2000:pool-1-thread-2

結果解讀:儘管有時間調度,他們依然是不一樣的線程來運行的,每顯示一條中間停頓2秒(線程運行時間也是2秒)

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

運行結果與以前相同,可是每顯示一條的時間間隔爲4秒(線程運行時間依然爲2秒),其中2秒爲調度時間,2秒爲運行時間.

三、核心線程池的內部實現。

其實不管是Executors工廠的哪一種實現,都是調用了同一個類ThreadPoolExecutor,使用了不一樣的構造參數罷了.不一樣的構造參數能夠產生不一樣種類的線程池,所以咱們也能夠自定義線程池.

JDK實現

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

拒絕策略

當線程池任務數量超過系統實際承載能力時,能夠啓用拒絕策略。

直接中斷策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor()的最後一個參數爲中斷策略,上面的new ThreadPoolExecutor.AbortPolicy()爲直接中斷!

參數說明:

第一個參數corePoolSize:指定了線程池中的線程數量.

第二個參數maximumPoolSize:指定了線程池中的最大線程數量.

第三個參數KeepAliveTime:當線程池線程數量超過了corePoolSize時,多餘的空閒線程的存活時間.即超過corePoolSize的空閒線程,在多長時間內會被銷燬.

第四個參數unit:keepAliveTime的單位.

第五個參數workQueue:任務隊列,被提交但還沒有被執行的任務.

1,直接提交的隊列:SynchronousQueue,無容量,每個插入操做都要等待一個刪除操做,提交的任務不會被真實保存,老是將新任務提交給線程執行.若是沒有空閒進程,則嘗試建立新的進程.若是進程數量達到最大,則執行拒絕策略.

2,有界的任務隊列:ArrayBlockingQueue,必須帶一個容量參數,表示該隊列的最大容量.當線程池的實際線程數小於corePoolSize,會優先建立新的線程,若大於corePoolSize,則會將新任務加入到等待隊列.若等待隊列滿的時候,沒法加入,則在總線程數不大於maximumPoolSize的前提下,建立新的進程執行任務.若大於maximumPoolSize,執行拒絕策略.

3,無界的任務隊列:LinkedBlockingQueue,除非系統資源耗盡,不存在任務入隊失敗的狀況.當線程池的實際線程數小於corePoolSize,會優先建立新的線程,若大於corePoolSize,則會將新任務加入到等待隊列,若任務的建立和處理的速度差別很大,無界隊列會保持快速增加,直到耗盡系統內存.

4,優先任務隊列:PriorityBlockingQueue,能夠控制任務執行的前後順序.是一個特殊的無界隊列.不管是ArrayBlockingQueue仍是LinkedBlockingQueue都是按照先進先出算法處理任務的,而PriorityBlockingQueue則能夠根據任務自身的優先級順序前後執行,老是確保高優先級的任務先執行.

第六個參數threadFactory:線程工廠,用於建立線程,通常用默認的便可.

第七個參數handler:拒絕策略,當任務太多,來不及處理,如何拒絕任務.

運行結果

1539153799420:Thread ID:11
1539153799430:Thread ID:12
1539153799440:Thread ID:13
1539153799450:Thread ID:14
1539153799460:Thread ID:15
1539153799520:Thread ID:11
1539153799530:Thread ID:12
1539153799540:Thread ID:13
1539153799550:Thread ID:14
1539153799560:Thread ID:15
1539153799621:Thread ID:11
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)

結果解讀:因爲併發線程數量太大,Integer.MAX_VALUE,咱們線程池的最大線程數只有5個,而無界任務隊列LinkedBlockingQueue<Runnable>只有10個,沒法知足快速的線程數量增加,拒絕策略發揮做用,拋出異常,阻止系統正常工做.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

new ThreadPoolExecutor.CallerRunsPolicy()只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務,但性能極有可能會急劇降低.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardOldestPolicy()該策略將丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardPolicy()丟棄沒法處理的任務,不予任何處理.

自定義拒絕策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

運行結果:

1539159379178:Thread ID:11
1539159379187:Thread ID:12
1539159379197:Thread ID:13
1539159379207:Thread ID:14
1539159379217:Thread ID:15
1539159379279:Thread ID:11
1539159379288:Thread ID:12
1539159379301:Thread ID:13
1539159379308:Thread ID:14
1539159379318:Thread ID:15
1539159379379:Thread ID:11
1539159379388:Thread ID:12
1539159379401:Thread ID:13
java.util.concurrent.FutureTask@45ee12a7 is discard
1539159379408:Thread ID:14
1539159379418:Thread ID:15
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard

這裏只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丟棄的任務.

自定義線程建立

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create " + t);
                        return t;
                    }
                });
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }
}

就是能夠本身定義線程,如守護線程等等

運行結果:

create Thread[Thread-0,5,main]
create Thread[Thread-1,5,main]
create Thread[Thread-2,5,main]
create Thread[Thread-3,5,main]
create Thread[Thread-4,5,main]
1539159694414:Thread ID:11
1539159694414:Thread ID:12
1539159694414:Thread ID:13
1539159694414:Thread ID:14
1539159694414:Thread ID:15

擴展線程池

線程池能夠擴展出線程執行前,執行後,終止的後續處理

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        public void run() {
            System.out.println("正在執行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準備執行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("執行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("線程池退出!");
            }
        };
        for (int i = 0;i < 5;i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

運行結果:

準備執行:TASK-GEYM-0
正在執行:Thread ID11,Task Name=TASK-GEYM-0
準備執行:TASK-GEYM-1
正在執行:Thread ID12,Task Name=TASK-GEYM-1
準備執行:TASK-GEYM-2
正在執行:Thread ID13,Task Name=TASK-GEYM-2
準備執行:TASK-GEYM-3
正在執行:Thread ID14,Task Name=TASK-GEYM-3
準備執行:TASK-GEYM-4
正在執行:Thread ID15,Task Name=TASK-GEYM-4
執行完成:TASK-GEYM-0
執行完成:TASK-GEYM-1
執行完成:TASK-GEYM-2
執行完成:TASK-GEYM-3
執行完成:TASK-GEYM-4
線程池退出!

在線程池中尋找堆棧

有時候線程執行時會出現Bug,拋出異常,若是使用submit()來提交線程時,不會打印異常信息,而使用execute()來執行線程時能夠打印異常信息.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.submit(new DivTask(100,i));
        }
    }
}

這段代碼中,5個併發線程會有一個線程有除0錯誤

運行結果:

100.0
50.0
33.0
25.0

結果沒有任何提示,異常拋出.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

運行結果:

100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
50.0
33.0
    at com.guanjian.DivTask.run(DivTask.java:18)
25.0
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有異常拋出

重寫跟蹤線程池,自定義跟蹤

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) {
        return new Runnable() {
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    try {
                        throw e;
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        };
    }
}
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>());
//        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
//                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

運行結果:

100.0
java.lang.Exception: Client stack trace
50.0
    at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27)
33.0
25.0
    at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18)
    at com.guanjian.DivTask.main(DivTask.java:28)
java.lang.ArithmeticException: / by zero
    at com.guanjian.DivTask.run(DivTask.java:18)
    at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

這樣咱們就能夠知道是在哪裏出了錯.

四、分而治之,Fork/Join框架

將一個大任務拆分紅各類較小規模的任務,進行並行處理,也許按照約定條件拆分的任務仍是大於約定條件就繼續拆分.有兩種線程類型,一種是有返回值的RecursiveTask<T>,一種是沒有返回值的RecursiveAction,他們都繼承於ForkJoinTask<>,一個帶泛型<T>,一個是<Void>.

/**
 * Created by Administrator on 2018/10/11.
 * 能夠理解成一個Runnable(線程類)
 */
public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 能夠理解成run()方法
     * @return
     */
    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        //最終計算,全部的最終拆分都是在這裏計算
        if (canCompute) {
            for (long i = start;i <= end;i++) {
                sum += i;
            }
        }else {
            //並行計算的規模,拆分紅100個並行計算
            long step = (start + end) /100;
            //建立子任務線程集合
            List<CountTask> subTasks = new ArrayList<CountTask>();
            //每一個並行子任務的開始值
            long pos = start;
            //並行執行100個分叉線程
            for (int i = 0;i < 100;i++) {
                //每一個並行子任務的結束值
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                //創建一個子任務的線程
                CountTask subTask = new CountTask(pos,lastOne);
                //建立下一個並行子任務的開始值
                pos += step + 1;
                //將當前子任務線程添加到線程集合
                subTasks.add(subTask);
                //執行該線程,實際上是一個遞歸,判斷lastOne-pos是否小於THRESHOLD,小於則真正執行,不然繼續分叉100個子線程
                subTask.fork();
            }
            for (CountTask t:subTasks) {
                //阻斷每一次分叉前的上一級線程進行等待,並將最終並行的結果進行層層累加
                sum += t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum: " + res);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

sum: 20000100000

相關文章
相關標籤/搜索