ThreadPoolExecutor 學習筆記

線程池的奧義

  在開發程序的過程當中,不少時候咱們會遇到遇到批量執行任務的場景,當各個具體任務之間互相獨立並不依賴其餘任務的時候,咱們會考慮使用併發的方式,將各個任務分散到不一樣的線程中進行執行來提升任務的執行效率。java

  咱們會想到爲每一個任務都分配一個線程,可是這樣的作法存在很大的問題:程序員

  一、資源消耗:首先當任務數量龐大的時候,大量線程會佔據大量的系統資源,特別是內存,當線程數量大於CPU可用數量時,空閒線程會浪費形成內存的浪費,並加大GC的壓力,大量的線程甚至會直接致使程序的內存溢出,並且大量線程在競爭CPU的時候會帶來額外的性能開銷。若是CPU已經足夠忙碌,再多的線程不只不會提升性能,反而會下降性能。web

  二、線程生命週期的開銷:線程的建立和銷燬都是有代價的,線程的建立須要時間、延遲處理的請求、須要JVM和操做系統提供一些輔助操做。若是請求特別龐大,而且任務的執行特別輕量級(好比只是計算1+1),那麼對比下來建立和銷燬線程代價就太昂貴了。面試

  三、穩定性:如資源消耗中所說若是程序由於大量的線程拋出OutOfMemoryEorror,會致使程序極大的不穩定。編程

  

  既然爲每一個任務分配一個線程的作法已經不可行,咱們考慮的代替方法中就必須考慮到,一、線程不能不能無限制建立,數量必須有一個合適的上限。二、線程的建立開銷昂貴,那咱們能夠考慮重用這些線程。理所固然,池化技術是一項比較容易想到的替代方案(馬後炮),線程的池化管理就叫線程池緩存

 

線程池族譜

  ThreadPoolExecutor的關係圖簡單以下。併發

  簡單介紹一些Executor、ExecutorService、AbstractExectorService。函數

  Executor接口比較簡單:性能

1 public interface Executor {
2     void execute(Runnable command);
3 }

  該接口只有一個方法,即任務的執行。學習

  ExecutorService在Executor接口上,添加了管理生命週期的方法、支持了Callable類型的任務、任務的執行方式。

  AbstractExecutorService是一個抽象類,實現了ExecutorService的任務執行方法,添加newTaskFor方法做爲鉤子對外提供任務的取消通道,可是AbstractExecutorService並無實現生命週期管理相關的方法,而是將生命週期相關的操做丟給了子類。

線程池奮鬥的一輩子

線程池的出生:

   線程池有多種構造器,參數最完整的構造器以下:

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.corePoolSize = corePoolSize;
16         this.maximumPoolSize = maximumPoolSize;
17         this.workQueue = workQueue;
18         this.keepAliveTime = unit.toNanos(keepAliveTime);
19         this.threadFactory = threadFactory;
20         this.handler = handler;
21     }

  corePoolSize:核心線程數量。當對線程池中空閒線程進行回收的時候。假設線程池中線程的數量小於corePoolSize,則不會對線程進行回收。若是線程由於異常緣由退出,若是線程退出後線程池的線程數量小於corePoolSize,則會對線程池添加一個線程。

  maximumPoolSize:線程池的最大大小。當線程池中任務已經溢出,若是線程數量已經等於maximunPoolSize,線程池也不會在添加線程。

  keepAliveTime:線程的空閒時間。若是線程池的線程數量已經大於corePoolSize,當線程空閒時間超過空閒時間,則該線程會被回收。

  unit:線程空閒時間的時間單位。能夠選擇納秒、微秒、毫秒、秒、分、小時、天爲單位。

  workQueue:工做隊列。用於存儲交付給線程池的任務。能夠選擇BlockingQueue的實現類來充當線程池的工做隊列,newFixThreadExecutor和newSingleThreadExecutor默認採用的是無界的LinkedBlockingQueue來充當工做隊列。更爲穩妥的方式是選擇一種有界的工做隊列來存儲。例若有界的LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue來充當消息隊列,防止由於任務無止境的堆積致使內存溢出。newCachedThreadPool使用的是SynchronousQueue來充當隊列,SynchronousQueue不是一個真正的消息隊列,而已一個任務在線程正當中的移交機制。通常只有在線程池能夠無限大,或者線程池能夠拒絕任務的狀況下使用SynchronousQueue。

  threadFactory:線程工廠。每當線程池須要建立一個線程時,能夠經過線程的工廠的new Thread方法來建立線程。能夠經過自定義一個ThreadFactory來實現對線程的定製。

  handler:拒絕機制。當線程池由於工做池已經飽和,準備拒絕任務時候。會調用RejectedExecutionHandler來拒絕該任務。Jdk提供了幾種不一樣的RejectedExecutionHandler實現,每種實現都包含不一樣的飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

  • Abort是默認的飽和策略,該策略會拋出未檢查的RejectedExecutionException。
  • CallerRuns實現一種調節機制,將任務回退到調用者,讓調用者執行,從而下降了新任務的流量。webServer經過使用該策略使得在請求負載太高的狀況下實現了性能的平緩下降。
  • Discard實現了會悄悄拋棄該任務,DiscardOldestPolicy會拋棄隊列中拋棄下一個即將被執行的任務。若是是在優先隊列裏,DiscardOldestPolicy會拋棄優先級最高的任務。

  

  ThreadLocalPool的池的大小設置,《Java併發編程實戰》書中給了一個推薦的設置值。

  Ncpu爲CPU的數量,Ucpu爲CPU的利用率,W/C爲任務的等待時間 / 任務的計算時間。在這種狀況下,通常線程池的最優大小:

 

N=Ncpu*Ucpu*(1+W/C)

  線程池建立也可使用Executors來建立:

  newFixedThreadPool:建立一個固定長度的線程池,每當提交一個任務就建立一個線程,直到達到最大線程數。若是由於異常致使未預期的異常結束。線程池將補充一個線程。

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

  newCacheThreadPool:建立一個可緩存的線程池。該線程池核心線程數爲0,最大線程爲Integer.max_value。能夠理解爲該線程池規模沒有任何限制。

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

  newScheduledThreadPool:建立一個固定長度的線程池,已延遲或者定時方式來執行任務,相似於Timer。

1     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2         return new ScheduledThreadPoolExecutor(corePoolSize);
3     }

  newSingleThreadExecutor:建立一個單線程的Executor來執行任務,能確保線程的執行順序,例如FIFO、LIFO、優先順序等。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

  newWorkStealingPool:根據給定的並行等級,建立一個擁有足夠的線程數目的線程池。

1     public static ExecutorService newWorkStealingPool(int parallelism) {
2         return new ForkJoinPool
3             (parallelism,
4              ForkJoinPool.defaultForkJoinWorkerThreadFactory,
5              null, true);
6     }

  

線程池的人生起落:

  ThreadPoolExecutor中有一個ctl變量。ctl是一個32位的二級制數,其中高3位用於表示線程池的狀態,低29位表示線程池中的活動線程。

 1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     private static final int COUNT_BITS = Integer.SIZE - 3;
 3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 4 
 5 
 6     private static final int RUNNING    = -1 << COUNT_BITS;
 7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 8     private static final int STOP       =  1 << COUNT_BITS;
 9     private static final int TIDYING    =  2 << COUNT_BITS;
10     private static final int TERMINATED =  3 << COUNT_BITS;

  如上代碼所示,線程池有五種狀態。RUNNING、SHUTDOWN、STOP、TIDYING、TERMINNATED。幸虧ThreadPoolExecutor的代碼上有對應註釋,看着這些註釋能對ThreadPoolExecutor的狀態做用和狀態流轉能有一個大體的瞭解。

  RUNNING:在線程池建立的時候,線程池默認處於RUNNING狀態。當線程池處於RUNNING狀態的時候,任務隊列能夠接受任務,而且能夠執行QUEUE中任務。

  SHUTDOWN:不接受新任務,可是會繼續執行QUEUE中的任務。

  STOP:不接受新任務,也不執行QUEUE中的任務。

  TIDYING:全部的任務都停止了,沒有活動中的線程。當線程池進行該狀態時候,會執行鉤子方法terminated() 。

  如下是各個狀態對應的流轉圖:

  

線程池的壽終正寢:

  上面有說過,ExecutorService在Executor接口上,添加了管理生命週期的方法。在ThreadPoolExecutor中,主要關閉動做有三個shutdown()、shutdownNow()、awaitTermination()。  

  shutdown()是一個平緩的關閉方式,線程池被調用了shutdown函數若是還有事作就會把狀態設爲SHUTDOWN,可是不會真的停止。

 1     public void shutdown() {
 2         final ReentrantLock mainLock = this.mainLock;
 3         mainLock.lock();
 4         try {
 5             //檢查是否有關閉線程的權限
 6             checkShutdownAccess();
 7             //檢查線程池狀態、小於SHUTDOWN的用CAS的方式將線程池狀態設置爲SHUTDOWN
 8             advanceRunState(SHUTDOWN);
 9             //打斷沒事作的線程
10             interruptIdleWorkers()
11             //這個是ScheduledThreadPoolExecutor中用到的不,ThreadPoolExecutor中是個空的
12             onShutdown(); // hook for ScheduledThreadPoolExecutor
13         } finally {
14             mainLock.unlock();
15         }
16         //嘗試停止,若是還有事作就不會停止
17         tryTerminate();
18     }

  

  shutdownNow()跟shutdown()類似,可是shutdownNow()比起shutdown()更加粗暴。無論線程池中的線程有沒有事作,直接把線程打斷。而且狀態會設置爲STOP。狀態設置爲STOP後也表示無視任務隊列裏面是否是還有任務。shutdownNow()由於會關閉已經開始執行可是還沒有結束的任務,因此使用shutdownNow()的時候若是須要知道每一個任務被放棄時候的狀態,就必須拓展任務,記錄清楚任務中未成功執行完成的任務。

 1     public List<Runnable> shutdownNow() {
 2         List<Runnable> tasks;
 3         final ReentrantLock mainLock = this.mainLock;
 4         mainLock.lock();
 5         try {
 6             checkShutdownAccess();
 7             //檢查線程池狀態、小於SHUTDOWN的用CAS的方式將線程池狀態設置爲SHUTDOWN
 8             advanceRunState(STOP);
 9             //強行打斷
10             interruptWorkers();
11             tasks = drainQueue();
12         } finally {
13             mainLock.unlock();
14         }
15         tryTerminate();
16         return tasks;
17     }

  

  awaitTermination(long timeout, TimeUnit unit)方法,用於進行等待,假設傳入時間爲60s,若是60s以後ThreadPoolExecutor狀態變爲TERMINATED,則返回ture,若是狀態不爲TERMINATED,則會返回false。一般調用玩shutdown()後會使用awaitTermination方法進行等待,確認線程池已經停止。

 1     public boolean awaitTermination(long timeout, TimeUnit unit)
 2         throws InterruptedException {
 3         long nanos = unit.toNanos(timeout);
 4         final ReentrantLock mainLock = this.mainLock;
 5         mainLock.lock();
 6         try {
 7             for (;;) {
 8                 if (runStateAtLeast(ctl.get(), TERMINATED))
 9                     return true;
10                 if (nanos <= 0)
11                     return false;
12                 nanos = termination.awaitNanos(nanos);
13             }
14         } finally {
15             mainLock.unlock();
16         }
17     }

  以上幾個方法能夠對線程池的狀態進行操做。線程池還提供了isShutdown(),isTerminating(),isTerminated()對線程池的狀態進行查詢。

    

線程池核心工做方法

  當咱們要將一個任務提交給線程池時,通常調用的線程池的execute(Runnable command)方法。簡單分析一下這個方法:

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         int c = ctl.get();
 5         //如何活動線程數量小於核心線程數量,則添加線程來處理該任務
 6         if (workerCountOf(c) < corePoolSize) {
 7             if (addWorker(command, true))
 8                 return;
 9             c = ctl.get();
10         }
11         //若是線程池在running狀態,而且往任務隊列裏推送任務成功:
12         if (isRunning(c) && workQueue.offer(command)) {
13             int recheck = ctl.get();
14             //二次檢查線程池已經關閉,任務隊列刪除任務,並拒絕任務
15             if (! isRunning(recheck) && remove(command))
16                 reject(command);
17             //若是工做線程數爲0,由於只有當corePoolSize==0的狀況下才能走到這裏,則此時添加一個非核心的工做者
18             else if (workerCountOf(recheck) == 0)
19                 addWorker(null, false);
20         }
21         //走到這邊,表示任務推送失敗或者線程池已經關閉,添加工做線程,若是線程池已經關閉會返回false,則拒絕該任務
22         else if (!addWorker(command, false))
23             reject(command);
24     }

 

 

  咱們從這裏能夠看出來,當線程池中的活動線程大於或等於核心線程的時候,線程池是不會立刻建立新的線程來執行任務的。只有線程池在任務隊列中推送任務失敗(任務隊列已經滿了)的時候纔會建立額外的線程來執行任務。若是線程池已經關閉,或者任務隊列和工做者已經滿了的時候,線程池會開始拒絕任務。reject(command)會用上面說過的RejectedExecutionHandler來對任務進行拒絕。 

 

  這裏的Worker是ThreadPoolExecutor的內部類,封裝Thread類。它的核心方法也就是run()方法。咱們來看一下Worker的run()方法,run()方法就是runWork()方法封裝一下。這裏的This值的是Worker本身。

1         public void run() {
2             runWorker(this);
3         }

 

  

  這個是工做者的工做方法。

 1     final void runWorker(ThreadPoolExecutor.Worker w) {
 2         Thread wt = Thread.currentThread();
 3         Runnable task = w.firstTask;
 4         w.firstTask = null;
 5         w.unlock(); // allow interrupts
 6         boolean completedAbruptly = true;
 7         try {
 8             //循環獲取任務,getTask()會阻塞的從任務隊列裏拿任務,
 9             while (task != null || (task = getTask()) != null) {
10                 w.lock();
11                 //判斷線程池和線程的狀態,是能夠繼續執行任務的
12                 if ((runStateAtLeast(ctl.get(), STOP) ||
13                         (Thread.interrupted() &&
14                                 runStateAtLeast(ctl.get(), STOP))) &&
15                         !wt.isInterrupted())
16                     wt.interrupt();
17                 try {
18                     //可拓展接口,任務執行前的動做
19                     beforeExecute(wt, task);
20                     Throwable thrown = null;
21                     try {
22                         //任務執行沒啥好說
23                         task.run();
24                     } catch (RuntimeException x) {
25                         thrown = x; throw x;
26                     } catch (Error x) {
27                         thrown = x; throw x;
28                     } catch (Throwable x) {
29                         thrown = x; throw new Error(x);
30                     } finally {
31                         //可拓展接口,任務執行前的動做
32                         afterExecute(task, thrown);
33                     }
34                 } finally {
35                     task = null;
36                     w.completedTasks++;
37                     w.unlock();
38                 }
39             }
40             completedAbruptly = false;
41         } finally {
42             //任務退出循環,根據是異常退出仍是正常退出進行收尾
43             //對工做任務進行回收也在這裏
44             processWorkerExit(w, completedAbruptly);
45         }
46     }

  從隊列中獲取任務。

 1     private Runnable getTask() {
 2         boolean timedOut = false; // Did the last poll() time out?
 3 
 4         for (;;) {
 5             int c = ctl.get();
 6             int rs = runStateOf(c);
 7 
 8             // 檢查線程池狀態和隊列是否爲空,若是沒任務可搞直接返回
 9             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
10                 decrementWorkerCount();
11                 return null;
12             }
13 
14             int wc = workerCountOf(c);
15 
16             //判斷線程池是否須要提出線程
17             // timed參數用於判斷是否須要根據超時時間回收線程,
18             //若是容許核心線程回收或者線程數已經超過核心線程數,則爲ture
19             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
20 
21             //工做者太多或者已經超時則幹掉
22             if ((wc > maximumPoolSize || (timed && timedOut))
23                     && (wc > 1 || workQueue.isEmpty())) {
24                 if (compareAndDecrementWorkerCount(c))
25                     return null;
26                 continue;
27             }
28 
29             try {
30                 //根據上面的判斷,讓工做者線程阻塞讀取直到被打斷或者超時返回
31                 Runnable r = timed ?
32                         workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
33                         workQueue.take();
34                 if (r != null)
35                     return r;
36                 timedOut = true;
37             } catch (InterruptedException retry) {
38                 timedOut = false;
39             }
40         }
41     }

  

  線程工做者退出。

 1     private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
 2         //若是不是由於異常緣由致使線程退出,則不要進行Worker數量調整
 3         if (completedAbruptly) 
 4             decrementWorkerCount();
 5 
 6         final ReentrantLock mainLock = this.mainLock;
 7         mainLock.lock();
 8         try {
 9             completedTaskCount += w.completedTasks;
10             //從工做隊列中刪除,讓JVM能夠對Worker進行回收
11             workers.remove(w);
12         } finally {
13             mainLock.unlock();
14         }
15         //嘗試停止線程池
16         tryTerminate();
17 
18         int c = ctl.get();
19         
20         //線程池若是還在跑,線程異常退出,須要補充工做者,就對工做者進行補充。
21         if (runStateLessThan(c, STOP)) {
22             if (!completedAbruptly) {
23                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
24                 if (min == 0 && ! workQueue.isEmpty())
25                     min = 1;
26                 if (workerCountOf(c) >= min)
27                     return; // replacement not needed
28             }
29             addWorker(null, false);
30         }
31     }

 

  上面的代碼我就不細講, 主要的流程就寫了註釋在上面。當年第一次據說線程池會回收空閒線程的時候就會好奇這個操做是怎麼搞的,上面代碼的workqueue.poll()就是關鍵,當線程能夠回收,而且線程阻塞已經超時,則進行線程回收。

 

  

 

後記:

  寫這篇博客的時候,心情比較煩躁。遵從朋友建議惡搞部分標題名,果真心情好不少。之後能夠考慮在內容沒歧義的前提下,文章部分也這麼寫。線程池在java中算是比較基礎的內容,入行以來面試也被面了很多,可是一直沒看過源碼,最近看了一下發現確實學習到了很多東西,部分看懂了,部分由於水平不夠沒看懂的東西,部分看明白後有種還能夠這麼寫的感慨。果真JAVA程序員要多看看JDK源碼。

相關文章
相關標籤/搜索