【JUC】如何理解線程池?第四種使用線程的方式

線程池的概念

線程池的主要工做的控制運行的線程的數量,處理過程種將任務放在隊列,線程建立後再啓動折現任務,若是線程數量超過了最大的數量,則超過部分的線程排隊等待,直到其餘線程執行完畢後,從隊列種取出任務來執行。小程序

處理流程:數組

1.線程池判斷核心線程池的線程是否所有在執行任務?緩存

  1.1 不是:建立一個新的工做線程執行任務。服務器

  1.2 是:網絡

    2. 線程池判斷工做隊列是否已經滿了?多線程

      2.1 沒有滿:將新提交的任務存儲在工做隊列中。併發

      2.2 滿了:框架

        3. 線程池判斷線程池的線程是否都在工做?異步

          3.1 是:交由飽和策略來處理這個任務。ide

          3.2 不是:建立一個新的工做線程來執行任務。       

特色:線程複用、控制最大併發數、管理線程。

線程池的優點

1. 下降資源消耗,經過重複利用已經建立的線程,下降了線程建立和銷燬產生的消耗。

2. 提升響應速度,任務到達時,任務不須要等待線程建立就能當即執行。

3. 提升線程的可管理性,線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行同一的分配、調優和監控。

線程池的使用

 Java線程池是經過Executor框架實現的,該框架中用到了Executor、Executors、ExecutorService和ThreadPoolExecutor類。

 

 

 

 

 

 具體使用示例:

 1     public static void fixedThreadPool() {
 2         ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定線程
 3         try {
 4             for (int i = 0; i < 10; i++) {
 5                 threadPool.execute(()->{
 6                     System.out.println(Thread.currentThread().getName());
 7                 });
 8             }
 9         }catch (Exception e){
10             e.printStackTrace();
11         }finally {
12             threadPool.shutdown();
13         }
14     }

輸出結果

pool-1-thread-2
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-1
pool-1-thread-3
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
View Code

線程池的源碼及重要參數

Executors.newFixedThreadPool(int)

固定線程數,適用場景:執行長期任務,性能好。

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }

Executors.newSingleThreadExecutor()

一池一個線程,使用場景:一個任務接一個任務執行的時候。

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }

Executors.newCachedThreadPool()

N個線程,帶緩存,適用場景:執行不少短時間異步的小程序或者負載較輕的服務器。

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }

ThreadPoolExecutor

ThreadPoolExecutor的執行示意圖:

 

 

1. corePoolSize:線程池中的常駐核心線程數

2. maximumPoolSize:線程池可以容納同時執行的最大線程數,必須大於等於1【擴容的上限】。若是工做隊列滿了,core也滿了的時候,線程池會擴容,直到達到maximumPoolSize(新來的任務會直接搶佔擴容線程,不進入工做隊列,工做隊列中的任務繼續等待)。

 1     public static void main(String[] args) {
 2         ExecutorService threadPool = new ThreadPoolExecutor(
 3                 2, //corePoolSize
 4                 5, //maximumPoolSize
 5                 100L, //keepAliveTime
 6                 TimeUnit.SECONDS,
 7                 new LinkedBlockingDeque<>(3),
 8                 Executors.defaultThreadFactory(),
 9                 new ThreadPoolExecutor.AbortPolicy());//N個線程帶緩存
10         try {
11             for (int i = 1; i <= 6; i++) {
12                 final int tmp = i;
13                 threadPool.execute(()->{
14                     System.out.println(Thread.currentThread().getName()+"線程"+",執行任務"+tmp);
15                     try {
16                         TimeUnit.SECONDS.sleep(4);
17                     } catch (InterruptedException e) {
18                         e.printStackTrace();
19                     }
20                 });
21             }
22         }catch (Exception e){
23             e.printStackTrace();
24         }finally {
25             threadPool.shutdown();
26         }
27     }

輸出結果:

pool-1-thread-2線程,執行任務2
pool-1-thread-3線程,執行任務6
pool-1-thread-1線程,執行任務1
pool-1-thread-3線程,執行任務3
pool-1-thread-2線程,執行任務4
pool-1-thread-1線程,執行任務5
View Code

當線程池中有2個核心線程時,線程1和2正在執行任務1和2,任務三、四、5在工做隊列中等候,此時工做隊列滿了,core也滿了的時候,且core< maximumPoolSize,任務6的出現引發線程池的擴容,任務6在三、四、5執行任務前進行了搶佔。因此從輸出結果能夠看出新來的任務會直接搶佔新擴容的線程。

3. keepAliveTime:多餘的空閒線程的存活時間。當前線程數超過corePoolSize的時候,空閒時間達到keepAliveTime時,多餘的空閒線程會被銷燬直到剩下corePoolSize的數量的線程。

4. unit:keepAliveTime的單位

5. workQueue:(阻塞隊列)工做隊列,任務等待區,被提交可是沒有被執行的任務。

6. threadFactory:線程工廠,用於建立線程,通常用默認便可

7. handler:拒絕策略。當隊列滿了而且工做線程大於線程池的最大線程數的時候觸發拒絕策略。

 

5個參數的構造函數

1     public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize, 3 long keepAliveTime, 4  TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7  Executors.defaultThreadFactory(), defaultHandler); 8 }

7個參數的構造函數

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7  RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.acc = System.getSecurityManager() == null ?
16                 null :
17                 AccessController.getContext();
18         this.corePoolSize = corePoolSize;
19         this.maximumPoolSize = maximumPoolSize;
20         this.workQueue = workQueue;
21         this.keepAliveTime = unit.toNanos(keepAliveTime);
22         this.threadFactory = threadFactory;
23         this.handler = handler;
24     }

線程池的底層工做原理及源碼

ThreadPoolExecutor執行execute方法分下面4種狀況。

1)若是當前運行的線程少於corePoolSize,則建立新線程來執行任務(注意,執行這一步驟須要獲取全局鎖)。

2)若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。

3)若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟須要獲取全局鎖)。

4)若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

 

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行execute()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

 

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         int c = ctl.get();
 5         //若是線程數小於核心線程數,建立線程執行任務
 6         if (workerCountOf(c) < corePoolSize) {
 7             if (addWorker(command, true))
 8                 return;
 9             c = ctl.get();
10         }
11         //若是線程數大於等於核心線程數或線程建立失敗,當前任務加入工做隊列
12         if (isRunning(c) && workQueue.offer(command)) {
13             int recheck = ctl.get();
14             if (! isRunning(recheck) && remove(command))
15                 reject(command);
16             else if (workerCountOf(recheck) == 0)
17                 addWorker(null, false);
18         }
19         //若是線程數不處於運行中或人物失效沒法放入隊列,
20         //且當前線程數量小於最大容許的線程數,則建立一個線程執行任務
21         else if (!addWorker(command, false))
22             reject(command);
23     }

 

工做線程:線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會循環獲取工做隊列裏的任務來執行。咱們能夠從Worker類的run()方法裏看到。

 1     public void run() {
 2         runWorker(this);
 3     }
 4 
 5     final void runWorker(Worker w) {
 6         Thread wt = Thread.currentThread();
 7         Runnable task = w.firstTask;
 8         w.firstTask = null;
 9         w.unlock(); // allow interrupts
10         boolean completedAbruptly = true;
11         try {//循環獲取工做隊列裏的任務執行
12             while (task != null || (task = getTask()) != null) {
13                 w.lock();
14                 // If pool is stopping, ensure thread is interrupted;
15                 // if not, ensure thread is not interrupted.  This
16                 // requires a recheck in second case to deal with
17                 // shutdownNow race while clearing interrupt
18                 if ((runStateAtLeast(ctl.get(), STOP) ||
19                      (Thread.interrupted() &&
20                       runStateAtLeast(ctl.get(), STOP))) &&
21                     !wt.isInterrupted())
22                     wt.interrupt();
23                 try {
24                     beforeExecute(wt, task);
25                     Throwable thrown = null;
26                     try {
27                         task.run();
28                     } catch (RuntimeException x) {
29                         thrown = x; throw x;
30                     } catch (Error x) {
31                         thrown = x; throw x;
32                     } catch (Throwable x) {
33                         thrown = x; throw new Error(x);
34                     } finally {
35                         afterExecute(task, thrown);
36                     }
37                 } finally {
38                     task = null;
39                     w.completedTasks++;
40                     w.unlock();
41                 }
42             }
43             completedAbruptly = false;
44         } finally {
45             processWorkerExit(w, completedAbruptly);
46         }
47     }

線程池的拒絕策略(RejectedExecutionHandler)

當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供瞭如下4種策略。
1. AbortPolicy:直接拋出異常。RejectedExecutionException
2. CallerRunsPolicy:只用調用者所在線程來運行任務。不拋棄任務,也不拋出異常,將任務回退到調用者。

例如:任務數 > maximumPoolSize+Queue.capacity=8的時候拒絕任務9和10,任務回退給調用者,示例中的調用者就是main線程。

pool-1-thread-1線程,執行任務1
main線程,執行任務9
pool-1-thread-3線程,執行任務6
pool-1-thread-2線程,執行任務2
pool-1-thread-5線程,執行任務8
pool-1-thread-4線程,執行任務7
main線程,執行任務10
pool-1-thread-3線程,執行任務3
pool-1-thread-1線程,執行任務5
pool-1-thread-5線程,執行任務4

3. DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
4. DiscardPolicy:不處理,丟棄掉。

任務隊列(runnableTaskQueue)

1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量一般要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3. SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。

在實際開發中選擇那種線程池?

三種:單一/固定數/可變,都不能用。爲何不用?

在實際的開發中線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。

由於不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者過分切換的問題。

線程池不容許適用Executors去建立,而是經過ThreadPoolExecutor的方式,能夠避免資源耗盡的風險。

Executors中的線程池對象存在的問題:

1. FixedThreadPool和SingleThreadPool:容許請求隊列的長度爲Integer.MAX_VALUE,可能會堆積大量請求,致使OOM

2. CachedThreadPool和ScheduledThreadPool:容許建立線程數量爲Integer.MAX_VALUE,可能會建立大量請求,致使OOM

因此應該選擇自定義線程池。

如何配置自定義的線程池參數?

首先查詢服務器是幾核的?Runtime.getRuntime().availableProcessors();

1. CPU密集型

任務須要大量的運算,而沒有阻塞,CPU一直全速運行,CPU密集任務只有在真正的多核CPU上纔可能獲得加速。(經過多線程)

應該配置儘量少的線程數量:CPU核數+1個線程的線程池

2. IO密集型

IO密集型任務並非一直執行任務,則應配置儘量多的線程,

CPU核數 * 2

CPU核數 / 1 - 阻塞係數(0.8~0.9)

補充:CPU密集 & IO密集

CPU密集型,又稱計算密集型任務。它的特色是要進行大量的計算,消耗CPU資源,好比計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也能夠用多任務完成,可是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,因此,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。計算密集型任務因爲主要消耗CPU資源,所以,代碼運行效率相當重要。

IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特色是CPU消耗不多,任務的大部分時間都在等待IO操做完成(由於IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,好比Web應用。IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間不多。

相關文章
相關標籤/搜索