RocketMQ源碼解析:事務消息

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: html

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右

1. 概述

必須必須必須 前置閱讀內容:java

2. 事務消息發送

2.1 Producer 發送事務消息

  • 活動圖以下(結合 核心代碼 理解):

Producer發送事務消息
Producer發送事務消息

  • 實現代碼以下:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
  2: /** 3: * 發送事務消息 4: * 5: * @param msg 消息 6: * @param tranExecuter 【本地事務】執行器 7: * @param arg 【本地事務】執行器參數 8: * @return 事務發送結果 9: * @throws MQClientException 當 Client 發生異常時 10: */
 11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) 12: throws MQClientException {
 13:     if (null == tranExecuter) {
 14:         throw new MQClientException("tranExecutor is null", null);
 15:     }
 16:     Validators.checkMessage(msg, this.defaultMQProducer);
 17: 
 18:     // 發送【Half消息】
 19:     SendResult sendResult;
 20:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
 21:     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
 22:     try {
 23:         sendResult = this.send(msg);
 24:     } catch (Exception e) {
 25:         throw new MQClientException("send message Exception", e);
 26:     }
 27: 
 28:     // 處理髮送【Half消息】結果
 29:     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 30:     Throwable localException = null;
 31:     switch (sendResult.getSendStatus()) {
 32:         // 發送【Half消息】成功,執行【本地事務】邏輯
 33:         case SEND_OK: {
 34:             try {
 35:                 if (sendResult.getTransactionId() != null) { // 事務編號。目前開源版本暫時沒用到,猜測ONS在使用。
 36:                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
 37:                 }
 38: 
 39:                 // 執行【本地事務】邏輯
 40:                 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
 41:                 if (null == localTransactionState) {
 42:                     localTransactionState = LocalTransactionState.UNKNOW;
 43:                 }
 44: 
 45:                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
 46:                     log.info("executeLocalTransactionBranch return {}", localTransactionState);
 47:                     log.info(msg.toString());
 48:                 }
 49:             } catch (Throwable e) {
 50:                 log.info("executeLocalTransactionBranch exception", e);
 51:                 log.info(msg.toString());
 52:                 localException = e;
 53:             }
 54:         }
 55:         break;
 56:         // 發送【Half消息】失敗,標記【本地事務】狀態爲回滾
 57:         case FLUSH_DISK_TIMEOUT:
 58:         case FLUSH_SLAVE_TIMEOUT:
 59:         case SLAVE_NOT_AVAILABLE:
 60:             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
 61:             break;
 62:         default:
 63:             break;
 64:     }
 65: 
 66:     // 結束事務:提交消息 COMMIT / ROLLBACK
 67:     try {
 68:         this.endTransaction(sendResult, localTransactionState, localException);
 69:     } catch (Exception e) {
 70:         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
 71:     }
 72: 
 73:     // 返回【事務發送結果】
 74:     TransactionSendResult transactionSendResult = new TransactionSendResult();
 75:     transactionSendResult.setSendStatus(sendResult.getSendStatus());
 76:     transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
 77:     transactionSendResult.setMsgId(sendResult.getMsgId());
 78:     transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
 79:     transactionSendResult.setTransactionId(sendResult.getTransactionId());
 80:     transactionSendResult.setLocalTransactionState(localTransactionState);
 81:     return transactionSendResult;
 82: }
 83: 
 84: /** 85: * 結束事務:提交消息 COMMIT / ROLLBACK 86: * 87: * @param sendResult 發送【Half消息】結果 88: * @param localTransactionState 【本地事務】狀態 89: * @param localException 執行【本地事務】邏輯產生的異常 90: * @throws RemotingException 當遠程調用發生異常時 91: * @throws MQBrokerException 當 Broker 發生異常時 92: * @throws InterruptedException 當線程中斷時 93: * @throws UnknownHostException 當解碼消息編號失敗是 94: */
 95: public void endTransaction(// 96: final SendResult sendResult, // 97: final LocalTransactionState localTransactionState, // 98: final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
 99:     // 解碼消息編號
100:     final MessageId id;
101:     if (sendResult.getOffsetMsgId() != null) {
102:         id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
103:     } else {
104:         id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
105:     }
106: 
107:     // 建立請求
108:     String transactionId = sendResult.getTransactionId();
109:     final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
110:     EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
111:     requestHeader.setTransactionId(transactionId);
112:     requestHeader.setCommitLogOffset(id.getOffset());
113:     switch (localTransactionState) {
114:         case COMMIT_MESSAGE:
115:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
116:             break;
117:         case ROLLBACK_MESSAGE:
118:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
119:             break;
120:         case UNKNOW:
121:             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
122:             break;
123:         default:
124:             break;
125:     }
126:     requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
127:     requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
128:     requestHeader.setMsgId(sendResult.getMsgId());
129:     String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
130: 
131:     // 提交消息 COMMIT / ROLLBACK。!!!通訊方式爲:Oneway!!!
132:     this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
133: }複製代碼

2.2 Broker 處理結束事務請求

  • 🦅 查詢請求的消息,進行提交 / 回滾。實現代碼以下:
1: // ⬇️⬇️⬇️【EndTransactionProcessor.java】
  2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
  3:     final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  4:     final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
  5: 
  6:     // 省略代碼 =》打印日誌(只處理 COMMIT / ROLLBACK)
  7: 
  8:     // 查詢提交的消息
  9:     final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
 10:     if (msgExt != null) {
 11:         // 省略代碼 =》校驗消息
 12: 
 13:         // 生成消息
 14:         MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
 15:         msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
 16:         msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
 17:         msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
 18:         msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
 19:         if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
 20:             msgInner.setBody(null);
 21:         }
 22: 
 23:         // 存儲生成消息
 24:         final MessageStore messageStore = this.brokerController.getMessageStore();
 25:         final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
 26: 
 27:         // 處理存儲結果
 28:         if (putMessageResult != null) {
 29:             switch (putMessageResult.getPutMessageStatus()) {
 30:                 // Success
 31:                 case PUT_OK:
 32:                 case FLUSH_DISK_TIMEOUT:
 33:                 case FLUSH_SLAVE_TIMEOUT:
 34:                 case SLAVE_NOT_AVAILABLE:
 35:                     response.setCode(ResponseCode.SUCCESS);
 36:                     response.setRemark(null);
 37:                     break;
 38:                 // Failed
 39:                 case CREATE_MAPEDFILE_FAILED:
 40:                     response.setCode(ResponseCode.SYSTEM_ERROR);
 41:                     response.setRemark("create maped file failed.");
 42:                     break;
 43:                 case MESSAGE_ILLEGAL:
 44:                 case PROPERTIES_SIZE_EXCEEDED:
 45:                     response.setCode(ResponseCode.MESSAGE_ILLEGAL);
 46:                     response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
 47:                     break;
 48:                 case SERVICE_NOT_AVAILABLE:
 49:                     response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
 50:                     response.setRemark("service not available now.");
 51:                     break;
 52:                 case OS_PAGECACHE_BUSY:
 53:                     response.setCode(ResponseCode.SYSTEM_ERROR);
 54:                     response.setRemark("OS page cache busy, please try another machine");
 55:                     break;
 56:                 case UNKNOWN_ERROR:
 57:                     response.setCode(ResponseCode.SYSTEM_ERROR);
 58:                     response.setRemark("UNKNOWN_ERROR");
 59:                     break;
 60:                 default:
 61:                     response.setCode(ResponseCode.SYSTEM_ERROR);
 62:                     response.setRemark("UNKNOWN_ERROR DEFAULT");
 63:                     break;
 64:             }
 65: 
 66:             return response;
 67:         } else {
 68:             response.setCode(ResponseCode.SYSTEM_ERROR);
 69:             response.setRemark("store putMessage return null");
 70:         }
 71:     } else {
 72:         response.setCode(ResponseCode.SYSTEM_ERROR);
 73:         response.setRemark("find prepared transaction message failed");
 74:         return response;
 75:     }
 76: 
 77:     return response;
 78: }複製代碼

2.3 Broker 生成 ConsumeQueue

  • 🦅 事務消息,提交(COMMIT)後才生成 ConsumeQueue
1: // ⬇️⬇️⬇️【DefaultMessageStore.java】
  2: public void doDispatch(DispatchRequest req) {
  3:     // 非事務消息 或 事務提交消息 創建 消息位置信息 到 ConsumeQueue
  4:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
  5:     switch (tranType) {
  6:         case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事務消息
  7:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事務消息COMMIT
  8:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
  9:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
 10:             break;
 11:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事務消息PREPARED
 12:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事務消息ROLLBACK
 13:             break;
 14:     }
 15:     // 省略代碼 =》 創建 索引信息 到 IndexFile
 16: }複製代碼

3. 事務消息回查

  • 【事務消息回查】功能曾經開源過,目前(V4.0.0)暫未開源。以下是該功能的開源狀況:
版本 【事務消息回查】
官方V3.0.4 ~ V3.1.4 基於 文件系統 實現 已開源
官方V3.1.5 ~ V4.0.0 基於 數據庫 實現 未徹底開源

咱們來看看兩種狀況下是怎麼實現的。git

3.1 Broker 發起【事務消息回查】

3.1.1 官方V3.1.4:基於文件系統

倉庫地址:github.com/YunaiV/rock…github

相較於普通消息,【事務消息】多依賴以下三個組件:數據庫

  • TransactionStateService :事務狀態服務,負責對【事務消息】進行管理,包括存儲與更新事務消息狀態、回查事務消息狀態等等。
  • TranStateTable :【事務消息】狀態存儲。基於 MappedFileQueue 實現,默認存儲路徑爲 ~/store/transaction/statetable,每條【事務消息】狀態存儲結構以下:
第幾位 字段 說明 數據類型 字節數
1 offset CommitLog 物理存儲位置 Long 8
2 size 消息長度 Int 4
3 timestamp 消息存儲時間,單位:秒 Int 4
4 producerGroupHash producerGroup 求 HashCode Int 4
5 state 事務狀態 Int 4
  • TranRedoLogTranStateTable 重放日誌,每次寫操做 TranStateTable 記錄重放日誌。當 Broker 異常關閉時,使用 TranRedoLog 恢復 TranStateTable。基於 ConsumeQueue 實現,TopicTRANSACTION_REDOLOG_TOPIC_XXXX,默認存儲路徑爲 ~/store/transaction/redolog

簡單手繪邏輯圖以下😈:apache

Broker_V3.1.4_基於文件系統
Broker_V3.1.4_基於文件系統

3.1.1.1 存儲消息到 CommitLog

  • 🦅存儲【half消息】到 CommitLog 時,消息隊列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置)。這樣,消息能夠索引到本身對應的 TranStateTable 的位置和記錄。

核心代碼以下:後端

1: // ⬇️⬇️⬇️【DefaultAppendMessageCallback.java】
  2: class DefaultAppendMessageCallback implements AppendMessageCallback {
  3:     public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
  4:         // ...省略代碼
  5: 
  6:         // 事務消息須要特殊處理 
  7:         final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
  8:         switch (tranType) {
  9:         case MessageSysFlag.TransactionPreparedType: // 消息隊列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置) 
 10:             queueOffset = CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().get();
 11:             break;
 12:         case MessageSysFlag.TransactionRollbackType:
 13:             queueOffset = msgInner.getQueueOffset();
 14:             break;
 15:         case MessageSysFlag.TransactionNotType:
 16:         case MessageSysFlag.TransactionCommitType:
 17:         default:
 18:             break;
 19:         }
 20: 
 21:         // ...省略代碼
 22: 
 23:         switch (tranType) {
 24:         case MessageSysFlag.TransactionPreparedType:
 25:             // 更新 TranStateTable 最大物理位置(可寫入物理位置) 
 26:             CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().incrementAndGet();
 27:             break;
 28:         case MessageSysFlag.TransactionRollbackType:
 29:             break;
 30:         case MessageSysFlag.TransactionNotType:
 31:         case MessageSysFlag.TransactionCommitType:
 32:             // 更新下一次的ConsumeQueue信息
 33:             CommitLog.this.topicQueueTable.put(key, ++queueOffset);
 34:             break;
 35:         default:
 36:             break;
 37:         }
 38: 
 39:         // 返回結果
 40:         return result;
 41:     }
 42: }複製代碼

3.1.1.2 寫【事務消息】狀態存儲(TranStateTable)

  • 🦅處理【Half消息】時,新增【事務消息】狀態存儲(TranStateTable)。
  • 🦅處理【Commit / Rollback消息】時,更新 【事務消息】狀態存儲(TranStateTable) COMMIT / ROLLBACK。
  • 🦅每次寫操做【事務消息】狀態存儲(TranStateTable),記錄重放日誌(TranRedoLog)。

核心代碼以下:微信

1: // ⬇️⬇️⬇️【DispatchMessageService.java】
  2: private void doDispatch() {
  3:     if (!this.requestsRead.isEmpty()) {
  4:         for (DispatchRequest req : this.requestsRead) {
  5: 
  6:             // ...省略代碼
  7: 
  8:             // 二、寫【事務消息】狀態存儲(TranStateTable)
  9:             if (req.getProducerGroup() != null) {
 10:                 switch (tranType) {
 11:                 case MessageSysFlag.TransactionNotType:
 12:                     break;
 13:                 case MessageSysFlag.TransactionPreparedType:
 14:                     // 新增 【事務消息】狀態存儲(TranStateTable)
 15:                     DefaultMessageStore.this.getTransactionStateService().appendPreparedTransaction(
 16:                         req.getCommitLogOffset(), req.getMsgSize(), (int) (req.getStoreTimestamp() / 1000), req.getProducerGroup().hashCode());
 17:                     break;
 18:                 case MessageSysFlag.TransactionCommitType:
 19:                 case MessageSysFlag.TransactionRollbackType:
 20:                     // 更新 【事務消息】狀態存儲(TranStateTable) COMMIT / ROLLBACK
 21:                     DefaultMessageStore.this.getTransactionStateService().updateTransactionState(
 22:                         req.getTranStateTableOffset(), req.getPreparedTransactionOffset(), req.getProducerGroup().hashCode(), tranType);
 23:                     break;
 24:                 }
 25:             }
 26:             // 三、記錄 TranRedoLog
 27:             switch (tranType) {
 28:             case MessageSysFlag.TransactionNotType:
 29:                 break;
 30:             case MessageSysFlag.TransactionPreparedType:
 31:                 // 記錄 TranRedoLog
 32:                 DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
 33:                         req.getCommitLogOffset(), req.getMsgSize(), TransactionStateService.PreparedMessageTagsCode,
 34:                         req.getStoreTimestamp(), 0L);
 35:                 break;
 36:             case MessageSysFlag.TransactionCommitType:
 37:             case MessageSysFlag.TransactionRollbackType:
 38:                 // 記錄 TranRedoLog
 39:                 DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
 40:                         req.getCommitLogOffset(), req.getMsgSize(), req.getPreparedTransactionOffset(),
 41:                         req.getStoreTimestamp(), 0L);
 42:                 break;
 43:             }
 44:         }
 45: 
 46:         // ...省略代碼
 47:     }
 48: }
 49: // ⬇️⬇️⬇️【TransactionStateService.java】
 50: /** 51: * 新增事務狀態 52: * 53: * @param clOffset commitLog 物理位置 54: * @param size 消息長度 55: * @param timestamp 消息存儲時間 56: * @param groupHashCode groupHashCode 57: * @return 是否成功 58: */
 59: public boolean appendPreparedTransaction(// 60: final long clOffset,// 61: final int size,// 62: final int timestamp,// 63: final int groupHashCode// 64: ) {
 65:     MapedFile mapedFile = this.tranStateTable.getLastMapedFile();
 66:     if (null == mapedFile) {
 67:         log.error("appendPreparedTransaction: create mapedfile error.");
 68:         return false;
 69:     }
 70: 
 71:     // 首次建立,加入定時任務中
 72:     if (0 == mapedFile.getWrotePostion()) {
 73:         this.addTimerTask(mapedFile);
 74:     }
 75: 
 76:     this.byteBufferAppend.position(0);
 77:     this.byteBufferAppend.limit(TSStoreUnitSize);
 78: 
 79:     // Commit Log Offset
 80:     this.byteBufferAppend.putLong(clOffset);
 81:     // Message Size
 82:     this.byteBufferAppend.putInt(size);
 83:     // Timestamp
 84:     this.byteBufferAppend.putInt(timestamp);
 85:     // Producer Group Hashcode
 86:     this.byteBufferAppend.putInt(groupHashCode);
 87:     // Transaction State
 88:     this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);
 89: 
 90:     return mapedFile.appendMessage(this.byteBufferAppend.array());
 91: }
 92: 
 93: /** 94: * 更新事務狀態 95: * 96: * @param tsOffset tranStateTable 物理位置 97: * @param clOffset commitLog 物理位置 98: * @param groupHashCode groupHashCode 99: * @param state 事務狀態 100: * @return 是否成功 101: */
102: public boolean updateTransactionState( 103: final long tsOffset, 104: final long clOffset, 105: final int groupHashCode, 106: final int state) {
107:     SelectMapedBufferResult selectMapedBufferResult = this.findTransactionBuffer(tsOffset);
108:     if (selectMapedBufferResult != null) {
109:         try {
110: 
111:             // ....省略代碼:校驗是否可以更新
112: 
113:             // 更新事務狀態
114:             selectMapedBufferResult.getByteBuffer().putInt(TS_STATE_POS, state);
115:         }
116:         catch (Exception e) {
117:             log.error("updateTransactionState exception", e);
118:         }
119:         finally {
120:             selectMapedBufferResult.release();
121:         }
122:     }
123: 
124:     return false;
125: }複製代碼

3.1.1.3 【事務消息】回查

  • 🦅TranStateTable 每一個 MappedFile 都對應一個 TimerTimer 固定週期(默認:60s)遍歷 MappedFile,查找【half消息】,向 Producer 發起【事務消息】回查請求。【事務消息】回查結果的邏輯不在此處進行,在 CommitLog dispatch時執行。

實現代碼以下:app

1: // ⬇️⬇️⬇️【TransactionStateService.java】
  2: /** 3: * 初始化定時任務 4: */
  5: private void initTimerTask() {
  6:     //
  7:     final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
  8:     for (MapedFile mf : mapedFiles) {
  9:         this.addTimerTask(mf);
 10:     }
 11: }
 12: 
 13: /** 14: * 每一個文件初始化定時任務 15: * @param mf 文件 16: */
 17: private void addTimerTask(final MapedFile mf) {
 18:     this.timer.scheduleAtFixedRate(new TimerTask() {
 19:         private final MapedFile mapedFile = mf;
 20:         private final TransactionCheckExecuter transactionCheckExecuter = TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();
 21:         private final long checkTransactionMessageAtleastInterval = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
 22:                     .getCheckTransactionMessageAtleastInterval();
 23:         private final boolean slave = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
 24: 
 25:         @Override
 26:         public void run() {
 27:             // Slave不須要回查事務狀態
 28:             if (slave) {
 29:                 return;
 30:             }
 31:             // Check功能是否開啓
 32:             if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
 33:                 .isCheckTransactionMessageEnable()) {
 34:                 return;
 35:             }
 36: 
 37:             try {
 38:                 SelectMapedBufferResult selectMapedBufferResult = mapedFile.selectMapedBuffer(0);
 39:                 if (selectMapedBufferResult != null) {
 40:                     long preparedMessageCountInThisMapedFile = 0; // 回查的【half消息】數量
 41:                     int i = 0;
 42:                     try {
 43:                         // 循環每條【事務消息】狀態,對【half消息】進行回查
 44:                         for (; i < selectMapedBufferResult.getSize(); i += TSStoreUnitSize) {
 45:                             selectMapedBufferResult.getByteBuffer().position(i);
 46: 
 47:                             // Commit Log Offset
 48:                             long clOffset = selectMapedBufferResult.getByteBuffer().getLong();
 49:                             // Message Size
 50:                             int msgSize = selectMapedBufferResult.getByteBuffer().getInt();
 51:                             // Timestamp
 52:                             int timestamp = selectMapedBufferResult.getByteBuffer().getInt();
 53:                             // Producer Group Hashcode
 54:                             int groupHashCode = selectMapedBufferResult.getByteBuffer().getInt();
 55:                             // Transaction State
 56:                             int tranType = selectMapedBufferResult.getByteBuffer().getInt();
 57: 
 58:                             // 已經提交或者回滾的消息跳過
 59:                             if (tranType != MessageSysFlag.TransactionPreparedType) {
 60:                                 continue;
 61:                             }
 62: 
 63:                             // 遇到時間不符合最小輪詢間隔,終止
 64:                             long timestampLong = timestamp * 1000;
 65:                             long diff = System.currentTimeMillis() - timestampLong;
 66:                             if (diff < checkTransactionMessageAtleastInterval) {
 67:                                 break;
 68:                             }
 69: 
 70:                             preparedMessageCountInThisMapedFile++;
 71: 
 72:                             // 回查Producer
 73:                             try {
 74:                                 this.transactionCheckExecuter.gotoCheck(groupHashCode, getTranStateOffset(i), clOffset, msgSize);
 75:                             } catch (Exception e) {
 76:                                 tranlog.warn("gotoCheck Exception", e);
 77:                             }
 78:                         }
 79: 
 80:                         // 無回查的【half消息】數量,且遍歷完,則終止定時任務
 81:                         if (0 == preparedMessageCountInThisMapedFile //
 82:                                 && i == mapedFile.getFileSize()) {
 83:                             tranlog.info("remove the transaction timer task, because no prepared message in this mapedfile[{}]", mapedFile.getFileName());
 84:                             this.cancel();
 85:                         }
 86:                     } finally {
 87:                         selectMapedBufferResult.release();
 88:                     }
 89: 
 90:                     tranlog.info("the transaction timer task execute over in this period, {} Prepared Message: {} Check Progress: {}/{}", mapedFile.getFileName(),//
 91:                             preparedMessageCountInThisMapedFile, i / TSStoreUnitSize, mapedFile.getFileSize() / TSStoreUnitSize);
 92:                 } else if (mapedFile.isFull()) {
 93:                     tranlog.info("the mapedfile[{}] maybe deleted, cancel check transaction timer task", mapedFile.getFileName());
 94:                     this.cancel();
 95:                     return;
 96:                 }
 97:             } catch (Exception e) {
 98:                 log.error("check transaction timer task Exception", e);
 99:             }
100:         }
101: 
102: 
103:         private long getTranStateOffset(final long currentIndex) {
104:             long offset = (this.mapedFile.getFileFromOffset() + currentIndex) / TransactionStateService.TSStoreUnitSize;
105:             return offset;
106:         }
107:     }, 1000 * 60, this.defaultMessageStore.getMessageStoreConfig().getCheckTransactionMessageTimerInterval());
108: }
109: 
110: // 【DefaultTransactionCheckExecuter.java】
111: @Override
112: public void gotoCheck(int producerGroupHashCode, long tranStateTableOffset, long commitLogOffset, 113: int msgSize) {
114:     // 第一步、查詢Producer
115:     final ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager().pickProducerChannelRandomly(producerGroupHashCode);
116:     if (null == clientChannelInfo) {
117:         log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode);
118:         return;
119:     }
120: 
121:     // 第二步、查詢消息
122:     SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(commitLogOffset, msgSize);
123:     if (null == selectMapedBufferResult) {
124:         log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: ", commitLogOffset, msgSize);
125:         return;
126:     }
127: 
128:     // 第三步、向Producer發起請求
129:     final CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
130:     requestHeader.setCommitLogOffset(commitLogOffset);
131:     requestHeader.setTranStateTableOffset(tranStateTableOffset);
132:     this.brokerController.getBroker2Client().checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, selectMapedBufferResult);
133: }複製代碼

3.1.1.4 初始化【事務消息】狀態存儲(TranStateTable)

  • 🦅根據最後 Broker 關閉是否正常,會有不一樣的初始化方式。

核心代碼以下:dom

1: // ⬇️⬇️⬇️【TransactionStateService.java】
  2: /** 3: * 初始化 TranRedoLog 4: * @param lastExitOK 是否正常退出 5: */
  6: public void recoverStateTable(final boolean lastExitOK) {
  7:     if (lastExitOK) {
  8:         this.recoverStateTableNormal();
  9:     } else {
 10:         // 第一步,刪除State Table
 11:         this.tranStateTable.destroy();
 12:         // 第二步,經過RedoLog全量恢復StateTable
 13:         this.recreateStateTable();
 14:     }
 15: }
 16: 
 17: /** 18: * 掃描 TranRedoLog 重建 StateTable 19: */
 20: private void recreateStateTable() {
 21:     this.tranStateTable = new MapedFileQueue(StorePathConfigHelper.getTranStateTableStorePath(defaultMessageStore
 22:                 .getMessageStoreConfig().getStorePathRootDir()), defaultMessageStore
 23:                 .getMessageStoreConfig().getTranStateTableMapedFileSize(), null);
 24: 
 25:     final TreeSet<Long> preparedItemSet = new TreeSet<Long>();
 26: 
 27:     // 第一步,從頭掃描RedoLog
 28:     final long minOffset = this.tranRedoLog.getMinOffsetInQuque();
 29:     long processOffset = minOffset;
 30:     while (true) {
 31:         SelectMapedBufferResult bufferConsumeQueue = this.tranRedoLog.getIndexBuffer(processOffset);
 32:         if (bufferConsumeQueue != null) {
 33:             try {
 34:                 long i = 0;
 35:                 for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQStoreUnitSize) {
 36:                     long offsetMsg = bufferConsumeQueue.getByteBuffer().getLong();
 37:                     int sizeMsg = bufferConsumeQueue.getByteBuffer().getInt();
 38:                     long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
 39: 
 40:                     if (TransactionStateService.PreparedMessageTagsCode == tagsCode) { // Prepared
 41:                         preparedItemSet.add(offsetMsg);
 42:                     } else { // Commit/Rollback
 43:                         preparedItemSet.remove(tagsCode);
 44:                     }
 45:                 }
 46: 
 47:                 processOffset += i;
 48:             } finally { // 必須釋放資源
 49:                 bufferConsumeQueue.release();
 50:             }
 51:         } else {
 52:             break;
 53:         }
 54:     }
 55:     log.info("scan transaction redolog over, End offset: {}, Prepared Transaction Count: {}", processOffset, preparedItemSet.size());
 56: 
 57:     // 第二步,重建StateTable
 58:     Iterator<Long> it = preparedItemSet.iterator();
 59:     while (it.hasNext()) {
 60:         Long offset = it.next();
 61:         MessageExt msgExt = this.defaultMessageStore.lookMessageByOffset(offset);
 62:         if (msgExt != null) {
 63:             this.appendPreparedTransaction(msgExt.getCommitLogOffset(), msgExt.getStoreSize(),
 64:                 (int) (msgExt.getStoreTimestamp() / 1000),
 65:                 msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP).hashCode());
 66:             this.tranStateTableOffset.incrementAndGet();
 67:         }
 68:     }
 69: }
 70: 
 71: /** 72: * 加載(解析)TranStateTable 的 MappedFile 73: * 1. 清理多餘 MappedFile,設置最後一個 MappedFile的寫入位置(position 74: * 2. 設置 TanStateTable 最大物理位置(可寫入位置) 75: */
 76: private void recoverStateTableNormal() {
 77:     final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
 78:     if (!mapedFiles.isEmpty()) {
 79:         // 從倒數第三個文件開始恢復
 80:         int index = mapedFiles.size() - 3;
 81:         if (index < 0) {
 82:             index = 0;
 83:         }
 84: 
 85:         int mapedFileSizeLogics = this.tranStateTable.getMapedFileSize();
 86:         MapedFile mapedFile = mapedFiles.get(index);
 87:         ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
 88:         long processOffset = mapedFile.getFileFromOffset();
 89:         long mapedFileOffset = 0;
 90:         while (true) {
 91:             for (int i = 0; i < mapedFileSizeLogics; i += TSStoreUnitSize) {
 92: 
 93:                 final long clOffset_read = byteBuffer.getLong();
 94:                 final int size_read = byteBuffer.getInt();
 95:                 final int timestamp_read = byteBuffer.getInt();
 96:                 final int groupHashCode_read = byteBuffer.getInt();
 97:                 final int state_read = byteBuffer.getInt();
 98: 
 99:                 boolean stateOK = false;
100:                 switch (state_read) {
101:                 case MessageSysFlag.TransactionPreparedType:
102:                 case MessageSysFlag.TransactionCommitType:
103:                 case MessageSysFlag.TransactionRollbackType:
104:                     stateOK = true;
105:                     break;
106:                 default:
107:                     break;
108:                 }
109: 
110:                 // 說明當前存儲單元有效
111:                 if (clOffset_read >= 0 && size_read > 0 && stateOK) {
112:                     mapedFileOffset = i + TSStoreUnitSize;
113:                 } else {
114:                     log.info("recover current transaction state table file over, " + mapedFile.getFileName() + " "
115:                             + clOffset_read + " " + size_read + " " + timestamp_read);
116:                     break;
117:                 }
118:             }
119: 
120:             // 走到文件末尾,切換至下一個文件
121:             if (mapedFileOffset == mapedFileSizeLogics) {
122:                 index++;
123:                 if (index >= mapedFiles.size()) { // 循環while結束
124:                     log.info("recover last transaction state table file over, last maped file " + mapedFile.getFileName());
125:                     break;
126:                 } else { // 切換下一個文件
127:                     mapedFile = mapedFiles.get(index);
128:                     byteBuffer = mapedFile.sliceByteBuffer();
129:                     processOffset = mapedFile.getFileFromOffset();
130:                     mapedFileOffset = 0;
131:                     log.info("recover next transaction state table file, " + mapedFile.getFileName());
132:                 }
133:             } else {
134:                 log.info("recover current transaction state table queue over " + mapedFile.getFileName() + " " + (processOffset + mapedFileOffset));
135:                 break;
136:             }
137:         }
138: 
139:         // 清理多餘 MappedFile,設置最後一個 MappedFile的寫入位置(position
140:         processOffset += mapedFileOffset;
141:         this.tranStateTable.truncateDirtyFiles(processOffset);
142: 
143:         // 設置 TanStateTable 最大物理位置(可寫入位置)
144:         this.tranStateTableOffset.set(this.tranStateTable.getMaxOffset() / TSStoreUnitSize);
145:         log.info("recover normal over, transaction state table max offset: {}", this.tranStateTableOffset.get());
146:     }
147: }複製代碼

3.1.1.5 補充

  • 爲何 V3.1.5 開始,使用 數據庫 實現【事務狀態】的存儲?以下是來自官方文檔的說明,多是一部分緣由:

RocketMQ 這種實現事務方式,沒有經過 KV 存儲作,而是經過 Offset 方式,存在一個顯著缺陷,即經過 Offset 更改數據,會令系統的髒頁過多,須要特別關注。

3.1.2 官方V4.0.0:基於數據庫

倉庫地址:github.com/apache/incu…

官方V4.0.0 暫時未徹底開源【事務消息回查】功能,So 咱們須要進行一些猜測,可能不必定正確😈

😆咱們來對比【官方V3.1.4:基於文件】的實現。

  • TransactionRecord :記錄每條【事務消息】。相似 TranStateTable
TranStateTable TransactionRecord
offset offset
producerGroupHash producerGroup
size 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 得到。
timestamp 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 得到。
state 非必須字段: 事務開始,增長記錄;事務結束,刪除記錄。

另外,數據庫自己保證了數據存儲的可靠性,無需 TranRedoLog


簡單手繪邏輯圖以下😈:

Broker_V4.0.0_基於數據庫
Broker_V4.0.0_基於數據庫

3.2 Producer 接收【事務消息回查】

  • 順序圖以下:

Producer接收【事務消息回查】
Producer接收【事務消息回查】

  • 核心代碼以下:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
  2: /** 3: * 檢查【事務狀態】狀態 4: * 5: * @param addr broker地址 6: * @param msg 消息 7: * @param header 請求 8: */
  9: @Override
 10: public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
 11:     Runnable request = new Runnable() {
 12:         private final String brokerAddr = addr;
 13:         private final MessageExt message = msg;
 14:         private final CheckTransactionStateRequestHeader checkRequestHeader = header;
 15:         private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
 16: 
 17:         @Override
 18:         public void run() {
 19:             TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
 20:             if (transactionCheckListener != null) {
 21:                 // 獲取事務執行狀態
 22:                 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 23:                 Throwable exception = null;
 24:                 try {
 25:                     localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
 26:                 } catch (Throwable e) {
 27:                     log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
 28:                     exception = e;
 29:                 }
 30: 
 31:                 // 處理事務結果,提交消息 COMMIT / ROLLBACK
 32:                 this.processTransactionState(//
 33:                     localTransactionState, //
 34:                     group, //
 35:                     exception);
 36:             } else {
 37:                 log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
 38:             }
 39:         }
 40: 
 41:         /** 42: * 處理事務結果,提交消息 COMMIT / ROLLBACK 43: * 44: * @param localTransactionState 【本地事務】狀態 45: * @param producerGroup producerGroup 46: * @param exception 檢查【本地事務】狀態發生的異常 47: */
 48:         private void processTransactionState(// 49: final LocalTransactionState localTransactionState, // 50: final String producerGroup, // 51: final Throwable exception) {
 52:             final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
 53:             thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
 54:             thisHeader.setProducerGroup(producerGroup);
 55:             thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
 56:             thisHeader.setFromTransactionCheck(true);
 57: 
 58:             // 設置消息編號
 59:             String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
 60:             if (uniqueKey == null) {
 61:                 uniqueKey = message.getMsgId();
 62:             }
 63:             thisHeader.setMsgId(uniqueKey);
 64: 
 65:             thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
 66:             switch (localTransactionState) {
 67:                 case COMMIT_MESSAGE:
 68:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
 69:                     break;
 70:                 case ROLLBACK_MESSAGE:
 71:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
 72:                     log.warn("when broker check, client rollback this transaction, {}", thisHeader);
 73:                     break;
 74:                 case UNKNOW:
 75:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
 76:                     log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
 77:                     break;
 78:                 default:
 79:                     break;
 80:             }
 81: 
 82:             String remark = null;
 83:             if (exception != null) {
 84:                 remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
 85:             }
 86: 
 87:             try {
 88:                 // 提交消息 COMMIT / ROLLBACK
 89:                 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
 90:                     3000);
 91:             } catch (Exception e) {
 92:                 log.error("endTransactionOneway exception", e);
 93:             }
 94:         }
 95:     };
 96: 
 97:     // 提交執行
 98:     this.checkExecutor.submit(request);
 99: }
100: 
101: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
102: /** 103: * 【事務消息回查】檢查監聽器 104: */
105: public interface TransactionCheckListener {
106: 
107:     /** 108: * 獲取(檢查)【本地事務】狀態 109: * 110: * @param msg 消息 111: * @return 事務狀態 112: */
113:     LocalTransactionState checkLocalTransactionState(final MessageExt msg);
114: 
115: }複製代碼
相關文章
相關標籤/搜索