Java線程池ThreadPoolExecutor類筆記

線程池ThreadPoolExecutor

threadpoolexecutoruml.png

爲何須要線程池?

線程池可以對線程進行統一分配,調優和監控:
- 下降資源消耗(防止線程不停的建立與銷燬,減小了資源消耗)
- 提升響應速度
- 提升線程的可管理性java

核心參數

源碼內部使用了一個Integer類型的原子變量來記錄線程池狀態(高三位)和線程池線程數(其他)。數組

狀態 高三位 表現
RUNNING -1(111) 接收並容許新任務
SHUTDOWN 0(000) 拒絕新任務,但處理阻塞隊列裏的任務
STOP 1(001) 拒絕新任務,放棄執行任務,並中斷正在處理的任務
TIDYING 2(010) 全部任務執行完後,當前線程池活動數爲0,將要調用 terminated 方法
TERMINATED 3(011) terminated()方法執行完畢
//原子變量 ctl 存儲線程池狀態(高三位)與線程數(其餘低位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//線程數的表示位數
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程最大個數
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//111 - 000~ 
private static final int RUNNING    = -1 << COUNT_BITS;
//000 - 000~ 拒絕新任務,但處理阻塞隊列裏的任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//001 - 000~ 放棄執行任務,並中斷正在處理的任務
private static final int STOP       =  1 << COUNT_BITS;
//010 - 000~ 全部任務執行完後,當前線程池活動數爲0,將要調用 terminated 方法
private static final int TIDYING    =  2 << COUNT_BITS;
//011 - 000~ 終止狀態
private static final int TERMINATED =  3 << COUNT_BITS;

//取高三位
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
//取低其餘位
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
//獲取 ctl 值
private static int ctlOf(int rs, int wc) { return rs | wc; }

//=========================分割線
private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<>();

private final Condition termination = mainLock.newCondition();

private volatile boolean allowCoreThreadTimeOut;

private int largestPoolSize;//歷史最大建立線程數

private long completedTaskCount;//完成的任務數
//-------------------------分割線
//構造方法相關的參數
private volatile int corePoolSize;

private volatile int maximumPoolSize;

private volatile long keepAliveTime;
//阻塞隊列
private final BlockingQueue<Runnable> workQueue;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

核心構造方法

public ThreadPoolExecutor(int corePoolSize,//核心線程數
                          int maximumPoolSize,//最大線程數
                          long keepAliveTime,//線程空閒至銷燬等待時間
                          TimeUnit unit,//時間單位
                          BlockingQueue<Runnable> workQueue,//工做隊列
                          ThreadFactory threadFactory,//線程工廠
                          RejectedExecutionHandler handler//飽和策略
                         ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

拒絕策略比較

拒絕策略 表現
ThreadPoolExecutor.AbortPolicy() 默認 丟棄任務並拋出RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy() 由調用線程(提交任務的線程)處理該任務
ThreadPoolExecutor.DiscardOldestPolicy() 丟棄隊列最前面的任務,而後提交當前任務
ThreadPoolExecutor.DiscardPolicy() 丟棄任務,可是不拋出異常。

阻塞隊列的比較

隊列名 表現
ArrayBlockingQueue 基於數組的有界隊列
LinkedBlockingQueue 基於鏈表的無界隊列
SynchronousQueue 最多隻有一個元素的隊列
PriorityBlockingQueue 優先級隊列![]()

提交任務的源碼

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   /*
    * 1.是否比核心線程數少,是則新建一個核心線程完成該任務
    * 2.嘗試添加到阻塞隊列
    * 3.嘗試新建線程完成任務,失敗則線程池已滿或關閉,執行拒絕策略
    */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //1
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        //2
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//3
        reject(command);
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        /* 檢查隊列是否只在必要時爲空,返回false
         * 1.若是線程池狀態爲 shutdown 之後的狀態
         * 2.若是線程池狀態爲 shutdown 而且有了第一個任務
         * 3.若是線程池狀態爲 shutdown 且任務隊列爲空
         */
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                //線程池正在運行 或 shutdown狀態且第一個任務爲空
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // 線程處於運行態,狀態錯誤 precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);//添加任務
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//啓動任務
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

線程池提交任務流程圖

tpelct.png

線程池的其餘方法 簡

方法名 表現
public void shutdown(); 線程池再也不接收新任務,但還會完成工做隊列裏的任務
public List<Runnable> shutdownNow(); 線程池再也不接收新任務,且會放棄工做隊列的任務,正在執行的任務會被中斷,返回被丟棄任務的集合
public boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; 當線程調用該方法後,當前線程會被阻塞,直到線程池狀態變成 TERMINATED 或超時才返回
相關文章
相關標籤/搜索