線程池

線程池

API

大多數併發應用程序是圍繞執行任務(Task)進行管理的。把一個應用程序的工做(work)分離到任務中,能夠簡化程序的管理;這種分離還在不一樣事物間劃分了天然的分界線,能夠方便程序在出現錯誤時進行恢復;同時這種分離還能夠爲並行工做提供一個天然的結構,有利於提升程序的併發性。java

所謂的任務拆分就是肯定每個執行任務(工做單元)的邊界。理想狀況下獨立的工做單元有最大的吞吐量,這些工做單元不依賴於其它工做單元的狀態、結果或者其餘資源等。所以將任務儘量的拆分紅一個個獨立的工做單元有利於提升程序的併發性。api

任務的執行策略包括4W3H 部分:緩存

  • 任務在什麼(What)線程中執行
  • 任務以什麼(What)順序執行(FIFO/LIFO/優先級等)
  • 同時有多少個(How Many)任務併發執行
  • 容許有多少個(How Many)個任務進入執行隊列
  • 系統過載時選擇放棄哪個(Which)任務,如何(How)通知應用程序這個動做
  • 任務執行的開始、結束應該作什麼(What)處理

如何知足上面的條件安全

  1. 首先明確必定是在Java 裏面能夠供使用者調用的啓動線程類是Thread。所以Runnable 或者Timer/TimerTask 等都是要依賴Thread 來啓動的,所以在ThreadPool 裏面一樣也是靠Thread 來啓動多線程的。
  2. 默認狀況下Runnable 接口執行完畢後是不能拿到執行結果的,所以在ThreadPool裏就定義了一個Callable 接口來處理執行結果。
  3. 爲了異步阻塞的獲取結果,Future 能夠幫助調用線程獲取執行結果。
  4. Executor 解決了向線程池提交任務的入口問題,同時ScheduledExecutorService 解決了如何進行重複調用任務的問題。
  5. CompletionService 解決了如何按照執行完畢的順序獲取結果的問題,這在某些狀況下能夠提升任務執行的併發,調用線程沒必要在長時間任務上等待過多時間。
  6. 顯然線程的數量是有限的,並且也不宜過多,所以合適的任務隊列是必不可少的,BlockingQueue 的容量正好能夠解決此問題。
  7. 固定任務容量就意味着在容量滿了之後須要必定的策略來處理過多的任務(新任務),RejectedExecutionHandler 正好解決此問題。
  8. 必定時間內阻塞就意味着有超時,所以TimeoutException 就是爲了描述這種現象。TimeUnit 是爲了描述超時時間方便的一個時間單元枚舉類。
  9. 有上述問題就意味了配置一個合適的線程池是很複雜的,所以Executors 默認的一些線程池配置能夠減小這個操做。

線程池的類體系結構

Executor 的execute 方法只是執行一個Runnable 的任務數據結構

ExecutorService 在Executor 的基礎上增長了一些方法,其中有兩個核心的方法:多線程

  • Future<?> submit(Runnable task)
  • Future submit(Callable task)

這兩個方法都是向線程池中提交任務,它們的區別在於Runnable 在執行完畢後沒有結果,Callable 執行完畢後有一個結果。這在多個線程中傳遞狀態和結果是很是有用的。另外他們的相同點在於都返回一個Future 對象。Future 對象能夠阻塞線程直到運行完畢(獲取結果,若是有的話),也能夠取消任務執行,固然也可以檢測任務是否被取消或者是否執行完畢。併發

ScheduledExecutorService 描述的功能和Timer/TimerTask 相似,解決那些須要任務重複執行
的問題。這包括延遲時間一次性執行、延遲時間週期性執行以及固定延遲時間週期性執行等。固然了繼承ExecutorService 的ScheduledExecutorService 擁有ExecutorService 的所有特性。異步

ScheduledThreadPoolExecutor 是繼承ThreadPoolExecutor 的ScheduledExecutorService 接口實現,週期性任務調度的類實現性能

CompletionService 接口,它是用於描述順序獲取執行結果的一個線程池包裝器ui

Executors 類裏面提供了一些靜態工廠,生成一些經常使用的線程池

  • newSingleThreadExecutonewSingleThreadExecutor:建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那
    麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
  • newFixedThreadPool:建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
  • newCachedThreadPool:建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
  • newScheduledThreadPool:建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
  • newSingleThreadScheduledExecutor:建立一個單線程的線程池。此線程池支持定時以及週期性執行任務的需求。

退出

線程是有多種執行狀態的,一樣管理線程的線程池也有多種狀態。JVM 會在全部線程(非後臺daemon 線程)所有終止後才退出,爲了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候沒法正確的關閉線程池,將會阻止JVM 的結束。

線程池Executor 是異步的執行任務,所以任什麼時候刻不可以直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。所以關閉線程池可能出現一下幾種狀況:

  • 平緩關閉:已經啓動的任務所有執行完畢,同時再也不接受新的任務
  • 當即關閉:取消全部正在執行和未執行的任務

線程池的四種狀態:

  • 線程池在構造前(new 操做)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING
  • 線程池運行中能夠經過shutdown()和shutdownNow()來改變運行狀態
  • 一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED 狀態,此時線程池就結束了。
  • isTerminating()描述的是SHUTDOWN 和STOP 兩種狀態。
  • isShutdown()描述的是非RUNNING 狀態,也就是SHUTDOWN/STOP/TERMINATED三種狀態。

線程池API

shutdownNow()會返回那些已經進入了隊列可是尚未執行的任務列表
awaitTermination 描述的是等待線程池關閉的時間,若是等待時間線程池尚未關閉將會拋出一個超時異常。

  • 平緩關閉線程池使用shutdown()
  • 當即關閉線程池使用shutdownNow(),同時獲得未執行的任務列表
  • 檢測線程池是否正處於關閉中,使用isShutdown()
  • 檢測線程池是否已經關閉使用isTerminated()
  • 定時或者永久等待線程池關閉結束使用awaitTermination()操做

線程池數據結構與線程構造方法

  • 線程池須要支持多個線程併發執行,所以有一個線程集合Collection 來執行線程任務;
  • 涉及任務的異步執行,所以須要有一個集合來緩存任務隊列Collection
  • 很顯然在多個線程之間協調多個任務,那麼就須要一個線程安全的任務集合,同時還須要支持阻塞、超時操做,那麼BlockingQueue 是必不可少的;
  • 既然是線程池,出發點就是提升系統性能同時下降資源消耗,那麼線程池的大小就有限制,所以須要有一個核心線程池大小(線程個數)和一個最大線程池大小(線程個數),有一個計數用來描述當前線程池大小;
  • 若是是有限的線程池大小,那麼長時間不使用的線程資源就應該銷燬掉,這樣就須要一個線程空閒時間的計數來描述線程什麼時候被銷燬;
  • 前面描述過線程池也是有生命週期的,所以須要有一個狀態來描述線程池當前的運行狀態;
  • 線程池的任務隊列若是有邊界,那麼就須要有一個任務拒絕策略來處理過多的任務,同時在線程池的銷燬階段也須要有一個任務拒絕策略來處理新加入的任務;
  • 上面種的線程池大小、線程空閒實際那、線程池運行狀態等等狀態改變都不是線程安全的,所以須要有一個全局的鎖(mainLock)來協調這些競爭資源;
  • 除了以上數據結構之外,ThreadPoolExecutor 還有一些狀態用來描述線程池的運行計數,例如線程池運行的任務數、曾經達到的最大線程數

    建立線程

    public interface ThreadFactory {
    Thread newThread(Runnable r);
    }
    static class DefaultThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    final AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;
    DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null)? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
    poolNumber.getAndIncrement() +
    "-thread-";
    }
    public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
    namePrefix + threadNumber.getAndIncrement(),
    0);
    if (t.isDaemon())
    t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
    t.setPriority(Thread.NORM_PRIORITY);
    return t;
    }
    }
    同一個線程池的全部線程屬於同一個線程組,也就是建立線程池的那個線程組

另外對於線程池中的全部線程默認都轉換爲非後臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結束。還有一點就是默認將線程池中的全部線程都調爲同一個級別,這樣在操做系統角度來看全部系統都是公平的,不會致使競爭堆積。

線程池中線程生命週期

一個線程Worker 被構造出來之後就開始處於運行狀態。

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            task.run();
        } finally {
            runLock.unlock();
        }
    }
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
}

當提交一個任務時,若是須要建立一個線程(什麼時候須要在下一節中探討)時,就調用線程工廠建立一個線程,同時將線程綁定到Worker 工做隊列中。須要說明的是,Worker 隊列構造的時候帶着一個任務Runnable

一旦線程池啓動線程後(調用線程run())方法,那麼線程工做隊列Worker 就從第1個任務開始執行(這時候發現構造Worker 時傳遞一個任務的好處了),一旦第1個任務執行完畢,就從線程池的任務隊列中取出下一個任務進行執行。循環如此,直到線程池被關閉或者任務拋出了一個RuntimeException。

線程池任務執行流程

  • 任務可能在未來某個時刻被執行,有可能不是當即執行。
  • 任務可能在一個新的線程中執行或者線程池中存在的一個線程中執行。
  • 任務沒法被提交執行有如下兩個緣由:線程池已經關閉或者線程池已經達到了容量限制。
  • 全部失敗的任務都將被「當前」的任務拒絕策略RejectedExecutionHandler 處理。

    public void execute(Runnable command) {
          //1
          if (command == null)
              throw new NullPointerException();
          //2,3
          if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
          //4
              if (runState == RUNNING && workQueue.offer(command)) {
          //5
              if (runState != RUNNING || poolSize == 0)
          //6
                      ensureQueuedTaskHandled(command);
              }
          //7
              else if (!addIfUnderMaximumPoolSize(command))
                  reject(command); // is shutdown or saturated
          }
      }

上述過程:

  1. 若是任務command 爲空,則拋出空指針異常,返回。不然進行2。
  2. 若是當前線程池大小大於或等於核心線程池大小,進行4。不然進行3。
  3. 建立一個新工做隊列(線程,參考上一節),成功直接返回,失敗進行4。
  4. 若是線程池正在運行而且任務加入線程池隊列成功,進行5,不然進行7。
  5. 若是線程池已經關閉或者線程池大小爲0,進行6,不然直接返回。
  6. 若是線程池已經關閉則執行拒絕策略返回,不然啓動一個新線程來進行執行任務,返回。
  7. 若是線程池大小不大於最大線程池數量,則啓動新線程來進行執行,不然進行拒絕策略,結束。

什麼時候任務當即執行

runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())

submit

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Object> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

Future

在Future 接口中提供了5個方法。

  • V get() throws InterruptedException, ExecutionException: 等待計算完成,而後獲取其結果。
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException。最多等待爲使計算完成所給定的時間以後,獲取其結果(若是結果可用)。
  • boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
  • boolean isCancelled():若是在任務正常完成前將其取消,則返回true。
  • boolean isDone():若是任務已完成,則返回true。可能因爲正常終止、異常或取消而完成,在全部這些狀況中,此方法都將返回true。

執行

初始狀況下任務狀態state=0,任務執行(innerRun)後狀態變爲運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變爲狀態CANCELLED(state=4)

void innerRun() {
    if (!compareAndSetState(0, RUNNING))
        return;
    try {
        runner = Thread.currentThread();
        if (getState() == RUNNING) // recheck after setting thread
            innerSet(callable.call());
        else
            releaseShared(0); // cancel
    } catch (Throwable ex) {
        innerSetException(ex);
    }
}

執行一個任務有四步:設置運行狀態、設置當前線程(AQS 須要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裏也能夠看到,一個任務只能執行一次,由於執行完畢後它的狀態不在爲初始值0,要麼爲CANCELLED,要麼爲RAN。

取消任務

boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED))
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt();
    }
    releaseShared(0);
    done();
    return true;
}

獲取結果

V innerGet() throws InterruptedException, ExecutionException {
    acquireSharedInterruptibly(0);
    if (getState() == CANCELLED)
        throw new CancellationException();
    if (exception != null)
        throw new ExecutionException(exception);
    return result;
}
//AQS#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg); //park current Thread for result
}
protected int tryAcquireShared(int ignore) {
    return innerIsDone()? 1 : -1;
}
boolean innerIsDone() {
    return ranOrCancelled(getState()) && runner == null;
}

當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS 的使用方式了。這裏獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。若是不知足條件,那麼在AQS 中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到知足條件

延遲、週期性任務調度的實現

ScheduledThreadPoolExecutor 和ThreadPoolExecutor 的惟一區別在於任務是有序(按照執行時間順序)的,而且須要到達時間點(臨界點)才能執行,並非任務隊列中有任務就須要執行的。也就是說惟一不一樣的就是任務隊列BlockingQueue workQueue 不同。ScheduledThreadPoolExecutor 的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue , 它是基於
java.util.concurrent.DelayQueue 隊列的實現。

因爲DelayQueue 在獲取元素時須要檢測元素是否「可用」,也就是任務是否達到「臨界點」(指定時間點),所以加入元素和移除元素會有一些額外的操做。

移除元素(出隊列)的過程是這樣的:

  • 老是檢測隊列的頭元素(順序最小元素,也是最早達到臨界點的元素)
  • 檢測頭元素與當前時間的差,若是大於0,表示還未到底臨界點,所以等待響應時間(使用條件變量available)
  • 若是小於或者等於0,說明已經到底臨界點或者已通過了臨界點,那麼就移除頭元素,而且喚醒其它等待任務隊列的線程。

一樣加入元素也會有相應的條件變量操做。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才須要喚醒「等待線程」去檢測元素。由於頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。

相關文章
相關標籤/搜索