java線程池的原理學習(三)

接上文:java線程池的原理學習(二)java

ThreadPoolExecutor深刻剖析

線程池的五種狀態

ThreadPoolExecutor 類中將線程狀態( runState)分爲了如下五種:segmentfault

RUNNING:能夠接受新任務而且處理進入隊列中的任務
SHUTDOWN:不接受新任務,可是仍然執行隊列中的任務
STOP:不接受新任務也不執行隊列中的任務
TIDYING:全部任務停止,隊列爲空,進入該狀態下的任務會執行 terminated()方法
TERMINATEDterminated()方法執行完成後進入該狀態緩存

狀態之間的轉換

  • RUNNING -> SHUTDOWNoop

調用了 shutdown()方法,多是在 finalize()方法中被隱式調用學習

  • (RUNNING or SHUTDOWN) -> STOPthis

調用 shutdownNow()線程

  • SHUTDOWN -> TIDYINGcode

當隊列和線程池都爲空時接口

  • STOP -> TIDYING隊列

線程池爲空時

  • TIDYING -> TERMINATED

terminated()方法執行完成

線程池狀態實現

若是查看 ThreadPoolExecutor的源碼,會發現開頭定義了這幾個變量來表明線程狀態和活動線程的數量:

//原子變量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

這個類中將二進制數分爲了兩部分,高位表明線程池狀態( runState),低位表明活動線程數( workerCount), CAPACITY表明最大的活動線程數,爲2^29-1,下面爲了更直觀的看到這些數我作了些打印:

public class Test1 {
    public static void main(String[] args) {
            final int COUNT_BITS = Integer.SIZE - 3;
            final int CAPACITY   = (1 << COUNT_BITS) - 1;

            final int RUNNING    = -1 << COUNT_BITS;
            final int SHUTDOWN   =  0 << COUNT_BITS;
            final int STOP       =  1 << COUNT_BITS;
            final int TIDYING    =  2 << COUNT_BITS;
            final int TERMINATED =  3 << COUNT_BITS;
            
            System.out.println(Integer.toBinaryString(CAPACITY));
            System.out.println(Integer.toBinaryString(RUNNING));
            System.out.println(Integer.toBinaryString(SHUTDOWN));
            System.out.println(Integer.toBinaryString(STOP));
            System.out.println(Integer.toBinaryString(TIDYING));
            System.out.println(Integer.toBinaryString(TERMINATED));
    }
}

輸出:

11111111111111111111111111111
11100000000000000000000000000000
0
100000000000000000000000000000
1000000000000000000000000000000
1100000000000000000000000000000

打印的時候會將高位0省略
能夠看到,第一行表明線程容量,後面5行提取高3位獲得:

111 - RUNNING
000 - SHUTDOWN
001 - STOP
010 - TIDYING
011 - TERMINATED

分別對應5種狀態,能夠看到這樣定義以後,只須要經過簡單的移位操做就能夠進行狀態的轉換。

重要方法

execute方法:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        /**分三步執行
         * 若是workerCount<corePoolSize,則建立一個新線程執行該任務
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) //建立成功則return
                return;
            c = ctl.get(); //建立失敗從新讀取狀態,隨時保持狀態的最新
        }
        /**
         * workerCount>=corePoolSize,判斷線程池是否處於運行狀態,再將任務加入隊列
         * */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();     //用於double check
            //若是線程池處於非運行態,則將任務從緩存隊列中刪除
            if (! isRunning(recheck) && remove(command)) 
                reject(command);  //拒絕任務
            else if (workerCountOf(recheck) == 0) //若是活動線程數爲0,則建立新線程
                addWorker(null, false);
        }
        //若是線程池不處於RUNNING狀態,或者workQueue滿了,則執行如下代碼
        else if (!addWorker(command, false))
            reject(command);
    }

能夠看到,在類中使用了 Work類來表明任務,下面是 Work類的簡單摘要:

private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        ...

Work類實現了 Runnable接口,使用了線程工廠建立線程,使用 runWork方法來運行任務
建立新線程時用到了 addWorker()方法:

/**
     * 檢查在當前線程池狀態和限制下可否建立一個新線程,若是能夠,會相應改變workerCount,
     * 每一個worker都會運行他們的firstTask
     * @param firstTask 第一個任務
     * @param core true使用corePoolSize做爲邊界,false使用maximumPoolSize
     * @return false 線程池關閉或者已經具有關閉的條件或者線程工廠沒有建立新線程
     */
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 只有當rs < SHUTDOWN纔有可能接受新任務
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c); //工做線程數量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) //不合法則返回
                    return false;
                if (compareAndIncrementWorkerCount(c)) //將工做線程數量+1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) //判斷線程池狀態有沒有改變,改變了則進行外循環,不然只進行內循環
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //建立新線程
        Worker w = new Worker(firstTask);
        Thread t = w.thread;

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //再次檢查狀態,防止ThreadFactory建立線程失敗或者狀態改變了
            int c = ctl.get();
            int rs = runStateOf(c);

            if (t == null ||
                (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null))) {
                decrementWorkerCount();  //減小線程數量
                tryTerminate();//嘗試停止線程
                return false;
            }

            workers.add(w);//添加到工做線程Set集合中

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }

        t.start();//執行任務
       //狀態變成了STOP(調用了shutdownNow方法)
        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();

        return true;
    }

再看 Work中runWork方法:

final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;//線程是否異常停止
        try {
            //先取firstTask,再從隊列中取任務直到爲null
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                try {
                    beforeExecute(w.thread, task);//實現鉤子方法
                    Throwable thrown = null;
                    try {
                        task.run();//運行任務
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//實現鉤子方法
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;//成功運行,說明沒有異常停止
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
相關文章
相關標籤/搜索