ThreadPoolExecutor淺談

耐心看完的你或多或少會有收穫!html

ThreadPoolExecutor field 的解釋

在瞭解線程池以前,但願你已瞭解 Java 內存模型AQS CASjava

/** * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed terminated() */
    
    // 前 3 位表示運行狀態,後面 29 位存儲當前運行 workerCount
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3
    
    // 最大容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
   
    /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. 實際線程池大小仍是由 CAPACITY 決定 */
    private volatile int maximumPoolSize;
    
    // 如下爲線程池的幾個狀態 官方註釋在最上方
    // 接受新的任務
    private static final int RUNNING    = -1 << COUNT_BITS; // 11100000000000000000000000000000
    
    // 不接受新的任務,可是已在隊列中的任務,還會繼續處理
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000000000000000000000000000
    
    // 不接受,不處理新的任務,且中斷正在進行中的任務
    private static final int STOP       =  1 << COUNT_BITS; // 00100000000000000000000000000000
    
    // 全部任務已中止,workerCount 清零,注意 workerCount 是由 workerCountOf(int c) 計算得出的
    private static final int TIDYING    =  2 << COUNT_BITS; // 01000000000000000000000000000000
    
    // 全部任務已完成
    private static final int TERMINATED =  3 << COUNT_BITS; // 01100000000000000000000000000000
    
    // 線程池運行狀態和已工做的 workerCount 初始化爲 RUNNING 和 0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 計算當前 state
    // ~CAPACITY 爲 11100000000000000000000000000000 & c(假如前三位爲 000 說明線程池已經 SHUTDOWN)
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    
    // 同時拿到 state workerCount
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    // 能夠計算出當前工做的 workerCount
    private static int workerCountOf(int c) { return c & CAPACITY; }
    
    // 線程入列
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
       
            // 得到當前 state 和 workerCount
            // 判斷是否知足加入核心線程
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
            	// 以核心線程的方式加入隊列
                if (addWorker(command, true))
                    return;
                // 添加失敗 獲取最新的線程池 state 和 workerCount
                c = ctl.get();
            }
            // 在運行且成功加入隊列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 再檢查一次,不在運行就拒絕任務
                if (!isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                	// 加入一個 null
                    addWorker(null, false);
            }
            // 加入失敗就拒絕任務
            else if (!addWorker(command, false))
                reject(command);
        }
    
    // 實際的操做
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
            	// 得到當前 state 和 workerCount
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 大於 SHUTDOWN 即 STOP TIDYING TERMINATED
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 計算 workerCount
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 成功了就退出
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        // 走到這一步說明 rs 爲 RUNNING 或 SHUTDOWN 能夠從新嘗試加入
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 統一線程的名字
                // 設置 daemon 和 priority
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        // 異常檢查
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 添加成功 啓動線程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
            	// 加入失敗 
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
        
    // 加入失敗 作一些掃尾清理
    private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                // workerCount-1
                decrementWorkerCount();
                // 嘗試更新狀態 何爲嘗試,即須要知足必定條件,而不是冒然去作某事
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
複製代碼

總結一下

寫得好的源碼,註釋必定要好好看一遍ide

線程池的狀態和工做線程數量用 32 位二進制數表示,而後經過二進制的位運算獲取狀態和數量,這種設計實在是太過精妙oop

膜拜大師post

相關文章
相關標籤/搜索