原覺得線程池還挺簡單的(平時經常使用,也分析過原理),此次是想本身動手寫一個線程池來更加深刻的瞭解它;但在動手寫的過程當中落地到細節時發現並沒想的那麼容易。結合源碼對比後確實不得不佩服 Doug Lea
。java
我以爲大部分人直接去看 java.util.concurrent.ThreadPoolExecutor
的源碼時都是看一個大概,由於其中涉及到了許多細節處理,還有部分 AQS
的內容,因此想要理清楚具體細節並非那麼容易。git
與其挨個分析源碼不如本身實現一個簡版,固然簡版並不意味着功能缺失,須要保證核心邏輯一致。github
因此也是本篇文章的目的:安全
本身動手寫一個五臟俱全的線程池,同時會了解到線程池的工做原理,以及如何在工做中合理的利用線程池。多線程
再開始以前建議對線程池不是很熟悉的朋友看看這幾篇:併發
這裏我截取了部份內容,也許能夠埋個伏筆(坑)。函數
具體請看這兩個連接。測試
因爲篇幅限制,本次可能會分爲上下兩篇。this
如今進入正題,新建了一個 CustomThreadPool
類,它的工做原理以下:操作系統
簡單來講就是往線程池裏邊丟任務,丟的任務會緩衝到隊列裏;線程池裏存儲的其實就是一個個的 Thread
,他們會一直不停的從剛纔緩衝的隊列裏獲取任務執行。
流程仍是挺簡單。
先來看看咱們這個自創的線程池的效果如何吧:
初始化了一個核心爲三、最大線程數爲五、隊列大小爲 4 的線程池。
先往其中丟了 10 個任務,因爲阻塞隊列的大小爲 4 ,最大線程數爲 5 ,因此因爲隊列裏緩衝不了最終會建立 5 個線程(上限)。
過段時間沒有任務提交後(sleep
)則會自動縮容到三個線程(保證不會小於核心線程數)。
來看看具體是如何實現的。
下面則是這個線程池的構造函數:
會有如下幾個核心參數:
miniSize
最小線程數,等效於 ThreadPool
中的核心線程數。maxSize
最大線程數。keepAliveTime
線程保活時間。workQueue
阻塞隊列。notify
通知接口。大體上都和 ThreadPool
中的參數相同,而且做用也是相似的。
須要注意的是其中初始化了一個 workers
成員變量:
/** * 存放線程池 */ private volatile Set<Worker> workers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }
workers
是最終存放線程池中運行的線程,在 j.u.c
源碼中是一個 HashSet
因此對他全部的操做都是須要加鎖。
我這裏爲了簡便起見就本身定義了一個線程安全的 Set
稱爲 ConcurrentHashSet
。
其實原理也很是簡單,和 HashSet
相似也是藉助於 HashMap
來存放數據,利用其 key
不可重複的特性來實現 set
,只是這裏的 HashMap
是用併發安全的 ConcurrentHashMap
來實現的。
這樣就能保證對它的寫入、刪除都是線程安全的。
不過因爲 ConcurrentHashMap
的 size()
函數並不許確,因此我這裏單獨利用了一個 AtomicInteger
來統計容器大小。
往線程池中丟一個任務的時候其實要作的事情還蠻多的,最重要的事情莫過於建立線程存放到線程池中了。
固然咱們不能無限制的建立線程,否則拿線程池來就沒任何意義了。因而 miniSize maxSize
這兩個參數就有了它的意義。
但這兩個參數再哪一步的時候才起到做用呢?這就是首先須要明確的。
從這個流程圖能夠看出第一步是須要判斷是否大於核心線程數,若是沒有則建立。
結合代碼能夠發如今執行任務的時候會判斷是否大於核心線程數,從而建立線程。
worker.startTask()
執行任務部分放到後面分析。
這裏的 miniSize
因爲會在多線程場景下使用,因此也用 volatile
關鍵字來保證可見性。
結合上面的流程圖,第二步天然是要判斷隊列是否能夠存聽任務(是否已滿)。
優先會往隊列裏存放。
一旦寫入失敗則會判斷當前線程池的大小是否大於最大線程數,若是沒有則繼續建立線程執行。
否則則執行會嘗試阻塞寫入隊列(j.u.c
會在這裏執行拒絕策略)
以上的步驟和剛纔那張流程圖是同樣的,這樣你們是否有看出什麼坑嘛?
從上面流程圖的這兩步能夠看出會直接建立新的線程。
這個過程相對於中間直接寫入阻塞隊列的開銷是很是大的,主要有如下兩個緣由:
因此理想狀況下咱們應該避免這兩步,儘可能讓丟入線程池中的任務進入阻塞隊列中。
任務是添加進來了,那是如何執行的?
在建立任務的時候提到過 worker.startTask()
函數:
/** * 添加任務,須要加鎖 * @param runnable 任務 */ private void addWorker(Runnable runnable) { Worker worker = new Worker(runnable, true); worker.startTask(); workers.add(worker); }
也就是在建立線程執行任務的時候會建立 Worker
對象,利用它的 startTask()
方法來執行任務。
因此先來看看 Worker
對象是長啥樣的:
其實他自己也是一個線程,將接收到須要執行的任務存放到成員變量 task
處。
而其中最爲關鍵的則是執行任務 worker.startTask()
這一步驟。
public void startTask() { thread.start(); }
其實就是運行了 worker
線程本身,下面來看 run
方法。
task.run
),接着會一直不停的從隊列裏獲取任務執行,直到獲取不到新任務了。workers.remove(this)
)。其實 getTask
也是很是關鍵的一個方法,它封裝了從隊列中獲取任務,同時對不須要保活的線程進行回收。
很明顯,核心做用就是從隊列裏獲取任務;但有兩個地方須要注意:
null
以後在上文的 run()
中就會退出這個線程;從而達到了回收線程的目的,也就是咱們以前演示的效果workers.size() > miniSize
條件屢次執行,從而致使線程被所有回收完畢。最後來談談線程關閉的事;
仍是以剛纔那段測試代碼爲例,若是提交任務後咱們沒有關閉線程,會發現即使是任務執行完畢後程序也不會退出。
從剛纔的源碼裏其實也很容易看出來,不退出的緣由是 Worker
線程必定還會一直阻塞在 task = workQueue.take();
處,即使是線程縮容了也不會小於核心線程數。
經過堆棧也能證實:
剛好剩下三個線程阻塞於此處。
而關閉線程一般又有如下兩種:
咱們先來看第一種當即關閉
:
/** * 當即關閉線程池,會形成任務丟失 */ public void shutDownNow() { isShutDown.set(true); tryClose(false); } /** * 關閉線程池 * * @param isTry true 嘗試關閉 --> 會等待全部任務執行完畢 * false 當即關閉線程池--> 任務有丟失的可能 */ private void tryClose(boolean isTry) { if (!isTry) { closeAllTask(); } else { if (isShutDown.get() && totalTask.get() == 0) { closeAllTask(); } } } /** * 關閉全部任務 */ private void closeAllTask() { for (Worker worker : workers) { //LOGGER.info("開始關閉"); worker.close(); } } public void close() { thread.interrupt(); }
很容易看出,最終就是遍歷線程池裏全部的 worker
線程挨個執行他們的中斷函數。
咱們來測試一下:
能夠發現後面丟進去的三個任務實際上是沒有被執行的。
而正常關閉則不同:
/** * 任務執行完畢後關閉線程池 */ public void shutdown() { isShutDown.set(true); tryClose(true); }
他會在這裏多了一個判斷,須要全部任務都執行完畢以後纔會去中斷線程。
同時在線程須要回收時都會嘗試關閉線程:
來看看實際效果:
上文或多或少提到了線程回收的事情,其實總結就是如下兩點:
shutdown/shutdownNow
方法都會將線程池的狀態置爲關閉狀態,這樣只要 worker
線程嘗試從隊列裏獲取任務時就會直接返回空,致使 worker
線程被回收。null
時就會觸發回收。但若是咱們的隊列足夠大,致使線程數都不會超過核心線程數,這樣是不會觸發回收的。
好比這裏我將隊列大小調爲 10 ,這樣任務就會累計在隊列裏,不會建立五個 worker
線程。
因此一直都是 Thread-1~3
這三個線程在反覆調度任務。
本次實現了線程池裏大部分核心功能,我相信只要看完並動手敲一遍必定會對線程池有不同的理解。
結合目前的內容來總結下:
shutdownNow()
方法關閉線程池,會致使任務丟失(除非業務容許)。keepalive
值,使得線程儘可能不被回收從而能夠複用線程。同時下次會分享一些線程池的新特性,如:
本文全部源碼:
你的點贊與分享是對我最大的支持