Java線程池 你知它有多深

總圖

ThreadPool.png

Thread

線程是一種資源,並非只存在程序的世界裏。程序,只是對生活的一種抽象表述。html

好比車站的售票窗口、退票窗口、檢票窗口,每一個窗口都在作不一樣的事情,就是同時在運行的不一樣線程。java

線程多了,須要管理,不一樣的線程也要能保證不會互相干擾,各作各的。segmentfault

線程像人的能量,線程池比如人能量的總場,這個比喻可能不太恰當,由於人同時作好幾件事情時,哪怕看似「同時」的,也是分心作的,是本身的CPU在不斷切換時間片,那只是「併發」,而不是「並行」。bash

線程的Life

ThreadLife.png

ThreadPoolExecutor

線程池狀態

public class ThreadPoolExecutor extends AbstractExecutorService {
    /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed 複製代碼
  • runState:線程池運行狀態
  • workerCount:工做線程的數量
// Android-added: @ReachabilitySensitive
    @ReachabilitySensitive
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // Packing and unpacking ctl
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

線程池用一個32位的int來同時保存runState和workerCount,其中高3位(第31到29位)是runState,其他29位是workerCount(大約500 million)。併發

來個圖看看存儲結構dom

Construtor

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

它的參數

  • corePoolSize 核心線程數,比如班幹部的人數。ide

  • maximumPoolSize 最大線程數,比如教室裏的座位數。 當提交任務數超過了這個最大值,線程還有拒絕策略——RejectExecutionHandler,作不動了嘛。函數

  • keepAliveTime 除核心線程外的空閒線程保持存活時間。 當線程池裏線程數超過corePoolSize數量了,keepAliveTime時間到,就把空閒線程關了,否則也閒置了呀,節省能量嘛。工具

  • workQueue 經過workQueue,線程池實現了阻塞功能。 當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。oop

  • threadFactory 建立線程的工廠。 全部的線程都經過這個Factory建立(經過addWorker方法)。

  • handler 線程池的飽和策略。

4種策略:

  • AbortPolicy:直接拋出異常,默認策略;
  • CallerRunsPolicy:用調用者所在的線程來執行任務;
  • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  • DiscardPolicy:直接丟棄任務。

更多被調用的構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
複製代碼

排隊策略

  • SynchronousQueue 直接提交。直接提交任務給線程,而不是保存它們。

  • LinkedBlockingQueue 無界隊列。 當核心線程都在忙的時候,用一個無界隊列存放提交的任務。 線程數不會超過核心線程數,maximumPoolSize設置也無效。

  • ArrayBlockingQueue 有界隊列。 防止資源被消耗完,隊列也是有上限的。

Executors

Executors.defaultThreadFactory()是Executors靜態工廠裏默認的threadFactory。 後面再詳細說。

源碼分析

Worker —— 工做線程

線程池建立線程時,會將線程封裝成工做線程Worker,就是在線程池裏幹活的人。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
複製代碼

既實現了Runnable,又繼承了AbstractQueuedSynchronizer(AQS),因此其既是一個可執行的任務,又能夠達到鎖的效果。

Worker和Task的區別: Worker是線程池中的線程,而Task雖然是runnable,可是並無真正執行,只是被Worker調用了run方法。

/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
   }
複製代碼

先來看看整體流程圖~~

workerQueue

注意點:

  • 線程池裏有不少Worker,核心的成員數就是corePoolSize,池子裏最多能容納的Worker數是maximumPoolSize;
  • workQueue是等待中的任務隊列。
  • 默認 corePoolSize 以內的線程是不會被回收的。

看源碼

execute(Runnable command)

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 若是當前正在運行的線程數 < corePoolSize,嘗試用給到的command來啓動一個新線程做爲第一個任務。 * 調用addWorker方法,檢查runState和workerCount, * 而且若是增長線程的話,能防止產生錯誤警報,若是不能增長線程,則返回false。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 若是一個任務被成功地加到隊列裏,仍然須要雙重檢驗來確認是否須要新建一個線程 *(由於可能在上一次檢查後,已經存在的線程已經died)或者進入這個方法後,線程池已經被關閉了。 * 因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新建一個線程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 若是不能把任務加入隊列(可能線程池已經關閉或者滿了),那麼須要新開一個線程(往maxPoolSize發展)。 * 若是失敗的話,說明線程池shutdown了或者滿了,就要拒絕這個任務了。 */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /** * 二、若是線程池RUNNING狀態,且入隊列成功 */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            
            //若是再次校驗過程當中,線程池不是RUNNING狀態,
            // 而且remove(command)--workQueue.remove()成功,拒絕當前command
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //爲何只檢查運行的worker數量是否是0呢?? 爲何不和corePoolSize比較呢??
            // 只保證有一個worker線程能夠從queue中獲取任務執行就好了??
            // 由於只要還有活動的worker線程,就能夠消費workerQueue中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /** * 三、若是線程池不是running狀態 或者 沒法入隊列 * 嘗試開啓新線程,擴容至maxPoolSize, * 若是addWork(command, false) 失敗了,拒絕當前command */
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

看註解有點費勁,把execute方法裏的註釋單獨列出來,一步步說得很清楚:

  1. 若是當前正在運行的線程數 < corePoolSize,嘗試用給到的command來啓動一個新線程做爲第一個任務。 調用addWorker方法,檢查runState和workerCount,而且若是增長線程的話,能防止產生錯誤警報,若是不能增長線程,則返回false。

  2. 若是一個任務被成功地加到隊列裏,仍然須要雙重檢驗來確認是否須要新建一個線程。 (由於可能在上一次檢查後,已經存在的線程已經died)或者進入這個方法後,線程池已經被關閉了。因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新建一個線程。

  3. 若是不能把任務加入隊列(可能線程池已經關閉或者滿了),那麼須要新開一個線程(往maxPoolSize發展)。若是失敗的話,說明線程池shutdown了或者滿了,就要拒絕這個任務了。

execute.png
(這是參考來的圖)

本身畫一個:

好像仍是很複雜,再簡單點:

總的來講,就是:

  1. 先看能不能加入核心線程裏,
  2. 再看能不能加入workQueue,
  3. 最後看有沒有超過線程池的最大數量,超過的話就拒絕這個task了。

Executors

它是一個Java中的工具類。提供工廠方法來建立不一樣類型的線程池。 用它能夠很方便地建立出下面幾種線程池來。

經常使用線程池 特色 適應場景
newSingleThreadExecutor 單線程的線程池 用於須要保證順序執行的場景,而且只有一個線程在執行
newFixedThreadPool 固定大小的線程池 用於已知併發壓力的狀況下,對線程數作限制。
newCachedThreadPool 能夠無限擴大的線程池 比較適合處理執行時間比較小的任務。
newScheduledThreadPool 能夠延時啓動,定時啓動的線程池 適用於須要多個後臺線程執行週期任務的場景。
newWorkStealingPool 擁有多個任務隊列的線程池 能夠減小鏈接數,建立當前可用cpu數量的線程來並行執行。

好比這樣建立:

ExecutorService singleService = Executors.newSingleThreadExecutor();

ExecutorService fixedService = Executors.newFixedThreadPool(9);

ExecutorService cacheService = Executors.newCacheThreadPool();
複製代碼

或者經過ThreadPoolExecutor的構造函數自定義須要的線程池。

先寫到這裏~

ref

相關文章
相關標籤/搜索