最近在項目中遇到一個須要用線程池來處理任務的需求,因而我用ThreadPoolExecutor
來實現,可是在實現過程當中我發現提交大量任務時它的處理邏輯是這樣的(提交任務還有一個submit
方法內部也調用了execute
方法):java
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
註釋中已經寫的很是明白:瀏覽器
corePoolSize
,直接建立新線程處理任務corePoolSize
,嘗試將任務放到等待隊列裏maximumPoolSIze > corePoolSize
)可是在個人項目中一個線程啓動須要10s左右的時間(須要啓動一個瀏覽器對象),所以我但願實現一個更精細的邏輯提高資源的利用率:安全
corePoolSize
個線程確保有新任務到來時能夠當即獲得執行threshold
時,說明堆積的任務已經太多了,這個時候開始建立非核心線程直到線程數量已經等於maximumPoolSize
maximumPoolSize
,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)當我研究了常見的CachedThreadPool
、FixedThreadPool
以及嘗試本身配置ThreadPoolExecutor
的構造函數後,發現不管如何都不能實現上面提到的邏輯,由於默認的實現只有在workQueue
達到容量上限後纔會開始建立非核心線程,所以須要經過繼承的方法實現一個新的類來完成需求。多線程
怎麼實如今workQueue
到達容量上限前就建立非核心線程?還要回顧下execute
函數的代碼ide
//嘗試將任務插入等待隊列,若是返回false //說明隊列已經到達容量上限,進入else if邏輯 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //嘗試建立非核心線程 else if (!addWorker(command, false)) reject(command);
那麼只要改變workQueue.offer()
的邏輯,在線程數量還小於maximumPoolSize
的時候就返回false拒絕插入,讓線程池調用addWoker
,等不能再建立更多線程時再容許添加到隊列便可。函數
能夠經過子類重寫offer
方法來實現添加邏輯的改變oop
@Override public boolean offer(E e) { if (threadPoolExecutor == null) { throw new NullPointerException(); } //當調用該方法時,已經肯定了workerCountOf(c) > corePoolSize //當數量小於threshold,在隊列裏等待 if (size() < threshold) { return super.offer(e); //當數量大於等於threshold,說明堆積的任務太多,返回false //讓線程池來建立新線程處理 } else { //此處可能會由於多線程致使錯誤的拒絕 if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; //線程池中的線程數量已經到達上限,只能添加到任務隊列中 } else { return super.offer(e); } } }
這樣就實現了基本實現了我須要的功能,可是在寫代碼的過程當中我找到了一個可能出錯的地方:ThreadPoolExecutor
是線程安全的,那麼重寫的offer
方法也可能遇到多線程調用的狀況測試
//設想當poolSize = maximumPoolSize-1時,兩個任務到達此處同時返回false if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; }
因爲添加到隊列返回false
,execute
方法進入到else if (!addWorker(command, false))
ui
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //添加到隊列失敗後進入addWorker方法中 else if (!addWorker(command, false)) reject(command); }
再來看一下addWorker
方法的代碼,這裏只截取須要的一部分this
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //兩個線程都認爲還能夠建立再建立一個新線程 wc >= (core ? corePoolSize : maximumPoolSize)) return false; //兩個線程同時調用cas方法只有一個可以成功 //成功的線程break retry;進入後面的建立線程的邏輯 //失敗的線程從新回到上面的檢查並返回false if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop }
最終,在競爭中失敗的線程因爲addWorker
方法返回了false
最終調用了reject(command)
。在前面寫的要實現的邏輯裏提到了,只有在等待隊列容量達到上限沒法再插入時才拒絕任務,可是因爲多線程的緣由,這裏只是超過了threshold
但沒有超過capacity
的時候就拒絕任務了,因此要對拒絕策略的觸發作出修改:第一次觸發Reject
時,嘗試從新添加到任務隊列中(不進行poolSize
的檢測),若是仍然不能添加,再拒絕任務。
這裏經過對execute
方法進行重寫來實現重試
@Override public void execute(Runnable command) { try { super.execute(command); } catch (RejectedExecutionException e) { /* 這裏參考源碼中將任務添加到任務隊列的實現 可是其中經過(workerCountOf(recheck) == 0) 檢查當任務添加到隊列後是否還有線程存活的部分 因爲是private權限的,沒法實現相似的邏輯,所以須要作必定的特殊處理 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } */ if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) { if (this.isShutdown() && remove(command)) //二次檢查 realRejectedExecutionHandler.rejectedExecution(command, this); } else { //插入失敗,隊列已經滿了 realRejectedExecutionHandler.rejectedExecution(command, this); } } } }
這裏有兩個小問題:
RejectedExecutionHandler
不必定會拋出異常(事實上,ThreadPoolExecutor
本身實現的4中拒絕策略中只有AbortPolicy
可以拋出異常並被捕捉到),所以須要在初始化父類時傳入AbortPolicy
拒絕策略並將構造函數中傳入的自定義拒絕策略保存下來,在重試失敗後才調用本身的rejectedExecution
。corePoolSize = 0
的極端狀況下,可能出現一個任務剛被插入隊列的同時,全部的線程都結束任務而後被銷燬了,此使這個被加入的任務就沒法被執行,在ThreadPoolExecutor
中是經過else if (workerCountOf(recheck) == 0) addWorker(null, false);在添加後再檢查工做線程是否爲0來確保任務能夠被執行,可是其中使用的方法是私有的,沒法在子類中實現相似的邏輯,所以在初始化時只能強制
corePoolSize
至少爲1來解決這個問題。所有代碼以下
public class MyThreadPool extends ThreadPoolExecutor { private RejectedExecutionHandler realRejectedExecutionHandler; public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity, new AbortPolicy()); } public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueCapacity, RejectedExecutionHandler handler) { super(corePoolSize == 0 ? 1 : corePoolSize, maximumPoolSize, keepAliveTime, unit, new MyLinkedBlockingQueue<>(queueCapacity), new AbortPolicy()); ((MyLinkedBlockingQueue)this.getQueue()).setThreadPoolExecutor(this); realRejectedExecutionHandler = handler; } @Override public void execute(Runnable command) { try { super.execute(command); } catch (RejectedExecutionException e) { if (!this.isShutdown() && ((MyLinkedBlockingQueue)this.getQueue()).offerWithoutCheck(command)) { if (this.isShutdown() && remove(command)) //二次檢查 realRejectedExecutionHandler.rejectedExecution(command, this); } else { //插入失敗,隊列已經滿了 realRejectedExecutionHandler.rejectedExecution(command, this); } } } } public class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> { private int threshold = 20; private ThreadPoolExecutor threadPoolExecutor = null; public MyLinkedBlockingQueue(int queueCapacity) { super(queueCapacity); } public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { this.threadPoolExecutor = threadPoolExecutor; } @Override public boolean offer(E e) { if (threadPoolExecutor == null) { throw new NullPointerException(); } //當調用該方法時,已經肯定了workerCountOf(c) > corePoolSize //當數量小於threshold,在隊列裏等待 if (size() < threshold) { return super.offer(e); //當數量大於等於threshold,說明堆積的任務太多,返回false //讓線程池來建立新線程處理 } else { //此處可能會由於多線程致使錯誤的拒絕 if (threadPoolExecutor.getPoolSize() < threadPoolExecutor.getMaximumPoolSize()) { return false; //線程池中的線程數量已經到達上限,只能添加到任務隊列中 } else { return super.offer(e); } } } public boolean offerWithoutCheck(E e) { return super.offer(e); } }
最後進行簡單的測試
corePoolSize:2 maximumPoolSize:5 queueCapacity:10 threshold:7 任務2 線程數量:2 等待隊列大小:0 等待隊列大小小於閾值,繼續等待。 任務3 線程數量:2 等待隊列大小:1 等待隊列大小小於閾值,繼續等待。 任務4 線程數量:2 等待隊列大小:2 等待隊列大小小於閾值,繼續等待。 任務5 線程數量:2 等待隊列大小:3 等待隊列大小小於閾值,繼續等待。 任務6 線程數量:2 等待隊列大小:4 等待隊列大小小於閾值,繼續等待。 任務7 線程數量:2 等待隊列大小:5 等待隊列大小小於閾值,繼續等待。 任務8 線程數量:2 等待隊列大小:6 等待隊列大小小於閾值,繼續等待。 任務9 線程數量:2 等待隊列大小:7 等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。 任務10 線程數量:3 等待隊列大小:7 等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。 任務11 線程數量:4 等待隊列大小:7 等待隊列大小大於等於閾值,線程數量小於MaximumPoolSize,建立新線程處理。 任務12 線程數量:5 等待隊列大小:7 等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。 任務13 線程數量:5 等待隊列大小:8 等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。 任務14 線程數量:5 等待隊列大小:9 等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。 任務15 線程數量:5 等待隊列大小:10 等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。 隊列已滿 任務16 線程數量:5 等待隊列大小:10 等待隊列大小大於等於閾值,但線程數量大於等於MaximumPoolSize,只能添加到隊列中。 隊列已滿
再從新複習一遍要實現的功能:
corePoolSize
個線程確保有新任務到來時能夠當即獲得執行threshold
時,說明堆積的任務已經太多了,這個時候開始建立非核心線程直到線程數量已經等於maximumPoolSize
maximumPoolSize
,再將新來的任務放回到任務隊列中等待(直到隊列滿後開始拒絕任務)能夠看出,線程池運行的邏輯和要實現的目標是相同的。