Java消息隊列任務的平滑關閉

1.問題背景

對於消息隊列任務的監聽,咱們通常使用Java寫一個獨立的程序,在Linux服務器上運行。當訂閱者程序啓動後,會經過消息隊列客戶端接收消息,放入線程池中併發的處理。java

那麼問題來了,當咱們修改程序後,須要從新啓動時,如何保證消息都可以被處理呢?linux

一些開源的消息隊列中間件,會提供ACK機制(消息確認機制),當訂閱者處理完消息後,會通知服務端刪除對應消息,若是訂閱者出現異常,服務端未收到確認消費,則會重試發送。服務器

那若是消息隊列中間件沒有提供ACK機制,或者爲了高吞度量的考慮關閉了ACK功能,如何最大可能保證消息都可以被處理呢?併發

正常來講,訂閱者程序關閉後,消息會在隊列中堆積,等待訂閱者下次訂閱消費,因此未接收的消息是不會丟失的。可能出現的問題就是在關閉的一瞬間,已經從消息隊列中取出,但尚未被處理的消息。ide

所以咱們須要一套平滑關閉的機制,保證在重啓的時候,已接收的消息能夠獲得正常處理。線程

2.問題分析

平滑關閉的思路以下:code

  1. 在關閉程序時,首先關閉消息訂閱,保證再也不接收新的消息。
  2. 關閉線程池,等待線程池中的消息處理完畢。
  3. 程序退出。

關閉消息訂閱:消息隊列的客戶端都會提供關閉鏈接的方法,具體能夠自行查看API。orm

關閉線程池:Java的ThreadPoolExecutor線程池提供shutdown()shutdownNow()兩個方法,區別是前者會等待線程池中的消息都處理完畢,後者會直接中止全部線程並返回未處理完的線程List。由於咱們須要使用shutdown()方法進行關閉,並經過isTerminated()方法,判斷線程池是否已經關閉。中間件

那麼問題又來了,咱們如何通知到程序,須要執行關閉操做呢?隊列

在Linux中,進程的關閉是經過信號傳遞的,咱們能夠用kill -9 pid關閉進程,除了-9以外,咱們能夠經過 kill -l,查看kill命令的其它信號量。

linux信號量

這裏提供兩種關閉方法:

  1. 程序中添加Runtime.getRuntime().addShutdownHook鉤子方法,SIGTERM,SIGINT,SIGHUP三種信號都會觸發該方法(分別對應kill -1/kill -2/kill -15,Ctrl+C也會觸發SIGINT信號)。

  2. 程序中經過Signal類註冊信號監聽,好比USR2(對應kill -12),在handle方法中執行關閉操做。

補充說明:addShutdownHook方法和handle方法中若是再調用System.exit,會形成deadlock,使進程沒法正常退出。

僞代碼分別以下

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        //關閉訂閱者
        //關閉線程池
        //退出
    }
});
//註冊linux kill信號量  kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
    @Override
    public void handle(Signal signal) {
        //關閉訂閱者
        //關閉線程池
        //退出
    }
});

模擬Demo

下面經過一個demo模擬相關邏輯操做

首先模擬一個生產者,每秒生產5個消息

而後模擬一個訂閱者,收到消息後,放入線程池進行處理,線程池固定4個線程,每一個線程處理時間1秒,這樣線程池每秒會積壓1個消息。

package com.lujianing.demo;

import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */
public class MsgClient {

    //模擬消費線程池 同時4個線程處理
    private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
    
    //模擬消息生產任務
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
    
    //用於判斷是否關閉訂閱
    private static volatile boolean isClose = false;

    public static void main(String[] args) throws InterruptedException {
    
        //註冊鉤子方法
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                close();
            }
        });

        BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
        producer(queue);
        consumer(queue);

    }

    //模擬消息隊列生產者
    private static void producer(final BlockingQueue  queue){
        //每200毫秒向隊列中放入一個消息
        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
            public void run() {
                queue.offer("");
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }

    //模擬消息隊列消費者 生產者每秒生產5個   消費者4個線程消費1個1秒  每秒積壓1個
    private static void consumer(final BlockingQueue queue) throws InterruptedException {
        while (!isClose){
            getPoolBacklogSize();
            //從隊列中拿到消息
            final String msg = (String)queue.take();
            //放入線程池處理
            if(!THREAD_POOL.isShutdown()) {
                THREAD_POOL.execute(new Runnable() {
                    public void run() {
                        try {
                            //System.out.println(msg);
                            TimeUnit.MILLISECONDS.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }

    //查看線程池堆積消息個數
    private static long getPoolBacklogSize(){
        long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
        System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
        return backlog;
    }

    private static void close(){
        System.out.println("收到kill消息,執行關閉操做");
        //關閉訂閱消費
        isClose = true;
        //關閉線程池,等待線程池積壓消息處理
        THREAD_POOL.shutdown();
        //判斷線程池是否關閉
        while (!THREAD_POOL.isTerminated()) {
            try {
                //每200毫秒 判斷線程池積壓數量
                getPoolBacklogSize();
                TimeUnit.MILLISECONDS.sleep(200L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("訂閱者關閉,線程池處理完畢");
    }

    static {
        String osName = System.getProperty("os.name").toLowerCase();
        if(osName != null && osName.indexOf("window") == -1) {
            //註冊linux kill信號量  kill -12
            Signal sig = new Signal("USR2");
            Signal.handle(sig, new SignalHandler() {
                @Override
                public void handle(Signal signal) {
                    close();
                }
            });
        }
    }

}

模擬demo

當咱們在服務上運行時,經過控制檯能夠看到相關的輸出信息,demo中輸出了線程池的積壓消息個數

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

啓動進程

另打開一個終端,經過ps命令查看進程號,或者經過nohup啓動Java進程拿到進程id

ps -fe|grep MsgClient

kill進程

當咱們執行kill -12 pid的時候 能夠看到關閉業務邏輯

平滑關閉

3.總結

其實不僅僅消息隊列任務,在常見的RPC服務中也會見到相似的功能,好比58的SCF,在源碼中,也會分別註冊了USR2信號量和addShutdownHook鉤子方法。

在重啓腳本中,首先會發送kill -12命令,RPC服務收到信號後會修改Server狀態爲關閉。接着會發送kill -15命令,觸發鉤子方法,關閉全部的鏈接。

相關文章
相關標籤/搜索