經過源碼理解 Java 線程池的核心參數

背景

做爲一名 Java 開發者,對線程池絕對不陌生,不管是平時工做,仍是面試都會,線程池都是必會的知識點.並且不能不能之知其表面,理解不透徹,很容易在實戰中出現 OOM,也可能在面試中被問趴😄.java

參數含義

其實,若是研究過線程池的話,其實並不難,他的參數並很少,java.util.concurrent.ThreadPoolExecutor中的參數列舉出來就是這些.面試

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
  • corePoolSize:線程池中的核心線程數,即便沒有任務執行的時候,他們也是存在的.(不考慮配置了參數:allowCoreThreadTimeOut,allowCoreThreadTimeOut經過字面意思也能知道,就是是否容許核心線程超時,通常狀況下不須要設置,本文不考慮)
  • maximumPoolSize:線程池中的容許存在的最大線程數.
  • keepAliveTime: 當線程池中的線程超過核心線程數的時候,這部分多餘的空閒線程等待執行新任務的超時時間.例如:核心線程數爲1 ,最大線程數爲5,當前運行線程爲4,keepAliveTime爲60s,那麼4-1=3個線程在空閒狀態下等待60s 後尚未新任務到來,就會被銷燬了.
  • unit :keepAliveTime 的時間單位.
  • workQueue: 線程隊列,若是當前時間核心線程都在運行,又來了一個新任務,那麼這個新任務就會被放進這個線程隊列中,等待執行.
  • threadFactory: 線程池建立線程的工廠類.
  • handler: 若是線程隊列滿了同事執行線程數也達到了maximumPoolSize,若是此時再來新的線程,將執行什麼 handler 來處理這個線程. handler的默認提供的類型有:
    • AbortPolicy: 拋出RejectedExecutionException異常
    • DiscardPolicy: 什麼都不作.
    • DiscardOldestPolicy: 將線程隊列中的最老的任務拋棄掉,換區一個空間執行當前的任務.
    • CallerRunsPolicy: 使用當前的線程(好比 main)來執行這個線程.

執行流程

咱們知道了參數的含義,那麼這些參數在執行過程當中究竟是怎麼運行的呢,咱們先用文字分幾種狀況來描述一下.多線程

在說以前,先來看一個例子.ide

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new BasicThreadFactory.Builder().namingPattern("name-%d").build());
threadPoolExecutor.execute(new Runnable() {
           @Override
           public void run() {
               System.out.println("test");
           }
       });
複製代碼

不得不說,在很長一段時間內,我都有一個疑問或者說誤區,明明是線程池,爲何每次都須要我 new 一個 線程(錯誤) 呢? 由於咱們開始學線程的時候先學 new Thread(),後來又學了new Runnable(),慢慢的就把這兩個混爲一罈了,其實 new Runnable()並無新起一個線程,只是新建了一個可運行的任務,就是一個普通的對象而已,哈哈,這應該是一個很傻的錯誤認知. 回到上面說的具體含義.oop

  1. 若是新加入一個運行的任務,當前運行的線程小於corePoolSize,這時候會在線程池中新建一個線程用於執行這個新的任務.
  2. 若是新加入一個運行的任務,當前運行的線程大於等於corePoolSize,這個時候就須要將這個新的任務加入到線程隊列workQueue中,一旦線程中的線程執行完成了一個任務,就會立刻從隊列中去一個任務來執行.
  3. 接2,若是隊列也滿了,怎麼辦呢? 若是maximumPoolSize大於corePoolSize,就會新建線程來處理這個新的任務,知道總運行線程數達到maximumPoolSize.
  4. 若是總運行線程數達到了maximumPoolSize,還來了新的任務怎麼辦呢?就須要執行上面所說的拒絕策略了handler了,按照配置的策略進行處理,默認不配置的狀況下,使用的是AbortPolicy.
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
複製代碼

源碼驗證

怎麼判斷上面說的流程是正確的呢?咱們能夠跟進源碼來仔細查看一下上面的流程,其實線程池執行的代碼比較簡單,一看變更,看了源碼,掌握得應該會更加深入一些.ui

首先來看看execute()方法this

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl是一個原子的控制位,能夠表示線程池的狀態和運行的線程數;
        int c = ctl.get();
        // 1. 若是運行線程數小於核心線程數
        if (workerCountOf(c) < corePoolSize) {
            //直接新建 worker(線程)執行.
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2. 若是上面的addWorker 失敗了,就須要加入線程隊列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 加入後,檢查狀態;
            if (! isRunning(recheck) && remove(command))
                //檢查運行狀態不經過,移除任務,執行拒絕策略
                reject(command);
            // 若是當前的運行線程爲0
            else if (workerCountOf(recheck) == 0)
                //就是用核心線程執行剛剛添加到隊列的線程
                addWorker(null, false);
        }
        // 3. 若是隊列也滿了,就新建線程繼續處理
        else if (!addWorker(command, false))
            // 4. 若是不容許新建了,就執行拒絕策略
            reject(command);
    }
複製代碼

按照一個正常流程來講,咱們只考慮一個理想的環境.咱們能夠分爲上面的4步,正好和上面的文字描述對應.spa

可能愛思考的同窗發現,第2步,加入隊列後,何時執行這個新加入的呢,難道有一個定時任務嗎?並非.咱們能夠看看這個addWorker()方法.線程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //第一層循環
        for (;;) {
            int c = ctl.get();
            //獲取當前線程池的狀態;
            int rs = runStateOf(c);
            ...
            //第二層循環
            for (;;) {
                //獲取線程池的運行線程個數
                int wc = workerCountOf(c);
                // 大於了最大容許的線程個數,固然要返回 false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //經過了檢查,就把 正在運行的線程數加1
                if (compareAndIncrementWorkerCount(c))
                    //跳出第一層循環
                    break retry;
                c = ctl.get();  // Re-read ctl
                //加1 失敗,可能有多線程衝突,檢查一下最新的狀態,繼續重試;
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建一個線程包裝咱們的 Runnable
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                ...
                //加入 hashSet 中管理存在於線程池中線程
                workers.add(w);
                workerAdded = true;
                if (workerAdded) {
                   // 啓動 worker,worker就是線程中真正執行的線程,包裝了咱們提供的 Runnable
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

上面的addWorker()方法中,就是靠t.start()來啓動線程的. Worker這個類存在於java.util.concurrent.ThreadPoolExecutor.Worker,定義以下 只保留了相對重要的代碼.code

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
             Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker */
        public void run() {
            runWorker(this);
        }
        
        final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                   ....
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    ...
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
        }
複製代碼

因此當t.start()的時候,實際上,新建了一個線程,執行了runWorker(this);方法: 這個方法裏面有一個while循環,getTask()是從隊列中獲取一個任務.因此說這裏能夠解答上面放到隊列裏面的任務何時執行了,等到任意一個核心線程空閒出來時候,他就會循環去取隊列中的任務執行.每一個核心線程和新起來的線程都是同步來執行你傳進來的Runnablerun方法.

整個流程應該就比較清楚了.

上面說了這麼多,核心參數都說的差很少了,那麼keepAliveTime 這個參數在源碼怎麼來用的呢?

上面說到一個getTask()方法從隊列中取一個任務,看一下這個方法的代碼(省略非主要的).

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            ...
             int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

主要就是用於取任務這裏,poll()不會阻塞,take()是阻塞的,因此當使用poll取數據的時候,到達設定的超時,就會繼續往下執行,若是超過設定時間仍是沒有任務進來,就會將timedOut設置爲 true,返回 null. 這個timedOut會控制上面的 if 判斷,最終控制compareAndDecrementWorkerCount()方法,就是講運行的線程數減1個,那麼下次若是又滿了,就會新建一個,因此這個 Alive 就失效了.

總結

整體來講,經過源碼來看問題能比較權威的解答一些問題.有時候源碼彷佛也沒有那麼高深😄

相關文章
相關標籤/搜索