rocketmq源碼解析結束事務處理器②

說在前面apache

結束事務管理器微信

 

源碼解析ide

往上返回到這個方法,發送最終消息成功刪除準備事務提交消息,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#deletePrepareMessageui

@Overridepublic boolean deletePrepareMessage(MessageExt msgExt) {//        =》if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);return true;} else {log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());return false;}    }

進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putOpMessagethis

public boolean putOpMessage(MessageExt messageExt, String opType) {MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {//            事務消息提交或回滾時添加刪除標記=》return addRemoveTagInTransactionOp(messageExt, messageQueue);}return true;    }

進入這個方法,事務消息提交或回滾時添加刪除標記,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#addRemoveTagInTransactionOpcode

private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));writeOp(message, messageQueue);return true;    }

進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#writeOpblog

private void writeOp(Message message, MessageQueue mq) {MessageQueue opQueue;if (opQueueMap.containsKey(mq)) {opQueue = opQueueMap.get(mq);} else {opQueue = getOpQueueByHalf(mq);MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);if (oldQueue != null) {opQueue = oldQueue;}}if (opQueue == null) {opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());}//        存儲消息putMessage(makeOpMessageInner(message, opQueue));    }

進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putMessage前面介紹過了。事務

往上返回到這個方法,事務消息回滾,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#rollbackMessageget

@Overridepublic OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {return getHalfMessageByOffset(requestHeader.getCommitLogOffset());    }

進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffset源碼

private OperationResult getHalfMessageByOffset(long commitLogOffset) {OperationResult response = new OperationResult();//        根據offset查詢消息=》MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);if (messageExt != null) {response.setPrepareMessage(messageExt);response.setResponseCode(ResponseCode.SUCCESS);} else {response.setResponseCode(ResponseCode.SYSTEM_ERROR);response.setResponseRemark("Find prepared transaction message failed");}return response;    }

進入這個方法,根據offset查詢消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)前面介紹過了。

往上返回到這個方法,事務消息回滾成功刪除準備提交事務消息,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#deletePrepareMessage前面介紹過了。

往上返回到這個方法,org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索