系統啓動一個新線程的成本是比較高的,由於它涉及與操做系統的交互。使用線程池能夠很好的提升性能,尤爲是程序中須要建立大量生存期很短暫的線程。java
線程池的本質就是使用了一個線程安全的工做隊列鏈接工做者線程和客戶端線程,客戶端線程將任務放入工做隊列後便返回,而工做者線程則不斷地從工做隊列上取出工做並執行。緩存
當工做隊列爲空時,全部的工做者線程均等待在工做隊列上,當有客戶端提交了一個任務以後會通知任意一個工做者線程,隨着大量的任務被提交,更多的工做者線程會被喚醒。安全
注意的是,核心線程在完成任務後不會被銷燬,而是在循環getTask()時被阻塞隊列阻塞住。只有當線程數大於了核心線程數的那些普通線程會被銷燬。多線程
構造器參數:併發
corePoolSize:線程池中的核心線程數,當提交一個任務時,線程池建立一個新線程執行任務,直到當前線程數等於corePoolSize,即便有其餘空閒線程可以執行新來的任務,也會繼續建立線程;若是當前線程數爲corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行; maximumPoolSize:線程池中容許的最大線程數。若是當前阻塞隊列滿了,且繼續提交任務,則建立新的線程執行任務,前提是當前線程數小於maximumPoolSize;當阻塞隊列是無界隊列,則maximumPoolSize則不起做用,由於沒法提交至核心線程池的線程會一直持續地放入workQueue. keepAliveTime:線程存活時間(當線程池容許線程超時且運行中的線程數量超過corePoolSize時,會按照此變量設置時間關閉線程) TimeUnit:keepAliveTime的單位 BlockingQueue<Runnable> workQueue:緩衝隊列,來不及執行的任務存放的阻塞隊列 RejectedExecutionHandler handler:拒絕處理任務類(默認:AbortPolicy 會拋異常) AbortPolicy:直接拋出異常,默認策略; CallerRunsPolicy:用調用者所在的線程來執行任務; DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務; DiscardPolicy:直接丟棄任務; 固然也能夠根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。 threadFactory:建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個具備識別度的線程名。默認爲DefaultThreadFactory ——————————————————————————————————————————————————————————————————————————————— //構造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor.javaide
private final BlockingQueue<Runnable> workQueue;//緩衝隊列 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//原子類用來計數 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //1 當前運行的線程數量小於核心線程數量,直接將任務加入worker啓動運行。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //若是失敗,則獲取最新的線程池數據 c = ctl.get(); } /*2 運行線程數量大於核心線程數量時,上面的if分支針對大於corePoolSize,而且緩存隊列加入任務操做成功的狀況。 運行中而且將任務加入緩衝隊列成功,正常來講這樣已經完成了處理邏輯。 可是爲了保險起見,增長了狀態出現異常的確認判斷,若是狀態出現異常會繼續remove操做,若是執行true,則按照拒絕處理策略駁回任務;*/ //運行線程數量大於核心線程數量時,若是線程池仍在運行,則把任務放到阻塞隊列中等待執行。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //當任務成功放入隊列時,若是recheck發現線程池已經再也不運行了則從隊列中把任務刪除 if (! isRunning(recheck) && remove(command)) //刪除成功之後,會調用構造參數傳入的拒絕策略。 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /*3 這裏針對運行線程數量超過了corePoolSize,而且緩存隊列也已經放滿的狀況。 注意第二個參數是false,能夠在下面addWorker方法看到,就是針對線程池最大線程數量maximumPoolSize的判斷。*/ else if (!addWorker(command, false)) //若是基於maximumPoolSize新建woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕策略 reject(command); }
addWorker方法函數
private boolean addWorker(Runnable firstTask, boolean core) { // CAS+死循環實現的關於線程池狀態,線程數量的校驗與更新邏輯 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //把指定任務做爲參數新建一個worker線程 w = new Worker(firstTask); //變量t就是表明woker線程 final Thread t = w.thread; if (t != null) { // 線程池重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 線程啓動,執行任務(Worker.thread(firstTask).start()) // 找到Worker的實現的run方法 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) //若是woker啓動失敗,則進行一些善後工做,好比說修改當前woker數量等等 addWorkerFailed(w); } return workerStarted; }
Worker類oop
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //Worker類run方法中調用了runWorker方法 public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker方法性能
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //task就是Woker構造函數入參指定的任務,即用戶提交的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 先執行firstTask,再從workerQueue中取task(getTask()),一直循環執行 //咱們都知道構造參數設置的時間表明瞭線程池中的線程,即woker線程的存活時間,若是到期則回收woker線程 //這個邏輯的實現就在getTask中。 //來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,用戶設置的存活時間,就是 //從這個阻塞隊列中取任務等待的最大時間,若是getTask返回null,意思就是woker等待了指定時間仍然沒有 //取到任務,此時就會跳過循環體,進入woker線程的銷燬邏輯。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //該方法是個空的實現,若是有須要用戶能夠本身繼承該類進行實現 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //運行傳入的線程的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裏,作一些善後工做 //好比在corePoolSize跟maximumPoolSize之間的woker會進行回收 processWorkerExit(w, completedAbruptly); } }
getTask()方法ui
從阻塞任務隊列中取任務,若是設置了allowCoreThreadTimeOut(true) 或者當前運行的任務數大於設置的核心線程數,那麼timed =true 。此時將使用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)從任務隊列中取任務,而若是沒有設置,那麼使用workQueue.take() 取任務,對於阻塞隊列,poll(long timeout, TimeUnit unit) 將會在規定的時間內去任務,若是沒取到就返回null。take()會一直阻塞,等待任務的添加。
到此相信咱們都可以理解爲何咱們的線程池可以一直等待任務的執行而不被銷燬了,其實也就是進入了阻塞狀態而已。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? //死循環 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling?注意,此處決定是否銷燬線程 //條件是開啓了 allowCoreThreadTimeOut,或者總線程數大於了核心線程數 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
Java經過Executors提供四種線程池,分別爲:
這四種線程池底層都是依賴ThreadPoolExecutor的構造器生成的。
//定長線程池 //corePoolSize跟maximumPoolSize值同樣,同時傳入一個無界阻塞隊列 //該線程池的線程會維持在指定線程數,不會進行回收 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //緩存線程池 //這個線程池corePoolSize爲0,maximumPoolSize爲Integer.MAX_VALUE //意思也就是說來一個任務就建立一個woker,回收時間是60s public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //調度線程池 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } //單線程池 //線程池中只有一個線程進行任務執行,其餘的都放入阻塞隊列 //外面包裝的FinalizableDelegatedExecutorService類實現了finalize方法,在JVM垃圾回收的時候會關閉線程池 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
使用例子:
import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool();//建立一個可緩存線程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);//建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);//建立一個定長線程池,支持定時及週期性任務執行 ExecutorService single = Executors.newSingleThreadExecutor();//建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行 for(int i=0;i<10;i++){ final int index = i; try { Thread.sleep(index*500); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); fixedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); //按期3s執行 scheduledExecutorService.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3s"); } }, 3, TimeUnit.SECONDS); single.execute(new Runnable() { @Override public void run() { System.out.println(index); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
可使用兩個方法向線程池提交任務,分別爲execute()和submit()方法。
submit()方法用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個
future對象能夠判斷任務是否執行成功,而且能夠經過future的get()方法來獲取返回值,get()方
法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線
程一段時間後當即返回,這時候有可能任務沒有執行完。
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理沒法執行任務異常 } finally { // 關閉線程池 executor.shutdown(); }
任務通常分爲:CPU密集型、IO密集型
線程等待時間所佔比例越高,須要越多線程。線程CPU時間所佔比例越高,須要越少線程。
估算公式:
通常經驗上設置大小
一、CPU密集型
儘可能使用較小的線程池,通常Cpu核心數+1
由於CPU密集型任務CPU的使用率很高,若開過多的線程,只能增長線程上下文的切換次數,帶來額外的開銷
二、IO密集型
方法一:可使用較大的線程池,通常CPU核心數 * 2
IO密集型CPU使用率不高,可讓CPU等待IO的時候處理別的任務,充分利用cpu時間