線程池分析

1、概述

在執行一個異步任務或併發任務時,每每是經過直接new Thread()方法來建立新的線程,這樣作弊端較多,更好的解決方案是合理地利用線程池,線程池的優點很明顯,以下:數組

下降系統資源消耗,經過重用已存在的線程,下降線程建立和銷燬形成的消耗; 提升系統響應速度,當有任務到達時,無需等待新線程的建立便能當即執行; 方便線程併發數的管控,線程如果無限制的建立,不只會額外消耗大量系統資源,更是佔用過多資源而阻塞系統或oom等情況,從而下降系統的穩定性。線程池能有效管控線程,統一分配、調優,提供資源使用率; 更強大的功能,線程池提供了定時、按期以及可控線程數等功能的線程池,使用方便簡單。緩存

2、線程池用法

Java API針對不一樣需求,利用Executors類提供了4種不一樣的線程池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor,接下來說講線程池的用法。服務器

2.1 newCachedThreadPool多線程

建立一個可緩存的無界線程池,該方法無參數。當線程池中的線程空閒時間超過60s則會自動回收該線程,當任務超過線程池的線程數則建立新線程。線程池的大小上限爲Integer.MAX_VALUE,可看作是無限大。併發

public void cachedThreadPoolDemo(){

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
        final int index = i;

        cachedThreadPool.execute(new Runnable() {

            [@Override]
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:異步

pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2
pool-1-thread-1, index=3
pool-1-thread-1, index=4

從運行結果能夠看出,整個過程都在同一個線程pool-1-thread-1中運行,後面線程複用前面的線程。ide

2.2 newFixedThreadPool優化

建立一個固定大小的線程池,該方法可指定線程池的固定大小,對於超出的線程會在LinkedBlockingQueue隊列中等待。this

public void fixedThreadPoolDemo(){
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 6; i++) {
        final int index = i;

        fixedThreadPool.execute(new Runnable() {

            [@Override]
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:操作系統

pool-1-thread-1, index=0
pool-1-thread-2, index=1
pool-1-thread-3, index=2
pool-1-thread-1, index=3
pool-1-thread-2, index=4
pool-1-thread-3, index=5

從運行結果能夠看出,線程池大小爲3,每休眠1s後將任務提交給線程池的各個線程輪番交錯地執行。線程池的大小設置,可參考Runtime.getRuntime().availableProcessors()。

2.3 newSingleThreadExecutor

建立一個只有線程的線程池,該方法無參數,全部任務都保存隊列LinkedBlockingQueue中,等待惟一的單線程來執行任務,並保證全部任務按照指定順序(FIFO或優先級)執行。

public void singleThreadExecutorDemo(){
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 3; i++) {
        final int index = i;

        singleThreadExecutor.execute(new Runnable() {

            [@Override]
            public void run() {
                System.out.println(Thread.currentThread().getName()+", index="+index);
            }
        });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:

pool-1-thread-1, index=0
pool-1-thread-1, index=1
pool-1-thread-1, index=2

從運行結果能夠看出,全部任務都是在單一線程運行的。

2.4 newScheduledThreadPool

建立一個可定時執行或週期執行任務的線程池,該方法可指定線程池的核心線程個數。

public void scheduledThreadPoolDemo(){
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
    //定時執行一次的任務,延遲1s後執行
    scheduledThreadPool.schedule(new Runnable() {

        [@Override]
        public void run() {
            System.out.println(Thread.currentThread().getName()+", delay 1s");
        }
    }, 1, TimeUnit.SECONDS);

    //週期性地執行任務,延遲2s後,每3s一次地週期性執行任務
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

        [@Override]
        public void run() {
            System.out.println(Thread.currentThread().getName()+", every 3s");
        }
    }, 2, 3, TimeUnit.SECONDS);
}

運行結果:

pool-1-thread-1, delay 1s
pool-1-thread-1, every 3s
pool-1-thread-2, every 3s
pool-1-thread-2, every 3s
...

schedule(Runnable command, long delay, TimeUnit unit),延遲必定時間後執行Runnable任務; schedule(Callable callable, long delay, TimeUnit unit),延遲必定時間後執行Callable任務; scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),延遲必定時間後,以間隔period時間的頻率週期性地執行任務; scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit),與scheduleAtFixedRate()方法很相似,可是不一樣的是scheduleWithFixedDelay()方法的週期時間間隔是以上一個任務執行結束到下一個任務開始執行的間隔,而scheduleAtFixedRate()方法的週期時間間隔是以上一個任務開始執行到下一個任務開始執行的間隔,也就是這一些任務系列的觸發時間都是可預知的。 ScheduledExecutorService功能強大,對於定時執行的任務,建議多采用該方法。

2.5 方法對比

上述4個方法的參數對比,以下:

輸入圖片說明

其餘參數都相同,其中線程工廠的默認類爲DefaultThreadFactory,線程飽和的默認策略爲ThreadPoolExecutor.AbortPolicy。

3、線程池原理

Executors類提供4個靜態工廠方法:newCachedThreadPool()、newFixedThreadPool(int)、newSingleThreadExecutor和newScheduledThreadPool(int)。這些方法最終都是經過ThreadPoolExecutor類來完成的,這裏強烈建議你們直接使用Executors類提供的便捷的工廠方法,能完成絕大多數的用戶場景,當須要更細節地調整配置,須要先了解每一項參數的意義。

3.1 ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

建立線程池,在構造一個新的線程池時,必須知足下面的條件:

corePoolSize(線程池基本大小)必須大於或等於0;
maximumPoolSize(線程池最大大小)必須大於或等於1;
maximumPoolSize必須大於或等於corePoolSize;
keepAliveTime(線程存活保持時間)必須大於或等於0;
workQueue(任務隊列)不能爲空;
threadFactory(線程工廠)不能爲空,默認爲DefaultThreadFactory類
handler(線程飽和策略)不能爲空,默認策略爲ThreadPoolExecutor.AbortPolicy。

3.2 參數詳解

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

參數說明:

corePoolSize(線程池基本大小):當向線程池提交一個任務時,若線程池已建立的線程數小於corePoolSize,即使此時存在空閒線程,也會經過建立一個新線程來執行該任務,直到已建立的線程數大於或等於corePoolSize時,纔會根據是否存在空閒線程,來決定是否須要建立新的線程。除了利用提交新任務來建立和啓動線程(按需構造),也能夠經過 prestartCoreThread() 或 prestartAllCoreThreads() 方法來提早啓動線程池中的基本線程。 maximumPoolSize(線程池最大大小):線程池所容許的最大線程個數。當隊列滿了,且已建立的線程數小於maximumPoolSize,則線程池會建立新的線程來執行任務。另外,對於無界隊列,可忽略該參數。

keepAliveTime(線程存活保持時間):默認狀況下,當線程池的線程個數多於corePoolSize時,線程的空閒時間超過keepAliveTime則會終止。但只要keepAliveTime大於0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心線程。另外,也可使用setKeepAliveTime()動態地更改參數。 unit(存活時間的單位):時間單位,分爲7類,從細到粗順序:NANOSECONDS(納秒),MICROSECONDS(微妙),MILLISECONDS(毫秒),SECONDS(秒),MINUTES(分),HOURS(小時),DAYS(天);

workQueue(任務隊列):用於傳輸和保存等待執行任務的阻塞隊列。可使用此隊列與線程池進行交互: 若是運行的線程數少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。 若是運行的線程數等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。 若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。 threadFactory(線程工廠):用於建立新線程。由同一個threadFactory建立的線程,屬於同一個ThreadGroup,建立的線程優先級都爲Thread.NORM_PRIORITY,以及是非守護進程狀態。threadFactory建立的線程也是採用new Thread()方式,threadFactory建立的線程名都具備統一的風格:pool-m-thread-n(m爲線程池的編號,n爲線程池內的線程編號); handler(線程飽和策略):當線程池和隊列都滿了,則代表該線程池已達飽和狀態。 ThreadPoolExecutor.AbortPolicy:處理程序遭到拒絕,則直接拋出運行時異常 RejectedExecutionException。(默認策略) ThreadPoolExecutor.CallerRunsPolicy:調用者所在線程來運行該任務,此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。 ThreadPoolExecutor.DiscardPolicy:沒法執行的任務將被刪除。 ThreadPoolExecutor.DiscardOldestPolicy:若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後從新嘗試執行任務(若是再次失敗,則重複此過程)。 排隊有三種通用策略:

直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集時出現鎖。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。 無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。

3.3 工做隊列BlockingQueue

BlockingQueue的插入/移除/檢查這些方法,對於不能當即知足但可能在未來某一時刻能夠知足的操做,共有4種不一樣的處理方式:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操做),第三種是在操做能夠成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。以下表格:

操做	拋出異常	   特殊值	阻塞	超時
插入	add(e)	    offer(e)	put(e)	offer(e, time, unit)
移除	remove()	poll()	take()	poll(time, unit)
檢查	element()	peek()	不可用	不可用

實現BlockingQueue接口的常見類以下:

ArrayBlockingQueue:基於數組的有界阻塞隊列。隊列按FIFO原則對元素進行排序,隊列頭部是在隊列中存活時間最長的元素,隊尾則是存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。 這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。ArrayBlockingQueue構造方法可經過設置fairness參數來選擇是否採用公平策略,公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」,可根據狀況來決策。

LinkedBlockingQueue:基於鏈表的無界阻塞隊列。與ArrayBlockingQueue同樣採用FIFO原則對元素進行排序。基於鏈表的隊列吞吐量一般要高於基於數組的隊列。

SynchronousQueue:同步的阻塞隊列。其中每一個插入操做必須等待另外一個線程的對應移除操做,等待過程一直處於阻塞狀態,同理,每個移除操做必須等到另外一個線程的對應插入操做。SynchronousQueue沒有任何容量。不能在同步隊列上進行 peek,由於僅在試圖要移除元素時,該元素才存在;除非另外一個線程試圖移除某個元素,不然也不能(使用任何方法)插入元素;也不能迭代隊列,由於其中沒有元素可用於迭代。Executors.newCachedThreadPool使用了該隊列。

PriorityBlockingQueue:基於優先級的無界阻塞隊列。優先級隊列的元素按照其天然順序進行排序,或者根據構造隊列時提供的 Comparator 進行排序,具體取決於所使用的構造方法。優先級隊列不容許使用 null 元素。依靠天然順序的優先級隊列還不容許插入不可比較的對象(這樣作可能致使 ClassCastException)。雖然此隊列邏輯上是無界的,可是資源被耗盡時試圖執行 add 操做也將失敗(致使 OutOfMemoryError)。

3.4 線程池關閉

調用線程池的shutdown()或shutdownNow()方法來關閉線程池

shutdown原理:將線程池狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。 shutdownNow原理:將線程池的狀態設置成STOP狀態,而後中斷全部任務(包括正在執行的)的線程,並返回等待執行任務的列表。 中斷採用interrupt方法,因此沒法響應中斷的任務可能永遠沒法終止。但調用上述的兩個關閉之一,isShutdown()方法返回值爲true,當全部任務都已關閉,表示線程池關閉完成,則isTerminated()方法返回值爲true。當須要馬上中斷全部的線程,不必定須要執行完任務,可直接調用shutdownNow()方法。

3.5 線程池流程

輸入圖片說明

判斷核心線程池是否已滿,即已建立線程數是否小於corePoolSize?沒滿則建立一個新的工做線程來執行任務。已滿則進入下個流程。 判斷工做隊列是否已滿?沒滿則將新提交的任務添加在工做隊列,等待執行。已滿則進入下個流程。 判斷整個線程池是否已滿,即已建立線程數是否小於maximumPoolSize?沒滿則建立一個新的工做線程來執行任務,已滿則交給飽和策略來處理這個任務。

4、優化

4.1 合理地配置線程池

須要針對具體狀況而具體處理,不一樣的任務類別應採用不一樣規模的線程池,任務類別可劃分爲CPU密集型任務、IO密集型任務和混合型任務。

對於CPU密集型任務:線程池中線程個數應儘可能少,不該大於CPU核心數; 對於IO密集型任務:因爲IO操做速度遠低於CPU速度,那麼在運行這類任務時,CPU絕大多數時間處於空閒狀態,那麼線程池能夠配置儘可能多些的線程,以提升CPU利用率; 對於混合型任務:能夠拆分爲CPU密集型任務和IO密集型任務,當這兩類任務執行時間相差無幾時,經過拆分再執行的吞吐率高於串行執行的吞吐率,但若這兩類任務執行時間有數據級的差距,那麼沒有拆分的意義。 4.2 線程池監控

利用線程池提供的參數進行監控,參數以下:

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:線程池曾經建立過的最大線程數量,經過這個數據能夠知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。若是線程池不銷燬的話,池裏的線程不會自動銷燬,因此這個大小隻增不減。
  • getActiveCount:獲取活動的線程數。
  • 經過擴展線程池進行監控:繼承線程池並重寫線程池的beforeExecute(),afterExecute()和terminated()方法,能夠在任務執行前、後和線程池關閉前自定義行爲。如監控任務的平均執行時間,最大執行時間和最小執行時間等。
5、參考資料

Java線程池源碼剖析(ThreadPoolExecutor)

線程池裏的大學問:分析Java線程池的建立

聊聊併發(三)Java線程池的分析和使用

相關文章
相關標籤/搜索