線程池的一些疑問和解答

疑問

  1. 線程池中的線程是如何實現一個線程執行多個任務的?html

  2. 構造線程池時爲什麼要用阻塞隊列做爲參數,非阻塞隊列不行嗎?java

線程池的幾個重要參數

  • corePoolSize 是線程池的核心線程數,一般線程池會維持這個線程數編程

  • maximumPoolSize 是線程池所能維持的最大線程數緩存

  • keepAliveTime 和 unit 則分別是超額線程的空閒存活時間數和時間單位併發

  • workQueue 是提交任務到線程池的入隊列this

  • threadFactory 是線程池建立新線程的線程構造器spa

  • handler 是當線程池不能接受提交任務的時候的處理策略線程

線程池處理任務通常流程

  1. 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;code

  2. 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;orm

  3. 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;

  4. 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

分析

addIfUnderPoolSize

以最簡單的poolSize<corePoolSize的狀況來分析好了,下面是關鍵代碼

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;final ReentrantLock mainLock = this.mainLock;
mainLock.lock();try {    if (poolSize < corePoolSize && runState == RUNNING)
        t = addThread(firstTask);        //建立線程去執行firstTask任務   
    } finally {
    mainLock.unlock();
}if (t == null)    return false;
t.start();return true;
}

addThread

看見了熟悉的t.start();,可是有一個問題,任務執行完線程就應該被銷燬纔對,爲何線程池裏的線程可以作到執行多個任務呢? t = addThread(firstTask); 裏確定大有名堂。

private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);  //建立一個線程,執行任務   if (t != null) {
    w.thread = t;            //將建立的線程的引用賦值爲w的成員變量       
    workers.add(w);    int nt = ++poolSize;     //當前線程數加1       
    if (nt > largestPoolSize)
        largestPoolSize = nt;
}return t;
}

Worker

上面的代碼中出現了Worker這個類,來看看Worker的結構:

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶能夠根據
            //本身須要重載這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務隊列中沒有任務時,進行清理工做       
        }
    }
}

關於Worker這個類,有幾點須要注意:

  1. Worker實現了runnable接口,能做爲實例化thread的參數存在,thread.start()後執行的實際上是Worker的run()方法

  2. 在Worker的run()方法中,有一段代碼很是關鍵

    while (task != null || (task = getTask()) != null) {
               runTask(task);
               task = null;
           }

    task!=nulll好理解,task = getTask()) != null又是什麼意思呢?不羅嗦,繼續跟代碼

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間,
                //則經過poll取任務,若等待必定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

workQueue

workQueue是BlockingQueue的一個實例,而BlockingQueue的take()方法當隊列爲空時會使當前線程發生阻塞
getTask()的功能就是去緩存隊列獲取任務,若是緩存隊列爲空,則處於等待狀態,不然拿到task回去執行runTask()方法,在runTask()方法裏執行咱們丟給線程池的任務的run()方法完成任務【兜了這麼一大圈子】
經過while循環使這個過程不斷進行

 while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }

總結

其實把整個流程梳理一遍就是,將要執行的任務封裝進Worker對象中,Worker實現Runnable接口重寫run()方法實現不斷獲取新任務(無論是直接得到仍是來自緩存隊列)的邏輯,將Worker做爲參數實例化Thread對象,這樣在開啓線程後Worker的run方法就被間接調用了。至此,第一個問題就解決了。至於第二個問題,若是熟悉生產者消費者模式的話也不難理解。

參考資料:

相關文章
相關標籤/搜索