此次源碼學習的方法是帶着問題學習源碼實現,問題列表以下前端
Broker 怎麼接收消息的?node
Broker 異常狀況下怎麼保證數據可靠性?segmentfault
Broker 怎麼保證存儲高吞吐量?後端
Broker 消息堆積應該怎麼處理?網絡
Broker 怎麼處理定時消息的?多線程
Broker 的buffer滿了怎麼辦?併發
Broker 怎麼處理定時消息的?app
Broker 連接複用嗎?dom
Broker 和Name Server的心跳怎麼實現的?eclipse
Broker 怎麼處理超時鏈接?
消息中轉角色,負責存儲消息,轉發消息,通常也稱爲 Server。在 JMS 規範中稱爲 Provider。可是RocketMQ的Broker和JMS1.1定義的不太同樣,好比JMS中P2P消息消費事後會刪除.
源碼探尋的入口從BrokerController.initialize開始,其中啓動了個NettyRemotingServer,註冊了不少處理器
this.remotingServer = new NettyRemotingServer,(this.nettyServerConfig, this.clientHousekeepingService); ... this.registerProcessor();
BrokerController.registerProcessor
把SendMessageProcessor(生產者發送消息處理器)註冊至NettyServer
public void registerProcessor() { SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
SendMessageProcessor.processRequest
在真正處理消息先後加上hook
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response; } }
SendMessageProcessor.sendMessage
構建內部存儲的MessageExtBrokerInner,而後委託給DefaultMessageStore作存儲
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand request, // final SendMessageContext sendMessageContext, // final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } } //存放消息 PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
DefaultMessageStore.putMessage
這裏的邏輯是先看數據能夠存儲不能夠,沒什麼問題的話再委託給CommitLog作存儲
public PutMessageResult putMessage(MessageExtBrokerInner msg) { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); //委託CommitLog存儲消息 PutMessageResult result = this.commitLog.putMessage(msg); long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 500) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
CommitLog.putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0; MappedFile unlockMappedFile = null; //獲取到要存儲文件的內存映射 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } //附加消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); //處理磁盤刷盤 handleDiskFlush(result, putMessageResult, msg); //處理高可用 handleHA(result, putMessageResult, msg); return putMessageResult; }
MappedFile.appendMessage
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { return appendMessagesInner(msg, cb); }
MappedFile.appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = null; if (messageExt instanceof MessageExtBrokerInner) { //繼續委託給CommitLog.doAppend result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
CommitLog.doAppend
爲結構化消息,又加點附屬信息,最終經過mappedByteBuffer存儲到內存中
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer //這裏終於把消息放到bytebuffer了,刷盤邏輯看下節 byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
至此,數據存儲於Broker磁盤最後的文件中了
異常狀況:
1. Broker 正常關閉 2. Broker 異常 Crash 3. OS Crash 4. 機器掉電,可是能當即恢復供電狀況。 5. 機器沒法開機(多是cpu、主板、內存等關鍵設備損壞) 6. 磁盤設備損壞。
1-4種狀況都屬於硬件資源可當即恢復狀況,RocketMQ在這四種狀況下能保證消息不丟,或者丟失少許數據(依賴刷盤方式是同步仍是異步).
5-6屬於單點故障,且沒法恢復,一旦發生,在此單點上的消息所有丟失。RocketMQ 在這兩種狀況下,經過異步複製,可保證99%的消息不丟,可是仍然會有極少許的消息可能丟失。經過同步雙寫技術能夠徹底避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。
刷盤策略:
RocketMQ 的全部消息都是持久化的,先寫入系統PAGECACHE,而後刷盤,能夠保證內存與磁盤都有一份數據,訪問時,直接從內存讀取。
異步刷盤:
在有 RAID 卡,SAS 15000 轉磁盤測試順序寫文件,速度能夠達到 300M 每秒左右,而線上的網卡通常都爲千兆網卡,寫磁盤速度明顯快於數據網絡入口速度,那麼是否能夠作到寫完內存就向用戶返回,由後臺線程刷盤呢?
(1). 因爲磁盤速度大於網卡速度,那麼刷盤的進度確定能夠跟上消息的寫入速度。
(2). 萬一因爲此時系統壓力過大,可能堆積消息,除了寫入 IO,還有讀取 IO,萬一出現磁盤讀取落後狀況,
會不會致使系統內存溢出,答案是否認的,緣由以下:
a) 寫入消息到 PAGECACHE 時,若是內存不足,則嘗試丟棄乾淨的 PAGE,騰出內存供新消息使用,策略是 LRU 方式。
b) 若是乾淨頁不足,此時寫入PAGECACHE會被阻塞,系統嘗試刷盤部分數據,大約每次嘗試 32 個 PAGE,異步刷盤:消息收到後,返回Producer Ok,同時調用消息存儲過程,異常狀況下,可能形成少許數據丟失來找出更多幹淨 PAGE。
綜上,內存溢出的狀況不會出現
同步刷盤:
>同步刷盤與異步刷盤的惟一區別是異步刷盤寫完 PAGECACHE 直接返回,而同步刷盤須要等待刷盤完成才返回, 同步刷盤流程以下:
(1). 寫入 PAGECACHE 後,線程等待,通知刷盤線程刷盤。
(2). 刷盤線程刷盤後,喚醒前端等待線程,多是一批線程。
(3). 前端等待線程向用戶返回成功。
代碼走讀:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { //構建同步刷盤請求 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); //通知刷盤 service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //異步耍 flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
CommitLog的構造方法中,同步刷和異步刷用兩個class實現
public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { //同步刷盤 this.flushCommitLogService = new GroupCommitService(); } else { //異步刷盤 this.flushCommitLogService = new FlushRealTimeService(); } this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() { @Override protected MessageExtBatchEncoder initialValue() { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); }
先看同步刷盤:GroupCommitService
通知刷盤
public synchronized void putRequest(final GroupCommitRequest request) { //把請求加入待寫列表 synchronized (this.requestsWrite) { this.requestsWrite.add(request); } //通知刷盤 if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
等待刷盤
public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { e.printStackTrace(); return false; } }
這裏也沒作什麼啊,只是經過閉鎖來等待刷盤,接下來找下哪裏countDownLatch.countDown.
public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); }
終於看到了刷盤flush
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { //重試機制刷盤 boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } //刷盤成功,釋放閉鎖 req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
又封裝了一層,接着往裏看
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
終於看到了Java NIO的操做,
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { //JAVA NIO 刷盤 this.fileChannel.force(false); } else { //JAVA NIO 刷盤 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
而doCommit又在另外一個線程裏面不停的循環.
猜想:同步刷盤爲何使用閉鎖控制另外一個線程進行刷盤操做(而不是同步方法),我認爲是io讀寫是個費時操做,須要控制timeout,主要使用countDownLatch.await(timeout, TimeUnit.MILLISECONDS)函數的超時功能才用多線程控制.
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 等待時機喚醒,而後執行flush操做 this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }
異步刷盤:
異步喚醒邏輯就是試圖喚醒刷盤邏輯而不等待阻塞.具體的刷盤邏輯就再也不看了,與同步相似,在FlushRealTimeService的run()方法內
flushCommitLogService.wakeup();
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
Broker系統的瓶頸在IO操做,RocketMQ使用的時文件存儲的方式,使用Java NIO的內存直接映射避免了文件到系統調用再到用戶空間的兩次調用,根據kafka官方文檔能夠達到600M/s.上述代碼中的MappedByteBuffer就是內存映射文件的Java實現.
關於NIO,能夠參見Java NIO-閱讀筆記及總結
消息中間件的主要功能是異步解耦,還有個重要功能是擋住前端的數據洪峯,保證後端系統的穩定性,這就要 求消息中間件具備必定的消息堆積能力,消息堆積分如下兩種狀況:
消息堆積在內存 Buffer,一旦超過內存 Buffer,能夠根據必定的丟棄策略來丟棄消息,如 CORBA Notification 規範中描述。適合能容忍丟棄消息的業務,這種狀況消息的堆積能力主要在於內存 Buffer 大小,並且消息 堆積後,性能降低不會太大,由於內存中數據多少對於對外提供的訪問能力影響有限。
消息堆積到持久化存儲系統中,例如DB,KV存儲,文件記錄形式。
當消息不能在內存 Cache 命中時,要不可避免的訪問磁盤,會產生大量讀 IO,讀 IO 的吞吐量直接決定了 消息堆積後的訪問能力。
評估消息堆積能力主要有如下四點:
消息能堆積多少條,多少字節?即消息的堆積容量。
依賴磁盤大小
b. 消息堆積後,發消息的吞吐量大小,是否會受堆積影響?無 SLAVE 狀況,會受必定影響
有 SLAVE 狀況,不受影響
c. 消息堆積後,正常消費的Consumer是否會受影響?
無 SLAVE 狀況,會受必定影響
有 SLAVE 狀況,不受影響
d . 消息堆積後,訪問堆積在磁盤的消息時,吞吐量有多大?
與訪問的併發有關,最慢會降到 5000 左右。
在有 Slave 狀況下,Master 一旦發現 Consumer 訪問堆積在磁盤的數據時,會向 Consumer 下達一個重定向指令,令 Consumer從Slave拉取數據,這樣正常的發消息與正常消費的Consumer都不會由於消息堆積受影響,由於系統將堆積場景與非堆積場景分割在了兩個不一樣的節點處理。這裏會產生另外一個問題,Slave會不會寫性能降低,答案是否認的。由於 Slave 的消息寫入只追求吞吐量,不追求實時性,只要總體的吞吐量高就能夠,而 Slave每次都是從Master拉取一批數據,如1M,這種批量順序寫入方式即便堆積狀況,總體吞吐量影響相對較小,只是寫入 RT 會變長。
Borker的定時處理原理是將定時消息放入特定的topic(SCHEDULE_TOPIC),而後經過後臺線程,到時間過再把msg放入原有topic.
放入ScheduleMessageService.SCHEDULE_TOPIC
在CommitLog.putMessage中有處理這段的邏輯,經過下降QueueId,並設置topic爲ScheduleMessageService.SCHEDULE_TOPIC,將原來的這兩個值放入屬性保存
if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }
後臺線程處理定時topic.ScheduleMessageService.SCHEDULE_TOPIC
BrokerController啓動時,會啓動一個定時線程來處理延時消息
if (this.messageStore != null) { this.messageStore.start(); } //上述代碼的 這裏的start public void start() throws Exception { this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } this.reputMessageService.start(); this.haService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; }
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //這個定時任務就是處理延遲消息的 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Exception e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
@Override public void run() { try { this.executeOnTimeup(); } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); } }
延遲putMessage,計算時間邏輯很複雜,暫時不深究
public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { /* * XXX: warn and notify me */ log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { /* */ long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
Broker 的 Buffer 一般指的是 Broker 中一個隊列的內存 Buffer 大小,這類 Buffer 一般大小有限,若是 Buffer 滿 了之後怎麼辦?
CORBA Notification 規範
(1). RejectNewEvents
拒絕新來的消息,向 Producer 返回 RejectNewEvents 錯誤碼。
(2). 按照特定策略丟棄已有消息
a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
b) FifoOrder - The first event received will be the first discarded.
c) LifoOrder - The last event received will be the first discarded.
d) PriorityOrder - Events should be discarded in priority order,
such that lower priority,events will be discarded before higher priority events.
e) DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first.
RocketMQ 沒有內存Buffer概念,RocketMQ的隊列都是持久化磁盤,數據按期清除。
對於此問題的解決思路,RocketMQ 同其餘 MQ 有很是顯著的區別,RocketMQ 的內存 Buffer 抽象成一個無限長度的隊列,無論有多少數據進來都能裝得下,這個無限是有前提的,Broker 會按期刪除過時的數據,例如 Broker 只保存3天的消息,那麼這個Buffer雖然長度無限,可是3天前的數據會被從隊尾刪除。
Broker HA怎麼實現的?
在刷盤後有與Slave通訊的邏輯,具體調用HAService中的服務,只是個tcp請求,邏輯比較簡單,就再也不詳細分析.這裏值得一提的是,這裏居然沒有委託netty實現,而使用原始的Java NIO請求和處理.
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }
1. Producer向Broker發送1條類型爲TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,而後返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,經過該變量能夠直接定位到消息自己),因爲該類型的消息在保存的時候,commitLogOffset沒有被保存到consumerQueue中,此時客戶端經過consumerQueue取不到commitLogOffset,因此該類型的消息沒法被取到,致使不會被消費。 2. Producer端的TransactionExecuterImpl執行本地操做,返回本地事務的狀態,而後發送一條類型爲TransactionCommitType或者TransactionRollbackType的消息到Broker確認提交或者回滾,Broker經過Request中的commitLogOffset,獲取到上面狀態爲TransactionPreparedType的消息(簡稱消息A),而後從新構造一條與消息A內容相同的消息B,設置狀態爲TransactionCommitType或者TransactionRollbackType,而後保存。其中TransactionCommitType類型的,會放commitLogOffset到consumerQueue中,TransactionRollbackType類型的,消息體設置爲空,不會放commitLogOffset到consumerQueue中.
上述第一步,prepared消息與普通消息相似,只不過不放入ConsumerQueue.
第二部,結束事務消息見EndTransactionProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); if (requestHeader.getFromTransactionCheck()) { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("check producer[{}] transaction state, but it's pending status." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { LOGGER.warn("check producer[{}] transaction state, the producer commit the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("check producer[{}] transaction state, the producer rollback the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } else { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("the producer[{}] end transaction in sending message, and it's pending status." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset()); if (msgExt != null) { final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (!pgroupRead.equals(requestHeader.getProducerGroup())) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the producer group wrong"); return response; } if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the transaction state table offset wrong"); return response; } if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the commit log offset wrong"); return response; } //構建與prepare同樣的消息,並設置flag爲commit或rollback MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(msgExt.getStoreTimestamp()); //若是rollback則消息體爲空 if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { msgInner.setBody(null); } final MessageStore messageStore = this.brokerController.getMessageStore(); final PutMessageResult putMessageResult = messageStore.putMessage(msgInner); if (putMessageResult != null) { switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SUCCESS); response.setRemark(null); break; // Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create mapped file failed."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark("service not available now."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("OS page cache busy, please try another machine"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; } return response; } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("find prepared transaction message failed"); return response; } return response; }
同一個網絡鏈接,客戶端多個線程能夠同時發送請求,應答響應經過 header 中的 opaque 字段來標識。
若是某個鏈接超過特定時間沒有活動(無讀寫事件),則自動關閉此鏈接,並通知上層業務,清除鏈接對應的 註冊信息。
Broker啓動時,會在定時線程池中每30秒註冊信息至Name Server
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);