Java線程框架_Executor

Executor 框架是 juc 裏提供的線程池的實現。前兩天看了下 Executor 框架的一些源碼,作個簡單的總結。java

線程池大概的思路是維護一個的線程池用於執行提交的任務。我理解池的技術的主要意義有兩個:數據庫

  1. 1.    資源的控制,如併發量限制。像鏈接池這種是對數據庫資源的保護。編程

  2. 2.    資源的有效利用,如線程複用,避免頻繁建立線程和線程上下文切換。併發

那麼想象中設計一個線程池就須要有線程池大小、線程生命週期管理、等待隊列等等功能,下面結合代碼看看原理。框架

Excutor 總體結構以下:
this

Executor 接口定義了最基本的 execute 方法,用於接收用戶提交任務。 ExecutorService 定義了線程池終止和建立及提交 futureTask 任務支持的方法。spa

AbstractExecutorService 是抽象類,主要實現了 ExecutorServicefutureTask 相關的一些任務建立和提交的方法。線程

ThreadPoolExecutor 是最核心的一個類,是線程池的內部實現。線程池的功能都在這裏實現了,平時用的最多的基本就是這個了。其源碼很精練,遠沒當時想象的多。設計

ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎上提供了支持定時調度的功能。線程任務能夠在必定延時時間後才被觸發執行。代理

1.ThreadPoolExecutor 原理  

1.1 ThreadPoolExecutor內部的幾個重要屬性    

   

   

1.線程池自己的狀態    

Java代碼  收藏代碼

  1. volatile int runState;   

  2. static final int RUNNING = 0;   

  3. static final int SHUTDOWN = 1;   

  4. static final int STOP = 2;   

  5. static final int TERMINATED = 3;   

 

2.等待任務隊列和工做集

Java代碼  收藏代碼

  1. private final BlockingQueue<Runnable> workQueue; //等待被執行的Runnable任務   

  2. private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被執行的Worker任務集   


3.線程池的主要狀態鎖。線程池內部的狀態變化 ( 如線程大小 ) 都須要基於此鎖。

Java代碼  收藏代碼

  1. private final ReentrantLock mainLock = new ReentrantLock();  

 

4.線程的存活時間和大小

Java代碼  收藏代碼

  1. private volatile long keepAliveTime;// 線程存活時間   

  2. private volatile boolean allowCoreThreadTimeOut;// 是否容許核心線程存活   

  3. private volatile int corePoolSize;// 核心池大小   

  4. private volatile int maximumPoolSize; // 最大池大小   

  5. private volatile int poolSize; //當前池大小   

  6. private int largestPoolSize; //最大池大小,區別於maximumPoolSize,是用於記錄線程池曾經達到過的最大併發,理論上小於等於maximumPoolSize。   

 

5.線程工廠和拒絕策略

Java代碼  收藏代碼

  1. private volatile RejectedExecutionHandler handler;// 拒絕策略,用於當線程池沒法承載新線程是的處理策略。  

  2.  private volatile ThreadFactory threadFactory;// 線程工廠,用於在線程池須要新建立線程的時候建立線程  

 

6.線程池完成任務數

Java代碼  收藏代碼

  1. private long completedTaskCount;//線程池運行到當前完成的任務數總和  

 

1.2 ThreadPoolExecutor 的內部工做原理

有了以上定義好的數據,下面來看看內部是如何實現的 。 Doug Lea 的整個思路總結起來就是  5 句話:

  1. 1.    若是當前池大小 poolSize 小於 corePoolSize ,則建立新線程執行任務。

  2. 2.    若是當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列

  3. 3.    若是當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則建立新線程執行任務。

  4. 4.    若是當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務。

  5. 5.    線程池裏的每一個線程執行完任務後不會馬上退出,而是會去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。

 

下面看看代碼實現 :

線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務提交的入口。  

咱們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實現:

 

Java代碼  收藏代碼

  1. public void execute(Runnable command) {  

  2.         if (command == null)  

  3.             throw new NullPointerException();  

  4.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  

  5.             if (runState == RUNNING && workQueue.offer(command)) {  

  6.                 if (runState != RUNNING || poolSize == 0)  

  7.                     ensureQueuedTaskHandled(command);  

  8.             }  

  9.             else if (!addIfUnderMaximumPoolSize(command))  

  10.                 reject(command); // is shutdown or saturated  

  11.         }  

  12. }  

 

解釋以下:  

當提交一個新的 Runnable 任務:

分支1 :    若是當前池大小小於 corePoolSize, 執行 addIfUnderCorePoolSize(command) , 若是線程池處於運行狀態且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會作以下事情,將 Runnable 任務封裝成 Worker 任務 , 建立新的 Thread ,執行 Worker 任務。若是不知足條件,則返回 false 。代碼以下:

 

Java代碼  收藏代碼

  1.  private boolean addIfUnderCorePoolSize(Runnable firstTask) {  

  2.         Thread t = null;  

  3.         final ReentrantLock mainLock = this.mainLock;  

  4.         mainLock.lock();  

  5.         try {  

  6.             if (poolSize < corePoolSize && runState == RUNNING)  

  7.                 t = addThread(firstTask);  

  8.         } finally {  

  9.             mainLock.unlock();  

  10.         }  

  11.         if (t == null)  

  12.             return false;  

  13.         t.start();  

  14.         return true;  

  15. }  

 

    分支2   若是大於 corePoolSize 或 1 失敗失敗,則:

  •     若是等待隊列未滿,把 Runnable 任務加入到 workQueue 等待隊列

    workQueue .offer(command)

     

  •     如多等待隊列已經滿了,調用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本相似,只不過判斷條件是 poolSize < maximumPoolSize 。若是大於 maximumPoolSize ,則把 Runnable 任務交由 RejectedExecutionHandler 來處理。

   

問題:如何實現線程的複用 ?      

Doug Lea  的實現思路是 線程池裏的每一個線程執行完任務後不馬上退出,而是去檢查下等待隊列裏是否還有線程任務須要執行,若是在 keepAliveTime 裏等不到新的任務了,那麼線程就會退出。這個功能的實現 關鍵在於 Worker  。線程池在執行 Runnable  任務的時候,並不單純把 Runnable  任務交給建立一個 Thread  。而是會把 Runnable  任務封裝成 Worker  任務。

下面看看 Worker  的實現:

 代碼很簡單,能夠看出, worker  裏面包裝了 firstTask  屬性,在構造worker  的時候傳進來的那個 Runnable  任務就是 firstTask  。 同時也實現了Runnable接口,因此是個代理模式,看看代理增長了哪些功能。 關鍵看 woker  的 run  方法:

Java代碼  收藏代碼

  1. public void run() {  

  2.            try {  

  3.                Runnable task = firstTask;  

  4.                firstTask = null;  

  5.                while (task != null || (task = getTask()) != null) {  

  6.                    runTask(task);  

  7.                    task = null;  

  8.                }  

  9.            } finally {  

  10.                workerDone(this);  

  11.            }  

  12.        }  

 能夠看出 worker 的 run 方法是一個循環,第一次循環運行的必然是 firstTask ,在運行完 firstTask 以後,並不會馬上結束,而是會調用 getTask 獲取新的任務( getTask 會從等待隊列裏獲取等待中的任務),若是 keepAliveTime 時間內獲得新任務則繼續執行,得不到新任務則那麼線程纔會退出。這樣就保證了多個任務能夠複用一個線程,而不是每次都建立新任務。 keepAliveTime 的邏輯在哪裏實現的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代碼段:

 

Java代碼  收藏代碼

  1. if (state == SHUTDOWN)  // Help drain queue  

  2.     r = workQueue.poll();  

  3. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  

  4.     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  

  5. else  

  6.     r = workQueue.take();  

2.ThreadFactory 和R ejectedExecutionHandler  

ThreadFactoryRejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 能夠認爲是兩個簡單的擴展點 . ThreadFactory 是建立線程的工廠。  

默認的線程工廠會建立一個帶有「 pool-poolNumber-thread-threadNumber 」爲名字的線程,若是咱們有特別的須要,如線程組命名、優先級等,能夠定製本身的 ThreadFactory

RejectedExecutionHandler 是拒絕的策略。常見有如下幾種:

AbortPolicy :不執行,會拋出 RejectedExecutionException 異常。

CallerRunsPolicy :由調用者(調用線程池的主線程)執行。

DiscardOldestPolicy :拋棄等待隊列中最老的。

DiscardPolicy: 不作任何處理,即拋棄當前任務。

 

3.ScheduledThreadPoolExecutor    

ScheduleThreadPoolExecutor 是對ThreadPoolExecutor的集成。增長了定時觸發線程任務的功能。須要注意     

從內部實現看, ScheduleThreadPoolExecutor  使用的是  corePoolSize   線程和一個無界隊列的固定大小的池,因此調整  maximumPoolSize   沒有效果。無界隊列是一個內部自定義的 DelayedWorkQueue

ScheduleThreadPoolExecutor  線程池接收定時任務的方法是 schedule ,看看內部實現:

 

Java代碼  收藏代碼

  1. public ScheduledFuture<?> schedule(Runnable command,  

  2.                                    long delay,  

  3.                                    TimeUnit unit) {  

  4.     if (command == null || unit == null)  

  5.         throw new NullPointerException();  

  6.     RunnableScheduledFuture<?> t = decorateTask(command,  

  7.         new ScheduledFutureTask<Void>(command, null,  

  8.                                       triggerTime(delay, unit)));  

  9.   

  10.     delayedExecute(t);  

  11.     return t;  

  12. }  

 

       以上代碼會初始化一個 RunnableScheduledFuture 類型的任務 t, 並交給 delayedExecute 方法。 delayedExecute(t) 方法實現以下:

     

Java代碼  收藏代碼

  1.     private void delayedExecute(Runnable command) {  

  2.         if (isShutdown()) {  

  3.             reject(command);  

  4.             return;  

  5.         }  

  6.         if (getPoolSize() < getCorePoolSize())  

  7.             prestartCoreThread();  

  8.   

  9.         super.getQueue().add(command);  

  10. }  

 

 

若是當前線程池大小 poolSize 小於 CorePoolSize ,則建立一個新的線程,注意這裏建立的線程是空的,不會把任務直接交給線程來作,而是把線程任務放到隊列裏。由於任務是要定時觸發的,因此不能直接交給線程去執行。  

問題: 那如何作到定時觸發呢?  

關鍵在於DelayedWorkQueue,它代理了  DelayQueue 。能夠認爲 DelayQueue 是這樣一個隊列(具體能夠去看下源碼,不詳細分析):

  1. 1.         隊列裏的元素按照任務的 delay 時間長短升序排序, delay 時間短的在隊頭, delay 時間長的在隊尾。

  2. 2.                       DelayQueue  裏 FIFO  的獲取一個元素的時候,不會直接返回 head  。可能會阻塞,等到 head  節點到達 delay  時間後才能被獲取。能夠看下 DelayQueue  的 take  方法實現:

 

Java代碼  收藏代碼

  1. public E take() throws InterruptedException {  

  2.     final ReentrantLock lock = this.lock;  

  3.     lock.lockInterruptibly();  

  4.     try {  

  5.         for (;;) {  

  6.             E first = q.peek();  

  7.             if (first == null) {  

  8.                 available.await();  

  9.             } else {  

  10.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  

  11.                 if (delay > 0) {  

  12.                     long tl = available.awaitNanos(delay);//等待delay時間  

  13.                 } else {  

  14.                     E x = q.poll();  

  15.                     assert x != null;  

  16.                     if (q.size() != 0)  

  17.                         available.signalAll(); // wake up other takers  

  18.                     return x;  

  19.                 }  

  20.             }  

  21.         }  

  22.     } finally {  

  23.         lock.unlock();  

  24.     }  

  25. }  

     

 

4.線程池使用策略

經過以上的詳解基本上可以定製出本身須要的策略了,下面簡單介紹下Executors裏面提供的一些常見線程池策略:

1.FixedThreadPool

Java代碼  收藏代碼

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  

  2.     return new ThreadPoolExecutor(nThreads, nThreads,  

  3.                                   0L, TimeUnit.MILLISECONDS,  

  4.                                   new LinkedBlockingQueue<Runnable>());  

  5. }  

 實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。

2.SingleThreadExecutor

Java代碼  收藏代碼

  1. public static ExecutorService newSingleThreadExecutor() {  

  2.     return new FinalizableDelegatedExecutorService  

  3.         (new ThreadPoolExecutor(1, 1,  

  4.                                 0L, TimeUnit.MILLISECONDS,  

  5.                                 new LinkedBlockingQueue<Runnable>()));  

  6. }  

  實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。

3.CachedThreadPool

Java代碼  收藏代碼

  1. public static ExecutorService newCachedThreadPool() {  

  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

  3.                                   60L, TimeUnit.SECONDS,  

  4.                                   new SynchronousQueue<Runnable>());  

  5. }  

 實際上就是個支持keepalivetime時間是60秒(線程空閒存活時間),且corePoolSize爲0,maximumPoolSize無窮大的線程池。

4.SingleThreadScheduledExecutor

Java代碼  收藏代碼

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  

  2.     return new DelegatedScheduledExecutorService  

  3.         (new ScheduledThreadPoolExecutor(1, threadFactory));  

  4. }  

 其實是個corePoolSize爲1的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。

5.ScheduledThreadPool

Java代碼  收藏代碼

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  

  2.     return new ScheduledThreadPoolExecutor(corePoolSize);  

  3. }  

  其實是corePoolSize課設定的ScheduledExecutor。上文說過ScheduledExecutor採用無界等待隊列,因此maximumPoolSize沒有做用。

 

以上還不必定知足你的須要,徹底能夠根據本身須要去定製。


<線程相關  先暫時不整理   等慢慢理解   由於這些比較難因此慢點  參考數據  java併發編程實戰>

相關文章
相關標籤/搜索