得益於MQ削峯填谷,系統解耦,操做異步等功能特性,在互聯網行業,能夠說有分佈式服務的地方,MQ都每每不會缺席。由阿里自研的RocketMQ更是經歷了多年的雙十一高併發挑戰,其中4.3.0版本推出了事務消息的新特性,本文對RocketMQ 4.5.0版本事務消息相關的源碼跟蹤介紹,經過閱讀讀者能夠知道:java
假設我所在的系統如今有這樣一個場景:數據庫
本地開啓數據庫事務進行扣款操做,成功後發送MQ消息給庫存中心進行發貨。編程
有人會想到開啓mybatis事務實現,把本地事務和MQ消息放在一塊兒不就好了嗎?若是MQ發送成功,就提交事務,發送失敗就回滾事務,整套操做一鼓作氣。緩存
transaction{
扣款();
boolean success = 發送MQ();
if(success){
commit();
}else{
rollBack();
}
}
複製代碼
看似沒什麼問題,可是網絡是不可靠的。服務器
假設MQ返回過來的響應由於網絡緣由遲遲沒有收到,因此在面對不肯定的MQ返回結果只好進行回滾。可是MQ 服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,因此致使的結果就是扣款失敗,成功發貨。網絡
既然MQ消息的發送不能和本地事務寫在一塊兒,那如何來保證其總體具備原子性的需求呢?答案就是今天咱們介紹的主角:事務消息。mybatis
整體而言RocketMQ事務消息分爲兩條主線併發
所以本文也經過這兩條主線對源碼進行分析異步
在本地應用發送事務消息的核心類是TransactionMQProducer,該類經過繼承DefaultMQProducer來複用大部分發送消息相關的邏輯,這個類的代碼量很是少只有100來行,下面是這個類的sendMessageTransaction方法分佈式
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
複製代碼
這個方法作了兩件事,
TransactionListener在事務消息流程中起到相當重要的做用,一塊兒看看這個接口
public interface TransactionListener {
/** * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. * * @param msg Half(prepare) message * @param arg Custom business parameter * @return Transaction state */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
複製代碼
接口註釋說的很明白,配合上面的概覽圖來看就是,executeLocalTransaction方法對應的就是執行本地事務操做,checkLocalTransaction對應的就是回查本地事務操做。
下面是DefaultMQProducer類的sendMessageInTransaction方法源碼
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
...
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
...
sendResult = this.send(msg);
...
switch (sendResult.getSendStatus()) {
case SEND_OK: {
...
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
...
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
...
this.endTransaction(sendResult, localTransactionState, localException);
...
}
複製代碼
爲了使源碼的邏輯更加直觀,筆者精簡了核心代碼。sendMessageInTransaction方法主要作了如下事情
發送半消息流程,Client端代碼到這裏差很少就結束了,接下來看看RocketMQ Server端是如何處理的
Server在接收到消息事後會進行一些領域對象的轉化和是否支持事務消息的權限校驗,對理解事務消息用處不大,此處就省略對旁枝末節的介紹了。下面是TransactionalMessageBridge類處理half message的源碼
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
複製代碼
這兩個方法主要作了如下事情:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
複製代碼
能夠看到全部的事務半消息都會被放進同一個topic的同一個queue裏面,經過對topic的區分,從而避免了半消息被consumer給消費到
Server將半消息持久化後而後會發送結果給咱們本地的應用程序。到了這裏Server端對半消息的處理就結束了,緊接着的是定時任務的登場。
定時任務是一個叫TransactionalMessageService類的線程,下面是該類的check方法
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
...
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
}
...
}
複製代碼
check方法很是長,省略的代碼大體都是對半消息進行過濾(如超過72小時的事務消息,就被算做過時),只保留符合條件的半消息對其進行回查。
其中頗有意思的是putBackHalfMsgQueue方法,由於每次把半消息從磁盤拉到內存裏進行處理都會對其屬性進行改變(例如TRANSACTION_CHECK_TIMES,這是是否丟棄事務消息的關鍵信息),因此在發送回查消息以前須要對半消息再次放進磁盤。RocketMQ採起的方法是基於最新的物理偏移量從新寫入,而不是對原有的半消息進行修改,其中的目的就是RocketMQ的存儲設計採用順序寫,若是去修改消息 ,沒法作到高性能。
下面是resolveHalfMsg方法,主要就是開啓一個線程而後發送check消息。
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
複製代碼
下面是DefaultMQProducerImpl的checkTransactionState方法,是本地應用對回查消息的處理邏輯
@Override
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
...
@Override
public void run() {
...
TransactionListener transactionListener = getCheckListener();
...
localTransactionState = transactionListener.checkLocalTransaction(message);
...
this.processTransactionState(
localTransactionState,
group,
exception);
}
private void processTransactionState( ... DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
...
}
};
this.checkExecutor.submit(request);
}
複製代碼
精簡代碼邏輯後能夠清晰的看到
RocketMQ 服務器在收到Client發過來的Commit消息後會
讀出半消息——>恢復topic等原消息體的信息——>和普通消息同樣再次寫入磁盤——>刪除以前的半消息
若是是Rollback消息則直接刪除以前的半消息
到此,整條RocketMQ 事務消息的調用鏈就結束了
1. 分佈式事務等於事務消息嗎?
二者並無關係,事務消息僅僅保證本地事務和MQ消息發送造成總體的原子性,而投遞到MQ服務器後,消費者是否能必定消費成功是沒法保證的。
2. 源碼設計上有什麼亮點嗎?
經過對整條鏈路源碼的學習理解發現仍是有很多亮點的
3. 源碼設計上有什麼不足嗎?
RocketMQ做爲一款極其成功的消息中間件,要發現不足不是那麼容易了,筆者談幾點見解
sendMessageIntransaction等事務相關的方法被劃分在了DefaultMQProducer裏面,從內聚的角度來講這是跟事務相關的發送消息方法應該被劃分在TransactionMQProducer。
全部topic的半消息都會寫在topic爲RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息隊列裏,而且每條半消息,在整個鏈路裏會被寫屢次,若是併發很大且大部分消息都是事務消息的話,可靠性會存在問題。