線程池ThreadPoolExecutor、Executors參數詳解與源代碼分析

歡迎探討,若有錯誤敬請指正 html

如需轉載,請註明出處 http://www.cnblogs.com/nullzx/ java

1. ThreadPoolExecutor數據成員

Private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

     ctl主要用於存儲線程池的工做狀態以及池中正在運行的線程數。顯然要在一個整型變量存儲兩個數據,只能將其一分爲二。其中高3bit用於存儲線程池的狀態,低位的29bit用於存儲正在運行的線程數。 緩存

     線程池具備如下五種狀態,當建立一個線程池時初始化狀態爲RUNNING 服務器

RUNNING多線程

容許提交併處理任務併發

SHUTDOWN函數

不容許提交新的任務,可是會處理完已提交的任務性能

STOPui

不容許提交新的任務,也不會處理阻塞隊列中未執行的任務,並設置正在執行的線程的中斷標誌位this

TIDYING

全部任務執行完畢,池中工做的線程數爲0,等待執行terminated()勾子方法

TERMINATED

terminated()勾子方法執行完畢

      注意,這裏說的是線程池的狀態而不是池中線程的狀態。

      調用線程池的shutdown方法,將線程池由RUNNING(運行狀態)轉換爲SHUTDOWN狀態。

      調用線程池的shutdownNow方法,將線程池由RUNNING或SHUTDOWN狀態轉換爲STOP狀態。

      SHUTDOWN狀態和STOP狀態先會轉變爲TIDYING狀態,最終都會變爲TERMINATED

Private static int runStateOf(int c)
Private static int workerCountOf(int c)
Private static int ctlOf(int rs,int wc)

      ThreadPoolExecutor同時提供上述三個方法用於池中的線程查看線程池的狀態和計算正在運行的線程數。

Private int largestPoolSize;
Private final BlockingQueue<Runnable>workQueue;
Private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;

      上述數據成員對線程池的性能也有很大的影響,我會將它們放到構造中講解。

Privatefinal HashSet<Worker> workers= new HashSet<Worker>();
Privatelong completedTaskCount;
Private volatile boolean allowCoreThreadTimeOut;
private int largestPoolSize;

       completedTaskCount表示線程池已完成的任務數。

      allowCoreThreadTimeeOut表示是否容許核心線程在空閒狀態下自行銷燬。

      largestPoolSize 表示線程池從建立到如今,池中線程的最大數量

private final HashSet<Worker> workers = new HashSet<Worker>();

     workers是個HashSet容器,它存儲的是Worker類的對象,Worker是線程池的內部類,它繼承了Runnable接口,不嚴格的狀況下,能夠將一個Worker對象當作Thread對象,也就是工做的線程。shutdown和shutdownNow方法中會使用workers完成對全部線程的遍歷。

Privatefinal ReentrantLock mainLock =new ReentrantLock();
Privatefinal Condition termination = mainLock.newCondition();

      mainLock主要用於同步訪問(或者說改變)線程池的狀態以及線程池的各項參數,好比completedTaskCount和workers等。

     在awaitTermination方法中,(mianLock的)termination是用於延時的條件隊列。

2. 構造函數

publicThreadPoolExecutor(intcorePoolSize,
		int maximumPoolSize,
		long keepAliveTime,
		TimeUnit unit,
		BlockingQueue<Runnable> workQueue,
		ThreadFactory threadFactory,
		RejectedExecutionHandler handler)

       線程池的構造函數參數多達7個,如今咱們一一來分析它們對線程池的影響。

       corePoolSize:線程池中核心線程數的最大值

       maximumPoolSize:線程池中能擁有最多線程數

       workQueue:用於緩存任務的阻塞隊列

       咱們如今經過向線程池添加新的任務來講明着三者之間的關係。

     (1)若是沒有空閒的線程執行該任務且當前運行的線程數少於corePoolSize,則添加新的線程執行該任務。

     (2)若是沒有空閒的線程執行該任務且當前的線程數等於corePoolSize同時阻塞隊列未滿,則將任務入隊列,而不添加新的線程

     (3)若是沒有空閒的線程執行該任務且阻塞隊列已滿同時池中的線程數小於maximumPoolSize,則建立新的線程執行任務。

     (4)若是沒有空閒的線程執行該任務且阻塞隊列已滿同時池中的線程數等於maximumPoolSize,則根據構造函數中的handler指定的策略來拒絕新的任務。

       注意,線程池並無標記哪一個線程是核心線程,哪一個是非核心線程,線程池只關心核心線程的數量。

       通俗解釋,若是把線程池比做一個單位的話,corePoolSize就表示正式工,線程就能夠表示一個員工。當咱們向單位委派一項工做時,若是單位發現正式工還沒招滿,單位就會招個正式工來完成這項工做。隨着咱們向這個單位委派的工做增多,即便正式工所有滿了,工做仍是幹不完,那麼單位只能按照咱們新委派的工做按前後順序將它們找個地方擱置起來,這個地方就是workQueue,等正式工完成了手上的工做,就到這裏來取新的任務。若是不巧,年底了,各個部門都向這個單位委派任務,致使workQueue已經沒有空位置放新的任務,因而單位決定招點臨時工吧(臨時工:又是我!)。臨時工也不是想招多少就找多少,上級部門經過這個單位的maximumPoolSize肯定了你這個單位的人數的最大值,換句話說最多招maximumPoolSize–corePoolSize個臨時工。固然,在線程池中,誰是正式工,誰是臨時工是沒有區別,徹底同工同酬。

        keepAliveTime:表示空閒線程的存活時間。

        TimeUnitunit:表示keepAliveTime的單位。

        爲了解釋keepAliveTime的做用,咱們在上述狀況下作一種假設。假設線程池這個單位已經招了些臨時工,但新任務沒有繼續增長,因此隨着每一個員工忙完手頭的工做,都來workQueue領取新的任務(看看這個單位的員工多自覺啊)。隨着各個員工齊心合力,任務愈來愈少,員工數沒變,那麼就一定有閒着沒事幹的員工。這樣的話領導不樂意啦,可是又不能輕易fire沒事幹的員工,由於隨時可能有新任務來,因而領導想了個辦法,設定了keepAliveTime,當空閒的員工在keepAliveTime這段時間尚未找到事情幹,就被辭退啦,畢竟地主家也沒有餘糧啊!固然辭退到corePoolSize個員工時就再也不辭退了,領導也不想當光桿司令啊!

       handler:表示當workQueue已滿,且池中的線程數達到maximumPoolSize時,線程池拒絕添加新任務時採起的策略。

爲了解釋handler的做用,咱們在上述狀況下作另外一種假設。假設線程池這個單位招滿臨時工,但新任務依然繼續增長,線程池從上到下,從裏到外真心忙的不可開交,阻塞隊列也滿了,只好拒絕上級委派下來的任務。怎麼拒絕是門藝術,handler通常能夠採起如下四種取值。

ThreadPoolExecutor.AbortPolicy()

拋出RejectedExecutionException異常

ThreadPoolExecutor.CallerRunsPolicy()

由向線程池提交任務的線程來執行該任務

ThreadPoolExecutor.DiscardOldestPolicy()

拋棄最舊的任務(最早提交而沒有獲得執行的任務)

ThreadPoolExecutor.DiscardPolicy()

拋棄當前的任務

     workQueue:它決定了緩存任務的排隊策略。對於不一樣的應用場景咱們可能會採起不一樣的排隊策略,這就須要不一樣類型的阻塞隊列,在線程池中經常使用的阻塞隊列有如下2種:

    (1)SynchronousQueue<Runnable>:此隊列中不緩存任何一個任務。向線程池提交任務時,若是沒有空閒線程來運行任務,則入列操做會阻塞。當有線程來獲取任務時,出列操做會喚醒執行入列操做的線程。從這個特性來看,SynchronousQueue是一個無界隊列,所以當使用SynchronousQueue做爲線程池的阻塞隊列時,參數maximumPoolSizes沒有任何做用。

    (2)LinkedBlockingQueue<Runnable>:顧名思義是用鏈表實現的隊列,能夠是有界的,也能夠是無界的,但在Executors中默認使用無界的。

      threadFactory:指定建立線程的工廠

     實際上ThreadPoolExecutor類中還有不少重載的構造函數,下面這個構造函數在Executors中常常用到。

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
	Executors.defaultThreadFactory(), defaultHandler);
}

      注意到上述的構造方法使用Executors中的defaultThreadFactory()線程工廠和ThreadPoolExecutor中的defaultHandler拋棄策略。

      使用defaultThreadFactory建立的線程同屬於相同的線程組,具備同爲Thread.NORM_PRIORITY的優先級,以及名爲"pool-XXX-thread-"的線程名(XXX爲建立線程時順序序號),且建立的線程都是非守護進程。

      defaultHandler缺省拋棄策是ThreadPoolExecutor.AbortPolicy()。

      除了在建立線程池時指定上述參數的值外,還可在線程池建立之後經過以下方法進行設置。

Public void allowCoreThreadTimeOut(boolean value)
Public void setKeepAliveTime(long time,TimeUnit unit)
Public void setMaximumPoolSize(int maximumPoolSize)
Public void setCorePoolSize(int corePoolSize)
Public void setThreadFactory(ThreadFactory threadFactory)
Public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

3. 其它有關涉及池中線程數量的相關方法

public void allowCoreThreadTimeOut(boolean value) 
public int prestartAllCoreThreads()

     默認狀況下,當池中有空閒線程,且線程的數量大於corePoolSize時,空閒時間超過keepAliveTime的線程會自行銷燬,池中僅僅會保留corePoolSize個線程。若是線程池中調用了allowCoreThreadTimeOut這個方法,則空閒時間超過keepAliveTime的線程所有都會自行銷燬,而沒必要理會corePoolSize這個參數。

     若是池中的線程數量小於corePoolSize時,調用prestartAllCoreThreads方法,則不管是否有待執行的任務,線程池都會建立新的線程,直到池中線程數量達到corePoolSize。

4. Executors中的線程池的工廠方法

     爲了防止使用者錯誤搭配ThreadPoolExecutor構造函數的各個參數以及更加方便簡潔的建立ThreadPoolExecutor對象,JavaSE中又定義了Executors類,Eexcutors類提供了建立經常使用配置線程池的方法。如下是Executors經常使用的三個建立線程池的源代碼。

      從源碼中能夠看出,Executors間接的調用了重載的ThreadPoolExecutor構造函數,並幫助用戶根據不一樣的應用場景,配置不一樣的參數。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

      newCachedThreadPool:使用SynchronousQueue做爲阻塞隊列,隊列無界,線程的空閒時限爲60秒。這種類型的線程池很是適用IO密集的服務,由於IO請求具備密集、數量巨大、不持續、服務器端CPU等待IO響應時間長的特色。服務器端爲了能提升CPU的使用率就應該爲每一個IO請求都建立一個線程,以避免CPU由於等待IO響應而空閒。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

      newFixedThreadPool:需指定核心線程數,核心線程數和最大線程數相同,使用LinkedBlockingQueue 做爲阻塞隊列,隊列無界,線程空閒時間0秒。這種類型的線程池能夠適用CPU密集的工做,在這種工做中CPU忙於計算而不多空閒,因爲CPU能真正併發的執行的線程數是必定的(好比四核八線程),因此對於那些須要CPU進行大量計算的線程,建立的線程數超過CPU可以真正併發執行的線程數就沒有太大的意義。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

      newSingleThreadExecutor:池中只有一個線程工做,阻塞隊列無界,它能保證按照任務提交的順序來執行任務。

5. 任務的提交過程

submit方法源碼

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

       submit的實現方法位於抽象類AbstractExecutorService中,而此時execute方法還未實現(而是在AbstractExecutorService的繼承類ThreadPoolExecutor中實現)。submit有三種重載方法,這裏我選取了兩個經常使用的進行分析,能夠看出不管哪一個submit方法都最終調用了execute方法。

execute方法源碼

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

         因爲execute方法中屢次調用addWorker,咱們這裏就簡要介紹一下它,這個方法的主要做用就是建立一個線程來執行Runnnable對象。

addWorker(Runnable firstTask, boolean core)

       第一個參數firstTask不爲null,則建立的線程就會先執行firstTask對象,而後去阻塞隊列中取任務,否直接到阻塞隊列中獲取任務來執行。第二個參數,core參數爲真,則用corePoolSize做爲池中線程數量的最大值;爲假,則以maximumPoolSize做爲池中線程數量的最大值。

      簡要分析一下execute源碼,執行一個Runnable對象時,首先經過workerCountOf(c)獲取線程池中線程的數量,若是池中的數量小於corePoolSize就調用addWorker添加一個線程來執行這個任務。不然經過workQueue.offer(command)方法入列。若是入列成功還須要在一次判斷池中的線程數,由於咱們建立線程池時可能要求核心線程數量爲0,因此咱們必須使用addWorker(null, false)來建立一個臨時線程去阻塞隊列中獲取任務來執行。

       isRunning(c) 的做用是判斷線程池是否處於運行狀態,若是入列後發現線程池已經關閉,則出列。不須要在入列前判斷線程池的狀態,由於判斷一個線程池工做處於RUNNING狀態到執行入列操做這段時間,線程池可能被其它線程關閉了,因此提早判斷毫無心義。

addWorker源碼

private boolean addWorker(Runnable firstTask, boolean core) {
    //這個兩個for循環主要是判斷可否增長一個線程,
	//外循環來判斷線程池的狀態
	//內循環主要是個增長線程數的CAS操做
	retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // 若是是由於線程數的改變致使CAS失敗,只須要重複內循環
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//建立線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//啓動線程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

6. 線程的執行過程

runWorker源碼

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

       Thread的run方法實際上調用了Worker類的runWorker方法,而Worker類繼承了AQS類,並實現了lock、unlock、trylock方法。可是這些方法不是真正意義上的鎖,因此在代碼中加鎖操做和解鎖操做沒有成對出現。runWorker方法中獲取到任務就「加鎖」,完成任務後就「解鎖」。也就是說在「加鎖」到「解鎖」的這段時間內,線程處於忙碌狀態,而其它時間段,處於空閒狀態。線程池就能夠經過trylock方法來肯定這個線程是否空閒。

       getTask方法的主要做用是從阻塞隊列中獲取任務。

       beforeExecute(wt, task)和afterExecute(task, thrown)是個鉤子函數,若是咱們須要在任務執行以前和任務執行之後進行一些操做,那麼咱們能夠自定義一個繼承ThreadPoolExecutor類,並覆蓋這兩個方法。

getTask源代碼

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

     能夠看出若是容許線程在keepAliveTime時間內未獲取到任務線程就銷燬就調用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),不然會調用workQueue.take()方法(該方法即便獲取不到任務就會一直阻塞下去)。而肯定是否使用workQueue.poll方法只有兩個條件決定,一個是當前池中的線程是否大於核心線程數量,第二個是是否容許核心線程銷燬,二者其一知足就會調用該方法。

7. 線程池的關閉過程

shutdown源碼

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

       advanceRunState(SHUTDOWN)的做用是經過CAS操做將線程池的狀態更改成SHUTDOWN狀態。

       interruptIdleWorkers是對空閒的線程進行中斷,它實際上調用了重載帶參數的函數interruptIdleWorkers(false)

       onShutdown也是一個鉤子函數

interruptIdleWorkers源碼

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

      經過workers容器,遍歷池中的線程,對每一個線程進行tryLock()操做,若是成功說明線程空閒,則設置其中斷標誌位。而線程是否響應中斷則由任務的編寫者決定。

8. 參考文章

[1] http://www.infoq.com/cn/articles/java-threadPool/

[2] http://my.oschina.net/u/1398304/blog/376827?fromerr=limo9iEj

[3] http://www.cnblogs.com/dolphin0520/p/3932921.html

[4] http://cuisuqiang.iteye.com/blog/2019372

[5] http://blog.sina.com.cn/s/blog_5eeabe8b0100v9i5.html

相關文章
相關標籤/搜索