Java 線程池

系統啓動一個線程的成本是比較高的,由於它涉及到與操做系統的交互,使用線程池的好處是提升性能,當系統中包含大量併發的線程時,會致使系統性能劇烈降低,甚至致使JVM崩潰,而線程池的最大線程數參數能夠控制系統中併發線程數不超過次數。java

1、Executors 工廠類用來產生線程池,該工廠類包含如下幾個靜態工廠方法來建立對應的線程池。建立的線程池是一個ExecutorService對象,使用該對象的submit方法或者是execute方法執行相應的Runnable或者是Callable任務。線程池自己在再也不須要的時候調用shutdown()方法中止線程池,調用該方法後,該線程池將再也不容許任務添加進來,可是會直到已添加的全部任務執行完成後才死亡。緩存

一、newCachedThreadPool(),建立一個具備緩存功能的線程池,提交到該線程池的任務(Runnable或Callable對象)建立的線程,若是執行完成,會被緩存到CachedThreadPool中,供後面須要執行的任務使用。併發

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CacheThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
        
        //先添加三個任務到線程池
        for(int i = 0 ; i < 3; i++) {
            cacheThreadPool.execute(new Task());
        }
        
        //等三個線程執行完成後,再次添加三個任務到線程池
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        for(int i = 0 ; i < 3; i++) {
            cacheThreadPool.execute(new Task());
        }
    }

}

執行結果以下:dom

CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7
CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7

線程池中的線程對象進行了緩存,當有新任務執行時進行了複用。可是若是有特別多的併發時,緩存線程池仍是會建立不少個線程對象。ide

二、newFixedThreadPool(int nThreads) 建立一個指定線程個數,線程可複用的線程池。函數

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        // 先添加三個任務到線程池
        for (int i = 0; i < 5; i++) {
            fixedThreadPool.execute(new Task());
        }

        // 等三個線程執行完成後,再次添加三個任務到線程池
        try {
            Thread.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 3; i++) {
            fixedThreadPool.execute(new Task());
        }
    }

}

執行結果:性能

FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7
FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7this

三、newSingleThreadExecutor(),建立一個只有單線程的線程池,至關於調用newFixedThreadPool(1)spa

四、newSheduledThreadPool(int corePoolSize),建立指定線程數的線程池,它能夠在指定延遲後執行線程。也能夠以某一週期重複執行某一線程,知道調用shutdown()關閉線程池。操作系統

示例以下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("time " + System.currentTimeMillis()  + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        
        scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);
        
        scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS);
    
        try {
            Thread.sleep(30 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduledExecutorService.shutdown();
    }

}

運行結果以下:

time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6
time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6
time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7

由運行時間可看出,任務是按照5秒的週期執行的。

五、newSingleThreadScheduledExecutor() 建立一個只有一個線程的線程池,同調用newScheduledThreadPool(1)。

2、ForkJoinPool和ForkJoinTask

ForkJoinPool是ExecutorService的實現類,支持將一個任務劃分爲多個小任務並行計算,在把多個小任務的計算結果合併成總的計算結果。它有兩個構造函數

ForkJoinPool(int parallelism)建立一個包含parallelism個並行線程的ForkJoinPool。

ForkJoinPool(),以Runtime.availableProcessors()方法返回值做爲parallelism參數來建立ForkJoinPool。

ForkJoinTask 表明一個能夠並行,合併的任務。它是實現了Future<T>接口的抽象類,它有兩個抽象子類,表明無返回值任務的RecuriveAction和有返回值的RecursiveTask。可根據具體需求繼承這兩個抽象類實現本身的對象,而後調用ForkJoinPool的submit 方法執行。

RecuriveAction 示例以下,實現並行輸出0-300的數字。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ActionForkJoinTask {
    static class PrintTask extends RecursiveAction {
        private static final int THRESHOLD = 50;
        private int start;
        private int end;

        public PrintTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            if (end - start < THRESHOLD) {
                for(int i = start; i < end; i++) {
                    System.out.println(Thread.currentThread().getName() + " " + i);
                }
            } else {
                int middle = (start + end) / 2;
                PrintTask left = new PrintTask(start, middle);
                PrintTask right = new PrintTask(middle, end);
                left.fork();
                right.fork();
            }
        }

    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        
        pool.submit(new PrintTask(0,  300));
        try {
            pool.awaitTermination(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        pool.shutdown();
    }

}

在拆分小任務後,調用任務的fork()方法,加入到ForkJoinPool中並行執行。

RecursiveTask示例,實現並行計算100個整數求和。拆分爲每20個數求和後獲取結果,在最後合併爲最後的結果。

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class TaskForkJoinTask {
    static class CalTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 20;

        private int arr[];
        private int start;
        private int end;

        public CalTask(int[] arr, int start, int end) {
            this.arr = arr;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;

            if (end - start < THRESHOLD) {
                for (int i = start; i < end; i++) {
                    sum += arr[i];
                }
                System.out.println(Thread.currentThread().getName() + "  sum:" + sum);
                return sum;
            } else {
                int middle = (start + end) / 2;
                CalTask left = new CalTask(arr, start, middle);
                CalTask right = new CalTask(arr, middle, end);

                left.fork();
                right.fork();

                return left.join() + right.join();
            }
        }

    }

    public static void main(String[] args) {
        int arr[] = new int[100];
        Random random = new Random();
        int total = 0;

        for (int i = 0; i < arr.length; i++) {
            int tmp = random.nextInt(20);
            total += (arr[i] = tmp);
        }
        System.out.println("total " + total);

        ForkJoinPool pool = new ForkJoinPool(4);

        Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
        try {
            System.out.println("cal result: " + future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }

}

執行結果以下:

total 912
ForkJoinPool-1-worker-2  sum:82
ForkJoinPool-1-worker-2  sum:123
ForkJoinPool-1-worker-2  sum:144
ForkJoinPool-1-worker-3  sum:119
ForkJoinPool-1-worker-2  sum:106
ForkJoinPool-1-worker-2  sum:128
ForkJoinPool-1-worker-2  sum:121
ForkJoinPool-1-worker-3  sum:89
cal result: 912

子任務執行完後,調用任務的join()方法獲取子任務執行結果,再相加得到最後的結果。

相關文章
相關標籤/搜索