服務如何優雅關閉

背景

不少時候服務都有平滑退出的需求,例如RPC服務在中止以後須要從註冊服務摘除節點、從消息隊列已經消費的消息須要正常處理完成等。通常地咱們但願能讓服務在退出前能執行完當前正在執行的任務,這個時候就須要咱們在JVM關閉的時候運行一些清理現場的代碼。java

方案

ShutdownHook

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,容許用戶註冊一個JVM關閉的鉤子。這個鉤子能夠在如下幾種場景被調用:ide

  • 程序正常退出;
  • 使用System.exit();
  • 終端使用Ctrl+C觸發的終端;
  • 系統關閉;
  • 使用kill pid命令幹掉進程;

通常地發佈系統會經過kill命令來中止服務。這個時候服務能夠接收到關閉信號並執行鉤子程序進行清理工做。測試

場景示例

假設如下場景,有個生產者往內部隊列發消息,有個消費者讀取隊列消息並執行。當咱們中止服務的時候,但願隊列的消息都能正常處理完成,代碼示例以下:spa

/** * 服務關閉測試 */
public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生產任務
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消費任務
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消費速度比生產速度稍慢,模擬積壓狀況
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先啓動消費
        consumeThread.start();
        // 再啓動生產
        producerThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("try close...");
            // 先關閉生產
            producerTask.setStopped();
            // 再關閉消費
            consumeTask.setStopped();
            try {
                System.out.println("close wait...");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
            System.out.println("close finished...");
        }));
    }
}
複製代碼

執行結果以下所示,能夠看到服務關閉的時候鉤子程序執行成功,在等待消息處理完成後才退出。線程

鉤子示例結果

潛在問題

在使用ShutdownHook的時候,咱們每每控制不了鉤子的執行順序。Java.Runtime.addShutdownHook是對外公開的API接口。在前述場景裏面,倘若是獨立註冊鉤子,在更復雜的項目裏面是否是就沒辦法保證執行的順序呢?曾在實際場景中遇到過這樣的問題,從kafka隊列消費消息,交給內部線程池去處理,咱們自定義了線程池的拒絕策略爲一直等待(爲了保證消息確實處理),而後就會偶爾出現服務沒法關閉的問題。緣由正是線程池先被關閉,kafka隊列卻還在消費消息,致使消費線程一直在等待。code

Signal

Java同時提供了signal信號機制,咱們的服務也能夠接收到關閉信號。cdn

使用Signal機制有如下緣由:blog

  • ShutdownHook執行順序沒法保障,第三方組件也可能註冊,致使業務自定義的退出流程依賴的資源會被提早關閉和清理;
  • Signal是非公開API,第三方組件基本不多使用,咱們能夠在內部託管服務關閉的執行順序;
  • 在完成清理工做後能夠執行exit調用,保證資源清理不會影響ShutdownHook的退出清理邏輯;

這裏核心的緣由仍是但願能徹底保證服務關閉的順序,避免出現問題。咱們在服務內部按順序維護關閉任務,上述代碼調整後以下所示:接口

public class TermHelper {

    private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
    private static AtomicBoolean stopping = new AtomicBoolean(false);
    private static AtomicBoolean registeredHolder = new AtomicBoolean(false);

    private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();

    private static void tryRegisterOnlyOnce() {
        boolean previousRegistered = registeredHolder.getAndSet(true);
        if (!previousRegistered) {
            registerTermSignal();
        }
    }

    private static void registerTermSignal() {
        Signal.handle(new Signal("TERM"), signal -> {
            boolean previous = signalTriggered.getAndSet(true);
            if (previous) {
                System.out.println("Term has been triggered.");
                return;
            }
            termAndExit();
        });
    }

    public static void addTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addLast(runnable);
    }

    public static void addFirstTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addFirst(runnable);
    }

    private static void termAndExit() {
        try {
            Thread current = Thread.currentThread();
            current.setName(current.getName() + "(退出線程)");
            System.out.println("do term cleanup....");
            doTerm();
            System.out.println("exit success.");
            System.exit(0);
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void doTerm() {
        boolean previousStopping = stopping.getAndSet(true);
        if (previousStopping) {
            System.out.println("Term routine already running, wait until done!");
            return;
        }
        for (Runnable runnable : terms) {
            try {
                System.out.println("execute term runnable : " + runnable);
                runnable.run();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
複製代碼

TermHelper內部使用隊列維護關閉任務,在服務關閉的時候串行執行相關任務,保證其順序。咱們也能夠在此基礎上維護關閉任務的優先級,實現按優先級高低依次執行關閉任務。隊列

public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生產任務
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消費任務
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消費速度比生產速度稍慢,模擬積壓狀況
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先啓動消費
        consumeThread.start();
        // 再啓動生產
        producerThread.start();

        TermHelper.addFirstTerm(() -> {
            // 關閉生產
            producerTask.setStopped();
        });

        TermHelper.addTerm(() -> {
            // 再關閉消費
            consumeTask.setStopped();
        });

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("shut down hook...");
        }));
    }
}
複製代碼

執行結果以下所示。須要注意的是咱們只註冊了TERM信號,因此須要經過kill -TERM的方式關閉服務。從圖中能夠看到咱們測試的生產者和消費者都正常退出了,內部的消息最後也處理完成。

image-20190407212611347

小結

若須要平滑中止服務,咱們通常能夠經過ShutdownHook和Signal來實現。ShutdownHook通常比較難保證關閉任務的執行順序,這個時候能夠考慮使用Signal機制來徹底託管咱們關閉服務的執行順序。

相關文章
相關標籤/搜索