Java 線程池詳解

1 概述

本文主要講解了Java裏面線程池的接口以及實現類,以及它們的基本使用方法,內容包括:java

  • Executor/Executors
  • ExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

2 兩個重要的接口:Executor+ExecutorService

Executor是一個接口,裏面只是定義了一個簡單的任務提交方法:shell

//Executor
package java.util.concurrent;

public interface Executor {
    void execute(Runnable var1);
}

ExecutorService也是一個接口,繼承了Executor,而且提供了更多用於任務提交和管理的一些方法,好比中止任務的執行等:安全

//ExecutorService
package java.util.concurrent;

import java.util.Collection;
import java.util.List;

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

下面將詳細講述ExecutorService的兩個重要實現:bash

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

3 ThreadPoolExecutor

這就是一般所說的線程池類,一般來講,一個線程池有以下特徵:併發

  • 線程池有必定數量的工做線程
  • 線程數量以及任務數量會受到必定的控制和管理
  • 任務的執行以異步的方式進行
  • 線程池會負責執行任務的信息統計

3.1 一個簡單的例子

先來看一個簡單的例子:框架

public class Main {
    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        //執行沒有返回值的任務
        executor.execute(()-> System.out.println(" Execute the runnable task."));
        //執行帶返回值的任務,用到了Future泛型類
        Future<String> future = executor.submit(()->" Execute the callable task and this is the result.");
        //經過get()獲取任務結果,get()會在任務未完成時一直阻塞
        System.out.println(future.get());
           //手動關閉線程池
        executor.shutdown();
    }
}

從這個簡單的例子能夠看到,線程池能夠執行帶返回值以及不帶返回值的任務,帶返回值的話須要使用get()方法阻塞獲取。另外,運行完畢後須要手動關閉線程池,不然JVM不會退出,由於線程池中有指定數量的活躍線程數量,而JVM正常退出的條件是JVM進程中不存在任何運行着的非守護進程。異步

3.2 構造方法

構造方法的源碼以下:ide

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

雖然提供了四個構造方法,但本質上調用的是最後一個構造方法,該構造方法帶有7個參數,分別是:高併發

  • corePoolSize:核心線程數量,即便當線程池中的核心線程不工做,核心線程的數量也不會減小。該參數的最小值爲0,且小於等於maximumPoolSize
  • maximumPoolSize:用於設置線程池中容許的線程數量的最大值
  • keepAliveTime:當線程池中的線程數量超過核心線程數而且處於空閒時,線程池將會回收一部分線程讓出系統資源,該參數可用於設置超過corePoolSize數量的線程在多長時間後被回收,與後一個表示時間單位的參數unit配合使用
  • unit:用於設定keepAliveTime的時間單位
  • workQueure:用於存放已提交至線程池但未被執行的任務
  • threadFactory:用於建立線程的工廠,開發者能夠自定義ThreadFactory來建立線程
  • handler:拒絕策略,當任務超過阻塞隊列的邊界時,線程池會拒絕新增的任務,主要用於設置拒絕策略

3.3 任務執行流程

線程池被成功建立後,內部的運行線程並不會被當即建立,ThreadPoolExecutor會採用一種Lazy的方式去建立而且運行。首次調用執行任務方法時纔會建立線程,好比:this

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
assert executor.getActiveCount() == 0;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
executor.execute(()-> System.out.println(" Execute the runnable task."));
assert executor.getActiveCount() == 1;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;

(運行的時候請加上-ea參數)

下面看一下任務的具體執行流程:

  • 若是運行中線程數少於核心線程數,建立新線程並當即執行任務
  • 若是運行中的線程大於等於核心線程數,且任務隊列未滿時,會將任務先放進任務隊列,直到運行中的線程數執行完成本身的任務後,再去輪詢任務隊列以獲取任務運行
  • 若是任務隊列已滿,且運行中的線程數量小於最大線程數量時,線程池會建立線程執行任務,建立的線程數量會少於最大線程數
  • 若是任務隊列已滿且運行中的線程數量已到達最大線程數量,而且此刻沒有空閒的運行線程,會執行任務拒絕策略,取決於RejectedEcecutionHandler
  • 若線程池中的線程是空閒的,且空閒時間到達keepAliveTime指定時間,會回收線程,直到保留corePoolSize個核心線程爲止(不過核心線程也能夠設置被超時回收,默認不開啓核心線程超時)

3.4 線程工廠

線程工廠ThreadFactory是一個接口:

package java.util.concurrent;

public interface ThreadFactory {
    Thread newThread(Runnable var1);
}

使用線程工廠能夠在建立線程時加入自定義配置,好比指定名字、優先級、是否爲守護線程等,好比下面是線程工廠的一個簡單實現:

public class TestThreadFactory implements ThreadFactory {
    private final static String PREFIX = "Test thread[";
    private final static String SUFFIX = "]";
    private final static AtomicInteger THREAD_NUM = new AtomicInteger();
    @Override
    public Thread newThread(Runnable runnable) {
        ThreadGroup group = new ThreadGroup("My pool");
        Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX);
        thread.setPriority(5);
        return thread;
    }
}

3.5 拒絕策略

默認狀況下,ThreadPoolExecutor提供了四種拒絕策略:

  • DiscardPolicy:丟棄策略,直接丟棄任務
  • AbortPolicy:終止策略,拋出RejectedExecutionException
  • DiscardOldestPolicy:丟棄隊列中最老任務的策略(嚴格意義來講須要根據任務隊列去選擇,由於不是全部的隊列都是FIFO的)
  • CallerRunsPolicy:調用者線程執行策略,任務會在當前線程中阻塞執行

固然,若是不能知足須要,能夠實現RejectedExecutionHandler接口去自定義策略:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

3.6 關閉線程池

若是不須要線程池,那麼須要手動對線程池關閉。線程池提供了以下三種方式:

  • 有序關閉:shutdown()
  • 當即關閉:shutdownNow()
  • 組合關閉:shutdown()+shutdownNow()

3.6.1 有序關閉

shutdown()提供了一種有序關閉的方式去關閉線程池,調用該方法後,會等待當前執行的任務所有執行完成而後關閉,同時新提交任務將會被拒絕。注意該方法是非阻塞,當即返回的。若是須要查看關閉狀態,可使用:

  • isShutdown():返回是否調用了shutdown()的結果
  • isTerminating():返回是否正在結束中
  • isTerminated():返回是否已經結束

3.6.2 當即關閉

shutdownNow()方法首先將線程池狀態修改成shutdown狀態,而後將未被執行的任務掛起,接着將嘗試中斷運行中的線程,最後返回未執行的任務:

public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    IntStream.range(0,10).forEach(i-> executor.execute(()-> {
        try{
            TimeUnit.SECONDS.sleep(5);
        }catch (Exception e){
            e.printStackTrace();
        }
    }));
    List<Runnable> runnables = executor.shutdownNow();
    System.out.println(runnables.size());
}

輸出:

8

BUILD SUCCESSFUL in 326ms
2 actionable tasks: 2 executed
java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:339)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
    at com.company.Main.lambda$main$0(Main.java:29)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:339)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
    at com.company.Main.lambda$main$0(Main.java:29)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
3:14:36 AM: Task execution finished 'Main.main()'.

3.6.3 組合關閉

爲了確保安全關閉線程池,通常會使用組合方式關閉,確保正在運行的任務被正常執行的同時又能提升線程池被關閉的成功率,例子以下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(0,10).forEach(i-> executor.execute(()-> {
    try{
        TimeUnit.SECONDS.sleep(5);
    }catch (Exception e){
        e.printStackTrace();
    }
}));

//首先調用shutdown()嘗試關閉
executor.shutdown();
try{
    //若是等待一段時間後還沒關閉
    if(!executor.awaitTermination(10,TimeUnit.SECONDS)){
        //強制關閉
        executor.shutdownNow();
        //若是強制關閉失敗,好比運行的線程異常耗時且不能被中斷
        if(!executor.awaitTermination(10,TimeUnit.SECONDS)){
            //其餘處理,這裏只是輸出中斷失敗的信息
            System.out.println("Terminate failed.");
        }
    }
}catch (InterruptedException e){
    //若是當前線程被中斷,而且捕獲了異常,執行當即關閉方法
    executor.shutdownNow();
    //從新拋出中斷信號
    Thread.currentThread().interrupt();
}

4 ScheduledThreadPoolExecutor

ScheduledExecutorService繼承了ExecutorService,而且提供了任務被定時執行的特性,可使用ScheduledThreadPoolExecutor去實現某些特殊的任務執行。固然實現固定任務的方法或者框架有不少,有原生的shell實現,老式的Timer/TimerTask實現,或者專門的框架Quartz實現,這裏要說的是JDK內部的實現ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,除了具有ThreadPoolExecutor的全部方法外,還定義了4個與schedule有關的方法:

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):一個one-shot(只執行一次)的方法, 任務(callable)會在單位時間(delay)後被執行,而且當即返回ScheduledFuture
  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):也是一個one-shot方法,任務會在單位時間後被執行,與第一個方法不一樣的是返回的ScheduledFuture不包含任何執行結果,可是能夠經過返回的ScheduledFuture判斷任務是否執行結束
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):任務會根據固定的速率在initialDelay後不斷被執行
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):任務將以固定延遲單位時間的方式執行任務

關於後二者的區別以下:

public static void main(String[] args) throws Exception {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    Runnable runnable = ()->{
        long startTimestamp = System.currentTimeMillis();
        System.out.println("current timestamp: "+startTimestamp);
        try{
            TimeUnit.MILLISECONDS.sleep(current().nextInt(100));
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("elapsed time: "+(System.currentTimeMillis() - startTimestamp));
    };

    executor.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS);
//        executor.scheduleWithFixedDelay(runnable,10,1000,TimeUnit.MILLISECONDS);
}

輸出:

current timestamp: 1619351675438
elapsed time: 97
current timestamp: 1619351676438
elapsed time: 85
current timestamp: 1619351677438
elapsed time: 1
current timestamp: 1619351678438
elapsed time: 1
current timestamp: 1619351679438
elapsed time: 68
current timestamp: 1619351680438
elapsed time: 99

能夠看到任務始終以一種固定的速率運行,每次運行的開始時間始終相隔1000ms

而使用FixedDelay的輸出以下:

current timestamp: 1619351754890
elapsed time: 53
current timestamp: 1619351755944
elapsed time: 30
current timestamp: 1619351756974
elapsed time: 13
current timestamp: 1619351757987
elapsed time: 80
current timestamp: 1619351759068
elapsed time: 94
current timestamp: 1619351760162
elapsed time: 29

每次開始的時間爲上一次執行完成後的時間再加上時間間隔(1000ms)。

5 Executors中的線程池

Executors類提供了六種建立線程池的靜態方法:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadScheduledExecutor
  • WorkStealingPool

下面分別來看一下。

5.1 FixedThreadPool

源碼以下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
}

FixedThreadPool底層調用的是ThreadPoolExecutor,默認建立的核心線程數與最大線程數相等,任務隊列爲無邊界的LinkedBlockingQueue

5.2 SingleThreadExecutor

相關源碼以下:

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));
}

private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super(executor);
    }

    protected void finalize() {
        super.shutdown();
    }
}

能夠看到SingleThreadPool其實是內部類FinalizableDelegatedExecutorService的包裝,核心線程與最大線程數均爲1,任務隊列爲無邊界的LinkedBlockingQueue。發生GC的時候,會調用shutdown()方法。

5.3 CachedThreadPool

源碼以下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
}

CachedThreadPool會根據須要建立新線程,一般用於執行量大的,耗時較短的異步任務。未被使用且空閒時間超過60s的線程會被回收。

5.4 ScheduledThreadPool

源碼以下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

建立指定核心數ScheduledThreadPoolExecutor

5.5 SingleThreadScheduledExecutor

源碼以下:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}

private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService {
    private final ScheduledExecutorService e;

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
        super(executor);
        this.e = executor;
    }

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return this.e.schedule(command, delay, unit);
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return this.e.schedule(callable, delay, unit);
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return this.e.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
}

其實就是SingelThreadPool+ScheduledThreadPool

5.6 WorkStealingPool

源碼以下:

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

WorkStealingPoolJDK8引入的線程池,返回的是ForkJoinPool。在WorkStealingPool中,若是每一個線程處理的任務執行比較耗時,那麼它負責的任務會被其餘線程「竊取」,進而提升併發處理的效率。

相關文章
相關標籤/搜索