不少時候服務都有平滑退出的需求,例如RPC服務在中止以後須要從註冊服務摘除節點、從消息隊列已經消費的消息須要正常處理完成等。通常地咱們但願能讓服務在退出前能執行完當前正在執行的任務,這個時候就須要咱們在JVM關閉的時候運行一些清理現場的代碼。java
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,容許用戶註冊一個JVM關閉的鉤子。這個鉤子能夠在如下幾種場景被調用:ide
通常地發佈系統會經過kill命令來中止服務。這個時候服務能夠接收到關閉信號並執行鉤子程序進行清理工做。測試
假設如下場景,有個生產者往內部隊列發消息,有個消費者讀取隊列消息並執行。當咱們中止服務的時候,但願隊列的消息都能正常處理完成,代碼示例以下:spa
/** * 服務關閉測試 */
public class ShutDownTest {
private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
private static AtomicLong taskId = new AtomicLong(0);
// 生產任務
private static class ProduceTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get()) {
long element = taskId.incrementAndGet();
queue.add(element);
System.out.println("add element : " + element);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop producer.");
}
}
// 消費任務
private static class ConsumeTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get() || queue.size() > 0) {
try {
long element = queue.take();
System.out.println("consume element : " + element);
doWork();
} catch (InterruptedException e) {
}
}
}
private void doWork() {
try {
// 消費速度比生產速度稍慢,模擬積壓狀況
Thread.sleep(60);
} catch (InterruptedException e) {
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop consumer.");
}
}
public static void main(String[] args) {
final ProduceTask producerTask = new ProduceTask();
final Thread producerThread = new Thread(producerTask);
final ConsumeTask consumeTask = new ConsumeTask();
Thread consumeThread = new Thread(consumeTask);
// 先啓動消費
consumeThread.start();
// 再啓動生產
producerThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("try close...");
// 先關閉生產
producerTask.setStopped();
// 再關閉消費
consumeTask.setStopped();
try {
System.out.println("close wait...");
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("close finished...");
}));
}
}
複製代碼
執行結果以下所示,能夠看到服務關閉的時候鉤子程序執行成功,在等待消息處理完成後才退出。線程
在使用ShutdownHook的時候,咱們每每控制不了鉤子的執行順序。Java.Runtime.addShutdownHook是對外公開的API接口。在前述場景裏面,倘若是獨立註冊鉤子,在更復雜的項目裏面是否是就沒辦法保證執行的順序呢?曾在實際場景中遇到過這樣的問題,從kafka隊列消費消息,交給內部線程池去處理,咱們自定義了線程池的拒絕策略爲一直等待(爲了保證消息確實處理),而後就會偶爾出現服務沒法關閉的問題。緣由正是線程池先被關閉,kafka隊列卻還在消費消息,致使消費線程一直在等待。code
Java同時提供了signal信號機制,咱們的服務也能夠接收到關閉信號。cdn
使用Signal機制有如下緣由:blog
這裏核心的緣由仍是但願能徹底保證服務關閉的順序,避免出現問題。咱們在服務內部按順序維護關閉任務,上述代碼調整後以下所示:接口
public class TermHelper {
private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
private static AtomicBoolean stopping = new AtomicBoolean(false);
private static AtomicBoolean registeredHolder = new AtomicBoolean(false);
private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();
private static void tryRegisterOnlyOnce() {
boolean previousRegistered = registeredHolder.getAndSet(true);
if (!previousRegistered) {
registerTermSignal();
}
}
private static void registerTermSignal() {
Signal.handle(new Signal("TERM"), signal -> {
boolean previous = signalTriggered.getAndSet(true);
if (previous) {
System.out.println("Term has been triggered.");
return;
}
termAndExit();
});
}
public static void addTerm(Runnable runnable) {
tryRegisterOnlyOnce();
terms.addLast(runnable);
}
public static void addFirstTerm(Runnable runnable) {
tryRegisterOnlyOnce();
terms.addFirst(runnable);
}
private static void termAndExit() {
try {
Thread current = Thread.currentThread();
current.setName(current.getName() + "(退出線程)");
System.out.println("do term cleanup....");
doTerm();
System.out.println("exit success.");
System.exit(0);
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
}
public static void doTerm() {
boolean previousStopping = stopping.getAndSet(true);
if (previousStopping) {
System.out.println("Term routine already running, wait until done!");
return;
}
for (Runnable runnable : terms) {
try {
System.out.println("execute term runnable : " + runnable);
runnable.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
TermHelper內部使用隊列維護關閉任務,在服務關閉的時候串行執行相關任務,保證其順序。咱們也能夠在此基礎上維護關閉任務的優先級,實現按優先級高低依次執行關閉任務。隊列
public class ShutDownTest {
private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);
private static AtomicLong taskId = new AtomicLong(0);
// 生產任務
private static class ProduceTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get()) {
long element = taskId.incrementAndGet();
queue.add(element);
System.out.println("add element : " + element);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop producer.");
}
}
// 消費任務
private static class ConsumeTask implements Runnable {
private AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void run() {
while (!stopped.get() || queue.size() > 0) {
try {
long element = queue.take();
System.out.println("consume element : " + element);
doWork();
} catch (InterruptedException e) {
}
}
}
private void doWork() {
try {
// 消費速度比生產速度稍慢,模擬積壓狀況
Thread.sleep(60);
} catch (InterruptedException e) {
}
}
public void setStopped() {
stopped.compareAndSet(false, true);
System.out.println("stop consumer.");
}
}
public static void main(String[] args) {
final ProduceTask producerTask = new ProduceTask();
final Thread producerThread = new Thread(producerTask);
final ConsumeTask consumeTask = new ConsumeTask();
Thread consumeThread = new Thread(consumeTask);
// 先啓動消費
consumeThread.start();
// 再啓動生產
producerThread.start();
TermHelper.addFirstTerm(() -> {
// 關閉生產
producerTask.setStopped();
});
TermHelper.addTerm(() -> {
// 再關閉消費
consumeTask.setStopped();
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("shut down hook...");
}));
}
}
複製代碼
執行結果以下所示。須要注意的是咱們只註冊了TERM信號,因此須要經過kill -TERM的方式關閉服務。從圖中能夠看到咱們測試的生產者和消費者都正常退出了,內部的消息最後也處理完成。
若須要平滑中止服務,咱們通常能夠經過ShutdownHook和Signal來實現。ShutdownHook通常比較難保證關閉任務的執行順序,這個時候能夠考慮使用Signal機制來徹底託管咱們關閉服務的執行順序。