來源:https://juejin.im/post/5ce1f3...
前言
原覺得線程池還挺簡單的(平時經常使用,也分析過原理),此次是想本身動手寫一個線程池來更加深刻的瞭解它;但在動手寫的過程當中落地到細節時發現並沒想的那麼容易。結合源碼對比後確實不得不佩服 Doug Lea 。
我以爲大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源碼時都是看一個大概,由於其中涉及到了許多細節處理,還有部分 AQS 的內容,因此想要理清楚具體細節並非那麼容易。
與其挨個分析源碼不如本身實現一個簡版,固然簡版並不意味着功能缺失,須要保證核心邏輯一致。
因此也是本篇文章的目的:java
本身動手寫一個五臟俱全的線程池,同時會了解到線程池的工做原理,以及如何在工做中合理的利用線程池。git
再開始以前建議對線程池不是很熟悉的朋友看看這幾篇:
這裏我截取了部份內容,也許能夠埋個伏筆(坑)。
github
具體請看這兩個連接。segmentfault
如何優雅的使用和理解線程池
線程池中你不容錯過的一些細節安全
因爲篇幅限制,本次可能會分爲上下兩篇。
建立線程池
如今進入正題,新建了一個 CustomThreadPool 類,它的工做原理以下:多線程
簡單來講就是往線程池裏邊丟任務,丟的任務會緩衝到隊列裏;線程池裏存儲的其實就是一個個的 Thread ,他們會一直不停的從剛纔緩衝的隊列裏獲取任務執行。
流程仍是挺簡單。
先來看看咱們這個自創的線程池的效果如何吧:
初始化了一個核心爲三、最大線程數爲五、隊列大小爲 4 的線程池。
先往其中丟了 10 個任務,因爲阻塞隊列的大小爲 4 ,最大線程數爲 5 ,因此因爲隊列裏緩衝不了最終會建立 5 個線程(上限)。
過段時間沒有任務提交後(sleep)則會自動縮容到三個線程(保證不會小於核心線程數)。
構造函數
來看看具體是如何實現的。
下面則是這個線程池的構造函數:架構
會有如下幾個核心參數:併發
miniSize 最小線程數,等效於 ThreadPool 中的核心線程數。
maxSize 最大線程數。
keepAliveTime 線程保活時間。
workQueue 阻塞隊列。
notify 通知接口。函數
大體上都和 ThreadPool 中的參數相同,而且做用也是相似的。
須要注意的是其中初始化了一個 workers 成員變量:post
/** * 存放線程池 */ private volatile Set<Worker> workers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }
複製代碼workers 是最終存放線程池中運行的線程,在 j.u.c 源碼中是一個 HashSet 因此對他全部的操做都是須要加鎖。
我這裏爲了簡便起見就本身定義了一個線程安全的 Set 稱爲 ConcurrentHashSet。
其實原理也很是簡單,和 HashSet 相似也是藉助於 HashMap 來存放數據,利用其 key 不可重複的特性來實現 set ,只是這裏的 HashMap 是用併發安全的 ConcurrentHashMap 來實現的。
這樣就能保證對它的寫入、刪除都是線程安全的。
不過因爲 ConcurrentHashMap 的 size() 函數並不許確,因此我這裏單獨利用了一個 AtomicInteger 來統計容器大小。
建立核心線程
往線程池中丟一個任務的時候其實要作的事情還蠻多的,最重要的事情莫過於建立線程存放到線程池中了。
固然咱們不能無限制的建立線程,否則拿線程池來就沒任何意義了。因而 miniSize maxSize 這兩個參數就有了它的意義。
但這兩個參數再哪一步的時候才起到做用呢?這就是首先須要明確的。點擊免費「領取Java架構資料」
從這個流程圖能夠看出第一步是須要判斷是否大於核心線程數,若是沒有則建立。
結合代碼能夠發如今執行任務的時候會判斷是否大於核心線程數,從而建立線程。
worker.startTask() 執行任務部分放到後面分析。
這裏的 miniSize 因爲會在多線程場景下使用,因此也用 volatile 關鍵字來保證可見性。
隊列緩衝
結合上面的流程圖,第二步天然是要判斷隊列是否能夠存聽任務(是否已滿)。
優先會往隊列裏存放。
上至封頂
一旦寫入失敗則會判斷當前線程池的大小是否大於最大線程數,若是沒有則繼續建立線程執行。
否則則執行會嘗試阻塞寫入隊列(j.u.c 會在這裏執行拒絕策略)
以上的步驟和剛纔那張流程圖是同樣的,這樣你們是否有看出什麼坑嘛?
時刻當心
從上面流程圖的這兩步能夠看出會直接建立新的線程。
這個過程相對於中間直接寫入阻塞隊列的開銷是很是大的,主要有如下兩個緣由:
建立線程會加鎖,雖然說最終用的是 ConcurrentHashMap 的寫入函數,但依然存在加鎖的可能。
會建立新的線程,建立線程還須要調用操做系統的 API 開銷較大。點擊免費「領取Java架構資料」
因此理想狀況下咱們應該避免這兩步,儘可能讓丟入線程池中的任務進入阻塞隊列中。
執行任務
任務是添加進來了,那是如何執行的?
在建立任務的時候提到過 worker.startTask() 函數:
/** * 添加任務,須要加鎖 * @param runnable 任務 */ private void addWorker(Runnable runnable) { Worker worker = new Worker(runnable, true); worker.startTask(); workers.add(worker); }
複製代碼也就是在建立線程執行任務的時候會建立 Worker 對象,利用它的 startTask() 方法來執行任務。
因此先來看看 Worker 對象是長啥樣的:
其實他自己也是一個線程,將接收到須要執行的任務存放到成員變量 task 處。
而其中最爲關鍵的則是執行任務 worker.startTask() 這一步驟。
public void startTask() { thread.start(); }
複製代碼其實就是運行了 worker 線程本身,下面來看 run 方法。
第一步是將建立線程時傳過來的任務執行(task.run),接着會一直不停的從隊列裏獲取任務執行,直到獲取不到新任務了。
任務執行完畢後將內置的計數器 -1 ,方便後面任務所有執行完畢進行通知。
worker 線程獲取不到任務後退出,須要將本身從線程池中釋放掉(workers.remove(this))。
從隊列裏獲取任務
其實 getTask 也是很是關鍵的一個方法,它封裝了從隊列中獲取任務,同時對不須要保活的線程進行回收。
很明顯,核心做用就是從隊列裏獲取任務;但有兩個地方須要注意:
當線程數超過核心線程數時,在獲取任務的時候須要經過保活時間從隊列裏獲取任務;一旦獲取不到任務則隊列確定是空的,這樣返回 null 以後在上文的 run() 中就會退出這個線程;從而達到了回收線程的目的,也就是咱們以前演示的效果
這裏須要加鎖,加鎖的緣由是這裏確定會出現併發狀況,不加鎖會致使 workers.size() > miniSize 條件屢次執行,從而致使線程被所有回收完畢。
關閉線程池
最後來談談線程關閉的事;
仍是以剛纔那段測試代碼爲例,若是提交任務後咱們沒有關閉線程,會發現即使是任務執行完畢後程序也不會退出。
從剛纔的源碼裏其實也很容易看出來,不退出的緣由是 Worker 線程必定還會一直阻塞在 task = workQueue.take(); 處,即使是線程縮容了也不會小於核心線程數。
經過堆棧也能證實:
剛好剩下三個線程阻塞於此處。
而關閉線程一般又有如下兩種:
當即關閉:執行關閉方法後無論如今線程池的運行情況,直接一刀切所有停掉,這樣會致使任務丟失。
不接受新的任務,同時等待現有任務執行完畢後退出線程池。
當即關閉
咱們先來看第一種當即關閉:
/** * 當即關閉線程池,會形成任務丟失 */ public void shutDownNow() { isShutDown.set(true); tryClose(false); } /** * 關閉線程池 * * @param isTry true 嘗試關閉 --> 會等待全部任務執行完畢 * false 當即關閉線程池--> 任務有丟失的可能 */ private void tryClose(boolean isTry) { if (!isTry) { closeAllTask(); } else { if (isShutDown.get() && totalTask.get() == 0) { closeAllTask(); } } } /** * 關閉全部任務 */ private void closeAllTask() { for (Worker worker : workers) { //LOGGER.info("開始關閉"); worker.close(); } } public void close() { thread.interrupt(); }
複製代碼很容易看出,最終就是遍歷線程池裏全部的 worker 線程挨個執行他們的中斷函數。
咱們來測試一下:
能夠發現後面丟進去的三個任務實際上是沒有被執行的。
完過後關閉
而正常關閉則不同:
/** * 任務執行完畢後關閉線程池 */ public void shutdown() { isShutDown.set(true); tryClose(true); }
複製代碼
他會在這裏多了一個判斷,須要全部任務都執行完畢以後纔會去中斷線程。
同時在線程須要回收時都會嘗試關閉線程:
來看看實際效果:
回收線程
上文或多或少提到了線程回收的事情,其實總結就是如下兩點:
一旦執行了 shutdown/shutdownNow 方法都會將線程池的狀態置爲關閉狀態,這樣只要 worker 線程嘗試從隊列裏獲取任務時就會直接返回空,致使 worker 線程被回收。
一旦線程池大小超過了核心線程數就會使用保活時間來從隊列裏獲取任務,因此一旦獲取不到返回 null 時就會觸發回收。
但若是咱們的隊列足夠大,致使線程數都不會超過核心線程數,這樣是不會觸發回收的。點擊免費「領取Java架構資料」
好比這裏我將隊列大小調爲 10 ,這樣任務就會累計在隊列裏,不會建立五個 worker 線程。
因此一直都是 Thread-1~3 這三個線程在反覆調度任務。
總結
本次實現了線程池裏大部分核心功能,我相信只要看完並動手敲一遍必定會對線程池有不同的理解。
結合目前的內容來總結下:
線程池、隊列大小要設計的合理,儘可能的讓任務從隊列中獲取執行。
慎用 shutdownNow() 方法關閉線程池,會致使任務丟失(除非業務容許)。
若是任務多,線程執行時間短能夠調大 keepalive 值,使得線程儘可能不被回收從而能夠複用線程。
同時下次會分享一些線程池的新特性,如:
執行帶有返回值的線程。
異常處理怎麼辦?
全部任務執行完怎麼通知我?
本文全部源碼:github.com/crossoverJi…你的點贊與分享是對我最大的支持