在平常開發中,線程池是使用很是頻繁的一種技術,不管是服務端多線程接收用戶請求,仍是客戶端多線程處理數據,都會用到線程池技術,那麼全面的瞭解線程池的使用、背後的實現原理以及合理的優化線程池的大小等都是很是有必要的。這篇文章會經過對一系列的問題的解答來說解線程池的基本功能以及背後的原理,但願能對你們有所幫助。java
先來看這樣一個場景,服務端在一個線程內經過監聽8888端口來接收多個客戶端的消息。爲了不阻塞主線程,每收到一個消息,就開啓一個新的線程來處理,這樣主線程就能夠不停的接收新的消息。不使用線程池時代碼的簡單實現以下:spring
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8888); while (true) { try { Socket socket = serverSocket.accept(); new Thread(() -> { try { InputStream inputStream = socket.getInputStream(); //do something } catch (IOException e) { e.printStackTrace(); } }).start(); } catch (IOException e) { } } }
經過每次new一個新的線程的方式,不會阻塞主線程,提升了服務端接收消息的能力。可是存在幾個很是明顯的問題:數據庫
newFixedThreadPool,固定線程數的線程池。它的核心線程數(corePoolSize)和最大線程數(maximumPoolSize)是相等的。同時它使用一個無界的阻塞隊列LinkedBlockingQueue來存儲額外的任務,也就是說當達到nThreads的線程數在運行以後,全部的後續線程都會進入LinkedBlockingQueue中,不會再建立新的線程。編程
使用場景:由於線程數固定,通常適用於可預測的並行任務執行環境。緩存
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newCachedThreadPool,可緩存線程的線程池。默認核心線程數(corePoolSize)爲0,最大線程數(maximumPoolSize)爲Integer.MAX_VALUE,它還有一個過時時間爲60秒,當線程閒置超過60秒以後會被回收。內部使用SynchronousQueue做爲阻塞隊列。性能優化
使用場景:因爲SynchronousQueue無容量的特性,致使了newCachedThreadPool不適合作長時間的任務。由於若是單個任務執行時間過長,每當無空閒線程時,會致使開啓新線程,而線程數量能夠達到Integer.MAX_VALUE,存儲隊列又不能緩存任務,很容易致使OOM的問題。因此他的使用場景通常在大量短期任務的執行上。服務器
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newSingleThreadExecutor,單線程線程池。默認核心線程數(corePoolSize)和最大線程數(maximumPoolSize)都爲1,使用無界阻塞隊列LinkedBlockingQueue。多線程
使用場景:因爲只能有一個線程在執行,並且其餘任務都會排隊,適用於單線程串行執行有序任務的環境。架構
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newScheduledThreadPool與newSingleThreadScheduledExecutor,執行延時或者週期性任務的線程池,使用了一個內部實現的DelayedWorkQueue阻塞隊列。能夠看到它的返回結果是ScheduledExecutorService,它擴展了ExecutorService接口,提供了用於延時和週期執行任務的方法。併發
使用場景:用於延時啓動任務,或須要週期性執行的任務。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
newWorkStealingPool,它是jdk1.8提供的一種線程池,用於執行並行任務。默認並行級別爲當前可用最大可用cpu數量的線程。
使用場景:用於大耗時同時能夠分段並行的任務。
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
線程池簡單來講能夠分爲四大組件:Executor、ExecutorService、Executors以及ThreadPoolExecutor。
public interface Executor { void execute(Runnable command); }
那麼在ThreadPoolExecutor中,是怎麼實現execute方法的呢?來看下ThreadPoolExecutor中execute方法的源碼,裏面的註釋實在太詳細了,簡直時良好註釋的典範。這裏只作個簡單總結:首先當工做線程小於核心線程數時會嘗試添加worker到隊列中去運行,若是核心線程不夠用會將任務加入隊列中,若是入隊也不成功,會採起拒絕策略。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //ctl經過位運算同時標記了線程數量以及線程狀態 int c = ctl.get(); //workerCountOf方法用來統計當前運行的線程數量 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
public interface ExecutorService extends Executor { //關閉線程池,關閉狀態 void shutdown(); //當即關閉線程池,關閉狀態 List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); //提交一個Callable類型的任務,帶Future返回值 <T> Future<T> submit(Callable<T> task); //提交一個Runnable類型的任務,帶Future返回值 Future<?> submit(Runnable task); //一段時間後終止線程池,終止狀態 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; ...... }
仍是經過ThreadPoolExecutor來講明,ThreadPoolExecutor中將線程池狀態進行了擴展,定義了5種狀態,這5種狀態經過Integer.SIZE的高3位來表示。代碼以下:
* The runState provides the main lifecycle control, taking on values: * 可以接受新任務也能處理隊列中的任務 * RUNNING: Accept new tasks and process queued tasks * 不能接受新任務,但能處理隊列中的任務 * SHUTDOWN: Don't accept new tasks, but process queued tasks 不能接受新任務,也不能處理隊列中的任務,同時會中斷正在執行的任務 * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks 全部的任務都被終止,工做線程爲0 * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method terminated方法執行完成 * TERMINATED: terminated() has completed private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS;//101 private static final int SHUTDOWN = 0 << COUNT_BITS;//000 private static final int STOP = 1 << COUNT_BITS;//001 private static final int TIDYING = 2 << COUNT_BITS;//010 private static final int TERMINATED = 3 << COUNT_BITS;//011
再來看看經過ExecutorService接口對這5種狀態的轉換:
public interface ExecutorService extends Executor { //關閉線程池,線程池狀態會從RUNNING變爲SHUTDOWN void shutdown(); //當即關閉線程池RUNNING或者SHUTDOWN到STOP List<Runnable> shutdownNow(); //STOP、TIDYING以及TERMINATED都返回true boolean isShutdown(); //TERMINATED狀態返回true boolean isTerminated(); //一段時間後終止線程池,TERMINATED boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; ...... }
Executors提供了一系列獲取線程池的靜態方法,至關於線程池工廠,是對ThreadPoolExecutor的一個封裝,簡化了用戶切換Executor和ExecutorService的各類實現細節。
ThreadPoolExecutor是對Executor以及ExecutorService的實現,提供了具體的線程池實現。
這個問題在上面已經作了解說,ExecutorService的生命週期經過接口定義能夠分爲運行中,關閉,終止三種狀態。
ThreadPoolExecutor在具體實現上提供了更加詳細的五種狀態:RUNNING、SHUTDOWN、STOP、TIDYING以及TERMINATED。各類狀態的說明以及轉換能夠看上一個問題的答案。
線程池中的線程是能夠進行超時控制的,經過ExecutorService的submit來提交任務,這樣會返回一個Future類型的結果,來看看Future接口的代碼:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); //獲取返回結果,並在出現錯誤或者中斷時throws Exception V get() throws InterruptedException, ExecutionException; //timeout時間內獲取返回結果,並在出現錯誤、中斷以及超時時throws Exception V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future定義了get()以及get(long timeout, TimeUnit unit)方法,get()方法會阻塞當前調用,一直到獲取到返回結果,get(long timeout, TimeUnit unit)會在指定時間內阻塞,當超時後會拋出TimeoutException錯誤。這樣就能夠達到線程超時控制的目的。簡單使用示例以下:
Future<String> future = executor.submit(callable); try { future.get(2000, TimeUnit.SECONDS); } catch (InterruptedException e1) { //中斷後處理 } catch (ExecutionException e1) { //拋出異常處理 } catch (TimeoutException e1) { //超時處理 }
這裏有一個問題就是由於get方法是阻塞的---經過LockSupport.park實現,那麼線程池中線程比較多的狀況下要怎麼獲取每一個線程的超時呢?這裏除了自定義線程池實現或者自定義線程工廠來實現以外,使用ThreadPoolExecutor自己的功能我也沒想到更好的辦法。有一個很是笨的解決方案是開啓同線程池數量相等的線程進行監聽。你們若是有更好的辦法能夠留言提出。
這個問題和上面的問題解決方案同樣,一樣也是經過ExecutorService的submit來提交任務,獲取Future,調用Future中的cancel方法來達到目的。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); }
cancel方法有一個mayInterruptIfRunning參數,當爲true時,表明任務能接受並處理中斷,調用了interrupt方法。若是爲false,表明若是任務沒啓動就不要運行它,不會調用interrupt方法。
取消的本質實際上仍是經過interrupt來實現的,這就是說,若是線程自己不能響應中斷,就算調用了cancel方法也沒用。通常狀況下經過lockInterruptibly、park和await方法阻塞的線程都是能響應中斷的,運行中的線程就須要開發者本身實現中斷了。
如何設置一個合適的線程池大小,這個問題我以爲是沒有一個固定公式的。或者能夠說,只有一些簡單的設置規則,但放到具體業務中,又各有不一樣,只能根據現場環境測試事後再來分析。
設置合適的線程池大小分爲兩部分,一部分是最大線程池大小,一部分是最小線程池大小。在ThreadPoolExecutor中體如今最大線程數(maximumPoolSize)和核心線程數(corePoolSize)。
最大線程池大小的設置首先跟當前機器cpu核心數密切相關,通常狀況來講要想最大化利用cpu,設置爲cpu核心數就能夠了,好比4核cpu服務器能夠設置爲4。但實際狀況又大有不一樣,由於每每咱們執行的任務都會涉及到IO,好比任務中執行了一個從數據庫查詢數據的操做,那麼這段時間cpu其實是沒有最大化利用的,這樣咱們就能夠適當擴大maximumPoolSize的大小。在有些狀況下任務會是cpu密集型的,若是這樣設置更多的線程不只不會提升效率,反而由於線程的建立銷燬以及切換開銷而大大下降了效率,因此說最大線程池的大小須要根據業務狀況具體測試後才能設置一個合適的大小。
最小線程池大小相比較最大線程池大小設置起來相對容易一些,由於最小線程通常來講是能夠根據業務狀況來預估進行設置,好比大多數狀況下會有2個任務在運行,很小几率會有超過2個任務運行,那麼直接設置最小線程池大小爲2就能夠。但有一點須要知道的是每間隔多長時間會有超過2個任務,若是每2分鐘會有一次超過2個任務的狀況,那麼咱們能夠將線程過時時間設置的稍微久一點,好比4分鐘,這樣就算頻繁的超過2個任務,也能夠利用緩存的線程池。
總的來講設置最大和最小線程池都是一個沒有固定公式的問題,都須要考慮實際業務狀況和機器配置,根據實際業務狀況多作測試才能作到最優化設置。在一切沒有決定以前,可使用軟件架構的KISS原則,設置最大以及最小線程數都爲cpu核心數便可,後續在作優化。
要設置合適的隊列大小,先要明白隊列何時會被使用。在ThreadPoolExecutor的實現中,使用隊列的狀況有點特殊。它會先使用核心線程池大小的線程,以後會將任務加入隊列中,再以後隊列滿了以後纔會擴大到最大線程池大小的線程。也就是說隊列的使用並非等待線程不夠用了才使用,而是等待覈心線程不夠用了就使用。我不是太能理解這樣設計的意圖,按《Java性能權威權威指南》一書中的說法是這樣提供了兩個節流閥,第一個是隊列,第二個是最大線程池。但這樣作並不能給使用者最優的體驗,既然要使用最大線程池,那爲何不在第一次就使用呢?
知道了ThreadPoolExecutor使用線程池的時機,那麼再來預估合適的隊列大小就很方便了。若是單個任務執行時間在100ms,最小線程數是2,使用者能忍受的最大延時在2s,那麼咱們能夠這樣簡單推算出隊列大小:2/2s/100ms=10,這樣滿隊列時最大延時就在2s以內。固然還有其餘一些影響因素,好比部分任務超過或者小於100ms,最大線程池的利用等等,能夠在這基礎上作簡單調整。
ThreadPoolExecutor中提供了四種RejectedExecutionHandler,每種分工都比較明確,選擇起來並不困難。它們分別是:AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。下面貼出了他們的源碼並作了簡單說明,使用的時候能夠根據須要自行選擇。
//AbortPolicy //默認的拒絕策略,直接拋出RejectedExecutionException異常供調用者作後續處理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } //DiscardPolicy //不作任何處理,將任務直接拋棄掉 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } //DiscardOldestPolicy //拋棄隊列中的下一個任務,而後嘗試作提交。這個使用我以爲應該是在知道當前要提交的任務比較重要,必需要被執行的場景 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } //CallerRunsPolicy //直接使用調用者線程執行,至關於同步執行,會阻塞調用者線程,不太友好感受。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
要統計線程池中的線程執行時間,就須要瞭解線程池中的線程是在什麼地方,什麼時機執行的?知道了線程的執行狀態,而後在線程執行先後添加本身的處理就能夠了,因此先來找到ThreadPoolExecutor中線程具體執行的代碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); //執行task.run()的前置方法 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//執行task.run()的後置方法 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
能夠看到runWorker方法中在task.run()也就是任務執行先後分別執行了beforeExecute以及afterExecute方法,着兩個方法在ThreadPoolExecutor的繼承類中都是可重寫的,提供了極大的靈活性。咱們能夠在繼承ThreadPoolExecutor以後在任務執行先後作任何本身須要作的事情,固然也就包括任務執行的時間統計了。
順便說一句,熟悉spring源碼的同窗看到這裏是否是發現和spring中的postprocesser先後置處理器有殊途同歸之妙?區別在於一個是經過繼承來覆蓋,一個是經過接口來實現。
其實線程池框架涉及到的問題遠不止這些,包括ThreadFactory、ForkJoinPool等等還有不少值得花時間研究的地方。本文也只是閱讀jdk源碼、《Java併發編程實戰》以及《Java性能優化權威指南》以後的一點點總結,若有錯誤遺漏的地方,但願你們能多多指出。
參考資料: