RocketMQ之去重操做

以前聊過一個問題,RocketMQ的設計上,是不考慮消息去重的問題,即不考慮消息是否會重複的消費的問題,而是將這個問題拋給業務端本身去處理冪等的問題。前端

做爲RocketMQ的使用者,如今去討論RocketMQ爲什麼不支持消息去重的問題,己經是無關痛癢,而且也意義不大。若是站在如何設計一個消息隊列的角度去思考這個問題,這是設計上舍與得,無關對錯。而如今要考慮的是,既然它不支持消息去重,那麼就只能本身經過某些方式去保證消息去重。java

  • RocketMQ自己是否提供某些關鍵信息能夠幫助咱們去重
  • 業務上如何支持冪等

業務冪等

業務上支持冪等,實現起來定然不會很複雜,可是須要對如今的不少業務動刀,只有很是嚴謹的業務須要慎重考慮去重問題的時候纔會去考慮改造,從業務上支持冪等,從它整個實現思路可能都不是特別優雅,由於業務方與組件耦合了。而且,RocketMQ不支持消息去重就可能猜到若是要支持去重,整個吞吐量可能都會有嚴重的下滑。算法

假設設計一層db來解決業務冪等(好比經過記錄訂單id),那麼一條消息會有幾種狀態呢?數據庫

  • 消息不存在 NONE
  • 消息消費成功 SUCCESS
  • 消息還在處理 PROCESS
  • 消息消費失敗 ERROR

image.png

基於以上幾種狀態去考慮設計一個嚴謹的業務冪等的解決方案,整個吞吐量降低了很是多,一條消息的消費至少涉及三次db操做,其中兩次db writer,那麼回過頭來思考系統架構引入消息隊列須要去解決什麼樣的問題?後端

  • 解耦,提升響應速度
  • 流量削峯填谷
  • 數據異構
  • ......

在消息的消費速度遠遠低於生產者的生產速度,直接會形成大量的消息堆積,帶來的影響是很是嚴重的,好比不少業務的延遲大幅度提升,整個用戶體驗會不好,也許原先延遲僅在1s以內,忽然上升到1h甚至更久。前幾天阿里雲生產事故,能夠說是一片哀嚎,特別是消息隊列,有的人泰然處之,有的人慾哭無淚,有的人準備刪庫跑路……api

經過如下兩個方案至少能夠減輕若是組件方忽然出事故的狀況所帶來的生產事故:安全

  • 一致性方案
  • 消息去重

其餘

不經過業務上的各類惟一id來處理消息去重問題,而是基於原先對於RocketMQ的瞭解,下意識去考慮RocketMQ中的MsgId是否能夠做爲去重的關鍵點?網絡

具有去重最緊要的首要因素是——>該值能夠全局惟一的標識一條消息架構

你們都知道,在單機環境下想要生成惟一id是一件很是容易的事,好比經過數據庫主鍵來生成一個惟一的id。可是在分佈式做業環境下,想要生成一個全局惟一的id顯然是比較困難的,不過目前也有很是多的成熟方案能夠去處理,這裏不做贅述。在這裏更想提的是,RocketMQ提供全局惟一的id是如何作到的?併發

查看SendResult類能夠看到其中有兩個msgId

public class SendResult {
    private SendStatus sendStatus;
    private String msgId;// 客戶端生成的id
    private MessageQueue messageQueue;
    private long queueOffset;
    private String transactionId;
    private String offsetMsgId;//服務端生成的id
    private String regionId;
    private boolean traceOn = true;
  
    .....
}

客戶端生成惟一id

客戶端MsgId是怎麼生成的呢?以下源碼

public class MessageClientIDSetter {
    private static final String TOPIC_KEY_SPLITTER = "#";
    private static final int LEN;
    private static final String FIX_STRING;
    private static final AtomicInteger COUNTER;
    private static long startTime;
    private static long nextStartTime;

    static {
        LEN = 4 + 2 + 4 + 4 + 2;
        ByteBuffer tempBuffer = ByteBuffer.allocate(10);
        tempBuffer.position(2);
        tempBuffer.putInt(UtilAll.getPid());// 進程id
        tempBuffer.position(0);
        try {
            tempBuffer.put(UtilAll.getIP());// ip地址
        } catch (Exception e) {
            tempBuffer.put(createFakeIP());
        }
        tempBuffer.position(6);
        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());// 
        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
        setStartTime(System.currentTimeMillis());
        COUNTER = new AtomicInteger(0);
    }

    private synchronized static void setStartTime(long millis) {
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(millis);
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        startTime = cal.getTimeInMillis();
        cal.add(Calendar.MONTH, 1);
        nextStartTime = cal.getTimeInMillis();
    }

    public static Date getNearlyTimeFromID(String msgID) {
        ByteBuffer buf = ByteBuffer.allocate(8);
        byte[] bytes = UtilAll.string2bytes(msgID);
        buf.put((byte) 0);
        buf.put((byte) 0);
        buf.put((byte) 0);
        buf.put((byte) 0);
        buf.put(bytes, 10, 4);
        buf.position(0);
        long spanMS = buf.getLong();
        Calendar cal = Calendar.getInstance();
        long now = cal.getTimeInMillis();
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        long monStartTime = cal.getTimeInMillis();
        if (monStartTime + spanMS >= now) {
            cal.add(Calendar.MONTH, -1);
            monStartTime = cal.getTimeInMillis();
        }
        cal.setTimeInMillis(monStartTime + spanMS);
        return cal.getTime();
    }

    public static String getIPStrFromID(String msgID) {
        byte[] ipBytes = getIPFromID(msgID);
        return UtilAll.ipToIPv4Str(ipBytes);
    }

    public static byte[] getIPFromID(String msgID) {
        byte[] result = new byte[4];
        byte[] bytes = UtilAll.string2bytes(msgID);
        System.arraycopy(bytes, 0, result, 0, 4);
        return result;
    }

    public static String createUniqID() {
        StringBuilder sb = new StringBuilder(LEN * 2);
        sb.append(FIX_STRING);
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();
    }

    private static byte[] createUniqIDBuffer() {
        ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
        long current = System.currentTimeMillis();
        if (current >= nextStartTime) {
            setStartTime(current);
        }
        buffer.position(0);
        buffer.putInt((int) (System.currentTimeMillis() - startTime));
        buffer.putShort((short) COUNTER.getAndIncrement());
        return buffer.array();
    }

    public static void setUniqID(final Message msg) {
        if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
            msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
        }
    }

    public static String getUniqID(final Message msg) {
        return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    }

    public static byte[] createFakeIP() {
        ByteBuffer bb = ByteBuffer.allocate(8);
        bb.putLong(System.currentTimeMillis());
        bb.position(4);
        byte[] fakeIP = new byte[4];
        bb.get(fakeIP);
        return fakeIP;
    }
}

從源碼中能夠看到

|value|part| |--|--| |FIX_STRING | ip地址 + 進程id + MessageClientIDSetter.class.getClassLoader().hashCode()組成 | |COUNTER | 是AtomicInteger值,可保證併發操做下的安全性| |TIME | System.currentTimeMillis() - 當前月開始的時間|

MsgId = FIX_STRING + bytes2string(TIME + COUNTER)

  • ip地址決定了分佈式做業環境下生產的id值惟一
  • 進程id決定了單機上多個客戶端實例間生產的id值惟一
  • count做爲原子Integer類型,決定了單實例運行時高併發下生產的值惟一
  • time 乃當前時間戳 - 當月開始時間戳的long值,保證應用月內重啓不會重複。

什麼狀況下會出現id重複?

  • 應用不重啓,id不可能重複
  • 月初重啓,在各個條件均不變的狀況下,獲得的值可能跟上個月開始的值相等。可是RocketMQ另外的一個機制保證不會出現重複的數據,即默認刪除三天前的數據。(可配置)

從這個裏能夠看到,經過ip+進程+自增值+時間戳達到了一個月內的數據時不會重複的,又經過默認清理數據的機制保證整個MQ運行時MsgId不會重複出現。可是整體來講,算法自己依賴兩個條件達到的惟一性,一個是數據月內惟一性,以及數據清理機制。這個算法不適合全部的分佈式惟一id生成場景,可是它很是適合消息隊列這個場景,簡單而且性能好(好比相較分佈式鎖生成id)。

服務端生成惟一id

offsetMsgId生成方式更加簡單。

public class MessageDecoder {
   
    public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
        input.flip();
        input.limit(MessageDecoder.MSG_ID_LENGTH);//長度16位

        input.put(addr);// ip地址
        input.putLong(offset);// 物理分區的偏移量offset

        return UtilAll.bytes2string(input.array());
    }
}

經過服務端ip地址+服務端消息的物理分區偏移量來達到惟一值id。

經過MsgId或者offsetMsgId去重

在日常的開發工做中,咱們經常會提醒本身,不能去信任前端的關鍵數據,爲什麼?由於它傳輸到服務端的過程是極大可能被修改的,因此它不是可信任的。

MsgId是客戶端生成的id,它可不可靠?先不說被串改這樣的問題,從算法的角度分析它是可靠的,可是它存在一個致命的問題:客戶端發送至服務端消息時,有沒有可能重複發送一條消息?正常狀況下不可能,可是當存在網絡波動,網路延時等諸多問題時,消息從客戶端發送至服務端過程當中,服務端正常寫入了commit-log,可在響應客戶端(ACK)的時候失敗了……

image.png

結果如何?

多是兩條同樣的消息內容,卻有了不同的MsgId跟OffsetMsgId,最終它仍是重複消費了(這種狀況極少出現,適合那些業務較爲寬鬆的場景),可是因爲在消費端沒法直接取到MsgId的值(亦或者我還沒看到),因此若是要以之做爲去重id,過程須要本身實現。

OffsetMsgId是服務端生成的id,它可不可靠?很明顯它也存在前文中說的客戶端id的狀況,可是它的好處是消費端能夠經過api直接取到。從代碼實現的角度來說,以OffsetMsgId做爲去重id是更爲優雅的,RocketMQ 做爲第三方組件嵌入系統,相似去重這樣的工做若是能夠與業務隔離開,無疑是最合適不過的。

另,忍不住吐槽,在RocketMQ-Console的客戶端上重試消息的時候,拿到的MsgId跟消息正常消息的Id居然不相同。前者爲客戶端Id,後端爲服務端Id。

好比能夠考慮最簡單的方案,以下:

image.png

總結

一、極爲嚴謹的業務必須業務冪等。 二、寬鬆業務能夠考慮使用OffsetMsgId做爲去重id。 三、惟一id的兩種方式很是值得借鑑與思考,簡單並且優雅。

相關文章
相關標籤/搜索