摘要: 本文詳細分析了RocketMQ事務消息實現原理中的事務狀態回查實現,RocketMQ會默認一分鐘的頻率處理消息狀態爲Prepare的消息,經過調用消息生產者的事務狀態查詢接口得知消息的事務狀態,從而決定提交或回滾消息。html
在閱讀本文前,若您對RocketMQ技術感興趣,請加入 RocketMQ技術羣交流web
上節已經梳理了RocketMQ發送事務消息的流程(基於二階段提交),本節將繼續深刻學習事務狀態消息回查,咱們知道,第一次提交到消息服務器時消息的主題被替換爲RMQ_SYS_TRANS_HALF_TOPIC,本地事務執行完後若是返回本地事務狀態爲UN_KNOW時,第二次提交到服務器時將不會作任何操做,也就是說此時消息還存在與RMQ_SYS_TRANS_HALF_TOPIC主題中,並不能被消息消費者消費,那這些消息最終如何被提交或回滾呢?apache
原來RocketMQ使用TransactionalMessageCheckService線程定時去檢測
RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀態。TransactionalMessageCheckService的檢測頻率默認1分鐘,可經過在broker.conf文件中設置transactionCheckInterval的值來改變默認值,單位爲毫秒。服務器
接下來將深刻分析該線程的實現原理,從而解開事務消息回查機制。數據結構
TransactionalMessageCheckService#onWaitEnd protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // @1 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // @2 long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); // @3 log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
代碼@1:從broker配置文件中獲取transactionTimeOut參數值。
代碼@2:從broker配置文件中獲取transactionCheckMax參數值,表示事務的最大檢測次數,若是超過檢測次數,消息會默認爲丟棄,即回滾消息。異步
接下來重點分析TransactionalMessageService#check的實現邏輯:ide
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl TransactionalMessageServiceImpl#check String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; }
step1:根據主題名稱,獲取該主題下全部的消息隊列。學習
TransactionalMessageServiceImpl#check for (MessageQueue messageQueue : msgQueues) { // ... }
Step2:循環遍歷消息隊列,從單個消息消費隊列去獲取消息。fetch
TransactionalMessageServiceImpl#check long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; }
Step3:獲取對應的操做隊列,其主題爲:RMQ_SYS_TRANS_OP_HALF_TOPIC,而後獲取操做隊列的消費進度、待操做的消費隊列的消費進度,若是任意一小於0,忽略該消息隊列,繼續處理下一個隊列。this
TransactionalMessageServiceImpl#check List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; }
Step4:調用fillOpRemoveMap主題填充removeMap、doneOpOffset數據結構,這裏主要的目的是避免重複調用事務回查接口,這裏說一下RMQ_SYS_TRANS_HALF_TOPIC、RMQ_SYS_TRANS_OP_HALF_TOPIC這兩個主題的做用。
RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主題,事務消息首先先進入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當消息服務器收到事務消息的提交或回滾請求後,會將消息存儲在該主題下。
TransactionalMessageServiceImpl#check // single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; // @1 while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { // @2 log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } if (removeMap.containsKey(i)) { // @3 log.info("Half offset {} has been committed/rolled back", i); removeMap.remove(i); } else { GetResult getResult = getHalfMsg(messageQueue, i); // @4 MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { // @5 if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // @6 listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.info("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // @7 long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { // @8 checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { newOffset = i + 1; i++; continue; } } } else { // @9 if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); // @10 if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { // @11 continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); // @12 log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } if (newOffset != halfOffset) { // @13 transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { // @14 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }
本段代碼比較長,倒是事務狀態回查的重點實現。
代碼@1:先解釋幾個局部變量的含義。
代碼@2:這段代碼應該不陌生,這是RocketMQ處理任務的一個通用處理邏輯,就是一個任務處理,能夠限制每次最多處理的時間,RocketMQ爲待檢測主題RMQ_SYS_TRANS_HALF_TOPIC的每一個隊列,作事務狀態回查,一次最多不超過60S,目前該值不可配置。
代碼@3:若是removeMap中包含當前處理的消息,則繼續下一條,removeMap中的值是經過Step3中填充的,具體實現邏輯是從RMQ_SYS_TRANS_OP_HALF_TOPIC主題中拉取32條,若是拉取的消息隊列偏移量大於等於RMQ_SYS_TRANS_HALF_TOPIC#queueId當前的處理進度時,會添加到removeMap中,表示已處理過。
代碼@4:根據消息隊列偏移量i從消費隊列中獲取消息。
代碼@5:若是消息爲空,則根據容許重複次數進行操做,默認重試一次,目前不可配置。其具體實現爲:
代碼@6:判斷該消息是否須要discard(吞沒,丟棄,不處理)、或skip(跳過),其依據以下:
代碼@7:處理事務超時相關概念,先解釋幾個局部變量:、
transactionTimeout:事務消息的超時時間,其設計的意義是,應用程序在發送事務消息後,事務不會立刻提交,該時間就是假設事務消息發送成功後,應用程序事務提交的時間,在這段時間內,RocketMQ任務事務未提交,故不該該在這個時間段嚮應用程序發送回查請求。
代碼@8:若是消息指定了事務消息過時時間屬性(PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS),若是當前時間已超過該值。
代碼@9:若是當前時間還未過(應用程序事務結束時間),則跳出本次回查處理的,等下一次再試。
代碼@10:判斷是否須要發送事務回查消息,具體邏輯:
代碼@11:若是須要發送事務狀態回查消息,則先將消息再次發送到RMQ_SYS_TRANS_HALF_TOPIC主題中,發送成功則返回true,不然返回false,這裏還有一個實現關鍵點:
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { msgExt.setQueueOffset( putMessageResult.getAppendMessageResult().getLogicsOffset()); msgExt.setCommitLogOffset( putMessageResult.getAppendMessageResult().getWroteOffset()); msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); }
若是發送成功,會將該消息的queueOffset、commitLogOffset設置爲從新存入的偏移量,爲何須要這樣呢,答案在listener.resolveHalfMsg(msgExt)中。
AbstractTransactionalMessageCheckListener#resolveHalfMsg public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); }
發送具體的事務回查機制,這裏用一個線程池來異步發送回查消息,爲了回查進度保存的簡化,這裏只要發送了回查消息,當前回查進度會向前推進,若是回查失敗,上一步驟新增的消息將能夠再次發送回查消息,那若是回查消息發送成功,那會不會下一次又重複發送回查消息呢?這個能夠根據OP隊列中的消息來判斷是否重複,若是回查消息發送成功而且消息服務器完成提交或回滾操做,這條消息會發送到OP隊列中,而後fillOpRemoveMap根據處理進度獲取一批已處理的消息,來與消息判斷是否重複,因爲fillopRemoveMap一次只拉32條消息,那又如何保證必定能拉取到與當前消息的處理記錄呢?其實就是經過代碼@10來實現的,若是此批消息最後一條未超過事務延遲消息,則繼續拉取更多消息進行判斷(@12)和(@14),op隊列也會隨着回查進度的推動而推動。
代碼@12:若是沒法判斷是否發送回查消息,則加載更多的已處理消息進行刷選。
代碼@13:保存(Prepare)消息隊列的回查進度。
代碼@14:保存處理隊列(op)的進度。
上述講解了TransactionalMessageCheckService回查定時線程的發送回查消息的總體流程與實現細節,接下來重點分析一下上述步驟@11,經過異步方式發送消息回查的實現過程。
AbstractTransactionalMessageCheckListener#sendCheckMessage public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // @1 msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); // @2 String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); // @3 Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); // @4 } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }
代碼@1:首先構建回查事務狀態請求消息,請求核心參數包括:消息offsetId、消息ID(索引)、消息事務ID、事務消息隊列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)。
代碼@2:恢復原消息的主題、隊列,並設置storeSize爲0。
代碼@3:獲取生產者組名稱。
代碼@4:根據生產者組獲取任意一個生產者,經過與其鏈接發送事務回查消息,回查消息的請求者爲【Broker服務器】,接收者爲(client,具體爲消息生產者)。
其處理類爲:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest,其詳細邏輯實現方法爲:
ClientRemotingProcessor#checkTransactionState public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); final MessageExt messageExt = MessageDecoder.decode(byteBuffer); if (messageExt != null) { String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId); } final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); producer.checkTransactionState(addr, messageExt, requestHeader); // @1 } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); } } else { log.warn("checkTransactionState, pick producer group failed"); } } else { log.warn("checkTransactionState, decode message failed"); } return null; }
代碼@1:最終調用生產者的checkTransactionState方法。
DefaultMQProducerImpl#checkTransactionState public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { // @1 private final String brokerAddr = addr; private final MessageExt message = msg; private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); @Override public void run() { TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); // @1 if (transactionCheckListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { localTransactionState = transactionCheckListener.checkLocalTransaction(message); // @2 } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } this.processTransactionState( // @3 localTransactionState, group, exception); } else { log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); } } private void processTransactionState( final LocalTransactionState localTransactionState, final String producerGroup, final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { uniqueKey = message.getMsgId(); } thisHeader.setMsgId(uniqueKey); thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); switch (localTransactionState) { case COMMIT_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); log.warn("when broker check, client rollback this transaction, {}", thisHeader); break; case UNKNOW: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); log.warn("when broker check, client does not know this transaction state, {}", thisHeader); break; default: break; } String remark = null; if (exception != null) { remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); } try { DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000); } catch (Exception e) { log.error("endTransactionOneway exception", e); } } }; this.checkExecutor.submit(request); }
上述代碼雖多,其實實現思路很是清晰,先使用一個匿名類( Runnable )構建一個運行任務,而後提交到checkExecutor線程池中執行,這與我第一篇文章的猜想是吻合的,那重點分析一下該任務的容許邏輯,對應在run方法中。
代碼@1:獲取消息發送者的TransactionListener。
代碼@2:執行TransactionListener#checkLocalTransaction,檢測本地事務狀態,也就是應用程序須要實現TransactionListener#checkLocalTransaction,告知RocketMQ該事務的事務狀態,而後返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一個,而後向Broker發送END_TRANSACTION命令便可,
代碼@3:發送END_TRANSACTION到Broker,其具體實現,已經在 http://www.javashuo.com/article/p-choefxxr-ct.html 中詳細講解過,在此不重複分析。
到這裏,事務消息狀態回查流程就講解完畢,接下來以一張流程圖結束本篇的講解。
下一篇,將重點分析Broker在收到事務狀態爲COMMIT_MESSAGE、ROLLBACK_MESSAGE時如何提交、回滾事務。
若您對RocketMQ技術感興趣,請加入做者所在的 RocketMQ技術羣交流