Java 併發編程(四) 線程池的建立與使用

一.前言

在項目中,避免不了要使用多線程,爲了不資源浪費和線程數的不可控而出現未知的問題,咱們通常都會使用線程池;java

JDK中給咱們提供了多種能夠當即使用的建立線程池的方法,其都是基於ThreadPoolExecutor建立的線程池,因此ThreadPoolExecutor是線程池的基礎,咱們主要分析一下ThreadPoolExecutor的使用,以後再分析一下快速建立線程池的方法優劣,再最後分析一下多線程的更加靈活運用;數組

二.ThreadPoolExecutor

2.1 構造方法

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

上邊的是AllArgsConstructor,一共有七個參數;緩存

  • int corePoolSize : 核心線程池的數量,也是線程池保持活躍的線程數量;
    • 當新的任務進來時,若執行任務中的線程數量小於corePoolSize,則建立一條新的線程來執行任務, 直至線程數量等於corePoolSize;此後線程池中活躍線程的數量始終爲corePoolSize,除非設置了allowCoreThreadTimeOut參數爲true;
    • 在線程池建立之初,線程數量是爲0的,只有當任務進來沒有閒置線程的時候纔會建立新線程;
  • int maximumPoolSize : 最大線程數量;用於限制線程池的線程數量;
  • long keepAliveTime, TimeUnit unit : 保持活躍的時間,和時間單位;
    • 當線程池中線程數量大於corePoolSize ,多出的線程在任務執行完成以後繼續保持活躍的時間,時間事後線程將會被銷燬,直至線程數量爲corePoolSize;
  • BlockingQueue<Runnable> workQueue : 阻塞隊列;
    • 用於保存當線程數量達到corePoolSize時後續提交的任務;
    • BlockingQueue有幾個Impl,在後面會說到;
    • 當阻塞在workQueue中的任務等於隊列的長度的時候,也就是隊列Full的時候,線程池會建立新的線程來處理任務,maximumPoolSize會限制線程數量的最大值;
  • ThreadFactory threadFactory : 線程工廠,因爲設置線程屬性;
  • RejectedExecutionHandler handler : 拒絕策略;
    • 當線程池線程數量和阻塞隊列同時Full的時候,後續提交任務的處理策略,可以使用已提供的策略也能夠自定義實現;後面會說到;

2.2 BlockingQueue

2.2.1 SynchronousQueue

沒有任何容量的隊列,能夠理解爲容量爲0的隊列,當處理任務的線程數量大於等於corePoolSize時,新進任務會直接建立線程執行,若線程數量等於maximumPoolSizes 則會拋出RejectedExecutionException異常;bash

2.2.2 LinkedBlockingQueue

無界隊列,內部經過Node鏈表實現,若使用這種隊列則線程池中線程最大數量爲corePoolSize,參數maximumPoolSizes是無用的,由於超過corePoolSize的任務都會被放進queue中,且queue無界,不會觸發corePoolSize以外的線程建立;服務器

無界隊列並不表明真的無界,只是說明該隊列可支持無線長度,該隊列支持一個有參構造,可設置隊列長度,這樣maximumPoolSizes就不會失效了;多線程

2.2.3 ArrayBlockingQueue

有界隊列,必須設置一個固定容量,所以稱之爲有界,內部數組實現,可定義公平與非公平策略;併發

2.3 RejectedExecutionHandler

A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.app

當隊列和線程都處於Full狀態時,新進任務的處理策略,有以下3個默認給出的實現策略供咱們使用,固然也能夠自定義,只要實現其less

rejectedExecution(Runnable r, ThreadPoolExecutor e)方法就能夠了;ide

2.3.1 ThreadPoolExecutor.AbortPolicy

查看其實現:

/**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

可知不處理任務,始終拋出一個RejectedExecutionException異常;

該策略可很好的控制服務的線程數量和隊列的容量,但應該catch異常信息返回狀態碼,例如在app請求服務的時候返回服務繁忙請稍後再試的提示;

2.3.2 ThreadPoolExecutor.DiscardPolicy

其rejectedExecution方法實現爲空,從其註釋也能夠看出,該策略默默的丟棄了新進任務,沒有任何提示及異常;

因爲會致使任務丟失且不可感知,所以應該在特定的場景下使用;

2.3.3 ThreadPoolExecutor.DiscardOldestPolicy

/**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

刪除隊列中最頭部的任務,而後將新進任務插入隊列尾部;

該策略會致使任務丟失,與2.3.2同樣,除非特定場景不然我的不建議使用;

2.3.4 ThreadPoolExecutor.CallerRunsPolicy

/**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

若是線程池沒有掛掉,則使用新進任務的線程直接執行任務,而非等待使用線程池中的線程;

此種策略應該在服務內線程數量可控的範圍內,或在咱們很瞭解服務的線程使用狀況下使用;

若短期內有大量的新任務產生,此策略會致使服務內線程數目飆升,與咱們使用線程池的初衷不符;

三 Executors 快速建立線程池

3.1 ExecutorService newFixedThreadPool(int nThreads)

建立固定容量的線程池:

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

經過內部實現咱們看到也是利用ThreadPoolExecutor建立,只不過是默認了部分入參而已.

corePoolSize 和 maximumPoolSize 均爲int nThreads,從而限制了線程的數量,採用無界隊列LinkedBlockingQueue使沒有得到線程資源的任務所有進入隊列等待,任務不會丟失;參數keepAliveTime默認爲0,由於沒有多於核心線程數的線程被建立,因此無需設置此值;

固定線程池能夠很好的控制服務中線程的數量,很好的避免的線程數量激增,控制了CPU的佔用率,但也會帶來另外的問題,如果隊列中有大量的任務阻塞,勢必會致使內存飆升;所以,固定線程池適用於任務併發數量可控的,短期內不會有大量任務提交的場景;

若在短期內有大量任務併發,可是每一個任務的運算不會佔用很長時間,能夠考慮下面的線程池 : ExecutorService newCachedThreadPool();

3.2 ExecutorService newCachedThreadPool()

緩存線程池 :

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

corePoolSize設置爲0,即沒有核心線程數量,全部的線程都是用完後超過keepAliveTime時間後就銷燬;maximumPoolSize 設置爲Integer.MAX_VALUE,基本能夠理解爲無上限,阻塞隊列採用同步隊列SynchronousQueue,全部任務即時提交線程執行,即不會有任務被阻塞在隊列中.

該線程池適用於短期內有任務併發,但任務都是在短期內能夠處理完畢的;maximumPoolSize的值保證了全部任務都能被線程或新建立線程當即處理,keepAliveTime = 60L使得大量線程在處理完當下任務時能夠保持活躍等待下一個任務到來,避免每次都會新建立線程帶來的開銷,在支持建立大量線程的狀況下有保證了線程不會被浪費,當線程空閒時間到達指定時間後銷燬,又避免了大量線程同時存在,控制的線程的數量;

對於任務處理時間長的場景,線程佔用時間過長,每次新進任務都會建立新的線程,線程數會上升,該線程池就不適用了須要考慮其餘的線程池;

3.3 ExecutorService newSingleThreadExecutor()

單線程線程池;採用無界隊列的核心線程和最大線程都是1的線程池,全部任務會被串行的執行;

3.4 ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

支持延遲執行任務的線程池,maximumPoolSize爲Integer.MAX_VALUE;可用於定時任務的執行;還有不少靈活的用法,詳細的能夠點擊直接看第二節;

3.5 ExecutorService newWorkStealingPool(int parallelism)

工做竊取線程池,參數爲指定併發等級,默認爲服務器CPU的數量;該線程池內部基於ForkJoinPool,具體使用請點擊連接跳轉;

3.6 自定義線程池工具類

下面提供一個自定義線程池可直接使用,須要結合項目實際狀況適當修改:

package com.river.thread;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public enum ContextThreadPool {

    /**
     * 該類的一個實例,經過枚舉類實現單例模式
     */
    INSTANCE;

    public ThreadPoolExecutor getThreadPool(){
        return ThreadPoolHolder.pool;
    }

    private static class ThreadPoolHolder{
        /**
         * 阻塞隊列的容量
         */
        private final static int CAPACITY = 500;

        private static ThreadPoolExecutor pool ;
        /**
         * 獲取處理器數目
         */
        private static int availableProcessors = Runtime.getRuntime().availableProcessors();

        /**
         * 基於LinkedBlockingQueue的容量爲{@link CAPACITY}
         */
        private static BlockingQueue queue = new LinkedBlockingQueue(CAPACITY);

        static {
            pool = new ThreadPoolExecutor(
                    availableProcessors * 2,
                    availableProcessors * 4 + 1,
                    0,
                    TimeUnit.MILLISECONDS,
                    queue,
                    new ThreadFactory() {
                        private AtomicInteger count = new AtomicInteger(0);

                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            String threadName = EnvirmentThreadPool.class.getSimpleName() + "-thread-" + count.addAndGet(1);
                            thread.setName(threadName);
                            return thread;
                        }
                    },
                    //自定義線程池FULL時的策略,新的任務阻塞在隊列外面;
                    (r, executor) -> {
                        try {
                            queue.put(r);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            );
        }

    }
}

工具調用:

ContextThreadPool.INSTANCE.getThreadPool();

四. Future

如今咱們擁有了線程池,接下來就須要向線程池提交任務,目前有兩種方式:

  • void execute(Runnable command)    
  • <T> Future<T> submit(Callable<T> task)

                                                                             

前者定義在Executor中,用於任務無返回值的使用,後者定義在ExecutorService中,能夠拿到任務的結果Future;

咱們都知道,咱們建立線程有幾種方式,其中之一之二就是繼承Runnable接口和Callable接口,普通使用沒有什麼區別,可是在線程執行結果的獲取上就體現出來了;

@FunctionalInterface
public interface Runnable {
    //void返回
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

4.1 get result

  • V get() throws InterruptedException, ExecutionException;
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

能夠經過Future.上面的方法獲取線程返回值,有時候任務執行的時間比較長,在咱們獲取結果的時候尚未執行完畢,所以一般調用

boolean isDone();來判斷任務是否執行完畢;

package com.river.thread;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool();
        //正常這裏我會用lamdba表達式去寫,爲了明瞭接口實現
        Future<String> result = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "123";
            }
        });

        while (true){
            if (result.isDone()){
                System.out.println(result.get());
                break;
            }
            log.info("not finish");
        }

    }
}

日誌輸出:

2018-09-17 17:47:37.352 myAppName [main] INFO  com.river.thread.FutureTest - not finish
123

這裏看到第一次獲取結果是沒有獲取到的,第二次就獲取到了;

接下來咱們使用待超時的get()方法獲取結果:

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool();
        Future<String> result = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "123";
            }
        });

        log.info("get result");
        log.info(result.get(3, TimeUnit.SECONDS));

    }
2018-09-17 17:52:43.348 myAppName [main] INFO  com.river.thread.FutureTest - get result
2018-09-17 17:52:45.349 myAppName [main] INFO  com.river.thread.FutureTest - 123

能夠看到2s中以後獲取到了執行結果,若是線程執行時間超過獲取時間呢?

咱們將sleep參數改爲了5000,

2018-09-17 17:55:14.380 myAppName [main] INFO  com.river.thread.FutureTest - get result
Exception in thread "main" java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at com.river.thread.FutureTest.main(FutureTest.java:21)

能夠看到拋出了異常;

可是當咱們不嘗試get結果的時候,異常是不會被拋出來的,也就是說,Future有持有異常的能力;咱們能夠經過在任務執行完畢後catch該異常,從而執行相應的處理辦法;

一般狀況下,咱們會向線程池提交一個任務集合,將result保存在集合中,最後在遍歷集合中的執行結果來獲得最終的結果;

相關文章
相關標籤/搜索