[3]java1.8線程池—ThreadPoolExecutor

Wiki 上是這樣解釋的:Thread Pooljava

做用:利用線程池能夠大大減小在建立和銷燬線程上所花的時間以及系統資源的開銷!緩存

下面主要講下線程池中最重要的一個類 ThreadPoolExecutor 。ide

 

看到池這關鍵字,第一反應會是線程能緩存起來。請原諒我這個Java入門漢蹩腳的想象。oop

我會這麼想,實例化出線程A、線程B,而後把A、B線程放入容器,再寫個循環while,每次執行時從容器中取出線程的引用,而後傳遞任務Runnable給線程執行。ui

 

但認真的研究了一邊Java線程的調用和生命週期以後,上面的想象是無從實現的。this

1.線程的任務Runnable需在線程建立以前傳遞,線程在存活期就已經捆綁了Runnable,不能再次傳遞Runnale去執行。spa

2.線程獲取的CPU分派後,執行完Runnable的Run方法,線程就已經死亡了。不可能再次發起執行(Java線程生命週期)。 .net

因此,線程池中的線程是緩存起來的,可取出調用,執行完畢後可從新放入的邏輯是不對的。線程

 

 

那麼線程池是什麼概念呢?設計

我的理解是保持了多個線程是存活(running,沒有執行完畢)的一種狀態,而後有個變量對它們線程的引用進行了存儲,而這就是線程池的容器。

池中的線程被設計成外部不可取出調用的,由於在調用了線程池ThreadPoolExecutor的execute方法後,線程已經被建立並執行,並無線程的引用暴露出來。

一個緣由是爲了封裝保護,二是由於線程已是running狀態了,線程的執行能力已經在使用中,執行能力已經沒法釋放了!

其中Worker對象中的內部線程,經過自己Worker中的Run方法中的一個阻塞循環體來實現線程長期存活的。

 

線程池是指已經或者將要 有指定個數的線程執行while循環,不停的阻塞消費BlockingQueue隊列中提交的外部Runnable,同時內部的線程不會被銷燬(由於while會被阻塞)的一種表現或能力。

 

下面來說講線程池ThreadPoolExecutor類:

建立線程池pool

 

public static void main(String[] args) {
        //核心線程數量爲 2,最大線程數量 4,空閒線程存活的時間 60s,有界隊列長度爲 3
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(3));
        for (int i = 1; i <= 10; i++) {
            //建立 10 個任務
            //運行
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("任務已執行");
                }
            });
            System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
        }
        //關閉線程池
        pool.shutdown();
    }

 

經過for循環提交了十個Runnable任務給了線程池pool,調用了ThreadPoolExecutor實例中的execute方法。

而後咱們查看一下ThreadPoolExecutor實例中execute方法,發現作了worker數量的判斷和建立方法addWorker()的調用。

 

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 一、若是當前的線程數小於核心線程池的大小,根據現有的線程做爲第一個 Worker 運行的線程,新建一個 Worker,
    // addWorker 自動的檢查當前線程池的狀態和 Worker 的數量,防止線程池在不能添加線程的狀態下添加線程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //二、若是線程入隊成功,而後仍是要進行 double-check 的,由於線程在入隊以後狀態是可能會發生變化的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        
        // recheck 防止線程池狀態的突變,若是突變,那麼將 reject 線程,防止 workQueue 中增長新線程
        if (! isRunning(recheck) && remove(command))
            reject(command);

        //上下兩個操做都有 addWorker 的操做,可是若是在workQueue.offer 的時候 Worker 變爲 0,那麼將沒有 Worker 執行新的 task,因此增長一個Worker.
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
    //三、若是 task 不能入隊(隊列滿了),這時候嘗試增長一個新線程,若是增長失敗那麼當前的線程池狀態變化了或者線程池已經滿了而後拒絕task
    else if (!addWorker(command, false))
        reject(command);
}

 

咱們看到,最終是經過調用addWorker()方法來建立線程的。咱們再看看addWorker()方法,該方法判斷了線程的數量,同時實例化一個Worker類並執行了其內部捆綁的線程。

 

// firstTask: 新增一個線程並執行這個任務,可空,增長的線程從隊列獲取任務;core:是否使用 corePoolSize 做爲上限,不然使用 maxmunPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /**
         * rs!=Shutdown || fistTask!=null || workQueue.isEmpty
         * 若是當前的線程池的狀態 > SHUTDOWN 那麼拒絕 Worker 的 add 若是 =SHUTDOWN
         * 那麼此時不能新加入不爲 null 的 Task,若是在 workQueue 爲 empty 的時候不能加入任何類型的 Worker,
         * 若是不爲 empty 能夠加入 task 爲 null 的 Worker, 增長消費的 Worker
         */
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            //若是當前的數量超過了 CAPACITY,或者超過了 corePoolSize 和 maximumPoolSize(試 core 而定)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS 嘗試增長線程數,若是失敗,證實有競爭,那麼從新到 retry。
            if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操做;
                break retry;
            c = ctl.get();  // Re-read ctl
            //判斷當前線程池的運行狀態,狀態發生改變,重試 retry;
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Worker 爲內部類,封裝了線程和任務,經過 ThreadFactory 建立線程,可能失敗拋異常或者返回 null
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        // SHUTDOWN 之後的狀態和 SHUTDOWN 狀態下 firstTask 爲 null,不可新增線程
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;//記錄最大線程數
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //執行worker中捆綁的線程
 t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //失敗回退,從 wokers 移除 w, 線程數減一,嘗試結束線程池(調用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}

 

上面標紅的代碼,就是啓動了Worker類中內部的一個線程。而後該線程以Worker自己實例爲Runnable參數,線程啓動以後將會常駐內存中執行。咱們再看一下Worker這個內部類

 

   /**
     * Worker自己就是一個任務,把本身當作任務傳遞給Thread類,來實例化出一個線程能力。
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        //捆綁的線程,也就是線程池中的真正線程,是該線程在處理不斷提交的外部任務Runnable
        final Thread thread;

        //捆綁的外部任務Runnable實例
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * 構造器
         * @param firstTask
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //經過外部傳遞的線程構造工廠實例 或 默認的內部線程工廠類來建立線程
            this.thread = getThreadFactory().newThread(this);
        }

        /**
         * 執行當前worker,也就是調用外部類ThreadPoolExecutor的runWorker
         */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

 

Worker類自己實現了Runnable接口,具有被線程執行的能力。因此addWorker()方法實例化一個Worker類後,啓動了它捆綁的內部線程,並執行之。

而後就會執行Worker的run方法,也就是調用下面的ThreadPoolExecutor類的runWorker方法。

 

    /**
     * 執行Worker中捆綁的Runnable
     * @param w
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //循環獲取隊列中的外部提交過來的Runnable任務
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                        runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執行咱們經過調用execute方法提交給線程池的Runnable任務
 task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

 

而後經過循環調用getTask()方法,從BlockingQueue阻塞隊列中,取出的外部提交至線程池的Ruannable任務實例。

並執行外部任務Runnable的run方法。從而執行到了咱們外部的業務代碼。

咱們看一下getTask()方法,該方法是阻塞取的。隊列中一旦沒有新的任務提交過來,那麼getTask方法將會阻塞執行,直到有新的外部任務Runnable實例提交進來。

 

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //從阻塞隊列BlockingQueue中獲取任務
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

 

內部的四個線程是共享阻塞隊列BlockingQueue的,4個線程經過 runWorker() 方法中的 while 和 getTask() 這個阻塞方法,得到了不進入線程死亡狀態(java線程生命週期的圖示)的能力。

至此,線程池的執行流程就大概清晰了。線程池類就是幫助咱們管理線程,同時執行咱們外部提交的任務、並提供監控的一個包裝類。

 

執行流程:

一、當有任務進入時,線程池建立線程(Worker類)去執行任務,直到核心線程數(Worker實例數)滿爲止。

二、核心線程數量滿了以後,任務就會進入一個緩衝的阻塞任務隊列中

  • 當任務隊列爲無界隊列時,任務就會一直放入緩衝的任務隊列中,不會和最大線程數量進行比較
  • 當任務隊列爲有界隊列時,任務先放入緩衝的任務隊列中,當任務隊列滿了以後,纔會將任務放入線程池,此時會與線程池中最大的線程數量進行比較,若是超出了,則默認會拋出異常。而後線程池纔會執行任務,當任務執行完,又會將緩衝隊列中的任務放入線程池中,而後重複此操做。

 

 

 

其中最核心的是 利用了 阻塞隊列BlockingQueue 和 循環while,還有拒絕任務機制reject,同時巧妙的把任務 Runnable 和 Thread 捆綁在 一個 Worker 類上。

 

其實能夠不須要Worker類的存在,能夠先添加Runnable到阻塞隊列中,再利用線程去循環取出消費也能實現。

咱們本身也能夠設計一個線程池類,不過官方已經提供了這麼好的實現,就不必重複造輪子了,咱們明白其中的設計思想便可!

 

 

ps:

深刻理解java線程池—ThreadPoolExecutor

相關文章
相關標籤/搜索