摘要: RocketMQ源碼分析之從官方示例窺探RocketMQ事務消息實現基本思想。html
在閱讀本文前,若您對RocketMQ技術感興趣,請加入 RocketMQ技術羣交流 java
RocketMQ4.3.0版本開始支持事務消息,後續分享將開始將剖析事務消息的實現原理。首先從官方給出的Demo實例入手,以此通往RocketMQ事務消息的世界中。web
官方版本未發佈以前,從apache rocketmq第一個版本上線後,代碼中存在與事務消息相關的代碼,例如COMMIT、ROLLBACK、PREPARED,在事務消息未開源以前網上對於事務消息的「聲音」基本上是使用相似二階段提交,主要是根據消息系統標誌MessageSysFlag中定義來推測的:數據庫
消息發送者首先發送TRANSACTION_PREPARED_TYPE類型的消息,而後根據事務狀態來決定是提交或回滾事務發送commit請求或rollback請求,若是commit/rollback請求丟失後,rocketmq會在指定超時時間後回查事務狀態來決定提交或回滾事務。apache
讓咱們各自帶着本身的理解和猜想,從閱讀RocketMQ官方提供的Demo程序入手,試圖窺探一些大致的信息。緩存
Demo示例程序位於:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。該包中未放置消息消費者,爲了驗證事務的消息消費狀況,咱們能夠從其餘包copy一個消費者,從而先運行生產者,而後運行消費者,判斷事務消息的預發放、提交、回滾等效果,二話不說,先運行一下,看下效果再說:
消息發送端運行結果:架構
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8] SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]
消息消費端效果:ide
Consumer Started. ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='C0A8010518DC6D06D69C8D5768640007'}]] ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='C0A8010518DC6D06D69C8D5768380004'}]] ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Message{topic='transaction_topic_test', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='C0A8010518DC6D06D69C8D57680F0001'}]]
綜上所述,服務端發送了10條消息,而消費端只收到3條消息,應該是因爲事務回滾,形成只提交了3條消息,爲了更加嚴謹,能夠安裝一個rocketmq-consonse,更加直觀的觀察shangshagn's上述結果:
源碼分析
接下來對示例代碼進行解讀:學習
一、生產者端代碼解讀:
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); // @1 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // @2 producer.setExecutorService(executorService); // @3 producer.setTransactionListener(transactionListener); // @4 producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { // @5 try { Message msg = new Message("transaction_topic_test", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { //這裏只是阻止生產者過早退出,致使事務消息的相關機制沒法運行 Thread.sleep(1000); } producer.shutdown(); } }
代碼@1:建立TransactionListener 實例,字面理解爲事務消息事件監聽器,下文詳細對其進行展開。
代碼@2:ExecutorService executorService,建立一個線程池,其線程的名稱前綴」client-transaction-msg-check-thread「,從字面理解爲客戶端事務消息狀態檢測線程,咱們能夠大膽的猜想一下是否是這個線程池調用TransactionListener方法,完成對事務消息的檢測呢?【這裏只是做者的猜想,你們不能當真,在做者後續文章發佈後,若是該觀點錯誤,會加以修復,這裏寫出來,主要是想分享一下我讀源碼的方法】。
代碼@3:爲事務消息發送者設置線程池。
代碼@4:爲事務消息發送者設置事務監聽器。
代碼@5:發送10條消息。
二、TransactionListener代碼解讀
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
到這裏,基本上仍是能夠得知事務消息的實現方式,基本與文章開頭所示的「網上聲音」實現相似,下一節將詳細分析TransactionMQProducer事務消息發送的實現細節。
鄭重聲明:本文主要是展現事務消息的基本使用,本文所下的結論還僅僅是做者的猜想,下一篇文章,將重點分析事務消息的實現細節,本文一個很是重要的目的,是向讀者朋友們展現做者學習源碼的一個方法,總結爲:先作全面瞭解(網上,官方文檔)、而後加以本身的思考,從Demo實例入手學習,將學習任務分解之,邊寫邊看。
這算不算文末有彩蛋呢?呵呵,下一篇見:詳細分析RocketMQ事務消息的實現細節。