rocketmq源碼解析結束事務處理器①

說在前面java

結束事務處理器node

 

源碼解析apache

進入這個方法,org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest緩存






@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.info("Transaction request:{}", requestHeader);//        若是broker是從節點if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();//        事務提交if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {//            事務消息提交=》result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {//                =》RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {//                    組裝結束事務消息=》MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());//                    發送最終消息=》RemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {//                        刪除準備消息=》this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {//            事務消息回滾=》result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {//                    刪除準備消息=》this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;    }

進入這個方法,事務消息提交,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#commitMessage微信

@Overridepublic OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {return getHalfMessageByOffset(requestHeader.getCommitLogOffset());    }

進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffsetapp

private OperationResult getHalfMessageByOffset(long commitLogOffset) {OperationResult response = new OperationResult();//        根據offset查詢消息=》MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);if (messageExt != null) {response.setPrepareMessage(messageExt);response.setResponseCode(ResponseCode.SUCCESS);} else {response.setResponseCode(ResponseCode.SYSTEM_ERROR);response.setResponseRemark("Find prepared transaction message failed");}return response;    }

進入這個方法,根據offset查詢消息,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#lookMessageByOffseteclipse

public MessageExt lookMessageByOffset(final long commitLogOffset) {return this.store.lookMessageByOffset(commitLogOffset);    }

進入這個方法,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)異步

public MessageExt lookMessageByOffset(long commitLogOffset) {//        根據commitLog查詢SelectMappedBufferResult=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {// 1 TOTALSIZEint size = sbr.getByteBuffer().getInt();//                根據offset和大小查詢消息=》return lookMessageByOffset(commitLogOffset, size);} finally {sbr.release();}}return null;    }

進入這個方法,根據commitLog查詢SelectMappedBufferResult,org.apache.rocketmq.store.CommitLog#getMessageide

public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();//        根據offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null;    }

進入這個方法,根據offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)ui




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;    }

往上返回到這個方法,根據offset和大小查詢消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long, int)

public MessageExt lookMessageByOffset(long commitLogOffset, int size) {//        根據offset和大小查詢offset=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);if (null != sbr) {try {return MessageDecoder.decode(sbr.getByteBuffer(), true, false);} finally {sbr.release();}}return null;    }

進入這個方法,根據offset和大小查詢offset,org.apache.rocketmq.store.CommitLog#getMessage上面介紹過了。

往上返回到這個方法,檢查準備提交的事務消息,org.apache.rocketmq.broker.processor.EndTransactionProcessor#checkPrepareMessage


private RemotingCommand checkPrepareMessage(MessageExt msgExt, EndTransactionRequestHeader requestHeader) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);if (msgExt != null) {final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (!pgroupRead.equals(requestHeader.getProducerGroup())) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The producer group wrong");return response;}if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The transaction state table offset wrong");return response;}if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The commit log offset wrong");return response;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Find prepared transaction message failed");return response;}response.setCode(ResponseCode.SUCCESS);return response;    }

進入這個方法,發送最終消息,org.apache.rocketmq.broker.processor.EndTransactionProcessor#sendFinalMessage

final RemotingCommand response = RemotingCommand.createResponseCommand(null);//        存儲消息=》final PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SUCCESS);response.setRemark(null);break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Create mapped file failed.");break;//                    消息太大case MESSAGE_ILLEGAL://                    消息屬性太大case PROPERTIES_SIZE_EXCEEDED:response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("The message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");break;case SERVICE_NOT_AVAILABLE:response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);response.setRemark("Service not available now.");break;//                    系統繁忙case OS_PAGECACHE_BUSY:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("OS page cache busy, please try another machine");break;case UNKNOWN_ERROR:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR");break;default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}return response;} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");}return response;    }

進入這個方法,存儲消息,org.apache.rocketmq.store.DefaultMessageStore#putMessage











public PutMessageResult putMessage(MessageExtBrokerInner msg) {//        存儲服務不可用if (this.shutdown) {log.warn("message store has shutdown, so putMessage is forbidden");return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}//        broker角色不是masterif (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is slave mode, so putMessage is forbidden ");}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}//        沒有寫權限if (!this.runningFlags.isWriteable()) {long value = this.printTimes.getAndIncrement();if ((value % 50000) == 0) {log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}//        topic長度不合法if (msg.getTopic().length() > Byte.MAX_VALUE) {log.warn("putMessage message topic length too long " + msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}//        消息屬性長度不合法if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}//        系統繁忙if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime = this.getSystemClock().now();//        commitLog存儲消息=》PutMessageResult result = this.commitLog.putMessage(msg);long eclipseTime = this.getSystemClock().now() - beginTime;if (eclipseTime > 500) {log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;    }

進入這個方法,commitLog存儲消息,org.apache.rocketmq.store.CommitLog#putMessage


















public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();//        從消息中獲取topicString topic = msg.getTopic();//        從消息中獲取queueIdint queueId = msg.getQueueId();//        獲取事務類型final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());//        若是沒有事務或提交事務延遲執行if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//                設置延遲消息的topictopic = ScheduleMessageService.SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId 備份真正的topic和queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock = 0;MappedFile unlockMappedFile = null;//        獲取映射文件隊列的最後一個映射文件MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//        自旋鎖或者互斥鎖putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();//            開始鎖定時間this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);if (null == mappedFile || mappedFile.isFull()) {//                映射文件不存在或者映射文件滿了以起始位置的offset獲取最後的映射文件=》mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}//            映射文件中添加消息=》result = mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;//                    映射文件不存在或者映射文件滿了case END_OF_FILE:unlockMappedFile = mappedFile;// Create a new file, re-write the message 建立一個文件讀寫消息mappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}//                    處理消息=》result = mappedFile.appendMessage(msg, this.appendMessageCallback);break;//                    消息過大case MESSAGE_SIZE_EXCEEDED://                    消息屬性過大case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock = 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {putMessageLock.unlock();}if (eclipseTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {//            解鎖映射文件this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// Statistics 單次存儲消息topic次數storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();//        單次存儲消息topic大小storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());//        磁盤刷新=》handleDiskFlush(result, putMessageResult, msg);//        主從刷新=》handleHA(result, putMessageResult, msg);return putMessageResult;    }

進入這個方法,映射文件不存在或者映射文件滿了以起始位置的offset獲取最後的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean)







public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;//        獲取映射文件隊列中最後一個映射文件MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) {//            建立的offset=最後映射文件的開始offset+映射文件的大小createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}//        建立文件的offset不是-1且須要建立映射文件if (createOffset != -1 && needCreate) {//            下個文件存儲路徑 System.getProperty("user.home") + File.separator + "store"//            + File.separator + "commitlog",根據offset建立文件名String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);//            下下個文件存儲路經String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {//                處理請求返回映射文件=》mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {//                    建立映射文件=》mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;    }

進入這個方法,處理請求返回映射文件,org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile






public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;//        是否瞬間持久化if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//            若是broker是master,buffer不夠用瞬間失敗if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in poolcanSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();}}AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);//        緩存存儲請求boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextFilePath);return null;}//            下一個請求添加到優先級阻塞隊列中boolean offerOK = this.requestQueue.offer(nextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}canSubmitRequests--;}AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);//        緩存下下個請求boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextNextFilePath);} else {//                下下個請求加入優先級阻塞隊列boolean offerOK = this.requestQueue.offer(nextNextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}}}if (hasException) {log.warn(this.getServiceName() + " service has exception. so return null");return null;}AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {//                同步等待boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);return result.getMappedFile();}} else {log.error("find preallocate mmap failed, this never happen");}} catch (InterruptedException e) {log.warn(this.getServiceName() + " service has exception. ", e);}return null;    }

往上返回到這個方法,映射文件中添加消息,org.apache.rocketmq.store.MappedFile#appendMessagesInner


public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt != null;assert cb != null;//        獲取當前寫的位置int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result = null;if (messageExt instanceof MessageExtBrokerInner) {//                消息序列化後組裝映射的buffer=》result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {//                批量消息序列化後組裝映射的bufferresult = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);    }

進入這個方法,消息序列化後組裝映射的buffer,org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)
















public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) {// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>// PHY OFFSET 寫的offsetlong wroteOffset = fileFromOffset + byteBuffer.position();this.resetByteBuffer(hostHolder, 8);String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);// Record ConsumeQueue informationkeyBuilder.setLength(0);keyBuilder.append(msgInner.getTopic());keyBuilder.append('-');keyBuilder.append(msgInner.getQueueId());String key = keyBuilder.toString();//            緩存topic、queue和offset信息Long queueOffset = CommitLog.this.topicQueueTable.get(key);if (null == queueOffset) {queueOffset = 0L;CommitLog.this.topicQueueTable.put(key, queueOffset);}// Transaction messages that require special handlingfinal int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) {// Prepared and Rollback message is not consumed, will not enter the// consumer queueccase MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset = 0L;break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}/*** Serialize message*/final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;//            消息屬性長度過大if (propertiesLength > Short.MAX_VALUE) {log.warn("putMessage message properties length too long. length={}", propertiesData.length);return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);}//            獲取topic數據final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);final int topicLength = topicData.length;final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;//            計算消息長度final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);// Exceeds the maximum messageif (msgLen > this.maxMessageSize) {CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength+ ", maxMessageSize: " + this.maxMessageSize);return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);}// Determines whether there is sufficient free spaceif ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);}// Initialization of storage spacethis.resetByteBuffer(msgStoreItemMemory, msgLen);// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);// 3 BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());// 4 QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());// 5 FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());// 6 QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);// 7 PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());// 8 SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());// 10 BORNHOSTthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));// 11 STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());// 12 STOREHOSTADDRESSthis.resetByteBuffer(hostHolder, 8);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));//this.msgBatchMemory.put(msgInner.getStoreHostBytes());// 13 RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());// 14 Prepared Transaction Offsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());// 15 BODYthis.msgStoreItemMemory.putInt(bodyLength);if (bodyLength > 0)this.msgStoreItemMemory.put(msgInner.getBody());// 16 TOPICthis.msgStoreItemMemory.put((byte) topicLength);this.msgStoreItemMemory.put(topicData);// 17 PROPERTIESthis.msgStoreItemMemory.putShort((short) propertiesLength);if (propertiesLength > 0)this.msgStoreItemMemory.put(propertiesData);final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue bufferbyteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}return result;        }

往上返回到這個方法,磁盤刷新,org.apache.rocketmq.store.CommitLog#handleDiskFlush

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// Synchronization flush 同步刷新if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);//                countdownLatch.await() 同步等待刷新結果,除非超時boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {//                若是異步直接解除阻塞 countdownLatch.countDown()service.wakeup();}}// Asynchronous flush 異步刷新else {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}    }

往上返回到這個方法,主從刷新,org.apache.rocketmq.store.CommitLog#handleHA

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {//        若是master同步刷新if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service = this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();//                    countDownLatch.await 同步等待刷新,除非等待超時boolean flushOK =request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}

下篇繼續。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索