上篇文章 跟我學RocketMQ之消息發送源碼解析 中,咱們已經對普通消息的發送流程進行了詳細的解釋,可是因爲篇幅問題沒有展開講解批量消息的發送。本文中,咱們就一塊兒來集中分析一下批量消息的發送是怎樣的邏輯。java
RocketMQ提供了批量發送消息的API,一樣在DefaultProducer.java中數組
@Override
public SendResult send(
Collection<Message> msgs) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}複製代碼
它的參數爲Message集合,也就是一批消息。它的另一個重載方法提供了發送超時時間參數框架
@Override
public SendResult send(Collection<Message> msgs,
long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}複製代碼
能夠看到是將消息經過batch()方法打包爲單條消息,咱們看一下batch方法的邏輯ide
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {複製代碼
// 聲明批量消息體
MessageBatch msgBatch;
try {複製代碼
// 從Message的list生成批量消息體MessageBatch
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
// 設置消息體,此時的消息體已是處理事後的批量消息體
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
// 設置topic
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}複製代碼
從代碼能夠看到,核心思想是將一批消息(Collection
public class MessageBatch extends Message implements Iterable<Message> {複製代碼
private final List<Message> messages;複製代碼
private MessageBatch(List<Message> messages) {
this.messages = messages;
}複製代碼
能夠看到MessageBatch繼承自Message,持有List
咱們接着看一下generateFromList方法this
public static MessageBatch generateFromList(Collection<Message> messages) {
assert messages != null;
assert messages.size() > 0;複製代碼
// 首先實例化一個Message的list
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;複製代碼
// 對messages集合進行遍歷
for (Message message : messages) {複製代碼
// 判斷延時級別,若是大於0拋出異常,緣由爲:批量消息發送不支持延時
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException
("TimeDelayLevel in not supported for batching");
}複製代碼
// 判斷topic是否以 **"%RETRY%"** 開頭,若是是,
// 則拋出異常,緣由爲:批量發送消息不支持消息重試
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
throw new UnsupportedOperationException("Retry Group is not supported for batching");
}複製代碼
// 判斷集合中的每一個Message的topic與批量發送topic是否一致,
// 若是不一致則拋出異常,緣由爲:
// 批量消息中的每一個消息實體的Topic要和批量消息總體的topic保持一致。
if (first == null) {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}複製代碼
// 判斷批量消息的首個Message與其餘的每一個Message實體的等待消息存儲狀態是否相同,
// 若是不一樣則報錯,緣由爲:批量消息中每一個消息的waitStoreMsgOK狀態均應該相同。
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}複製代碼
// 校驗經過後,將message實體添加到messageList中
messageList.add(message);
}複製代碼
// 將處理完成的messageList做爲構造方法,
// 初始化MessageBatch實體,並設置topic以及isWaitStoreMsgOK狀態。
MessageBatch messageBatch = new MessageBatch(messageList);複製代碼
messageBatch.setTopic(first.getTopic());
messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
return messageBatch;
}複製代碼
總結一下,generateFromList方法對調用方設置的Collection
到此,經過MessageBatch.generateFromList方法,將發送端傳入的一批消息集合轉換爲了MessageBatch實體。spa
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {複製代碼
// 聲明批量消息體
MessageBatch msgBatch;
try {
// 從Message的list生成批量消息體MessageBatch
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
// 設置消息體,此時的消息體已是處理事後的批量消息體
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
// 設置topic
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}複製代碼
注意下面這行代碼:.net
// 設置消息體,此時的消息體已是處理事後的批量消息體
msgBatch.setBody(msgBatch.encode());複製代碼
這裏對MessageBatch進行消息編碼處理,經過調用MessageBatch的encode方法實現,代碼邏輯以下:
public byte[] encode() {
return MessageDecoder.encodeMessages(messages);
}複製代碼
能夠看到是經過靜態方法 encodeMessages(List
咱們看一下encodeMessages方法的邏輯:
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {複製代碼
// 遍歷messages集合,分別對每一個Message實體進行編碼操做,轉換爲byte[]
byte[] tmp = encodeMessage(message);
// 將轉換後的單個Message的byte[]設置到encodedMessages中
encodedMessages.add(tmp);
// 批量消息的二進制數據長度隨實際消息體遞增
allSize += tmp.length;
}
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
// 遍歷encodedMessages,按序複製每一個Message的二進制格式消息體
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
// 返回批量消息總體的消息體二進制數組
return allBytes;
}複製代碼
encodeMessages的邏輯在註釋中分析的已經比較清楚了,其實就是遍歷messages,並按序拼接每一個Message實體的二進制數組格式消息體並返回。
咱們能夠繼續看一下單個Message是如何進行編碼的,調用了 MessageDecoder.encodeMessage(message) 方法,邏輯以下:
public static byte[] encodeMessage(Message message) {
//only need flag, body, properties
byte[] body = message.getBody();
int bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
//note properties length must not more than Short.MAX
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);複製代碼
// 2 MAGICCODE
byteBuffer.putInt(0);複製代碼
// 3 BODYCRC
byteBuffer.putInt(0);複製代碼
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);複製代碼
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);複製代碼
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);複製代碼
return byteBuffer.array();
}複製代碼
這裏其實就是將消息按照RocektMQ的消息協議進行編碼,格式爲:
消息總長度 --- 4字節
魔數 --- 4字節
bodyCRC校驗碼 --- 4字節
flag標識 --- 4字節
body長度 --- 4字節
消息體 --- 消息體實際長度N字節
屬性長度 --- 2字節
擴展屬性 --- N字節複製代碼
經過encodeMessage方法處理以後,消息便會被編碼爲固定格式,最終會被Broker端進行處理並持久化。
到此即是批量消息發送的源碼分析,實際上RocketMQ在處理批量消息的時候是將其解析爲單個消息再發送的,這樣就在底層統一了單條消息、批量消息發送的邏輯,讓整個框架的設計更加健壯,也便於咱們進行理解學習。
後續的發送流程這裏就再也不重複展開了,感興趣的同窗能夠移步咱們的上一篇文章查看
批量消息的源碼分析就暫時告一段落,更多的源碼分析隨後奉上,感謝您的閱讀。
版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。