Java線程池瞭解一下

前言

立刻就要過年了,還在崗位上堅守「swimming」的小夥伴們頂住。博主給你們帶來一篇線程池的基本使用解解悶。java

爲何須要使用線程池

一、減小線程建立與切換的開銷編程

  • 在沒有使用線程池的時候,來了一個任務,就建立一個線程,咱們知道系統建立和銷燬工做線程的開銷很大,並且頻繁的建立線程也就意味着須要進行頻繁的線程切換,這都是一筆很大的開銷。

二、控制線程的數量設計模式

  • 使用線程池咱們能夠有效地控制線程的數量,當系統中存在大量併發線程時,會致使系統性能劇烈降低。

線程池作了什麼

重複利用有限的線程緩存

  • 線程池中會預先建立一些空閒的線程,他們不斷的從工做隊列中取出任務,而後執行,執行完以後,會繼續執行工做隊列中的下一個任務,減小了建立和銷燬線程的次數,每一個線程均可以一直被重用,變了建立和銷燬的開銷。

線程池的使用

其實經常使用Java線程池本質上都是由ThreadPoolExecutor或者ForkJoinPool生成的,只是其根據構造函數傳入不一樣的實參來實例化相應線程池而已。併發

Executors

Executors是一個線程池工廠類,該工廠類包含以下集合靜態工廠方法來建立線程池:dom

  • newFixedThreadPool():建立一個可重用的、具備固定線程數的線程池
  • newSingleThreadExecutor():建立只有一個線程的線程池
  • newCachedThreadPool():建立一個具備緩存功能的線程池
  • newWorkStealingPool():建立持有足夠線程的線程池來支持給定的並行級別的線程池
  • newScheduledThreadPool():建立具備指定線程數的線程池,它能夠在指定延遲後執行任務線程

ExecutorService接口

對設計模式有了解過的同窗都會知道,咱們儘可能面向接口編程,這樣對程序的靈活性是很是友好的。Java線程池也採用了面向接口編程的思想,能夠看到ThreadPoolExecutorForkJoinPool全部都是ExecutorService接口的實現類。在ExecutorService接口中定義了一些經常使用的方法,而後再各類線程池中均可以使用ExecutorService接口中定義的方法,經常使用的方法有以下幾個:ide

  • 向線程池提交線程
    • Future<?> submit():將一個Runnable對象交給指定的線程池,線程池將在有空閒線程時執行Runnable對象表明的任務,該方法既能接收Runnable對象也能接收Callable對象,這就意味着sumbit()方法能夠有返回值。
    • void execute(Runnable command):只能接收Runnable對象,意味着該方法沒有返回值。
  • 關閉線程池
    • void shutdown():阻止新來的任務提交,對已經提交了的任務不會產生任何影響。(等待全部的線程執行完畢才關閉)
    • List<Runnable> shutdownNow(): 阻止新來的任務提交,同時會中斷當前正在運行的線程,另外它還將workQueue中的任務給移除,並將這些任務添加到列表中進行返回。(立馬關閉)
  • 檢查線程池的狀態
    • boolean isShutdown():調用shutdown()或shutdownNow()方法後返回爲true。
    • boolean isTerminated():當調用shutdown()方法後,而且全部提交的任務完成後返回爲true;當調用shutdownNow()方法後,成功中止後返回爲true。

常見線程池使用示例

1、newFixedThreadPool

線程池中的線程數目是固定的,無論你來了多少的任務。函數

示例代碼性能

public class MyFixThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 建立一個線程數固定爲5的線程池
        ExecutorService service = Executors.newFixedThreadPool(5);

        System.out.println("初始線程池狀態:" + service);

        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("線程提交完畢以後線程池狀態:" + service);

        service.shutdown();//會等待全部的線程執行完畢才關閉,shutdownNow:立馬關閉
        System.out.println("是否所有線程已經執行完畢:" + service.isTerminated());//全部的任務執行完了,就會返回true
        System.out.println("是否已經執行shutdown()" + service.isShutdown());
        System.out.println("執行完shutdown()以後線程池的狀態:" + service);

        TimeUnit.SECONDS.sleep(5);
        System.out.println("5秒鐘事後,是否所有線程已經執行完畢:" + service.isTerminated());
        System.out.println("5秒鐘事後,是否已經執行shutdown()" + service.isShutdown());
        System.out.println("5秒鐘事後,線程池狀態:" + service);
    }

}
複製代碼

運行結果:this

初始線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
線程提交完畢以後線程池狀態:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否所有線程已經執行完畢:false
是否已經執行shutdown():true
執行完shutdown()以後線程池的狀態:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒鐘事後,是否所有線程已經執行完畢:true
5秒鐘事後,是否已經執行shutdown():true
5秒鐘事後,線程池狀態:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]

程序分析

  • 當咱們建立好一個FixedThreadPool以後,該線程池就處於Running狀態了,可是pool size(線程池線程的數量)、active threads(當前活躍線程) queued tasks(當前排隊線程)、completed tasks(已完成的任務數)都是0
  • 當咱們把6個任務都提交給線程池以後,
    • pool size = 5:由於咱們建立的是一個固定線程數爲5的線程池(注意:若是這個時候咱們只提交了3個任務,那麼pool size = 3,說明線程池也是經過懶加載的方式去建立線程)。
    • active threads = 5:雖然咱們向線程池提交了6個任務,可是線程池的固定大小爲5,因此活躍線程只有5個
    • queued tasks = 1:雖然咱們向線程池提交了6個任務,可是線程池的固定大小爲5,只能有5個活躍線程同時工做,因此有一個任務在等待
  • 咱們第一次執行shutdown()的時候,因爲任務尚未所有執行完畢,因此isTerminated()返回falseshutdown()返回true,而線程池的狀態會由Running變爲Shutting down
  • 從任務的運行結果咱們能夠看出,名爲pool-1-thread-2執行了兩次任務,證實線程池中的線程確實是重複利用的。
  • 5秒鐘後,isTerminated()返回trueshutdown()返回true,證實全部的任務都執行完了,線程池也關閉了,咱們再次檢查線程池的狀態[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6],狀態已經處於Terminated了,而後已完成的任務顯示爲6
2、newSingleThreadExecutor

從頭至尾整個線程池都只有一個線程在工做。

實例代碼

public class SingleThreadPool {

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; i++) {
            final int j = i;
            service.execute(() -> {
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }

}
複製代碼

運行結果

0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1

程序分析 能夠看到只有pool-1-thread-1一個線程在工做。

3、newCachedThreadPool

來多少任務,就建立多少線程(前提是沒有空閒的線程在等待執行任務,不然仍是會複用以前舊(緩存)的線程),直接你電腦能支撐的線程數的極限爲止。

實例代碼

public class CachePool {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println("初始線程池狀態:" + service);

        for (int i = 0; i < 12; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println("線程提交完畢以後線程池狀態:" + service);

        TimeUnit.SECONDS.sleep(50);
        System.out.println("50秒後線程池狀態:" + service);

        TimeUnit.SECONDS.sleep(30);
        System.out.println("80秒後線程池狀態:" + service);
    }

}
複製代碼

運行結果

初始線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
線程提交完畢以後線程池狀態:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒後線程池狀態:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒後線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]

程序分析

  • 由於咱們每一個線程任務至少須要500毫秒的執行時間,因此當咱們往線程池中提交12個任務的過程當中,基本上沒有空閒的線程供咱們重複使用,因此線程池會建立12個線程。
  • 緩存中的線程默認是60秒沒有活躍就會被銷燬掉,能夠看到在50秒鐘的時候回,全部的任務已經完成了,可是線程池線程的數量仍是12。
  • 80秒事後,能夠看到線程池中的線程已經所有被銷燬了。
4、newScheduledThreadPool

能夠在指定延遲後或週期性地執行線程任務的線程池。

ScheduledThreadPoolExecutor

  • newScheduledThreadPool()方法返回的實際上是一個ScheduledThreadPoolExecutor對象,ScheduledThreadPoolExecutor定義以下:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
複製代碼
  • 能夠看到,它仍是繼承了ThreadPoolExecutor並實現了ScheduledExecutorService接口,而ScheduledExecutorService也是繼承了ExecutorService接口,因此咱們也能夠像使用以前的線程池對象同樣使用,只不過是該對象會額外多了一些方法用於控制延遲與週期:
    • public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit):指定callable任務將在delay延遲後執行
    • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):指定的command任務將在delay延遲後執行,並且已設定頻率重複執行。(一開始並不會執行)
    • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit):建立並執行一個在給定初始延遲後首期啓用的按期操做,隨後在每個執行終止和下一次執行開始之間都存在給定的延遲。

示例代碼

下面代碼每500毫秒打印一次當前線程名稱以及一個隨機數字。

public class MyScheduledPool {

    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}
複製代碼
5、newWorkStealingPool

每一個線程維護着本身的隊列,執行完本身的任務以後,會去主動執行其餘線程隊列中的任務。

示例代碼

public class MyWorkStealingPool {

    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool(4);
        System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());

        service.execute(new R(1000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));

        //因爲產生的是精靈線程(守護線程、後臺線程),主線程不阻塞的話,看不到輸出
        System.in.read();
    }

    static class R implements Runnable {

        int time;

        R(int time) {
            this.time = time;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}
複製代碼

運行結果

cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1

程序分析 ForkJoinPool-1-worker-1任務的執行時間是1秒,它會最早執行完畢,而後它會去主動執行其餘線程隊列中的任務。

6、ForkJoinPool
  • ForkJoinPool能夠將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。ForkJoinPool提供了以下幾個方法用於建立ForkJoinPool實例對象:

    • ForkJoinPool(int parallelism):建立一個包含parallelism個並行線程的ForkJoinPool,parallelism的默認值爲Runtime.getRuntime().availableProcessors()方法的返回值
    • ForkJoinPool commonPool():該方法返回一個通用池,通用池的運行狀態不會受shutdown()shutdownNow()方法的影響。
  • 建立了ForkJoinPool示例以後,就能夠調用ForkJoinPoolsubmit(ForkJoinTask task)invoke(ForkJoinTask task)方法來執行指定任務了。其中ForkJoinTask(實現了Future接口)表明一個能夠並行、合併的任務。ForkJoinTask是一個抽象類,他還有兩個抽象子類:RecursiveActionRecursiveTask。其中RecursiveTask表明有返回值的任務,而RecursiveAction表明沒有返回值的任務。

示例代碼

下面代碼演示了使用ForkJoinPool對1000000個隨機整數進行求和。

public class MyForkJoinPool {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random random = new Random();

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt(1000);
        }
        System.out.println(Arrays.stream(nums).sum());
    }

// static class AddTask extends RecursiveAction {
//
// int start, end;
//
// AddTask(int start, int end) {
// this.start = start;
// this.end = end;
// }
//
// @Override
// protected void compute() {
// if (end - start <= MAX_NUM) {
// long sum = 0L;
// for (int i = 0; i < end; i++) sum += nums[i];
// System.out.println("from:" + start + " to:" + end + " = " + sum);
// } else {
// int middle = start + (end - start) / 2;
//
// AddTask subTask1 = new AddTask(start, middle);
// AddTask subTask2 = new AddTask(middle, end);
// subTask1.fork();
// subTask2.fork();
// }
// }
// }

    static class AddTask extends RecursiveTask<Long> {

        int start, end;

        AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 當end與start之間的差大於MAX_NUM,將大任務分解成兩個「小任務」
            if (end - start <= MAX_NUM) {
                long sum = 0L;
                for (int i = start; i < end; i++) sum += nums[i];
                return sum;
            } else {
                int middle = start + (end - start) / 2;

                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                // 並行執行兩個「小任務」
                subTask1.fork();
                subTask2.fork();
                // 把兩個「小任務」累加的結果合併起來
                return subTask1.join() + subTask2.join();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        forkJoinPool.execute(task);

        long result = task.join();
        System.out.println(result);

        forkJoinPool.shutdown();
    }
}
複製代碼

額外補充

上面咱們說到過:其實經常使用Java線程池都是由ThreadPoolExecutor或者ForkJoinPool兩個類生成的,只是其根據構造函數傳入不一樣的實參來生成相應線程池而已。那咱們如今一塊兒來看看Executors中幾個建立線程池對象的靜態方法相關的源碼:

ThreadPoolExecutor構造函數原型

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
複製代碼

參數說明

  • corePoolSize:核心運行的poolSize,也就是當超過這個範圍的時候,就須要將新的Runnable放入到等待隊列workQueue中了。
  • maximumPoolSize:線程池維護線程的最大數量,當大於了這個值就會將任務由一個丟棄處理機制來處理(固然也存在永遠不丟棄任務的線程池,具體得看策略)。
  • keepAliveTime:線程空閒時的存活時間(當線程數大於corePoolSize時該參數纔有效)[java doc中的是這樣寫的 :when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.]
  • unit:keepAliveTime的單位。
  • workQueue:用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口。

執行任務的過程

  1. poolSize (當前實際須要使用的線程數) < corePoolSize,提交 Runnable 任務,會立馬執行。
  2. 當提交的任務數超過了 corePoolSize ,會將當前的 Runnable 提交到一個 BlockingQueue 中。
  3. 有界隊列滿了以後,若是 poolSize < maximumPoolSize 時,會嘗試new一個Thread進行急救處理,立馬執行對應的Runnable任務。
  4. 若是第三步也沒法處理了,就會走到第四步執行reject操做。

newFixedThreadPool

poolSize 和 maximumPoolSize 相等,使用無界隊列存儲,不管來多少任務,隊列都能塞的下,因此線程池中的線程數老是 poolSize。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

newSingleThreadExecutor

poolSize 和 maximumPoolSize 都爲1,使用無界隊列存儲,不管來多少任務,隊列都能塞的下,因此線程池中的線程數老是 1。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

newCachedThreadPool

poolSize 爲 0,來一個任務直接扔到隊列中,使用SynchronousQueue存儲(沒有容量的隊列),因此來來一個任務就得新建一個線程,maximumPoolSize 爲 Integer.MAX_VALUE,能夠當作是容許建立無限的線程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

newScheduledThreadPool

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
複製代碼

newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
複製代碼

拉票環節

以爲文章寫得不錯的朋友能夠點贊、轉發、加關注呀!大家的支持就是我最大的動力,筆芯!

相關文章
相關標籤/搜索