本文主要研究一下rocketmq的maxMessageSizejava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagit
public class DefaultMQProducer extends ClientConfig implements MQProducer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // 4M public int getMaxMessageSize() { return maxMessageSize; } public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); } private MessageBatch batch(Collection<Message> msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch", e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/Validators.javagithub
public class Validators { public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"; public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR); public static final int CHARACTER_MAX_LENGTH = 255; //...... /** * Validate message */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } } //...... }
rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.javaapache
public class MessageStoreConfig { //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; //The directory in which the commitlog is kept @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; //...... // The maximum size of message,default is 4M private int maxMessageSize = 1024 * 1024 * 4; public int getMaxMessageSize() { return maxMessageSize; } public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } //...... }
在rocketmq安裝目錄的conf/broker.conf中指定maxMessageSize=65536
rocketmq/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.javathis
public class DefaultMessageStore implements MessageStore { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final MessageStoreConfig messageStoreConfig; // CommitLog private final CommitLog commitLog; //...... public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { if (this.shutdown) { log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); PutMessageResult result = this.commitLog.putMessages(messageExtBatch); long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; } //...... }
rocketmq的client端及broker端均有對消息體大小是否超出maxMessageSize進行校驗;client端的DefaultMQProducer定義了maxMessageSize,默認是4M大小;send方法及batch方法都會調用Validators.checkMessage(message, this)校驗消息;服務端conf/broker.conf能夠指定maxMessageSize大小;若是須要修改maxMessageSize大小須要跟服務端配合一塊兒修改spa