RocketMQ源碼分析之RocketMQ事務消息實現原理上篇(二階段提交)

根據上文的描述,發送事務消息的入口爲:apache

TransactionMQProducer#sendMessageInTransaction:
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
        if (null == this.transactionListener) {    // @1
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);  // @2
    }

代碼@1:若是transactionListener爲空,則直接拋出異常。
代碼@2:調用defaultMQProducerImpl的sendMessageInTransaction方法。服務器

接下來重點分享sendMessageInTransaction方法函數

DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
           final TransactionListener tranExecuter, final Object arg)  throws MQClientException {

Step1:首先先闡述一下參數含義。源碼分析

  • final Message msg:消息
  • TransactionListener tranExecuter:事務監聽器
  • Object arg:其餘附加參數
DefaultMQProducerImpl#sendMessageInTransaction
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
       sendResult = this.send(msg);
} catch (Exception e) {
       throw new MQClientException("send message Exception", e);
}

Step2:在消息屬性中增長兩個屬性:TRAN_MSG,其值爲true,表示爲事務消息;PGROUP:消息所屬發送者組,而後以同步方式發送消息。在消息發送以前,會先檢查消息的屬性TRAN_MSG,若是存在而且值爲true,則經過設置消息系統標記的方式,設置消息爲MessageSysFlag.TRANSACTION_PREPARED_TYPE。ui

DefaultMQProducerImpl#sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
       sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

SendMessageProcessor#sendMessage
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
             return response;
       }
      putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
      putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

Step3:Broker端收到客戶端發送消息請求後,判斷消息類型。若是是事務消息,則調用TransactionalMessageService#prepareMessage方法,不然走原先的邏輯,調用MessageStore#putMessage方法將消息存入Broker服務端。
本節重點闡述事務消息的實現原理,故接下來將重點關注prepareMessage方法,如想了解RocketMQ消息存儲相關,能夠關注做者源碼分析RocketMQ系列this

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.putHalfMessage(messageInner);
 }

step4:事務消息,將調用TransactionalMessageServiceImpl#prepareMessage方法,繼而調用TransactionalMessageBridge#prepareMessage方法。.net

TransactionalMessageBridge#parseHalfMessageInner
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
        return store.putMessage(parseHalfMessageInner(messageInner));
    }

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

Step5:備份消息的原主題名稱與原隊列ID,而後取消消息的事務消息標籤,從新設置消息的主題爲:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定爲0。而後調用MessageStore#putMessage方法將消息持久化,這裏TransactionalMessageBridge橋接類,就是封裝事務消息的相關流程,最終調用MessageStore完成消息的持久化。消息入庫後,會繼續回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2後面,也就是經過同步將消息發送到消息服務端。rest

注:這是事務消息Prepare狀態的處理邏輯,消息是存儲在消息服務器了,但存儲的並非原主題,而是RMQ_SYS_TRANS_HALF_TOPIC,故此時消費端是沒法消費shen
生產者發送的消息的。看到這裏,若是對RocketMQ比較熟悉的話,確定會有一個「定時任務」去取這個主題下的消息,而後則「合適」的時機將消息的主題恢復。code

DefaultMQProducerImpl#sendMessageInTransaction
switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

Step6:若是消息發送成功,會回調TransactionListener#executeLocalTransaction方法,執行本地事務,而且返回本地事務狀態爲LocalTransactionState,枚舉值以下:blog

  • COMMIT_MESSAGE,
  • ROLLBACK_MESSAGE,
  • UNKNOW

注意:TransactionListener#executeLocalTransaction是在發送者成功發送PREPARED消息後,會執行本地事務方法,而後返回本地事務狀態;若是PREPARED消息發送失敗,則不會調用TransactionListener#executeLocalTransaction,而且本地事務消息,設置爲LocalTransactionState.ROLLBACK_MESSAGE,表示消息須要被回滾。

DefaultMQProducerImpl#sendMessageInTransaction
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

step7:調用endTransaction方法結束事務(提交或回滾)。

DefaultMQProducerImpl#endTransaction
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
    case COMMIT_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
         break;
    case ROLLBACK_MESSAGE:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
         break;
     case UNKNOW:
         requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
         break;
     default:
         break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());

step8:組裝結束事務請求,主要參數爲:事務ID、事務操做(commitOrRollback)、消費組、消息隊列偏移量、消息ID,fromTransactionCheck,從這裏發出的請求,默認爲false。Broker端的請求處理器爲:EndTransactionProcessor。

step9:EndTransactionProcessor根據事務提交類型:TRANSACTION_COMMIT_TYPE(提交事務)、TRANSACTION_ROLLBACK_TYPE(回滾事務)、TRANSACTION_NOT_TYPE(忽略該請求)。

到目前爲止,已詳細梳理了RocketMQ事務消息的發送流程,更加準確的說是Prepare狀態的消息發送流程。具體流程如圖所示:

本文到這裏,初步展現了事務消息的發送流程,總的說來,RocketMQ的事務消息發送使用二階段提交思路,首先,在消息發送時,先發送消息類型爲Prepread類型的消息,而後在將該消息成功存入到消息服務器後,會回調TransactionListener#executeLocalTransaction,執行本地事務狀態回調函數,而後根據該方法的返回值,結束事務:
一、COMMIT_MESSAGE :提交事務。
二、ROLLBACK_MESSAGE:回滾事務。
三、UNKNOW:未知事務狀態,此時消息服務器(Broker)收到EndTransaction命令時,將不對這種消息作處理,消息還處於Prepared類型,存儲在主題爲:RMQ_SYS_TRANS_HALF_TOPIC的隊列中,而後消息發送流程將結束,那這些消息如何提交或回滾呢?

爲了實現避免客戶端須要再次發送提交、回滾命令,RocketMQ會採起定時任務將RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,而後回到客戶端,判斷該消息是否須要提交或回滾,來完成事務消息的聲明週期,該部份內容將在下節重點探討。

原文連接

相關文章
相關標籤/搜索