最近的幾篇博客裏我討論了長輪詢(long polling)和Spring的DeferredResult技術,而且利用這些概念將生產者消費者項目塞進了一個Web應用程序。 儘管博客中的示例代碼展現了相關概念,卻也包含了不少邏輯問題。除了在實際的應用程序中不會使用簡單的LinkedBlockingQueue而是選擇JMS或者其餘強健的消息隊列服務,而且只會有一個用戶能夠得到匹配更新。還有一個嚴重的問題就是在JVM關閉時,行爲不良的線程不會被關閉。java
你可能會問:爲何這會成爲問題……好吧,對程序員來講這真的算不上一個問題,只要隨便寫點代碼就能夠解決。可是對使用軟件的人而言這卻會帶來沒必要要的麻煩。緣由是這樣會產生不少行爲不良的線程,而執行Tomcat的shutdown.sh命令收效甚微。這時你不得不執行下面命令野蠻的殺掉web服務器:git
ps -ef | grep java程序員
先獲得進程pid,而後github
kill -9 <<pid>>web
……接着須要有一大片web服務器須要重啓,這種混亂絕對讓人頭痛。最後你執行shutdown.sh中止Tomcat。apache
在我最近的幾篇博客裏,我編寫的那些行爲不良的線程在run()方法開頭都包含了下面的代碼:安全
@Override服務器
public void run() { 併發
while (true) { app
try {
DeferredResult<Message> result = resultQueue.take();
Message message = queue.take();
result.setResult(message);
} catch (InterruptedException e) {
throw new UpdateException("Cannot get latest update. " + e.getMessage(), e);
}
}
}
在上面的代碼裏,我用了一個無限循環while(true),這意味着線程會一直運行而且不會終止。
@Override
public void run() {
sleep(5); // Sleep等待app從新加載
logger.info("The match has now started...");
long now = System.currentTimeMillis();
List<Message> matchUpdates = match.getUpdates();
for (Message message : matchUpdates) {
delayUntilNextUpdate(now, message.getTime());
logger.info("Add message to queue: {}", message.getMessageText());
queue.add(message);
}
start = true; // 結束,重啓
logger.warn("GAME OVER");
}
上面第二個示例中線程的行爲一樣很糟糕。線程會從MatchUpdates列表中取消息並在合適的時候添加到消息隊列。惟一的可取之處是他們會拋出異常InterruptedException,若是正確處理線程能夠正常中止。然而,沒有人能確保這一點。
對上面代碼的一個有效地快速修正……只要確保建立全部線程都是後臺線程。後臺線程的定義是:在程序結束時,即便線程還在運行但不會阻止JVM退出。一個後臺線程的例子就是JVM的垃圾回收線程。將線程設置爲後臺線程只須要調用:
thread.setDaemon(true);
……接着執行shutdown.sh,而後砰的一聲全部的線程都消失了。然而這種作法有一個問題:若是你的後臺線程正在執行重要的任務,剛剛開始執行就被忽然結束掉會致使丟失不少重要的數據該怎麼辦?
你須要確保全部線程都被友好地關閉,在關閉前完成全部正在執行的任務。本文接下來將爲這些錯誤的線程給出一個修復,使用ShutdownHook讓他們在關閉前互相協調。根據文檔的描述:「一個shutdown hook就是一個初始化但沒有啓動的線程。 當虛擬機開始執行關閉程序時,它會啓動全部已註冊的shutdown hook(不按前後順序)而且併發執行。」讀完最後一句話,你可能已經猜到了你須要的就是建立一個負責關閉多有其餘線程的線程並經過shutdown hook傳遞給JVM。只要在你已有線程的run() 方法裏用幾個小的class作一些手腳。
須要建立ShutdownService和Hook兩個類。首先展現的是Hook類,它會將ShutdownService 鏈接到你的線程,代碼以下:
public class Hook {
private static final Logger logger = LoggerFactory.getLogger(Hook.class);
private boolean keepRunning = true;
private final Thread thread;
Hook(Thread thread) {
this.thread = thread;
}
/**
* @return True 若是後臺線程繼續運行
*/
public boolean keepRunning() {
return keepRunning;
}
/**
* 告訴客戶端後臺線程關閉並等待友好地關閉
*/
public void shutdown() {
keepRunning = false;
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
logger.error("Error shutting down thread with hook", e);
}
}
}
Hook包含兩個實例變量:keepRunning和thread。thread是對Hook負責關閉實例對象的引用,而keepRunning則是告訴線程繼續運行。
Hook有兩個public方法:keepRunning()和shutdown()。線程調用keepRunning()來確認是否須要繼續運行,而shutdown()是由ShutdownService的shutdown hook線程調用以關閉目標線程。這就是兩個方法的有趣之處。首先將keepRunning變量置爲false,接着調用thread.interrupt()來打斷線程強制拋出一個InterruptedException,最後調用thread.join()等待線程實例關閉。
值得注意的是這種方法須要你的線程配合。若是其中某個線程出錯,那麼全部的工做都會失敗。爲了不這種狀況最好在thread.join(…)中加入一個超時。
@Service
public class ShutdownService {
private static final Logger logger = LoggerFactory.getLogger(ShutdownService.class);
private final List<Hook> hooks;
public ShutdownService() {
logger.debug("Creating shutdown service");
hooks = new ArrayList<Hook>();
createShutdownHook();
}
/**
* Protected for testing
*/
@VisibleForTesting
protected void createShutdownHook() {
ShutdownDaemonHook shutdownHook = new ShutdownDaemonHook();
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
protected class ShutdownDaemonHook extends Thread {
/**
* 循環並使用hook關閉全部後臺線程
*
* @see java.lang.Thread#run()
*/
@Override
public void run() {
logger.info("Running shutdown sync");
for (Hook hook : hooks) {
hook.shutdown();
}
}
}
/**
* 建立hook class的新實例
*/
public Hook createHook(Thread thread) {
thread.setDaemon(true);
Hook retVal = new Hook(thread);
hooks.add(retVal);
return retVal;
}
@VisibleForTesting
List<Hook> getHooks() {
return hooks;
}
}
ShutdownService是一個Spring服務包含一個由引用的線程提供的Hook類列表用來關閉線程。它還包括了一個繼承了Thread的內部類ShutdownDaemonHook。在ShutdownService構造函數中會建立一個ShutdownDaemonHook實例並傳遞給JVM做爲shutdown hook,代碼以下:
Runtime.getRuntime().addShutdownHook(shutdownHook);
ShutdownService 有一個public方法:createHook()。createHook()作的第一步是確保全部傳遞的線程都被設置爲後臺線程。接下來會建立一個新的Hook實例,在最終存儲結果到列表返回給調用者以前做爲參數傳遞給線程。
最後要作的就是將ShutdownService繼承到DeferredResultService和MatchReporter。這兩個類包含了行爲不良的線程。
@Service("DeferredService")
public class DeferredResultService implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(DeferredResultService.class);
private final BlockingQueue<DeferredResult<Message>> resultQueue = new LinkedBlockingQueue<>();
private Thread thread;
private volatile boolean start = true;
@Autowired
private ShutdownService shutdownService;
private Hook hook;
@Autowired
@Qualifier("theQueue")
private LinkedBlockingQueue<Message> queue;
@Autowired
@Qualifier("BillSkyes")
private MatchReporter matchReporter;
public void subscribe() {
logger.info("Starting server");
matchReporter.start();
startThread();
}
private void startThread() {
if (start) {
synchronized (this) {
if (start) {
start = false;
thread = new Thread(this, "Studio Teletype");
hook = shutdownService.createHook(thread);
thread.start();
}
}
}
}
@Override
public void run() {
logger.info("DeferredResultService - Thread running");
while (hook.keepRunning()) {
try {
DeferredResult<Message> result = resultQueue.take();
Message message = queue.take();
result.setResult(message);
} catch (InterruptedException e) {
System.out.println("Interrupted when waiting for latest update. " + e.getMessage());
}
}
System.out.println("DeferredResultService - Thread ending");
}
public void getUpdate(DeferredResult<Message> result) {
resultQueue.add(result);
}
}
爲DeferredResultService作的第一個修改就是自動匹配ShutdownService實例。接着在線程建立之後thread.start()調用以前使用ShutdownService建立一個Hook實例:
thread = new Thread(this, "Studio Teletype");
hook = shutdownService.createHook(thread);
thread.start();
最後將while(true)替換爲:
while (hook.keepRunning()) {
……通知線程何時須要結束while循環並關閉。
你可能已經注意到上面的代碼裏有一些System.out.println()調用。緣由並非對關閉hook線程的執行順序不肯定。須要記住,不只僅是你編寫的類試圖關閉其餘的子系統也是如此。這就是爲這在我原來的代碼中,logger.info(…)會給出下面的異常輸出:
Exception in thread "Studio Teletype" java.lang.NoClassDefFoundError: org/apache/log4j/spi/ThrowableInformation
at org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java:159)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:382)
at com.captaindebug.longpoll.service.DeferredResultService.run(DeferredResultService.java:75)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.ThrowableInformation
at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1714)
at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1559)
... 6 more
這裏的異常是由於logger在調用時已經被卸載了,所以會給出報錯。再一次,文檔中是這麼描述的:「Shutdown hook是在JVM生命週期中的一個微妙的時間執行,所以須要進行防護性變成。尤爲是應該注意線程安全儘量地避免死鎖。Hook代碼應該不對任何服務盲目依賴,由於這些服務可能會註冊本身的shutdown hook而且此時也在關閉的過程當中。例如,試圖使用基於線程的服務好比AWT時間分發線程會致使死鎖。
MatchReport類也須要進行相似的修改。主要的區別在於run() 方法中的hook.keepRunning()是一個for循環:
public class MatchReporter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MatchReporter.class);
private final Match match;
private final Queue<Message> queue;
private volatile boolean start = true;
@Autowired
private ShutdownService shutdownService;
private Hook hook;
public MatchReporter(Match theBigMatch, Queue<Message> queue) {
this.match = theBigMatch;
this.queue = queue;
}
/**
* 由Spring加載上下文以後調用。會啓動匹配……
*/
public void start() {
if (start) {
synchronized (this) {
if (start) {
start = false;
logger.info("Starting the Match Reporter...");
String name = match.getName();
Thread thread = new Thread(this, name);
hook = shutdownService.createHook(thread);
thread.start();
}
}
} else {
logger.warn("Game already in progress");
}
}
/**
* The main run loop
*/
@Override
public void run() {
sleep(5); // Sleep等待應用加載
logger.info("The match has now started...");
long now = System.currentTimeMillis();
List<Message> matchUpdates = match.getUpdates();
for (Message message : matchUpdates) {
delayUntilNextUpdate(now, message.getTime());
if (!hook.keepRunning()) {
break;
}
logger.info("Add message to queue: {}", message.getMessageText());
queue.add(message);
}
start = true; // Game over, can restart
logger.warn("GAME OVER");
}
private void sleep(int deplay) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
logger.info("Sleep interrupted...");
}
}
private void delayUntilNextUpdate(long now, long messageTime) {
while (System.currentTimeMillis() < now + messageTime) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
logger.info("MatchReporter Thread interrupted...");
}
}
}
}
最終的代碼測試是在匹配更新過程到一半時執行Tomcat shutdown.sh命令。當JVM終止時會經過ShutdownDaemonHook類調用shutdown hook,其中的run()方法會對Hook實例列表循環執行通知他們關閉各自的線程。若是你執行tail -f查看服務器的日誌文件(我這裏是catalina.out,你的Tomcat配置可能與我不一樣),你會看到服務器友好地關閉記錄。
本文附帶的代碼能夠在GitHub上找到:https://github.com/roghughe/captaindebug/tree/master/long-poll。