跟我學RocketMQ之批量消息發送源碼解析

上篇文章 跟我學RocketMQ之消息發送源碼解析 中,咱們已經對普通消息的發送流程進行了詳細的解釋,可是因爲篇幅問題沒有展開講解批量消息的發送。本文中,咱們就一塊兒來集中分析一下批量消息的發送是怎樣的邏輯。java

DefaultProducer.send

RocketMQ提供了批量發送消息的API,一樣在DefaultProducer.java中數組

@Override
    public SendResult send(
        Collection<Message> msgs) throws MQClientException, RemotingException, 
            MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs));
    }複製代碼

它的參數爲Message集合,也就是一批消息。它的另一個重載方法提供了發送超時時間參數框架

@Override
    public SendResult send(Collection<Message> msgs,
        long timeout) throws MQClientException, RemotingException,
             MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(batch(msgs), timeout);
    }複製代碼

能夠看到是將消息經過batch()方法打包爲單條消息,咱們看一下batch方法的邏輯ide

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {複製代碼
// 聲明批量消息體
        MessageBatch msgBatch;
        try {複製代碼
// 從Message的list生成批量消息體MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 設置消息體,此時的消息體已是處理事後的批量消息體
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 設置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }複製代碼

從代碼能夠看到,核心思想是將一批消息(Collection msgs)打包爲MessageBatch對象,咱們看下MessageBatch的聲明 源碼分析

public class MessageBatch extends Message implements Iterable<Message> {複製代碼
private final List<Message> messages;複製代碼
private MessageBatch(List<Message> messages) {
            this.messages = messages;
        }複製代碼

能夠看到MessageBatch繼承自Message,持有List 引用。 學習

咱們接着看一下generateFromList方法this

MessageBatch.generateFromList

public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;複製代碼
// 首先實例化一個Message的list
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;複製代碼
// 對messages集合進行遍歷
        for (Message message : messages) {複製代碼
// 判斷延時級別,若是大於0拋出異常,緣由爲:批量消息發送不支持延時
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException
                    ("TimeDelayLevel in not supported for batching");
            }複製代碼
// 判斷topic是否以 **"%RETRY%"** 開頭,若是是,
            // 則拋出異常,緣由爲:批量發送消息不支持消息重試
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }複製代碼
// 判斷集合中的每一個Message的topic與批量發送topic是否一致,
            // 若是不一致則拋出異常,緣由爲:
            // 批量消息中的每一個消息實體的Topic要和批量消息總體的topic保持一致。
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }複製代碼
// 判斷批量消息的首個Message與其餘的每一個Message實體的等待消息存儲狀態是否相同,
                // 若是不一樣則報錯,緣由爲:批量消息中每一個消息的waitStoreMsgOK狀態均應該相同。
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }複製代碼
// 校驗經過後,將message實體添加到messageList中
            messageList.add(message);
        }複製代碼
// 將處理完成的messageList做爲構造方法,
        // 初始化MessageBatch實體,並設置topic以及isWaitStoreMsgOK狀態。
        MessageBatch messageBatch = new MessageBatch(messageList);複製代碼
messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }複製代碼

總結一下,generateFromList方法對調用方設置的Collection 集合進行遍歷,通過前置校驗以後,轉換爲MessageBatch對象並返回給DefaultProducer.batch方法中,咱們接着看DefaultProducer.batch的邏輯。 編碼

到此,經過MessageBatch.generateFromList方法,將發送端傳入的一批消息集合轉換爲了MessageBatch實體。spa

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {複製代碼
// 聲明批量消息體
        MessageBatch msgBatch;
        try {
            // 從Message的list生成批量消息體MessageBatch
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            // 設置消息體,此時的消息體已是處理事後的批量消息體
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        // 設置topic
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }複製代碼

注意下面這行代碼:.net

// 設置消息體,此時的消息體已是處理事後的批量消息體
        msgBatch.setBody(msgBatch.encode());複製代碼

這裏對MessageBatch進行消息編碼處理,經過調用MessageBatch的encode方法實現,代碼邏輯以下:

public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }複製代碼

能夠看到是經過靜態方法 encodeMessages(List messages) 實現的。

咱們看一下encodeMessages方法的邏輯:

public static byte[] encodeMessages(List<Message> messages) {
        //TO DO refactor, accumulate in one buffer, avoid copies
        List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
        int allSize = 0;
        for (Message message : messages) {複製代碼
// 遍歷messages集合,分別對每一個Message實體進行編碼操做,轉換爲byte[]
            byte[] tmp = encodeMessage(message);
            // 將轉換後的單個Message的byte[]設置到encodedMessages中
            encodedMessages.add(tmp);
            // 批量消息的二進制數據長度隨實際消息體遞增
            allSize += tmp.length;
        }
        byte[] allBytes = new byte[allSize];
        int pos = 0;
        for (byte[] bytes : encodedMessages) {
            // 遍歷encodedMessages,按序複製每一個Message的二進制格式消息體
            System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
            pos += bytes.length;
        }
        // 返回批量消息總體的消息體二進制數組
        return allBytes;
    }複製代碼

encodeMessages的邏輯在註釋中分析的已經比較清楚了,其實就是遍歷messages,並按序拼接每一個Message實體的二進制數組格式消息體並返回。

咱們能夠繼續看一下單個Message是如何進行編碼的,調用了 MessageDecoder.encodeMessage(message) 方法,邏輯以下:

public static byte[] encodeMessage(Message message) {
        //only need flag, body, properties
        byte[] body = message.getBody();
        int bodyLen = body.length;
        String properties = messageProperties2String(message.getProperties());
        byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
        //note properties length must not more than Short.MAX
        short propertiesLength = (short) propertiesBytes.length;
        int sysFlag = message.getFlag();
        int storeSize = 4 // 1 TOTALSIZE
            + 4 // 2 MAGICCOD
            + 4 // 3 BODYCRC
            + 4 // 4 FLAG
            + 4 + bodyLen // 4 BODY
            + 2 + propertiesLength;
        ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
        // 1 TOTALSIZE
        byteBuffer.putInt(storeSize);複製代碼
// 2 MAGICCODE
        byteBuffer.putInt(0);複製代碼
// 3 BODYCRC
        byteBuffer.putInt(0);複製代碼
// 4 FLAG
        int flag = message.getFlag();
        byteBuffer.putInt(flag);複製代碼
// 5 BODY
        byteBuffer.putInt(bodyLen);
        byteBuffer.put(body);複製代碼
// 6 properties
        byteBuffer.putShort(propertiesLength);
        byteBuffer.put(propertiesBytes);複製代碼
return byteBuffer.array();
    }複製代碼

這裏其實就是將消息按照RocektMQ的消息協議進行編碼,格式爲:

消息總長度          ---  4字節
    魔數                --- 4字節
    bodyCRC校驗碼       --- 4字節
    flag標識            --- 4字節
    body長度            --- 4字節
    消息體              --- 消息體實際長度N字節
    屬性長度            --- 2字節
    擴展屬性            --- N字節複製代碼

經過encodeMessage方法處理以後,消息便會被編碼爲固定格式,最終會被Broker端進行處理並持久化。

其餘

到此即是批量消息發送的源碼分析,實際上RocketMQ在處理批量消息的時候是將其解析爲單個消息再發送的,這樣就在底層統一了單條消息、批量消息發送的邏輯,讓整個框架的設計更加健壯,也便於咱們進行理解學習。

後續的發送流程這裏就再也不重複展開了,感興趣的同窗能夠移步咱們的上一篇文章查看

跟我學RocketMQ之消息發送源碼解析

批量消息的源碼分析就暫時告一段落,更多的源碼分析隨後奉上,感謝您的閱讀。

版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。
相關文章
相關標籤/搜索