深刻理解Java線程池

編者注:Java中的線程池是運用場景最多的併發組件,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。java

在開發過程當中,合理地使用線程池可以帶來至少如下幾個好處。web

  • 下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。算法

  • 提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。數據庫

  • 提升線程的可管理性:線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須瞭解其實現原理。數組

  • 代碼解耦:好比生產者消費者模式。安全

線程池實現原理

當提交一個新任務到線程池時,線程池的處理流程以下:微信

  1. 若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。併發

  2. 若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。app

  3. 若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟也須要獲取全局鎖)。異步

  4. 若是建立新線程將使當前運行的線程數超出maximumPoolSize,該任務將被拒絕,並調用相應的拒絕策略來處理(RejectedExecutionHandler.rejectedExecution()方法,線程池默認的飽和策略是AbortPolicy,也就是拋異常)。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

線程池任務 拒絕策略包括 拋異常直接丟棄丟棄隊列中最老的任務將任務分發給調用線程處理

線程池的建立:經過ThreadPoolExecutor來建立一個線程池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler);

建立一個線程池時須要輸入如下幾個參數:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會建立一個線程來執行任務,即便其餘空閒的基本線程可以執行新任務也會建立線程,等到線程池的線程數等於線程池基本大小時就再也不建立。若是調用了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部基本線程。

  • maximumPoolSize(線程池最大數量):線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,若是使用了無界的任務隊列這個參數就沒什麼效果。

  • keepAliveTime(線程活動保持時間):線程池的工做線程空閒後,保持存活的時間。因此,若是任務不少,而且每一個任務執行的時間比較短,能夠調大時間,提升線程的利用率。

  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。能夠選擇如下幾個阻塞隊列。

  • - ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。

  • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。

  • SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。

  • PriorityBlockingQueue:一個具備優先級的無界阻塞隊列。

線程的狀態

在HotSpot VM線程模型中,Java線程被一對一映射到本地系統線程,Java線程啓動時會建立一個本地系統線程;當Java線程終止時,這個本地系統線程也會被回收。操做系統調度全部線程並把它們分配給可用的CPU。

thread運行週期中,有如下6種狀態,在 java.lang.Thread.State 中有詳細定義和說明:

// Thread類
public enum State {
    /**
     * 剛建立還沒有運行
     */

    NEW,

    /**
     * 可運行狀態,該狀態表示正在JVM中處於運行狀態,不過有多是在等待其餘資源,好比CPU時間片,IO等待
     */

    RUNNABLE,

    /**
     * 阻塞狀態表示等待monitor鎖(阻塞在等待monitor鎖或者在調用Object.wait方法後從新進入synchronized塊時阻塞)
     */

    BLOCKED,

    /**
     * 等待狀態,發生在調用Object.wait、Thread.join (with no timeout)、LockSupport.park
     * 表示當前線程在等待另外一個線程執行某種動做,好比Object.notify()、Object.notifyAll(),Thread.join表示等待線程執行完成
     */

    WAITING,

    /**
     * 超時等待,發生在調用Thread.sleep、Object.wait、Thread.join (in timeout)、LockSupport.parkNanos、LockSupport.parkUntil
     */

    TIMED_WAITING,

    /**
     *線程已執行完成,終止狀態
     */

    TERMINATED;
}

線程池操做

向線程池提交任務,可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。execute()方法用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute()方法輸入的任務是一個Runnable類的實例。

threadsPool.execute(new Runnable() {
    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
});

submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功,經過future的get()方法來獲取返回值,future的get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務尚未執行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
    Object s = future.get();
catch (InterruptedException e) {
    // 處理中斷異常
catch (ExecutionException e) {
    // 處理沒法執行任務異常
finally {
    // 關閉線程池
    executor.shutdown();
}

合理配置線程池

要想合理配置線程池,必須先分析任務的特色,能夠從如下幾個角度分析:

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

  • 任務的優先級:高、中和低。

  • 任務的執行時間:長、中和短。

  • 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務應配置儘量少的線程,如配置Ncpu+1個線程的線程池。因爲IO密集型任務線程並非一直在執行任務,則應配置多一點線程,如2*Ncpu。混合型的任務,若是能夠拆分,將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。若是這兩個任務執行時間相差太大,則不必進行分解。能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。

優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先執行。執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者可使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼線程數應該設置得越大,這樣才能更好地利用CPU。

線程池中線程數量未達到coreSize時,這些線程處於什麼狀態?

這些線程處於RUNNING或者WAITING,RUNNING表示線程處於運行當中,WAITING表示線程阻塞等待在阻塞隊列上。當一個task submit給線程池時,若是當前線程池線程數量還未達到coreSize時,會建立線程執行task,不然將任務提交給阻塞隊列,而後觸發線程執行。(從submit內部調用的代碼也能夠看出來)

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲以後運行任務,或者按期執行任務。ScheduledThreadPoolExecutor的功能與Timer相似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺線程,而ScheduledThreadPoolExecutor能夠在構造函數中指定多個對應的後臺線程數。

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,ScheduledThreadPoolExecutor和ThreadPoolExecutor的區別是,ThreadPoolExecutor獲取任務時是從BlockingQueue中獲取的,而ScheduledThreadPoolExecutor是從DelayedWorkQueue中獲取的(注意,DelayedWorkQueue是BlockingQueue的實現類)。

ScheduledThreadPoolExecutor把待調度的任務(ScheduledFutureTask)放到一個DelayQueue中,其中ScheduledFutureTask主要包含3個成員變量:

  1. sequenceNumber:任務被添加到ScheduledThreadPoolExecutor中的序號;

  2. time:任務將要被執行的具體時間;

  3. period:任務執行的間隔週期。

ScheduledThreadPoolExecutor會把待執行的任務放到工做隊列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對隊列中的ScheduledFutureTask進行排序,具體的排序比較算法實現以下:

ScheduledFutureTask在DelayQueue中被保存在一個PriorityQueue(基於數組實現的優先隊列,相似於堆排序中的優先隊列)中,在往數組中添加/移除元素時,會調用siftDown/siftUp來進行元素的重排序,保證元素的優先級順序。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
    implements BlockingQueue<Runnable
{

    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;

    private Thread leader = null;
    private final Condition available = lock.newCondition();
}

從DelayQueue獲取任務的主要邏輯就在take()方法中,首選獲取lock,而後獲取queue[0],若是爲null則await等待任務的來臨,若是非null查看任務是否到期,是的話就執行該任務,不然再次await等待。這裏有一個leader變量,用來表示當前進行awaitNanos等待的線程,若是leader非null,表示已經有其餘線程在進行awaitNanos等待,本身await等待,不然本身進行awaitNanos等待。

// DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null// don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

獲取到任務以後,就會執行task的run()方法了,即ScheduledFutureTask.run():

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}


 推薦閱讀 


歡迎小夥伴 關注【TopCoder】 閱讀更多精彩好文。

本文分享自微信公衆號 - TopCoder(gh_12e4a74a5c9c)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索