ThreadPoolExecutor是一個很是重要的類,用來構建帶有線程池的任務執行器,經過配置不一樣的參數來構造具備不一樣規格線程池的任務執行器。 html
寫在前面的是: java
線程池和任務執行器,線程池的定義比較直接,能夠看作多個線程的集合。而任務執行器的概念比較的具備針對性,它用來執行任務,經過對線程池的管理實現多任務的併發,是線程池的載體。 api
線程和任務的區別,線程不是任務,線程是用來執行任務的。 併發
隊列是用來存聽任務的,不是用來存放線程的。 oracle
主要的幾個參數解析: oop
一開始二者的存在很讓人摸不着頭腦,簡單的想法是用一個線程數(pool size)表示線程池的大小不就完了嗎,不到規定的線程數就建立新的線程來執行新的任務,到了規定的線程數就等待其餘線程處理完成,怎麼還出現兩個控制線程數的參數? ui
那這兩個參數是什麼意思幹什麼用的? this
核心線程數:這個數與上面那個簡單想法中的數有一個共同點,就是若是當前線程數達不到核心線程數時,不會使用已有的空閒的線程(若是有的話),來了新任務就會建立新的線程。 spa
若是當前線程數達到核心線程數,並且沒有空閒線程,那麼來了新任務是否要建立新的線程呢?這取決於兩點: .net
- 當前的任務隊列是否已滿。
- 線程池的最大線程數。
經過這個問題能夠引出最大線程數的概念
最大線程數 : 最大線程數是和任務隊列匹配使用的,確切的說是和有長度限制的任務隊列(即有界任務隊列)匹配使用的。
補充回答上面的問題,ThreadPoolExecutor的線程池擁有一個任務隊列,這個任務隊列只有在當前線程數>核心線程數的時候纔開始使用,若是該線程池使用的任務隊列是有界隊列,好比10,那麼當該隊列被新任務填滿時也就是說隊列中有10個新任務時ThreadPoolExecutor纔會建立一個新的線程來執行隊列中的一個任務,若是再發生隊列被填滿,並且依舊沒有空閒線程時ThreadPoolExecutor再次建立新的線程,一旦線程的數量等於最大線程數就再也不建立新的線程了,若是此時隊列中還有10個任務,那麼新來的任務就會被拒絕(reject)。
上述是針對有界隊列,若是這個任務執行器的隊列是無界隊列呢?
因爲無界隊列不會被填滿,因此永遠不能達到建立新線程所須要的條件,因此也就不會有新線程被建立,因此最大線程數在這種狀況下也就失去了其存在的意義。
在介紹上面的核心線程數和最大線程數時有提到空閒的線程,所謂空閒的線程就是執行完任務以後閒着的線程。
超過這個時間會使得那麼核心線程以外的空閒線程被殺死,若是想把這個時間也做用在覈心線程上須要設置allowCoreThreadTimeOut(boolean)爲true
這裏有必要說一下的是,任務執行器如何實現線程的重複利用,當任務執行器執行execute(task)的時候會建立一個worker,它是一個Runnable類,能夠看作task的載體,worker包含一個thread對象,這個thread啓動的時候執行worker自己的run方法,這樣worker和線程就融爲一體。當worker的thread start的時候,就會執行worker的run方法,而worker的run會調用任務執行器的runWorker(worker),並將自身傳遞過去,意思是任務執行器啓動了一個worker,而線程重複利用關鍵就在runWorker中,在啓動了一個worker後,worker會從任務執行器中尋找能夠運行的任務,而一開始建立worker使用的task就是它的第一個任務。
下面是jdk1.7的源碼
//執行一個任務 task public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 將task裝配到一個worker中 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
//添加一個worker private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {//若是worker建立成功,就啓動它的對應的thread t.start(); //worker中的tread啓動 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
//啓動這個worker final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//這裏是關鍵,使用一個while來尋找任務執行器中(主要仍是從任務隊列中獲取)還未執行的task。 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }