RocketMQ中分佈式事務源碼剖析,面試阿里,帶上這個大殺器!

做者:有故事的驢apache

來自讀者的投稿bash

---------------------------------------服務器

分佈式事務是一個複雜的問題,上篇講了分佈式事務經常使用的幾種解決方案,其實最經常使用的是消息最終一致性方案,rocketMQ中的事務也是使用消息最終一致性方案的思路來實現的。微信

rocketMQ保證本地事務成功時,消息必定會發送成功並被成功消費,若是本地事務失敗了,消息就不會被髮送。網絡

下邊引用一張rocketMQ官網的事物處理流程圖:socket

首先介紹下上圖中提到的2個概念
分佈式

half消息是什麼?ide

half消息指的是暫時沒法投遞的消息。當消息成功發送到MQ服務器,但服務器沒有收到來自生產者的消息的第二次確認時,該消息被標記爲「暫時沒法投遞」。此狀態中的消息稱爲half消息。ui

消息狀態回查是什麼意思?this

因爲網絡斷開或生產者應用程序從新啓動可能致使丟失事務消息的第二次確認。當MQ服務器發現消息狀態長時間爲half消息時,它將向消息生產者發送回查請求,檢查生產者上次本地事物的執行結果,以此爲基礎進行消息的提交或回滾。

上邊的圖詳細描述了rocketMQ中分佈式事務的每一個階段,下邊用文字描述一下這個過程:

  1. 生產者向MQ服務器發送half消息。
  2. half消息發送成功後,MQ服務器返回確認消息給生產者。
  3. 生產者開始執行本地事務。
  4. 根據本地事務執行的結果(UNKNOW、commit、rollback)向MQ Server發送提交或回滾消息。
  5. 若是錯過了(可能由於網絡異常、生產者忽然宕機等致使的異常狀況)提交/回滾消息,則MQ服務器將向同一組中的每一個生產者發送回查消息以獲取事務狀態。
  6. 回查生產者本地事物狀態。
  7. 生產者根據本地事務狀態發送提交/回滾消息。
  8. MQ服務器將丟棄回滾的消息,但已提交(進行過二次確認的half消息)的消息將投遞給消費者進行消費。

從上述流程能夠知道,事務消息其實只是保證了生產者本地事務和發送消息的原子性

消費者在消費事務消息時,broker處理事務消息的消費與普通消息是同樣的,若消費不成功,則broker會重複投遞該消息16次,若仍然不成功則須要人工介入。

OK,知道了rocketMQ的事物處理流程後,咱們根據官網提供的例子跟下源碼再看看(rocketMQ版本爲4.3.0)

1、生產者發送prepare消息

客戶端發送事務消息的部分(完整代碼請查看org.apache.rocketmq.example.transaction.TransactionProducer)

生產者TransactionProducer代碼以下:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //本地事務回調組件
        TransactionListener transactionListener = new TransactionListenerImpl();
        //生產者初始化
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //設置生產者的本地事物回調組件
        producer.setTransactionListener(transactionListener);
        producer.start();
        //發送消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        producer.shutdown();
    }
}
複製代碼

回調組件TransactionListenerImpl代碼以下:

public class TransactionListenerImpl implements TransactionListener {
    //本地事務業務邏輯
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //這裏會執行本地業務邏輯,此處省略...
        //返回本地事物的執行結果(UNKNOW、commit、rollback)
        return LocalTransactionState.UNKNOW;
    }
    //當prepare發送後超時沒有返回,那麼MQ服務器會回調執行這個方法用來檢查上次本地事務執行的狀態,
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        
        //這裏會實現檢查本地事物處理結果的邏輯,此處省略...
        //好比若是本地事物是往表A插入一條數據的話,那麼此處能夠去表A查下那條記錄是否存在,就能夠知道上次本地事物是否成功了
        //根據上次本地事物執行的結果返回消息狀態
        //本地事物執行成功返回COMMIT_MESSAGE,反之失敗返回ROLLBACK_MESSAGE
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
複製代碼

以上是官網例子中我精簡過的代碼,關鍵的一點是事務消息的生產者須要構造實現了TransactionListener的實現類,並註冊到TransactionMQProducer中,接下來咱們就以此爲入口按照上面的事物流程圖來分析下源碼吧

首先確定是從生產中發送消息開始嘍

//發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
複製代碼

最終會調用到org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction中來

這個方法很是重要,實現了事物消息發送的關鍵邏輯(發送消息->執行本地事物->commit/rollback消息)

public class DefaultMQProducerImpl implements MQProducerInner {
    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        //獲取以前註冊的transactionListener本地事務回查組件
        TransactionListener transactionListener = getCheckListener();
        try {
            //發送prepare消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {//發送prepare消息成功
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    //localTransactionExecuter傳進來爲null,這個分支不會走
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //執行本地事務
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
        try {
            //根據本地事務執行的結果去發送commit消息或者rollback消息
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
}
複製代碼

咱們先看下sendResult = this.send(msg);這行代碼內部是怎麼執行的,其實它最終調用的是MQClientAPIImpl組件的sendMessage()方法,代碼調用順序以下:

public class DefaultMQProducerImpl implements MQProducerInner {
    /**
     * DEFAULT SYNC -------------------------------------------------------
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    }
    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        //省略非關鍵代碼
                        break;
                    case ONEWAY:
                    case SYNC://communicationMode傳參時爲SYNC,默認走的這個分支
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        //發送消息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
    }
}
複製代碼

咱們接着MQClientAPIImpl組件的sendMessage()方法繼續往下看

public class MQClientAPIImpl {
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
    }
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
        request.setBody(msg.getBody());
        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC://communicationMode傳參時爲SYNC,默認走的這個代碼分支
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                //以同步的方式發送消息
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }
        return null;
    }
    private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        //後邊就是調用netty相關組件來處理髮送msg了
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }
}
複製代碼

由以上代碼可知最終會調用到NettyRemotingClient組件的invokeSync()方法進行處理,在這個invokeSync()方法中會經過nio的方式把消息發送到MQ服務器端broker,invokeSync()方法的源碼以下

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
}
複製代碼

MQ服務器端broker處理完髮送過來的消息以後會給生產者DefaultMQProducerImpl一個返回值SendResult。

到這裏爲止咱們執行到事物消息的哪一個階段了呢?

如今咱們回到關鍵流程中去,關鍵流程就在DefaultMQProducerImpl組件的sendMessageInTransaction()方法中

而後咱們發現其實咱們剛把這段代碼sendResult = this.send(msg)執行完畢並拿到了一個sendResult返回值,這個返回值表明着咱們發送這條消息的一個結果(發送成功仍是失敗)

public class DefaultMQProducerImpl implements MQProducerInner {
    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        //省略無關代碼...
        //發送prepare消息
        sendResult = this.send(msg);
}
複製代碼

接下來會根據sendResult返回值來執行不一樣的邏輯處理

若是sendResult爲SEND_OK,即發送prepare消息成功,那麼就開始執行本地事物(即TransactionListenerImpl組件的executeLocalTransaction()方法)

本地事物返回值是一個枚舉localTransactionState(有3種取值 COMMIT_MESSAGE , ROLLBACK_MESSAGE , UNKNOW)

若是發送prepare消息失敗,則直接設置localTransactionState的值爲ROLLBACK_MESSAGE。最後執行endTransaction()方法進行prepare消息的二次確認。源碼以下

public class DefaultMQProducerImpl implements MQProducerInner {
    public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        //獲取以前註冊的transactionListener本地事務回查組件
        TransactionListener transactionListener = getCheckListener();
        try {
            //發送prepare消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {//發送prepare消息成功
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    //localTransactionExecuter傳進來爲null,這個分支不會走
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //執行本地事務
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
        try {
            //根據本地事務執行的結果去發送commit消息或者rollback消息
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
    }
}
複製代碼

咱們繼續接着endTransaction()往下看

public class DefaultMQProducerImpl implements MQProducerInner {
    public void endTransaction(
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        //根據本地事務執行的結果去設置請求頭信息commitOrRollback
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        //將request寫入channel中,經過socket把消息發送給消費端,後邊主要是netty的事兒了
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
}
複製代碼


public class MQClientAPIImpl {
    public void endTransactionOneway(
        final String addr,
        final EndTransactionRequestHeader requestHeader,
        final String remark,
        final long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
        request.setRemark(remark);
        //將request寫入channel中,經過socket把消息發送給消費端,後邊主要是netty的事兒了
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }
}
複製代碼


public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    @Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
}
複製代碼


最終調用NettyRemotingClient組件的invokeOneway()方法完成prepare消息的二次確認,若是localTransactionState的值爲COMMIT_MESSAGE時則MQ服務端會將消息投遞給消費者進行消費;

可是若是localTransactionState的值爲ROLLBACK_MESSAGE時則MQ服務端會刪除已經存儲的prepare消息,此時消費者將沒有機會消費到這條消息。

2、Broker端對消息的處理

(1)Broker處理prepare消息

最終調用SendMessageProcessor組件的sendMessage方法處理事務消息,代碼以下

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                        final RemotingCommand request,
                                        final SendMessageContext sendMessageContext,
                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
        //判斷是不是事務消息 若是是事務消息則用事務消息的邏輯處理
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            }
            //處理prepare消息
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
    }
}
複製代碼


public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.putHalfMessage(messageInner);
    }
}
複製代碼


public class TransactionalMessageBridge {
    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
        //對prepare消息進行存儲
        return store.putMessage(parseHalfMessageInner(messageInner));
    }
}
複製代碼


(2)Broker處理prepare消息的二次確認,即結束事務消息的處理

2.1 會判斷本次事務的最終狀態,若是是Commit就改變事物消息狀態,使消費者可見,此時消費者就能夠消費消息了

2.2 若是是Rollback,那麼就刪除在broker端存儲的事物消息,此時消費者就永遠消費不到這條消息

你們能夠先這樣理解,由於這塊比較複雜,實際是經過3個隊列實現的,以後有機會我會詳細說下這塊的實現細節。

3、事務消息是如何處理回查的?

broker在啓動時會啓動線程回查的服務,在TransactionMessageCheckService的run方法中,該方法會執行到onWaitEnd方法:

@Override
protected void onWaitEnd() {
    //獲取超時時間 6s
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    //獲取最大檢測次數 15次
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    //獲取當前時間
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    //開始檢測
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
複製代碼

經過netty傳遞消息最終調用到TransactionListenerImpl組件的checkLocalTransaction()方法來檢查本地事物的狀態

public class TransactionListenerImpl implements TransactionListener {
    //當prepare發送後超時沒有返回,那麼MQ服務器會回調執行這個方法用來檢查上次本地事務執行的狀態,
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        
        //這裏會實現檢查本地事物處理結果的邏輯,此處省略...
        //好比若是本地事物是往表A插入一條數據的話,那麼此處能夠去表A查下那條記錄是否存在,就能夠知道上次本地事物是否成功了
        //根據上次本地事物執行的結果返回消息狀態
        //本地事物執行成功返回COMMIT_MESSAGE,反之失敗返回ROLLBACK_MESSAGE
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
複製代碼


4、總結

本篇文章從生產者發送prepare消息、Broker端對消息的處理以及事務消息是如何處理回查的3個階段詳細分析了rocketMQ的源碼,按照上述代碼執行順序你們徹底能夠跟着走讀下源碼。

爲了給你們展現到事物消息的核心鏈路,不少細節我都隱藏掉了,你們能夠在走讀源碼的時候本身研究下。

5、關於rocketMQ某些版本不支持事務消息回查的說明

RocketMQ 3.0.8 以及以前的版本是 支持分佈式事務消息;

RocketMQ 3.0.8 以後 ,分佈式事務的閹割了,不支持分佈式事務消息;

RocketMQ 4.0.0 開始 apache 孵化,可是也不支持分佈式事務消息;

2018年08月, RocketMQ 4.3.0 又開始支持分佈式事務消息。

http://rocketmq.apache.org/release_notes/release-notes-4.3.0/ ,如圖所示:


總結:

從RocketMQ 3.0.8 以後 到 4.3.0 以前,在這期間的版本均不支持分佈式事務消息。包括這在期間使用比較普遍的3.2.6 就是不支持分佈式事務消息。

注意上邊說的不支持僅僅是指源碼中缺乏了broker事務回查的代碼,其餘的事物代碼仍是存在的,我是對比了4.2.0和4.3.0的源碼發現的,你們有興趣也能夠關注下這塊。

END

推薦一個專欄:

《從零開始帶你成爲JVM實戰高手》

做者是我多年好友,之前團隊的左膀右臂

一塊兒經歷過各類大型複雜系統上線的血雨腥風

現任阿里資深技術專家,對JVM有豐富的生產實踐經驗

專欄目錄參見文末,能夠掃下方海報進行試讀

經過上面海報購買,再返你24元

領取方式:加微信號:Giotto1245,暗號:返現

相關文章
相關標籤/搜索