多線程學習(6)ThreadPoolExecutor 線程池學習-1

threadpool模型:

調用方經過調用api將任務,裝進queue裏,而後會有一個機制監事queue裏有沒有task,若是有task,就分配給某個worker去執行。workers表明線程池的話.worker就是某條線程了。java

線程池的構造方法:

Executor框架最核心的類是ThreadPoolExecutor,他是線程池的實現類,主要由下列7個組件構成。web

package java.util.concurrent;

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

int corePoolSize,   //  線程池可以使用線程數的最小值api


int maximumPoolSize,  // 線程池容量的最大值tomcat

maximumPoolSize:是一個靜態變量,在變量初始化的時候,有構造函數指定.多線程

long keepAliveTime, //  當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。併發

TimeUnit unit, //  線程的阻塞時間單位,它的執行方法是TimeUnit.unit.Sleep(keepAliveTime);框架

內部調用了Thread.sleep()方法。可是它和Thread.sleep()方法的區別是,Thread.Sleep只能設置毫秒數,而TimeUnit.unit.Sleep()中的unit能夠換成時間單位,好比DAYS、HOURS、MINUTES,SECONDS、MILLISECONDS和NANOSECONDS。jvm

TimeUnit.MINUTES.sleep(4);  // sleeping for 4 minutes

BlockingQueue<Runnable> workQueue,  // 阻塞隊列,裏面是Runnable類型,線程的任務
ThreadFactory threadFactory,  // 建立線程,併爲線程指定queue裏面的runnable,線程池的構造方法,支持自定義threadFactory傳入,咱們能夠本身編寫newThread()方法,來實現自定義的線程建立邏輯。函數

public interface ThreadFactory {
    Thread newThread(Runnable r);
}


RejectedExecutionHandler handler  // 當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大線程池大小且工做隊列已滿),execute()方法將要調用的Handler。oop

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

而且這些成員變量,都是volatile修飾的

private volatile ThreadFactory threadFactory;

    private volatile RejectedExecutionHandler handler;

    private volatile long keepAliveTime;

    private volatile boolean allowCoreThreadTimeOut;

    private volatile int corePoolSize;

    private volatile int maximumPoolSize;

線程池成員屬性和api方法介紹

largestPoolSize: 是一個動態變量,是記錄線程曾經達到的最高值,也就是 largestPoolSize<= maximumPoolSize.

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

completedtaskcount:

返回已完成執行的近似任務總數。由於在計算期間任務和線程狀態可能動態改變,因此返回值只是一個近似值,可是該值在整個連續調用過程當中不會減小。

當一個線程在workers容器中,準備remove時,線程會將本身的completedtaskcount賦值給線程池的completedtaskcount。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

TaskCount 線程池執行的總任務數,包括已經執行完的任務數和任務隊列中目前還須要執行的任務數

public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

getActiveCount();Thread.activeCount()  獲得是存活的線程數  返回值是int類型

public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

常見線程池類型:

singletenthreadPool:

SingleThreadExecutor是使用單個worker線程的Executor。下面是SingleThreadExecutor的源代碼實現。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor的corePoolSize和maximumPoolSize被設置爲1。其餘參數與FixedThreadPool相同。SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊列做爲工做隊列對線程池帶來的影響與FixedThreadPool相同,這裏就不贅述了。

  1. 若是當前運行的線程數少於corePoolSize(即線程池中無運行的線程),則建立一個新線程來執行任務。
  2. 在線程池完成預熱以後(當前線程池中有一個運行的線程),將任務加入LinkedBlockingQueue。
  3. 線程執行完1中的任務後,會在一個無限循環中反覆從LinkedBlockingQueue獲取任務來執行。

fixedthreadpool:

package java.util.concurrent;
public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}

FixedThreadPool的corePoolSize和maximumPoolSize都被設置爲建立FixedThreadPool時指定的參數nThreads。

當線程池中的線程數大於corePoolSize時,keepAliveTime爲多餘的空閒線程等待新任務的最長時間,超過這個時間後多餘的線程將被終止。這裏把keepAliveTime設置爲0L,意味着多餘的空閒線程會被當即終止。
FixedThreadPool的execute()方法的運行示意圖以下所示。

對上圖的說明以下。

  1. 若是當前運行得線程數少於corePoolSize,則建立線程來執行任務。
  2. 在線程池完成預熱以後(當前運行的線程數等於corePoolSize),將任務加入LinkedBlockingQueue。
  3. 線程執行完1中的任務後,會在循環中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列(隊列的容量爲Integer.MAX_VALUE)。使用無界隊列做爲工做隊列會對線程池帶來以下影響。

  1. 當線程池中的線程數達到corePoolSize後,新任務將在無界隊列中等待,所以線程池中的線程數不會超過corePoolSize。
  2. 因爲1,使用無界隊列時maximumPoolSize將是一個無效參數。
  3. 因爲1和2,使用無界隊列時keepAliveTime將是一個無效參數。
  4. 因爲使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。

cachethreadpool:

CacheThreadPool是一個會根據須要建立新線程的線程池。下面是建立CacheThreadPool的源代碼。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CacheThreadPool的corePoolSize被設置爲0,即corePool爲空;maximumPoolSize被設置爲Integer.MAX_VALUE,即maximumPool是無界的。這裏把keepAliveTime設置爲60L,意味着CacheThreadPool中的空閒線程等待新任務的最長時間爲60秒,空閒線程超過60秒後將會被終止。

FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue做爲線程池的工做隊列。CacheThreadPool使用沒有容量的SynchronousQueue做爲線程池的工做隊列,但CacheThreadPool的maximumPool是無界的。這意味着,若是主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CacheThreadPool會不斷建立新線程。極端狀況下,CacheThreadPool會由於建立過多線程而耗盡CPU和內存資源。

對上圖的說明以下。

  1. 首先執行SynchronousQueue.offer(Runnable task)。若是當前maximumPool中有空閒線程正在執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那麼主線程執行offer操做與空閒線程執行的poll操做配對成功,主線程把任務交給空閒線程執行,execute()方法執行完成;不然執行下面的步驟2。
  2. 當初始maximumPool爲空,或者maximumPool中當前沒有空閒線程時,將沒有線程執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這種狀況下,步驟1將失敗。此時CachedThreadPool會建立一個新線程執行任務,execute()方法執行完成。
  3. 在步驟2中新建立的線程將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這個poll操做會讓空閒線程最多在SynchronousQueue中等待60秒鐘。若是60秒鐘內主線程提交了一個新任務(主線程執行步驟1),那麼這個空閒線程將執行主線程提交的新任務;不然,這個空閒線程將終止。因爲空閒60秒的空閒線程會被終止,所以長時間保持空閒的CachedThreadPool不會使用任務資源。

前面提到過,SynchronousQueue是一個沒有容量的阻塞隊列。每一個插入操做必須等待另外一個線程的對應移除操做,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的任務傳遞給空閒線程執行。CachedThreadPool中任務傳遞的示意圖以下所示。

ScheduledThreadPool

執行定時任務的線程池

建立線程池的四種方式

這四種方式,都實現了RejectedExecutionHandler接口

Abortpolicy 

會拋出異常,致使當前線程退出

當咱們建立線程池時,不指定rejectedExecutionHandler時,就會默認使用AbortPolicy,當咱們經過executor.execute(runnable)任務時,可能會發生異常,並將異常直接返回給了調用者。

 

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

CallerRunsPolicy

當線程池的存活線程數,達到了最大值,此時又有新的請求過來,線程池會調用rejectedExecutionHandler這個接口的實現類的rejectedExecution的方法,此時該實現類正好是CallerRunsPolicy,它會讓新請求,在本身的線程上執行run方法,若是run方法消耗時間長,它會阻塞web容器的請求,影響web容器處理其餘請求的性能。

當有外部請求訪問web服務端時,tomcat會分配一條線程(tomcat默認有150個線程,能夠配置最大的爲1500個線程來接收處理請求,且這些線程之間具備隔離性不會互相影響對方)來處理這個請求,當這個請求要用到線程池,且咱們的線程池是基於CallerRunsPolicy來建立的,那麼CallerRunsPolicy會,使用當前請求的線程,來執行run方法。而當這個run方法執行時間過長時,tomcat的請求就會被佔用不放,致使沒法拿出空閒的線程去處理其餘請求,就會影響到服務端的性能。

應用場景:當咱們但願線程池滿了以後,進行阻塞,就使用CallerRunsPolicy,阻塞的是調用方的,不會往queue裏聽任務了。

 

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

看上面的rejectedExecution方法體,頗有意思,它執行線程的方式,是r.run()而不是start()方法,這很回味無窮,緣由有兩個

咱們在main方法中,準備啓動一個線程時,若是在代碼中咱們使用thread.star()方法,jvm在執行到這行時,實際上會建立一個新的線程,來執行線程對象中的run方法,此時在執行run方法的線程,與執行main方法的線程,是兩條線程,沒有關聯。而上面調用了runnable接口實例的run方法,jvm在執行時,根本不會建立新線程去執行,而是就在當前的請求(線程)裏之心run方法,此時的run方法,根本不須要開闢或分配新線程來運行,而是當作一個普通方法來執行了。因此此時run方法卡住了,他就會卡住當前的請求,就會卡住web容器的請求。影響web容器處理其餘請求的性能。

DiscardOldestPolicy  在個人隊列裏面,踢出出最老的一個任務

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

DiscardPolicy

不作任何處理

ThreadPool的三個階段

Workers容器

0<active<coresize 

當一個task準備分配給workers容器,但願調用一個線程去執行它時,若是此時容器中存活的線程數小於coresize指定線程數時,會一次性建立一條新線程來執行任務,並且新線程也會駐留在內存中。而當線程執行完任務,並不會收回,而是變成等待狀態了。

問題:何時出現activesize會超過coresize?

當coreSize向maxsize變遷的時候,不是由workers決定的,而是由queue決定的。queue裏面的task數量達到最大值的時候,coreSize就會向maxsize變遷了。咱們在建立線程池的時候。線程池的構造方法會有一個BlockingQueue<Runnable> workQueue,而後咱們初始化線程池時會指定這個queue的size,那麼調用者一邊往queue裏裝task,task也會一邊分配給workers去執行。只有當queue裏面的任務數,size達到了設置的最大size時,wokers纔會去建立更多的線程,來處理任務,建立新線程的數量,不能超過maxsize。

core<active<maxsize

條件:任務queue滿了,會新建立線程去處理任務

active == maxsize

跟rejectHandlerPolicy有關係,配置了CallerRunsPolicy就會阻塞請求方,拒絕接受任務;配置了abortPolicy就會返回異常,意思是線程數已經創夠了,不能繼續建立了;配置了discardOldPolicy就會刪除最老任務,配置了discardPolicy就什麼都不作。

 

 

本文章參考了:https://blog.csdn.net/en_joker/article/details/84973420   《併發:ThreadPoolExecutor詳解》

相關文章
相關標籤/搜索