ExecutorService這個接口從Java 5開始就已經存在了。這得追溯到2004年了。這裏小小地提醒一下,官方已經再也不支持Java 5, Java 6了,Java 7在半年後也將中止支持。我之因此會提起ExecutorService這麼舊的一個接口是由於,大多數Java程序員並無搞清楚它的工做原理。關於它能夠介紹的有不少,這裏我只想分享它的一些較少爲人所知的特性以及實踐技巧。本文主要是面向初級程序員的,並無過於高深的東西。html
這點得反覆強調。對正在運行的JVM進行線程轉儲(thread dump)或者調試時,線程池默認的命名機制是pool-N-thread-M,這裏N是線程池的序號(每新建立一個線程池,這個N都會加一),而M是池裏線程的序號。比方說,pool-2-thread-3指的是JVM生命週期中第二個線程池裏的第三個線程。參考這裏 Executors.defaultThreadFactory()。這樣的名字表述性不佳。因爲JDK將命名機制都隱藏在ThreadFactory裏面,這使得要正確地命名線程得稍微費點工夫。所幸的是Guava提供了這麼一個工具類:java
import com.google.common.util.concurrent.ThreadFactoryBuilder; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Orders-%d") .setDaemon(true) .build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
這是我從高效的jstack:如何對高速運行的服務器進行調試一文中學到的一個技巧。線程名能夠隨時進行修改,只要你想這麼作的話。這是有必定的意義的,由於線程轉儲只能看到類名和方法名,而沒有參數及本地變量。經過調整線程名能夠保留一些比較關鍵的上下文信息,這樣排查消息/記錄/查詢等變慢或者出現死鎖的問題時就容易多了。示例:react
private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + messageId); try { //real logic here... } finally { currentThread.setName(oldName); } }); }
在try-finally塊中當前線程的名字是Processing-某個消息ID。這對跟蹤系統內的消息流會比較有用。git
客戶端線程和線程池之間會有一個任務隊列。當程序要關閉時,你須要注意兩件事情:入隊的這些任務的狀況怎麼樣了以及正在運行的這個任務執行得如何了。使人驚訝的是不少開發人員並沒能正確地或者有意識地去關閉線程池。正確的方法有兩種:一個是讓全部的入隊任務都執行完畢(shutdown()),再就是捨棄這些任務(shutdownNow())——這徹底取決於你。好比說若是咱們提交了N多任務而且但願等它們都執行完後才返回的話,那麼就使用shutdown():程序員
private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug("All e-mails were sent so far? {}", done); }
本例中咱們發送了許多電子郵件,每一封郵件都對應着線程池中的一個任務。提交完這些任務後咱們會關閉線程池,這樣就不會再有新的任務進來了。而後咱們會至少等待一分鐘,直到這些任務執行完。若是1分鐘後仍是有的任務沒執行到的話,awaitTermination()便會返回false。可是剩下的任務還會繼續執行。我知道有些趕時髦的人會這麼寫:github
emails.parallelStream().forEach(this::sendEmail);
他們以爲我那樣很老套,不過我我的比較喜歡能控制併發線程的數量。還有一個優雅地關閉掉線程池的方法就是shutdownNow():spring
final List<Runnable> rejected = executorService.shutdownNow(); log.debug("Rejected tasks: {}", rejected.size());
這麼作的話隊列中的全部任務都會被捨棄並返回。已執行的任務仍會繼續執行。編程
Future的一個較少說起的特性即是cancelling。這裏我就不重複多說了,能夠看下我以前的一篇文章:InterruptedException及線程中斷。api
不當的線程池大小會使得處理速度變慢,穩定性降低,而且致使內存泄露。若是配置的線程過少,則隊列會持續變大,消耗過多內存。而過多的線程又會因爲頻繁的上下文切換致使整個系統的速度變緩——殊途而同歸。隊列的長度相當重要,它必須得是有界的,這樣若是線程池不堪重負了它能夠暫時拒絕掉新的請求:安全
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
上面的代碼等價於Executors.newFixedThreadPool(n),然而不一樣的是默認的實現是一個無界的LinkedBlockingQueue。這裏咱們用的是一個固定100大小的ArrayBlockingQueue。也就是說若是已經有100個任務在隊列中了(還有N個在執行中),新的任務就會被拒絕掉,並拋出RejectedExecutionException異常。因爲這裏的隊列是在外部聲明的,咱們還能夠時不時地調用下它的size()方法來將隊列大小記錄在到日誌/JMX/或者你所使用的監控系統中。
下面這段代碼執行的結果是什麼?
executorService.submit(() -> { System.out.println(1 / 0); });
我被它坑過無數回了:它什麼也不會輸出。沒有任何的java.lang.ArithmeticException: / by zero的徵兆,啥也沒有。線程池會把這個異常吞掉,就像什麼也沒發生過同樣。若是是你本身建立的java.lang.Thread還好,這樣UncaughtExceptionHandler還能起做用。不過若是是線程池的話你就得當心了。若是你提交的是Runnable對象的話(就像上面那個同樣,沒有返回值),你得將整個方法體用try-catch包起來,至少打印一下異常。若是你提交的是Callable的話,得確保你在用get()方法取值的時候從新拋出異常:
final Future<Integer> division = executorService.submit(() -> 1 / 0); //below will throw ExecutionException caused by ArithmeticException division.get();
有趣的是Spring框架的@Async爲此還弄出了個BUG,參見:SPR-8995以及 SPR-12090。
監控工做隊列的長度只是一個方面。然而排除故障時查看從提交任務到實際執行之間的時間差就顯得很是重要了。這個時間差越接近0就越好(說明正好線程池中有空閒的線程),不然任務要入隊的話這個時間就會增長了。再進一步說,若是線程池不是固定線程數的話,執行新的任務還得新建立一個線程,這個一樣也會消耗必定的時間。爲了能更好地監控這項指標,能夠對ExecutorService作一下封裝:
public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug("Task {} spent {}ms in queue", task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }); } //... }
這個實現並不完整,不過也能說明大概的意思了。當咱們將任務提交給線程池的時候,便當即開始記錄它的時間。一旦這個任務被取出並開始執行時便中止計時。不要被代碼中的startTime和queueDuration這兩個變量搞混了。事實上它們是在兩個不一樣的線程中進行求值的,一般都會差個毫秒級或者秒級:
Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
近來響應式編程受到了很多關注。 Reactive manifesto, reactive streams, RxJava(僅發佈了1.0版本!),Clojure agents, scala.rx等等。它們都很是不錯,但棧跟蹤信息就完蛋了,它們幾乎是毫無價值的。假設提交到線程池中的一個任務出現了異常:
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na] at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0] at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
能夠很容易發現NPE異常出如今MyTask的76行。可是咱們並不知道是誰提交的這個任務,由於棧信息只能看到Thread以及ThreadPoolExecutor。技術上來說咱們固然是能夠看下代碼,看看是何處建立的MyTask。不過若是沒有線程在這中間的話,咱們立刻便能知道是誰提交的任務。那麼若是咱們能夠保留客戶端代碼(提交任務的那段代碼)的棧信息呢?這個想法並不是我獨創的,Hazelcast就將異常從全部者節點傳播到了客戶端中。下面是一個很是簡單的將客戶端棧信息保留下來以便失敗時查看的例子:
public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception("Client stack trace"); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); } //... }
這樣一旦失敗的話咱們即可以取到完整的棧信息以及提交任務時所在的線程的名字。跟以前相比咱們有了一些更有價值的信息:
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na] at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na] at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0] at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0] at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
Java 8中引入了更爲強大的CompletableFuture。有可能的話儘可能使用下它。ExecutorService並無擴展以支持這個加強型的接口,所以你得本身動手了。這麼寫是不行的了:
final Future<BigDecimal> future = executorService.submit(this::calculate);
你得這樣:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
CompletableFuture 繼承自Future,所以跟以前的用法同樣。可是使用你接口的人必定會感謝CompletableFuture所提供的這些額外的功能的。
SynchronousQueue是一個很是有意思的BlockingQueue。它自己甚至都算不上是一個數據結構。最好的解釋就是它是一個容量爲0的隊列。這裏引用下Java文檔中的一段話:
每個insert操做都須要等待另外一個線程的一個對應的remove操做,反之亦然。同步隊列內部不會有任何空間,甚至連一個位置也沒有。你沒法對同步隊列執行peek操做,由於僅當你要移除一個元素的時候才存在這麼個元素;若是沒有別的線程在嘗試移除一個元素你也沒法往裏面插入元素;你也沒法對它進行遍歷,由於它什麼都沒有。。。 同步隊列與CSP和Ada中所用到的集結管道(rendezvous channel)有殊途同歸之妙。
它和線程池有什麼關係?你能夠試試在ThreadPoolExecutor中用下SynchronousQueue:
BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
咱們建立了一個擁有兩個線程的線程池,以及一個SynchronousQueue。因爲SynchronousQueue本質上是一個容量爲0的隊列,所以這個ExecutorService只有當有空閒線程的時候才能接受新的任務。若是全部的線程都在忙,新的任務便會立刻被拒絕掉,不會進行等待。這在要麼當即執行,要麼立刻丟棄的後臺執行的場景中會很是有用。
終於講完了,但願你能找到一個本身感興趣的特性!