說在前面apache
結束事務管理器微信
源碼解析ide
往上返回到這個方法,發送最終消息成功刪除準備事務提交消息,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#deletePrepareMessageui
@Overridepublic boolean deletePrepareMessage(MessageExt msgExt) {// =》if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);return true;} else {log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());return false;} }
進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putOpMessagethis
public boolean putOpMessage(MessageExt messageExt, String opType) {MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {// 事務消息提交或回滾時添加刪除標記=》return addRemoveTagInTransactionOp(messageExt, messageQueue);}return true; }
進入這個方法,事務消息提交或回滾時添加刪除標記,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#addRemoveTagInTransactionOpcode
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));writeOp(message, messageQueue);return true; }
進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#writeOpblog
private void writeOp(Message message, MessageQueue mq) {MessageQueue opQueue;if (opQueueMap.containsKey(mq)) {opQueue = opQueueMap.get(mq);} else {opQueue = getOpQueueByHalf(mq);MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);if (oldQueue != null) {opQueue = oldQueue;}}if (opQueue == null) {opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());}// 存儲消息putMessage(makeOpMessageInner(message, opQueue)); }
進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putMessage前面介紹過了。事務
往上返回到這個方法,事務消息回滾,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#rollbackMessageget
@Overridepublic OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {return getHalfMessageByOffset(requestHeader.getCommitLogOffset()); }
進入這個方法,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffset源碼
private OperationResult getHalfMessageByOffset(long commitLogOffset) {OperationResult response = new OperationResult();// 根據offset查詢消息=》MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);if (messageExt != null) {response.setPrepareMessage(messageExt);response.setResponseCode(ResponseCode.SUCCESS);} else {response.setResponseCode(ResponseCode.SYSTEM_ERROR);response.setResponseRemark("Find prepared transaction message failed");}return response; }
進入這個方法,根據offset查詢消息,org.apache.rocketmq.store.DefaultMessageStore#lookMessageByOffset(long)前面介紹過了。
往上返回到這個方法,事務消息回滾成功刪除準備提交事務消息,org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#deletePrepareMessage前面介紹過了。
往上返回到這個方法,org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣