🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: html
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
必須必須必須 前置閱讀內容:java
核心代碼
理解):1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
2: /** 3: * 發送事務消息 4: * 5: * @param msg 消息 6: * @param tranExecuter 【本地事務】執行器 7: * @param arg 【本地事務】執行器參數 8: * @return 事務發送結果 9: * @throws MQClientException 當 Client 發生異常時 10: */
11: public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) 12: throws MQClientException {
13: if (null == tranExecuter) {
14: throw new MQClientException("tranExecutor is null", null);
15: }
16: Validators.checkMessage(msg, this.defaultMQProducer);
17:
18: // 發送【Half消息】
19: SendResult sendResult;
20: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
21: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
22: try {
23: sendResult = this.send(msg);
24: } catch (Exception e) {
25: throw new MQClientException("send message Exception", e);
26: }
27:
28: // 處理髮送【Half消息】結果
29: LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
30: Throwable localException = null;
31: switch (sendResult.getSendStatus()) {
32: // 發送【Half消息】成功,執行【本地事務】邏輯
33: case SEND_OK: {
34: try {
35: if (sendResult.getTransactionId() != null) { // 事務編號。目前開源版本暫時沒用到,猜測ONS在使用。
36: msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
37: }
38:
39: // 執行【本地事務】邏輯
40: localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
41: if (null == localTransactionState) {
42: localTransactionState = LocalTransactionState.UNKNOW;
43: }
44:
45: if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
46: log.info("executeLocalTransactionBranch return {}", localTransactionState);
47: log.info(msg.toString());
48: }
49: } catch (Throwable e) {
50: log.info("executeLocalTransactionBranch exception", e);
51: log.info(msg.toString());
52: localException = e;
53: }
54: }
55: break;
56: // 發送【Half消息】失敗,標記【本地事務】狀態爲回滾
57: case FLUSH_DISK_TIMEOUT:
58: case FLUSH_SLAVE_TIMEOUT:
59: case SLAVE_NOT_AVAILABLE:
60: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
61: break;
62: default:
63: break;
64: }
65:
66: // 結束事務:提交消息 COMMIT / ROLLBACK
67: try {
68: this.endTransaction(sendResult, localTransactionState, localException);
69: } catch (Exception e) {
70: log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
71: }
72:
73: // 返回【事務發送結果】
74: TransactionSendResult transactionSendResult = new TransactionSendResult();
75: transactionSendResult.setSendStatus(sendResult.getSendStatus());
76: transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
77: transactionSendResult.setMsgId(sendResult.getMsgId());
78: transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
79: transactionSendResult.setTransactionId(sendResult.getTransactionId());
80: transactionSendResult.setLocalTransactionState(localTransactionState);
81: return transactionSendResult;
82: }
83:
84: /** 85: * 結束事務:提交消息 COMMIT / ROLLBACK 86: * 87: * @param sendResult 發送【Half消息】結果 88: * @param localTransactionState 【本地事務】狀態 89: * @param localException 執行【本地事務】邏輯產生的異常 90: * @throws RemotingException 當遠程調用發生異常時 91: * @throws MQBrokerException 當 Broker 發生異常時 92: * @throws InterruptedException 當線程中斷時 93: * @throws UnknownHostException 當解碼消息編號失敗是 94: */
95: public void endTransaction(// 96: final SendResult sendResult, // 97: final LocalTransactionState localTransactionState, // 98: final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
99: // 解碼消息編號
100: final MessageId id;
101: if (sendResult.getOffsetMsgId() != null) {
102: id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
103: } else {
104: id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
105: }
106:
107: // 建立請求
108: String transactionId = sendResult.getTransactionId();
109: final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
110: EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
111: requestHeader.setTransactionId(transactionId);
112: requestHeader.setCommitLogOffset(id.getOffset());
113: switch (localTransactionState) {
114: case COMMIT_MESSAGE:
115: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
116: break;
117: case ROLLBACK_MESSAGE:
118: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
119: break;
120: case UNKNOW:
121: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
122: break;
123: default:
124: break;
125: }
126: requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
127: requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
128: requestHeader.setMsgId(sendResult.getMsgId());
129: String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
130:
131: // 提交消息 COMMIT / ROLLBACK。!!!通訊方式爲:Oneway!!!
132: this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
133: }複製代碼
1: // ⬇️⬇️⬇️【EndTransactionProcessor.java】
2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
3: final RemotingCommand response = RemotingCommand.createResponseCommand(null);
4: final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
5:
6: // 省略代碼 =》打印日誌(只處理 COMMIT / ROLLBACK)
7:
8: // 查詢提交的消息
9: final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
10: if (msgExt != null) {
11: // 省略代碼 =》校驗消息
12:
13: // 生成消息
14: MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
15: msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
16: msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
17: msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
18: msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
19: if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
20: msgInner.setBody(null);
21: }
22:
23: // 存儲生成消息
24: final MessageStore messageStore = this.brokerController.getMessageStore();
25: final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
26:
27: // 處理存儲結果
28: if (putMessageResult != null) {
29: switch (putMessageResult.getPutMessageStatus()) {
30: // Success
31: case PUT_OK:
32: case FLUSH_DISK_TIMEOUT:
33: case FLUSH_SLAVE_TIMEOUT:
34: case SLAVE_NOT_AVAILABLE:
35: response.setCode(ResponseCode.SUCCESS);
36: response.setRemark(null);
37: break;
38: // Failed
39: case CREATE_MAPEDFILE_FAILED:
40: response.setCode(ResponseCode.SYSTEM_ERROR);
41: response.setRemark("create maped file failed.");
42: break;
43: case MESSAGE_ILLEGAL:
44: case PROPERTIES_SIZE_EXCEEDED:
45: response.setCode(ResponseCode.MESSAGE_ILLEGAL);
46: response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
47: break;
48: case SERVICE_NOT_AVAILABLE:
49: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
50: response.setRemark("service not available now.");
51: break;
52: case OS_PAGECACHE_BUSY:
53: response.setCode(ResponseCode.SYSTEM_ERROR);
54: response.setRemark("OS page cache busy, please try another machine");
55: break;
56: case UNKNOWN_ERROR:
57: response.setCode(ResponseCode.SYSTEM_ERROR);
58: response.setRemark("UNKNOWN_ERROR");
59: break;
60: default:
61: response.setCode(ResponseCode.SYSTEM_ERROR);
62: response.setRemark("UNKNOWN_ERROR DEFAULT");
63: break;
64: }
65:
66: return response;
67: } else {
68: response.setCode(ResponseCode.SYSTEM_ERROR);
69: response.setRemark("store putMessage return null");
70: }
71: } else {
72: response.setCode(ResponseCode.SYSTEM_ERROR);
73: response.setRemark("find prepared transaction message failed");
74: return response;
75: }
76:
77: return response;
78: }複製代碼
COMMIT
)後才生成 ConsumeQueue
。1: // ⬇️⬇️⬇️【DefaultMessageStore.java】
2: public void doDispatch(DispatchRequest req) {
3: // 非事務消息 或 事務提交消息 創建 消息位置信息 到 ConsumeQueue
4: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
5: switch (tranType) {
6: case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事務消息
7: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事務消息COMMIT
8: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
9: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
10: break;
11: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事務消息PREPARED
12: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事務消息ROLLBACK
13: break;
14: }
15: // 省略代碼 =》 創建 索引信息 到 IndexFile
16: }複製代碼
版本 | 【事務消息回查】 | |
---|---|---|
官方V3.0.4 ~ V3.1.4 | 基於 文件系統 實現 | 已開源 |
官方V3.1.5 ~ V4.0.0 | 基於 數據庫 實現 | 未徹底開源 |
咱們來看看兩種狀況下是怎麼實現的。git
倉庫地址:github.com/YunaiV/rock…github
相較於普通消息,【事務消息】多依賴以下三個組件:數據庫
MappedFileQueue
實現,默認存儲路徑爲 ~/store/transaction/statetable
,每條【事務消息】狀態存儲結構以下:第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | offset | CommitLog 物理存儲位置 | Long | 8 |
2 | size | 消息長度 | Int | 4 |
3 | timestamp | 消息存儲時間,單位:秒 | Int | 4 |
4 | producerGroupHash | producerGroup 求 HashCode | Int | 4 |
5 | state | 事務狀態 | Int | 4 |
TranStateTable
重放日誌,每次寫操做 TranStateTable
記錄重放日誌。當 Broker
異常關閉時,使用 TranRedoLog
恢復 TranStateTable
。基於 ConsumeQueue
實現,Topic
爲 TRANSACTION_REDOLOG_TOPIC_XXXX
,默認存儲路徑爲 ~/store/transaction/redolog
。簡單手繪邏輯圖以下😈:apache
CommitLog
時,消息隊列位置(queueOffset
)使用 TranStateTable
最大物理位置(可寫入物理位置)。這樣,消息能夠索引到本身對應的 TranStateTable
的位置和記錄。核心代碼以下:後端
1: // ⬇️⬇️⬇️【DefaultAppendMessageCallback.java】
2: class DefaultAppendMessageCallback implements AppendMessageCallback {
3: public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
4: // ...省略代碼
5:
6: // 事務消息須要特殊處理
7: final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
8: switch (tranType) {
9: case MessageSysFlag.TransactionPreparedType: // 消息隊列位置(queueOffset)使用 TranStateTable 最大物理位置(可寫入物理位置)
10: queueOffset = CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().get();
11: break;
12: case MessageSysFlag.TransactionRollbackType:
13: queueOffset = msgInner.getQueueOffset();
14: break;
15: case MessageSysFlag.TransactionNotType:
16: case MessageSysFlag.TransactionCommitType:
17: default:
18: break;
19: }
20:
21: // ...省略代碼
22:
23: switch (tranType) {
24: case MessageSysFlag.TransactionPreparedType:
25: // 更新 TranStateTable 最大物理位置(可寫入物理位置)
26: CommitLog.this.defaultMessageStore.getTransactionStateService().getTranStateTableOffset().incrementAndGet();
27: break;
28: case MessageSysFlag.TransactionRollbackType:
29: break;
30: case MessageSysFlag.TransactionNotType:
31: case MessageSysFlag.TransactionCommitType:
32: // 更新下一次的ConsumeQueue信息
33: CommitLog.this.topicQueueTable.put(key, ++queueOffset);
34: break;
35: default:
36: break;
37: }
38:
39: // 返回結果
40: return result;
41: }
42: }複製代碼
TranStateTable
)。TranStateTable
) COMMIT / ROLLBACK。TranStateTable
),記錄重放日誌(TranRedoLog
)。核心代碼以下:微信
1: // ⬇️⬇️⬇️【DispatchMessageService.java】
2: private void doDispatch() {
3: if (!this.requestsRead.isEmpty()) {
4: for (DispatchRequest req : this.requestsRead) {
5:
6: // ...省略代碼
7:
8: // 二、寫【事務消息】狀態存儲(TranStateTable)
9: if (req.getProducerGroup() != null) {
10: switch (tranType) {
11: case MessageSysFlag.TransactionNotType:
12: break;
13: case MessageSysFlag.TransactionPreparedType:
14: // 新增 【事務消息】狀態存儲(TranStateTable)
15: DefaultMessageStore.this.getTransactionStateService().appendPreparedTransaction(
16: req.getCommitLogOffset(), req.getMsgSize(), (int) (req.getStoreTimestamp() / 1000), req.getProducerGroup().hashCode());
17: break;
18: case MessageSysFlag.TransactionCommitType:
19: case MessageSysFlag.TransactionRollbackType:
20: // 更新 【事務消息】狀態存儲(TranStateTable) COMMIT / ROLLBACK
21: DefaultMessageStore.this.getTransactionStateService().updateTransactionState(
22: req.getTranStateTableOffset(), req.getPreparedTransactionOffset(), req.getProducerGroup().hashCode(), tranType);
23: break;
24: }
25: }
26: // 三、記錄 TranRedoLog
27: switch (tranType) {
28: case MessageSysFlag.TransactionNotType:
29: break;
30: case MessageSysFlag.TransactionPreparedType:
31: // 記錄 TranRedoLog
32: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
33: req.getCommitLogOffset(), req.getMsgSize(), TransactionStateService.PreparedMessageTagsCode,
34: req.getStoreTimestamp(), 0L);
35: break;
36: case MessageSysFlag.TransactionCommitType:
37: case MessageSysFlag.TransactionRollbackType:
38: // 記錄 TranRedoLog
39: DefaultMessageStore.this.getTransactionStateService().getTranRedoLog().putMessagePostionInfoWrapper(
40: req.getCommitLogOffset(), req.getMsgSize(), req.getPreparedTransactionOffset(),
41: req.getStoreTimestamp(), 0L);
42: break;
43: }
44: }
45:
46: // ...省略代碼
47: }
48: }
49: // ⬇️⬇️⬇️【TransactionStateService.java】
50: /** 51: * 新增事務狀態 52: * 53: * @param clOffset commitLog 物理位置 54: * @param size 消息長度 55: * @param timestamp 消息存儲時間 56: * @param groupHashCode groupHashCode 57: * @return 是否成功 58: */
59: public boolean appendPreparedTransaction(// 60: final long clOffset,// 61: final int size,// 62: final int timestamp,// 63: final int groupHashCode// 64: ) {
65: MapedFile mapedFile = this.tranStateTable.getLastMapedFile();
66: if (null == mapedFile) {
67: log.error("appendPreparedTransaction: create mapedfile error.");
68: return false;
69: }
70:
71: // 首次建立,加入定時任務中
72: if (0 == mapedFile.getWrotePostion()) {
73: this.addTimerTask(mapedFile);
74: }
75:
76: this.byteBufferAppend.position(0);
77: this.byteBufferAppend.limit(TSStoreUnitSize);
78:
79: // Commit Log Offset
80: this.byteBufferAppend.putLong(clOffset);
81: // Message Size
82: this.byteBufferAppend.putInt(size);
83: // Timestamp
84: this.byteBufferAppend.putInt(timestamp);
85: // Producer Group Hashcode
86: this.byteBufferAppend.putInt(groupHashCode);
87: // Transaction State
88: this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);
89:
90: return mapedFile.appendMessage(this.byteBufferAppend.array());
91: }
92:
93: /** 94: * 更新事務狀態 95: * 96: * @param tsOffset tranStateTable 物理位置 97: * @param clOffset commitLog 物理位置 98: * @param groupHashCode groupHashCode 99: * @param state 事務狀態 100: * @return 是否成功 101: */
102: public boolean updateTransactionState( 103: final long tsOffset, 104: final long clOffset, 105: final int groupHashCode, 106: final int state) {
107: SelectMapedBufferResult selectMapedBufferResult = this.findTransactionBuffer(tsOffset);
108: if (selectMapedBufferResult != null) {
109: try {
110:
111: // ....省略代碼:校驗是否可以更新
112:
113: // 更新事務狀態
114: selectMapedBufferResult.getByteBuffer().putInt(TS_STATE_POS, state);
115: }
116: catch (Exception e) {
117: log.error("updateTransactionState exception", e);
118: }
119: finally {
120: selectMapedBufferResult.release();
121: }
122: }
123:
124: return false;
125: }複製代碼
TranStateTable
每一個 MappedFile
都對應一個 Timer
。Timer
固定週期(默認:60s)遍歷 MappedFile
,查找【half消息】,向 Producer
發起【事務消息】回查請求。【事務消息】回查結果的邏輯不在此處進行,在 CommitLog dispatch時執行。實現代碼以下:app
1: // ⬇️⬇️⬇️【TransactionStateService.java】
2: /** 3: * 初始化定時任務 4: */
5: private void initTimerTask() {
6: //
7: final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
8: for (MapedFile mf : mapedFiles) {
9: this.addTimerTask(mf);
10: }
11: }
12:
13: /** 14: * 每一個文件初始化定時任務 15: * @param mf 文件 16: */
17: private void addTimerTask(final MapedFile mf) {
18: this.timer.scheduleAtFixedRate(new TimerTask() {
19: private final MapedFile mapedFile = mf;
20: private final TransactionCheckExecuter transactionCheckExecuter = TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();
21: private final long checkTransactionMessageAtleastInterval = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
22: .getCheckTransactionMessageAtleastInterval();
23: private final boolean slave = TransactionStateService.this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
24:
25: @Override
26: public void run() {
27: // Slave不須要回查事務狀態
28: if (slave) {
29: return;
30: }
31: // Check功能是否開啓
32: if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
33: .isCheckTransactionMessageEnable()) {
34: return;
35: }
36:
37: try {
38: SelectMapedBufferResult selectMapedBufferResult = mapedFile.selectMapedBuffer(0);
39: if (selectMapedBufferResult != null) {
40: long preparedMessageCountInThisMapedFile = 0; // 回查的【half消息】數量
41: int i = 0;
42: try {
43: // 循環每條【事務消息】狀態,對【half消息】進行回查
44: for (; i < selectMapedBufferResult.getSize(); i += TSStoreUnitSize) {
45: selectMapedBufferResult.getByteBuffer().position(i);
46:
47: // Commit Log Offset
48: long clOffset = selectMapedBufferResult.getByteBuffer().getLong();
49: // Message Size
50: int msgSize = selectMapedBufferResult.getByteBuffer().getInt();
51: // Timestamp
52: int timestamp = selectMapedBufferResult.getByteBuffer().getInt();
53: // Producer Group Hashcode
54: int groupHashCode = selectMapedBufferResult.getByteBuffer().getInt();
55: // Transaction State
56: int tranType = selectMapedBufferResult.getByteBuffer().getInt();
57:
58: // 已經提交或者回滾的消息跳過
59: if (tranType != MessageSysFlag.TransactionPreparedType) {
60: continue;
61: }
62:
63: // 遇到時間不符合最小輪詢間隔,終止
64: long timestampLong = timestamp * 1000;
65: long diff = System.currentTimeMillis() - timestampLong;
66: if (diff < checkTransactionMessageAtleastInterval) {
67: break;
68: }
69:
70: preparedMessageCountInThisMapedFile++;
71:
72: // 回查Producer
73: try {
74: this.transactionCheckExecuter.gotoCheck(groupHashCode, getTranStateOffset(i), clOffset, msgSize);
75: } catch (Exception e) {
76: tranlog.warn("gotoCheck Exception", e);
77: }
78: }
79:
80: // 無回查的【half消息】數量,且遍歷完,則終止定時任務
81: if (0 == preparedMessageCountInThisMapedFile //
82: && i == mapedFile.getFileSize()) {
83: tranlog.info("remove the transaction timer task, because no prepared message in this mapedfile[{}]", mapedFile.getFileName());
84: this.cancel();
85: }
86: } finally {
87: selectMapedBufferResult.release();
88: }
89:
90: tranlog.info("the transaction timer task execute over in this period, {} Prepared Message: {} Check Progress: {}/{}", mapedFile.getFileName(),//
91: preparedMessageCountInThisMapedFile, i / TSStoreUnitSize, mapedFile.getFileSize() / TSStoreUnitSize);
92: } else if (mapedFile.isFull()) {
93: tranlog.info("the mapedfile[{}] maybe deleted, cancel check transaction timer task", mapedFile.getFileName());
94: this.cancel();
95: return;
96: }
97: } catch (Exception e) {
98: log.error("check transaction timer task Exception", e);
99: }
100: }
101:
102:
103: private long getTranStateOffset(final long currentIndex) {
104: long offset = (this.mapedFile.getFileFromOffset() + currentIndex) / TransactionStateService.TSStoreUnitSize;
105: return offset;
106: }
107: }, 1000 * 60, this.defaultMessageStore.getMessageStoreConfig().getCheckTransactionMessageTimerInterval());
108: }
109:
110: // 【DefaultTransactionCheckExecuter.java】
111: @Override
112: public void gotoCheck(int producerGroupHashCode, long tranStateTableOffset, long commitLogOffset, 113: int msgSize) {
114: // 第一步、查詢Producer
115: final ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager().pickProducerChannelRandomly(producerGroupHashCode);
116: if (null == clientChannelInfo) {
117: log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode);
118: return;
119: }
120:
121: // 第二步、查詢消息
122: SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(commitLogOffset, msgSize);
123: if (null == selectMapedBufferResult) {
124: log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: ", commitLogOffset, msgSize);
125: return;
126: }
127:
128: // 第三步、向Producer發起請求
129: final CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
130: requestHeader.setCommitLogOffset(commitLogOffset);
131: requestHeader.setTranStateTableOffset(tranStateTableOffset);
132: this.brokerController.getBroker2Client().checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, selectMapedBufferResult);
133: }複製代碼
核心代碼以下:dom
1: // ⬇️⬇️⬇️【TransactionStateService.java】
2: /** 3: * 初始化 TranRedoLog 4: * @param lastExitOK 是否正常退出 5: */
6: public void recoverStateTable(final boolean lastExitOK) {
7: if (lastExitOK) {
8: this.recoverStateTableNormal();
9: } else {
10: // 第一步,刪除State Table
11: this.tranStateTable.destroy();
12: // 第二步,經過RedoLog全量恢復StateTable
13: this.recreateStateTable();
14: }
15: }
16:
17: /** 18: * 掃描 TranRedoLog 重建 StateTable 19: */
20: private void recreateStateTable() {
21: this.tranStateTable = new MapedFileQueue(StorePathConfigHelper.getTranStateTableStorePath(defaultMessageStore
22: .getMessageStoreConfig().getStorePathRootDir()), defaultMessageStore
23: .getMessageStoreConfig().getTranStateTableMapedFileSize(), null);
24:
25: final TreeSet<Long> preparedItemSet = new TreeSet<Long>();
26:
27: // 第一步,從頭掃描RedoLog
28: final long minOffset = this.tranRedoLog.getMinOffsetInQuque();
29: long processOffset = minOffset;
30: while (true) {
31: SelectMapedBufferResult bufferConsumeQueue = this.tranRedoLog.getIndexBuffer(processOffset);
32: if (bufferConsumeQueue != null) {
33: try {
34: long i = 0;
35: for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQStoreUnitSize) {
36: long offsetMsg = bufferConsumeQueue.getByteBuffer().getLong();
37: int sizeMsg = bufferConsumeQueue.getByteBuffer().getInt();
38: long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
39:
40: if (TransactionStateService.PreparedMessageTagsCode == tagsCode) { // Prepared
41: preparedItemSet.add(offsetMsg);
42: } else { // Commit/Rollback
43: preparedItemSet.remove(tagsCode);
44: }
45: }
46:
47: processOffset += i;
48: } finally { // 必須釋放資源
49: bufferConsumeQueue.release();
50: }
51: } else {
52: break;
53: }
54: }
55: log.info("scan transaction redolog over, End offset: {}, Prepared Transaction Count: {}", processOffset, preparedItemSet.size());
56:
57: // 第二步,重建StateTable
58: Iterator<Long> it = preparedItemSet.iterator();
59: while (it.hasNext()) {
60: Long offset = it.next();
61: MessageExt msgExt = this.defaultMessageStore.lookMessageByOffset(offset);
62: if (msgExt != null) {
63: this.appendPreparedTransaction(msgExt.getCommitLogOffset(), msgExt.getStoreSize(),
64: (int) (msgExt.getStoreTimestamp() / 1000),
65: msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP).hashCode());
66: this.tranStateTableOffset.incrementAndGet();
67: }
68: }
69: }
70:
71: /** 72: * 加載(解析)TranStateTable 的 MappedFile 73: * 1. 清理多餘 MappedFile,設置最後一個 MappedFile的寫入位置(position 74: * 2. 設置 TanStateTable 最大物理位置(可寫入位置) 75: */
76: private void recoverStateTableNormal() {
77: final List<MapedFile> mapedFiles = this.tranStateTable.getMapedFiles();
78: if (!mapedFiles.isEmpty()) {
79: // 從倒數第三個文件開始恢復
80: int index = mapedFiles.size() - 3;
81: if (index < 0) {
82: index = 0;
83: }
84:
85: int mapedFileSizeLogics = this.tranStateTable.getMapedFileSize();
86: MapedFile mapedFile = mapedFiles.get(index);
87: ByteBuffer byteBuffer = mapedFile.sliceByteBuffer();
88: long processOffset = mapedFile.getFileFromOffset();
89: long mapedFileOffset = 0;
90: while (true) {
91: for (int i = 0; i < mapedFileSizeLogics; i += TSStoreUnitSize) {
92:
93: final long clOffset_read = byteBuffer.getLong();
94: final int size_read = byteBuffer.getInt();
95: final int timestamp_read = byteBuffer.getInt();
96: final int groupHashCode_read = byteBuffer.getInt();
97: final int state_read = byteBuffer.getInt();
98:
99: boolean stateOK = false;
100: switch (state_read) {
101: case MessageSysFlag.TransactionPreparedType:
102: case MessageSysFlag.TransactionCommitType:
103: case MessageSysFlag.TransactionRollbackType:
104: stateOK = true;
105: break;
106: default:
107: break;
108: }
109:
110: // 說明當前存儲單元有效
111: if (clOffset_read >= 0 && size_read > 0 && stateOK) {
112: mapedFileOffset = i + TSStoreUnitSize;
113: } else {
114: log.info("recover current transaction state table file over, " + mapedFile.getFileName() + " "
115: + clOffset_read + " " + size_read + " " + timestamp_read);
116: break;
117: }
118: }
119:
120: // 走到文件末尾,切換至下一個文件
121: if (mapedFileOffset == mapedFileSizeLogics) {
122: index++;
123: if (index >= mapedFiles.size()) { // 循環while結束
124: log.info("recover last transaction state table file over, last maped file " + mapedFile.getFileName());
125: break;
126: } else { // 切換下一個文件
127: mapedFile = mapedFiles.get(index);
128: byteBuffer = mapedFile.sliceByteBuffer();
129: processOffset = mapedFile.getFileFromOffset();
130: mapedFileOffset = 0;
131: log.info("recover next transaction state table file, " + mapedFile.getFileName());
132: }
133: } else {
134: log.info("recover current transaction state table queue over " + mapedFile.getFileName() + " " + (processOffset + mapedFileOffset));
135: break;
136: }
137: }
138:
139: // 清理多餘 MappedFile,設置最後一個 MappedFile的寫入位置(position
140: processOffset += mapedFileOffset;
141: this.tranStateTable.truncateDirtyFiles(processOffset);
142:
143: // 設置 TanStateTable 最大物理位置(可寫入位置)
144: this.tranStateTableOffset.set(this.tranStateTable.getMaxOffset() / TSStoreUnitSize);
145: log.info("recover normal over, transaction state table max offset: {}", this.tranStateTableOffset.get());
146: }
147: }複製代碼
RocketMQ 這種實現事務方式,沒有經過 KV 存儲作,而是經過 Offset 方式,存在一個顯著缺陷,即經過 Offset 更改數據,會令系統的髒頁過多,須要特別關注。
官方V4.0.0 暫時未徹底開源【事務消息回查】功能,So 咱們須要進行一些猜測,可能不必定正確😈。
😆咱們來對比【官方V3.1.4:基於文件】的實現。
TranStateTable
。TranStateTable | TransactionRecord | |
---|---|---|
offset | offset | |
producerGroupHash | producerGroup | |
size | 無 | 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 得到。 |
timestamp | 無 | 非必須字段:【事務消息】回查時,使用 offset 讀取 CommitLog 得到。 |
state | 無 | 非必須字段: 事務開始,增長記錄;事務結束,刪除記錄。 |
另外,數據庫自己保證了數據存儲的可靠性,無需 TranRedoLog
。
簡單手繪邏輯圖以下😈:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
2: /** 3: * 檢查【事務狀態】狀態 4: * 5: * @param addr broker地址 6: * @param msg 消息 7: * @param header 請求 8: */
9: @Override
10: public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
11: Runnable request = new Runnable() {
12: private final String brokerAddr = addr;
13: private final MessageExt message = msg;
14: private final CheckTransactionStateRequestHeader checkRequestHeader = header;
15: private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
16:
17: @Override
18: public void run() {
19: TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
20: if (transactionCheckListener != null) {
21: // 獲取事務執行狀態
22: LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
23: Throwable exception = null;
24: try {
25: localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
26: } catch (Throwable e) {
27: log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
28: exception = e;
29: }
30:
31: // 處理事務結果,提交消息 COMMIT / ROLLBACK
32: this.processTransactionState(//
33: localTransactionState, //
34: group, //
35: exception);
36: } else {
37: log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
38: }
39: }
40:
41: /** 42: * 處理事務結果,提交消息 COMMIT / ROLLBACK 43: * 44: * @param localTransactionState 【本地事務】狀態 45: * @param producerGroup producerGroup 46: * @param exception 檢查【本地事務】狀態發生的異常 47: */
48: private void processTransactionState(// 49: final LocalTransactionState localTransactionState, // 50: final String producerGroup, // 51: final Throwable exception) {
52: final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
53: thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
54: thisHeader.setProducerGroup(producerGroup);
55: thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
56: thisHeader.setFromTransactionCheck(true);
57:
58: // 設置消息編號
59: String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
60: if (uniqueKey == null) {
61: uniqueKey = message.getMsgId();
62: }
63: thisHeader.setMsgId(uniqueKey);
64:
65: thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
66: switch (localTransactionState) {
67: case COMMIT_MESSAGE:
68: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
69: break;
70: case ROLLBACK_MESSAGE:
71: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
72: log.warn("when broker check, client rollback this transaction, {}", thisHeader);
73: break;
74: case UNKNOW:
75: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
76: log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
77: break;
78: default:
79: break;
80: }
81:
82: String remark = null;
83: if (exception != null) {
84: remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
85: }
86:
87: try {
88: // 提交消息 COMMIT / ROLLBACK
89: DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
90: 3000);
91: } catch (Exception e) {
92: log.error("endTransactionOneway exception", e);
93: }
94: }
95: };
96:
97: // 提交執行
98: this.checkExecutor.submit(request);
99: }
100:
101: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
102: /** 103: * 【事務消息回查】檢查監聽器 104: */
105: public interface TransactionCheckListener {
106:
107: /** 108: * 獲取(檢查)【本地事務】狀態 109: * 110: * @param msg 消息 111: * @return 事務狀態 112: */
113: LocalTransactionState checkLocalTransactionState(final MessageExt msg);
114:
115: }複製代碼