深刻學習java線程池

咱們都是經過new Thread來建立一個線程,因爲線程的建立和銷燬都須要消耗必定的CPU資源,因此在高併發下這種建立線程的方式將嚴重影響代碼執行效率。而線程池的做用就是讓一個線程執行結束後不立刻銷燬,繼續執行新的任務,這樣就節省了不斷建立線程和銷燬線程的開銷。編程

ThreadPoolExecutor

建立Java線程池最爲核心的類爲ThreadPoolExecutor:併發

QQ截圖20190630215357.png

它提供了四種構造函數來建立線程池,其中最爲核心的構造函數以下所示:dom

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
複製代碼

這7個參數的含義以下:ide

  • corePoolSize 線程池核心線程數。即線程池中保留的線程個數,即便這些線程是空閒的,也不會被銷燬,除非經過ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法開啓了核心線程的超時策略;函數

  • maximumPoolSize 線程池中容許的最大線程個數;高併發

  • keepAliveTime 用於設置那些超出核心線程數量的線程的最大等待時間,超過這個時間尚未新任務的話,超出的線程將被銷燬;this

  • unit 超時時間單位;spa

  • workQueue 線程隊列。用於保存經過execute方法提交的,等待被執行的任務;線程

  • threadFactory 線程建立工程,即指定怎樣建立線程;3d

  • handler 拒絕策略。即指定當線程提交的數量超出了maximumPoolSize後,該使用什麼策略處理超出的線程。

在經過這個構造方法建立線程池的時候,這幾個參數必須知足如下條件,不然將拋出IllegalArgumentException異常:

  • corePoolSize不能小於0;

  • keepAliveTime不能小於0;

  • maximumPoolSize 不能小於等於0;

  • maximumPoolSize不能小於corePoolSize;

此外,workQueue、threadFactory和handler不能爲null,不然將拋出空指針異常。

下面舉些例子來深刻理解這幾個參數的含義。

使用上面的構造方法建立一個線程池:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
System.out.println("線程池建立完畢");

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活躍線程個數 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心線程個數 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("隊列線程個數 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大線程數 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
複製代碼

上面的代碼建立了一個核心線程數量爲1,容許最大線程數量爲2,最大活躍時間爲10秒,線程隊列長度爲1的線程池。

假如咱們經過execute方法向線程池提交1個任務,看看結果如何:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1, 2, 10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
                new ThreadPoolExecutor.AbortPolicy());
System.out.println("線程池建立完畢");

threadPoolExecutor.execute(() -> sleep(100));

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活躍線程個數 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心線程個數 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("隊列線程個數 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大線程數 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
複製代碼

ThreadPoolExecutor的execute和submit方法均可以向線程池提交任務,區別是,submit方法可以返回執行結果,返回值類型爲Future

sleep方法代碼:

private static void sleep(long value) {
    try {
        System.out.println(Thread.currentThread().getName() + "線程執行sleep方法");
        TimeUnit.SECONDS.sleep(value);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
複製代碼

啓動程序,控制檯輸出以下:

QQ截圖20190630222238.png

線程池核心線程數量爲1,經過execute提交了一個任務後,因爲核心線程是空閒的,因此任務被執行了。因爲這個任務的邏輯是休眠100秒,因此在這100秒內,線程池的活躍線程數量爲1。此外,由於提交的任務被核心線程執行了,因此並無線程須要被放到線程隊列裏等待,線程隊列長度爲0。

假如咱們經過execute方法向線程池提交2個任務,看看結果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼

QQ截圖20190701183457.png

線程池核心線程數量爲1,經過execute提交了2個任務後,一開始核心線程是空閒的,Thread-0被執行。因爲這個任務的邏輯是休眠100秒,因此在這100秒內,線程池的活躍線程數量爲1。由於核心線程數量爲1,因此另一個任務在這100秒內不能被執行,因而被放到線程隊列裏等待,線程隊列長度爲1。

假如咱們經過execute方法向線程池提交3個任務,看看結果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼

QQ截圖20190701184303.png

這三個任務都是休眠100秒,因此核心線程池中第一個任務正在被執行,第二個任務被放入到了線程隊列。而當第三個任務被提交進來時,線程隊列滿了(咱們定義的長度爲1),因爲該線程池容許的最大線程數量爲2,因此線程池還能夠再建立一個線程來執行另一個任務,因而乎以前在線程隊列裏的線程被取出執行(FIFO),第三個任務被放入到了線程隊列。

改變第二個和第三個任務的睡眠時間,觀察輸出:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
複製代碼

QQ截圖20190701185215.png

第二個任務提交5秒後,任務執行完畢,因此線程隊列裏的任務被執行,因而隊列線程個數爲0,活躍線程數量爲2(第一個和第三個任務)。再過5秒後,第三個任務執行完畢,因而活躍線程數量爲1(第一個100秒還沒執行完畢)。

在第三個任務結束的瞬間,咱們觀察線程快照:

QQ截圖20190701185617.png

能夠看到,線程池中有兩個線程,Thread-0在執行第一個任務(休眠100秒,還沒結束),Thread-1執行完第三個任務後並無立刻被銷燬。過段時間後(10秒鐘後)再觀察線程快照:

QQ截圖20190701190444.png

能夠看到,Thread-1這個線程被銷燬了,由於咱們在建立線程池的時候,指定keepAliveTime 爲10秒,10秒後,超出核心線程池線程外的那些線程將被銷燬。

假如一次性提交4個任務,看看會怎樣:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼

QQ截圖20190701190808.png

由於咱們設置的拒絕策略爲AbortPolicy,因此最後提交的那個任務直接被拒絕了。更多拒絕策略下面會介紹到。

關閉線程池

線程池包含如下幾個狀態:

QQ截圖20190702100110.png

當線程池中全部任務都處理完畢後,線程並不會本身關閉。咱們能夠經過調用shutdown和shutdownNow方法來關閉線程池。二者的區別在於:

shutdown方法將線程池置爲shutdown狀態,拒絕新的任務提交,但線程池並不會立刻關閉,而是等待全部正在折行的和線程隊列裏的任務都執行完畢後,線程池纔會被關閉。因此這個方法是平滑的關閉線程池。

shutdownNow方法將線程池置爲stop狀態,拒絕新的任務提交,中斷正在執行的那些任務,而且清除線程隊列裏的任務並返回。因此這個方法是比較「暴力」的。

舉兩個例子觀察下二者的區別:

shutdown例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    System.out.println("已經執行了線程池shutdown方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

啓動程序,控制檯輸出以下:

QQ截圖20190702101041.png

能夠看到,雖然在任務都被提交後立刻執行了shutdown方法,可是並不會立刻關閉線程池,而是等待全部被提交的任務都執行完了才關閉。

shutdownNow例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 立刻關閉,並返回還未被執行的任務
    System.out.println(runnables);

    System.out.println("已經執行了線程池shutdownNow方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

啓動程序,控制檯輸出以下:

QQ截圖20190702101355.png

能夠看到,在執行shutdownNow方法後,線程池立刻就被關閉了,正在執行中的兩個任務被打斷,而且返回了線程隊列中等待被執行的兩個任務。

經過上面兩個例子咱們還能夠看到shutdown和shutdownNow方法都不是阻塞的。常與shutdown搭配的方法有awaitTermination。

awaitTermination方法接收timeout和TimeUnit兩個參數,用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,不然返回false。該方法是阻塞的:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 4, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    if (isShutdown) {
        System.out.println("線程池在3秒內成功關閉");
    } else {
        System.out.println("等了3秒還沒關閉,不等了╰(‵□′)╯");
    }
    System.out.println("------------");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

啓動程序輸出以下:

QQ截圖20190702102156.png

4大拒絕策略

當線程池沒法再接收新的任務的時候,可採起以下四種策略:

QQ截圖20190302111014.png

CallerRunsPolicy CallerRunsPolicy策略:由調用線程處理該任務:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.CallerRunsPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

上面的線程池最多隻能一次性提交4個任務,第5個任務提交後會被拒絕策略處理。啓動程序輸出以下:

QQ截圖20190702103818.png

能夠看到,第5個提交的任務由調用線程(即main線程)處理該任務。

AbortPolicy AbortPolicy策略:丟棄任務,並拋出RejectedExecutionException異常。前面的例子就是使用該策略,因此再也不演示。

DiscardOldestPolicy DiscardOldestPolicy策略:丟棄最先被放入到線程隊列的任務,將新提交的任務放入到線程隊列末端:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.DiscardOldestPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

啓動程序輸出以下:

QQ截圖20190702105646.png

能夠看到最後提交的任務被執行了,而第3個任務是第一個被放到線程隊列的任務,被丟棄了。

DiscardPolicy DiscardPolicy策略:直接丟棄新的任務,不拋異常:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 3, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.DiscardPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程當中被打斷" + e.getMessage());
        }
    }
}
複製代碼

啓動程序,輸出以下:

QQ截圖20190702110022.png

第5個任務直接被拒絕丟棄了,而沒有拋出任何異常。

線程池工廠方法

除了使用ThreadPoolExecutor的構造方法建立線程池外,咱們也可使用Executors提供的工廠方法來建立不一樣類型的線程池:

QQ截圖20190702110350.png

newFixedThreadPool 查看newFixedThreadPool方法源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼

能夠看到,經過newFixedThreadPool建立的是一個固定大小的線程池,大小由nThreads參數指定,它具備以下幾個特色:

由於corePoolSize和maximumPoolSize的值都爲nThreads,因此線程池中線程數量永遠等於nThreads,不可能新建除了核心線程數的線程來處理任務,即keepAliveTime實際上在這裏是無效的。

LinkedBlockingQueue是一個無界隊列(最大長度爲Integer.MAX_VALUE),因此這個線程池理論是能夠無限的接收新的任務,這就是爲何上面沒有指定拒絕策略的緣由。

newCachedThreadPool 查看newCachedThreadPool方法源碼:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼

這是一個理論上無限大小的線程池:

核心線程數爲0,SynchronousQueue隊列是沒有長度的隊列,因此當有新的任務提交,若是有空閒的還未超時的(最大空閒時間60秒)線程則執行該任務,不然新增一個線程來處理該任務。

由於線程數量沒有限制,理論上能夠接收無限個新任務,因此這裏也沒有指定拒絕策略。

newSingleThreadExecutor 查看newSingleThreadExecutor源碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼

核心線程數和最大線程數都爲1,每次只能有一個線程處理任務。

LinkedBlockingQueue隊列能夠接收無限個新任務。

newScheduledThreadPool 查看newScheduledThreadPool源碼:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
   
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
複製代碼

因此newScheduledThreadPool理論是也是能夠接收無限個任務,DelayedWorkQueue也是一個無界隊列。

使用newScheduledThreadPool建立的線程池除了能夠處理普通的Runnable任務外,它還具備調度的功能:

1.延遲指定時間後執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲5秒執行
executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);
複製代碼

2.按指定的速率執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲1秒執行,而後每5秒執行一次
executorService.scheduleAtFixedRate(
        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
複製代碼

QQ截圖20190702152117.png

3.按指定的時延執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
複製代碼

QQ截圖20190702152440.png

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay沒啥區別,實際它們仍是有區別的:

scheduleAtFixedRate按照固定速率執行任務,好比每5秒執行一個任務,即便上一個任務沒有結束,5秒後也會開始處理新的任務;

scheduleWithFixedDelay按照固定的時延處理任務,好比每延遲5秒執行一個任務,不管上一個任務處理了1秒,1分鐘仍是1小時,下一個任務老是在上一個任務執行完畢後5秒鐘後開始執行。

對於這些線程池工廠方法的使用,阿里巴巴編程規程指出:

QQ截圖20190702153306.png

由於這幾個線程池理論是均可以接收無限個任務,因此這就有內存溢出的風險。實際上只要咱們掌握了ThreadPoolExecutor構造函數7個參數的含義,咱們就能夠根據不一樣的業務來建立出符合需求的線程池。通常線程池的建立能夠參考以下規則:

IO密集型任務,線程池線程數量能夠設置爲2 X CPU核心數;

計算密集型任務,線程池線程數量能夠設置爲CPU核心數 + 1。

一些API的用法 ThreadPoolExecutor提供了幾個判斷線程池狀態的方法:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1, 2, 5, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    threadPoolExecutor.shutdown();
    System.out.println("線程池爲shutdown狀態:" + threadPoolExecutor.isShutdown());
    System.out.println("線程池正在關閉:" + threadPoolExecutor.isTerminating());
    System.out.println("線程池已經關閉:" + threadPoolExecutor.isTerminated());
    threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
    System.out.println("線程池已經關閉" + threadPoolExecutor.isTerminated());
}
複製代碼

程序輸出以下:

20190703205843.png

前面咱們提到,線程池核心線程即便是空閒狀態也不會被銷燬,除非使用allowCoreThreadTimeOut設置了容許核心線程超時:

public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               1, 2, 3, TimeUnit.SECONDS,
               new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
               new ThreadPoolExecutor.AbortPolicy()
       );
       threadPoolExecutor.allowCoreThreadTimeOut(true);
       threadPoolExecutor.execute(() -> {
           try {
               TimeUnit.SECONDS.sleep(5);
               System.out.println("任務執行完畢");
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });
   }
複製代碼

程序輸出以下所示:

asdfasdfaaaaa.gif

5秒後任務執行完畢,核心線程處於空閒的狀態。由於經過allowCoreThreadTimeOut方法設置了容許核心線程超時,因此3秒後(keepAliveTime設置爲3秒),核心線程被銷燬。核心線程被銷燬後,線程池也就沒有做用了,因而就自動關閉了。

值得注意的是,若是一個線程池調用了allowCoreThreadTimeOut(true)方法,那麼它的keepAliveTime不能爲0。

ThreadPoolExecutor提供了一remove方法,查看其源碼:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
複製代碼

可看到,它刪除的是線程隊列中的任務,而非正在被執行的任務。舉個例子:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1, 2, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );
    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("任務執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Runnable r = () -> System.out.println("看看我是否會被刪除");
    threadPoolExecutor.execute(r);
    threadPoolExecutor.remove(r);

    threadPoolExecutor.shutdown();
}
複製代碼

執行程序,輸出以下:

QQ截圖20190703211746.png

可看到任務並無被執行,已經被刪除,由於惟一一個核心線程已經在執行任務了,因此後提交的這個任務被放到了線程隊列裏,而後經過remove方法刪除。

默認狀況下,只有當往線程池裏提交了任務後,線程池纔會啓動核心線程處理任務。咱們能夠經過調用prestartCoreThread方法,讓核心線程即便沒有任務提交,也處於等待執行任務的活躍狀態:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 2, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );
    System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
}
複製代碼

程序輸出以下所示:

QQ截圖20190703213145.png

該方法返回boolean類型值,若是因此核心線程都啓動了,返回false,反之返回true。

還有一個和它相似的prestartAllCoreThreads方法,它的做用是一次性啓動全部核心線程,讓其處於活躍地等待執行任務的狀態。

ThreadPoolExecutor的invokeAny方法用於隨機執行任務集合中的某個任務,並返回執行結果,該方法是同步方法:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 5, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    // 任務集合
    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());
    // 隨機執行結果
    Integer result = threadPoolExecutor.invokeAny(tasks);
    System.out.println("-------------------");
    System.out.println(result);
    threadPoolExecutor.shutdownNow();
}
複製代碼

啓動程序,輸出以下:

QQ截圖20190704091530.png

ThreadPoolExecutor的invokeAll則是執行任務集合中的全部任務,返回Future集合:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 5, 3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
            new ThreadPoolExecutor.AbortPolicy()
    );

    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());

    List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
    futureList.stream().map(f->{
        try {
            return f.get();
        } catch (InterruptedException | ExecutionException e) {
           return null;
        }
    }).forEach(System.out::println);

    threadPoolExecutor.shutdownNow();
}
複製代碼

輸出以下:

QQ截圖20190704091836.png

總結下這些方法:

方法 描述
allowCoreThreadTimeOut(boolean value) 是否容許核心線程空閒後超時,是的話超時後核心線程將銷燬,線程池自動關閉
awaitTermination(long timeout, TimeUnit unit) 阻塞當前線程,等待線程池關閉,timeout用於指定等待時間。
execute(Runnable command) 向線程池提交任務,沒有返回值
submit(Runnable task) 向線程池提交任務,返回Future
isShutdown() 判斷線程池是否爲shutdown狀態
isTerminating() 判斷線程池是否正在關閉
isTerminated() 判斷線程池是否已經關閉
remove(Runnable task) 移除線程隊列中的指定任務
prestartCoreThread() 提早讓一個核心線程處於活躍狀態,等待執行任務
prestartAllCoreThreads() 提早讓全部核心線程處於活躍狀態,等待執行任務
getActiveCount() 獲取線程池活躍線程數
getCorePoolSize() 獲取線程池核心線程數
threadPoolExecutor.getQueue() 獲取線程池線程隊列
getMaximumPoolSize() 獲取線程池最大線程數
shutdown() 讓線程池處於shutdown狀態,再也不接收任務,等待全部正在運行中的任務結束後,關閉線程池。
shutdownNow() 讓線程池處於stop狀態,再也不接受任務,嘗試打斷正在運行中的任務,並關閉線程池,返回線程隊列中的任務。
相關文章
相關標籤/搜索