MQ組件是系統架構裏必不可少的一門利器,設計層面能夠下降系統耦合度,高併發場景又能夠起到削峯填谷的做用,從單體應用到集羣部署方案,再到如今的微服務架構,MQ憑藉其優秀的性能和高可靠性,獲得了普遍的承認。 隨着數據量增多,系統壓力變大,開始出現這種現象:數據庫已經更新了,但消息沒發出來,或者消息先發了,但後來數據庫更新失敗了,結果研發童鞋各類數據修復,這種生產問題出現的機率不大,但讓人很鬱悶。這個其實就是數據庫事務與MQ消息的一致性問題,簡單來說,數據庫的事務跟普通MQ消息發送沒法直接綁定與數據庫事務綁定在一塊兒,例如上面說起的兩種問題場景:java
場景1的問題是數據庫事務可能剛剛提交,服務器就宕機了,MQ消息沒發出去,場景2的問題就是MQ消息發送出去了,但數據庫事務提交失敗,又沒辦法追加已經發出去的MQ消息,結果致使數據沒更新,下游已經收到消息,最終事務出現不一致的狀況。spring
咱們以微服務架構的購物場景爲例,參照一下RocketMQ官方的例子,用戶A發起訂單,支付100塊錢操做完成後,能獲得100積分,帳戶服務和會員服務是兩個獨立的微服務模塊,有各自的數據庫,按照上文說起的問題可能性,將會出現這些狀況:數據庫
由此引出的是數據庫事務與MQ消息的事務一致性問題,rocketmq事務消息解決的問題:解決本地事務執行與消息發送的原子性問題。這裏界限必定要明白,是確保MQ生產端正確無誤地將消息發送出來,沒有多發,也不會漏發。但至於發送後消費端有沒有正常的消費掉(如上面說起的第三種狀況,錢正常扣了,消息也發了,但下游消費出問題致使積分不對),這種異常場景將由MQ消息消費失敗重試機制來保證,不在這次的討論範圍內。apache
經常使用的MQ組件針對此場景都有本身的實現方案,如ActiveMQ使用AMQP協議(二階提交方式)保證消息正確發送,這裏咱們以RocketMQ爲重點進行學習。服務器
根據CAP理論,RocketMQ事務消息經過異步確保方式,保證事務的最終一致性。設計流程上借鑑兩階段提交理論,流程圖以下: 網絡
以RocketMQ 4.5.2版本爲例,事務消息有專門的一個隊列RMQ_SYS_TRANS_HALF_TOPIC,全部的prepare消息都先往這裏放,當消息收到Commit請求後,就把消息再塞到真實的Topic隊列裏,供Consumer消費,同時向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一條消息。簡易流程圖以下: 架構
上述流程中,請容許我這樣劃分模塊職責:併發
應用模塊的事務由於中斷,或是其餘的網絡緣由,致使沒法當即響應的,RocketMQ當作UNKNOW處理,RocketMQ事務消息還提供了一個補救方案:定時查詢事務消息的數據庫事務狀態 簡易流程圖以下: 異步
講解的思路基本上按照以下流程圖,根據模塊職責和流程逐一分析。分佈式
環境準備 閱讀源碼前須要在IDE上獲取和調試RocketMQ的源碼,這部分請自行查閱方法。
應用模塊(事務消息生產端)核心源碼 建立一個監聽類,實現TransactionListener接口,在實現的數據庫事務提交方法和回查事務狀態方法模擬結果。
/** * @program: rocket * @description: 調試事務消息示例代碼 * @author: Huang * @create: 2019-10-16 **/ public class SelfTransactionListener implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private AtomicInteger checkTimes = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); /** * 執行本地事務 * * @param message * @param o * @return */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { String msgKey = message.getKeys(); System.out.println("start execute local transaction " + msgKey); LocalTransactionState state; if (msgKey.contains("1")) { // 第一條消息讓他經過 state = LocalTransactionState.COMMIT_MESSAGE; } else if (msgKey.contains("2")) { // 第二條消息模擬異常,明確回覆回滾操做 state = LocalTransactionState.ROLLBACK_MESSAGE; } else { // 第三條消息無響應,讓它調用回查事務方法 state = LocalTransactionState.UNKNOW; // 給剩下3條消息,放1,2,3三種狀態 localTrans.put(msgKey, transactionIndex.incrementAndGet()); } System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis()); return state; } /** * 回查本地事務結果 * * @param messageExt * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String msgKey = messageExt.getKeys(); System.out.println("start check local transaction " + msgKey); Integer state = localTrans.get(msgKey); switch (state) { case 1: System.out.println("check result unknown 回查次數" + checkTimes.incrementAndGet()); return LocalTransactionState.UNKNOW; case 2: System.out.println("check result commit message, 回查次數" + checkTimes.incrementAndGet()); return LocalTransactionState.COMMIT_MESSAGE; case 3: System.out.println("check result rollback message, 回查次數" + checkTimes.incrementAndGet()); return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } }
事務消息生產者代碼示例,共發送5條消息,基本上包含所有的場景,休眠時間設置足夠的時間,保證回查事務時實例還在運行中,代碼以下:
/** * @program: rocket * @description: Rocketmq事務消息 * @author: Huang * @create: 2019-10-16 **/ public class TransactionProducer { public static void main(String[] args) { try { TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer"); producer.setNamesrvAddr("10.0.133.29:9876"); producer.setTransactionListener(new SelfTransactionListener()); producer.start(); for (int i = 1; i < 6; i++) { Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" + i).getBytes()); try { SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" + i); System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId()); } catch (Exception e) { e.printStackTrace(); } } Thread.sleep(Integer.MAX_VALUE); producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
第二段:消息發送成功後,調用應用模塊數據庫事務方法,獲取事務結果(爲節省篇幅,代碼有刪節)
switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
第三段:發送事務結果到RocketMQ端,結束事務,並響應結果給應用模塊
try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 修改消息的Topic爲由RMQ_SYS_TRANS_HALF_TOPIC改成真實Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 將消息存儲到真實Topic中,供Consumer消費 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記爲刪除狀態,事務消息回查的定時任務中會作處理 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }
@Override protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // 超過15次的回查事務狀態失敗後,默認是丟棄此消息 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
回查事務調用入口:
// 此段代碼爲TransactionalMessageServiceImpl類中的check方法 List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1 ); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 調用AbstractTransactionalMessageCheckListener的 listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } // 此方法在AbstractTransactionalMessageCheckListener類中 public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); } // 此方法在AbstractTransactionalMessageCheckListener類中 public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); if (channel != null) { // 經過Netty發送請求到RocketMQ Client端,執行checkTransactionState方法 brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }
RocketMQ Client接收到服務端的請求後,從新調用回查數據庫事務方法,並將事務結果再次提交到RocketMQ Broker端 方法入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類的方法
try { if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } else { log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } this.processTransactionState( localTransactionState, group, exception);
官網有說起,事務消息是不支持延遲消息和批量消息,我手賤試了一下延遲消息,事務消息設置一個DelayTimeLevel,結果這條消息就一直沒法從RMQ_SYS_TRANS_HALF_TOPIC移除掉了,應用模塊的日誌發如今反覆地嘗試回查事務,Console界面上RMQ_SYS_TRANS_HALF_TOPIC的消息查詢列表很快就超過2000條記錄了,爲何?
咱們回到代碼層面進行分析,過程以下:
1.設置了DelayTimeLevel後,數據事務提交後(或是回查數據庫事務完成後),將消息寫入目標Topic時,因爲DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX,同時REAL_TOPIC變成RMQ_SYS_TRANS_HALF_TOPIC,真實的Topic在這個環節已經丟失。
// RocketMQ Broker端接受事務提交後的處理 org.apache.rocketmq.broker.processor.EndTransactionProcessor類 OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 這裏調用CommitLog的putMessage方法 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 修改消息的Topic爲由RMQ_SYS_TRANS_HALF_TOPIC改成真實Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 將消息存儲到真實Topic中,此時Topic已經變成SCHEDULE_TOPIC_XXXX RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記爲刪除狀態,事務消息回查的定時任務中會作處理 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } // 此段代碼在org.apache.rocketmq.store.CommitLog類的putMessage方法中 // 因爲DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
打印的日誌示例以下:
2019-10-17 14\:41\:05 INFO EndTransactionThread_4 - Transaction op message write successfully. messageId=0A00851D00002A9F0000000000000E09, queueId=0 msgExt:MessageExt [queueId=0, storeSize=335, queueOffset=5, sysFlag=8, bornTimestamp=1571293959305, bornHost=/10.0.133.29:54634, storeTimestamp=1571294460555, storeHost=/10.0.133.29:10911, msgId=0A00851D00002A9F0000000000000E09, commitLogOffset=3593, bodyCRC=1849408413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SCHEDULE_TOPIC_XXXX', flag=0, properties={REAL_TOPIC=RMQ_SYS_TRANS_HALF_TOPIC, TRANSACTION_CHECK_TIMES=3, KEYS=msg-test-3, TRAN_MSG=true, UNIQ_KEY=0A00851D422C18B4AAC25584B0880000, WAIT=false, DELAY=1, PGROUP=transactionMQProducer, TAGS=transactionTest, REAL_QID=0}, body=[72, 101, 108, 108, 111, 84, 105, 109, 101, 58, 51], transactionId='null'}]
2.延遲消息是定時任務觸發的,我剛剛設置的延遲是1秒,定時任務又把消息從新放回RMQ_SYS_TRANS_HALF_TOPIC中,注意此時只有RMQ_SYS_TRANS_HALF_TOPIC有消息,RMQ_SYS_TRANS_OP_HALF_TOPIC隊列是沒有這條消息的,以下代碼:
// 此段代碼在org.apache.rocketmq.store.schedule.ScheduleMessageService類executeOnTimeup方法內 try { // 消息從新回到RMQ_SYS_TRANS_HALF_TOPIC隊列中 MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); }
3.事務消息定時任務啓動,查RMQ_SYS_TRANS_HALF_TOPIC有消息,但RMQ_SYS_TRANS_OP_HALF_TOPIC沒有消息,爲了保證消息順序寫入,又將此消息從新填入RMQ_SYS_TRANS_OP_HALF_TOPIC中,而且觸發一次回查事務操做。示例代碼如上文回查事務調用入口相同:
// 此段代碼爲TransactionalMessageServiceImpl類中的check方法 List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1 ); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; }
這樣構成了一個死循環,直到嘗試到15次才丟棄此消息(默認最大嘗試次數是15次),這個代價有點大。針對此問題的優化,已經提交PR到RocketMQ社區,新版本發佈後,事務消息將屏蔽DelayTimeLevel,這個問題就不會再出現了。
在新版本發佈以前,咱們的解決辦法:
/** * 事務消息發送 * 不支持延遲發送和批量發送 */ public void sendMessageInTransaction(String topic, String tag, Object message, String requestId) throws Exception { TransactionMQProducer producer = annotationScan.getProducer(topic + "_" + tag); producer.sendMessageInTransaction(MessageBuilder.of(topic, tag, message, requestId).build(), message); }
應該靠譜一些,畢竟從源頭杜絕了DelayTimeLevel參數的設置。
本篇簡單介紹了事務消息的解決的場景和職責的界限,基本的設計思路和流程,在此借鑑學習了RocketMQ做者的圖稿,而後挑了部分代碼做簡要的講解,仍是本身的刨坑過程,文章內有任何不正確或不詳盡之處請留言指導,謝謝。
專一Java高併發、分佈式架構,更多技術乾貨分享與心得,請關注公衆號:Java架構社區