rocketmq事務消息html
參考:java
https://blog.csdn.net/u011686226/article/details/78106215swift
https://yq.aliyun.com/articles/55630服務器
https://my.oschina.net/u/2950586/blog/760677網絡
https://blog.csdn.net/chunlongyu/article/details/53844393併發
分佈式消息隊列RocketMQ--事務消息--解決分佈式事務的最佳實踐 less
說到分佈式事務,就會談到那個經典的」帳號轉帳」問題:2個帳號,分佈處於2個不一樣的DB,或者說2個不一樣的子系統裏面,A要扣錢,B要加錢,如何保證原子性?異步
通常的思路都是經過消息中間件來實現「最終一致性」:A系統扣錢,而後發條消息給中間件,B系統接收此消息,進行加錢。分佈式
但這裏面有個問題:A是先update DB,後發送消息呢? 仍是先發送消息,後update DB?ide
假設先update DB成功,發送消息網絡失敗,重發又失敗,怎麼辦?
假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎麼辦?
因此,這裏下個結論: 只要發送消息和update DB這2個操做不是原子的,不管誰先誰後,都是有問題的。
那這個問題怎麼解決呢??
有人可能想到了,我能夠把「發送消息」這個網絡調用和update DB放在同1個事務裏面,若是發送消息失敗,update DB自動回滾。這樣不就保證2個操做的原子性了嗎?
這個方案看似正確,實際上是錯誤的,緣由有2:
(1)網絡的2將軍問題:發送消息失敗,發送方並不知道是消息中間件真的沒有收到消息呢?仍是消息已經收到了,只是返回response的時候失敗了?
若是是已經收到消息了,而發送端認爲沒有收到,執行update db的回滾操做。則會致使A帳號的錢沒有扣,B帳號的錢卻加了。
(2)把網絡調用放在DB事務裏面,可能會由於網絡的延時,致使DB長事務。嚴重的,會block整個DB。這個風險很大。
基於以上分析,咱們知道,這個方案實際上是錯誤的!
假設消息中間件沒有提供「事務消息」功能,好比你用的是Kafka。那如何解決這個問題呢?
解決方案以下:
(1)Producer端準備1張消息表,把update DB和insert message這2個操做,放在一個DB事務裏面。
(2)準備一個後臺程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。容許消息重複,但消息不會丟,順序也不會打亂。
(3)Consumer端準備一個判重表。處理過的消息,記在判重表裏面。實現業務的冪等。但這裏又涉及一個原子性問題:若是保證消息消費 + insert message到判重表這2個操做的原子性?
消費成功,但insert判重表失敗,怎麼辦?關於這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過討論。
經過上面3步,咱們基本就解決了這裏update db和發送網絡消息這2個操做的原子性問題。
但這個方案的一個缺點就是:須要設計DB消息表,同時還須要一個後臺任務,不斷掃描本地消息。致使消息的處理和業務邏輯耦合額外增長業務方的負擔。
爲了能解決該問題,同時又不和業務耦合,RocketMQ提出了「事務消息」的概念。
具體來講,就是把消息的發送分紅了2個階段:Prepare階段和確認階段。
具體來講,上面的2個步驟,被分解成3個步驟:
(1) 發送Prepared消息
(2) update DB
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。
可能有人會問了,前2步執行成功了,最後1步失敗了怎麼辦?這裏就涉及到了RocketMQ的關鍵點:RocketMQ會按期(默認是1分鐘)掃描全部的Prepared消息,詢問發送方,究竟是要確認這條消息發出去?仍是取消此條消息?
具體代碼實現以下:
也就是定義了一個checkListener,RocketMQ會回調此Listener,從而實現上面所說的方案。
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,至關於示例中檢查Bob帳戶並扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();
public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.若是消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }
總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把「掃描消息表」這個事情,不讓業務方作,而是消息中間件幫着作了。
至於消息表,其實仍是沒有省掉。由於消息中間件要詢問發送方,事物是否執行成功,仍是須要一個「變相的本地消息表」,記錄事物執行狀態。
可能有人又要說了,不管方案1,仍是方案2,發送端把消息成功放入了隊列,但消費端消費失敗怎麼辦?
消費失敗了,重試,還一直失敗怎麼辦?是否是要自動回滾整個流程?
答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是很是巨大的,不但實現複雜,還會引入新的問題。好比自動回滾失敗,又怎麼處理?
對應這種極低機率的case,採起人工處理,會比實現一個高複雜的自動化回滾系統,更加可靠,也更加簡單。
rocketmq事務消息的理解
http://www.cnblogs.com/wxd0108/p/6038543.html
RocketMQ第一階段發送Prepared消息
時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息
,它會向消息發送者確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
若是endTransaction
方法執行失敗,致使數據沒有發送到broker
,broker
會有回查線程定時(默認1分鐘)掃描每一個存儲事務狀態的表格文件,若是是已經提交或者回滾的消息直接跳過,若是是prepared狀態
則會向Producer
發起CheckTransaction
請求,Producer
會調用DefaultMQProducerImpl.checkTransactionState()
方法來處理broker
的定時回調請求,而checkTransactionState
會調用咱們的事務設置的決斷方法,最後調用endTransactionOneway
讓broker
來更新消息的最終狀態。
再回到轉帳的例子,若是Bob的帳戶的餘額已經減小,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程當中有可能會出現消息重複的問題,按照前面的思路解決便可。
本質上仍是個二階段提交
重複消費冪等性要本身作
源代碼版本是3.2.6,仍是直接跑源代碼。rocketmq事務消息是發生在Producer和Broker之間,是二階段提交。
二階段提交過程看圖:
第一階段是:步驟1,2,3。
第二階段是:步驟4,5。
具體說明:
只有在消息發送成功,而且本地操做執行成功時,才發送提交事務消息,作事務提交。
其餘的狀況,例如消息發送失敗,直接發送回滾消息,進行回滾,或者發送消息成功,可是執行本地操做失敗,也是發送回滾消息,進行回滾。
事務消息原理實現過程:
一階段:
Producer向Broker發送1條類型爲TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,而後返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,經過該變量能夠直接定位到消息自己),因爲該類型的消息在保存的時候,commitLogOffset沒有被保存到consumerQueue中,此時客戶端經過consumerQueue取不到commitLogOffset,因此該類型的消息沒法被取到,致使不會被消費。
一階段的過程當中,Broker保存了1條消息。
二階段:
Producer端的TransactionExecuterImpl執行本地操做,返回本地事務的狀態,而後發送一條類型爲TransactionCommitType或者TransactionRollbackType的消息到Broker確認提交或者回滾,Broker經過Request中的commitLogOffset,獲取到上面狀態爲TransactionPreparedType的消息(簡稱消息A),而後從新構造一條與消息A內容相同的消息B,設置狀態爲TransactionCommitType或者TransactionRollbackType,而後保存。其中TransactionCommitType類型的,會放commitLogOffset到consumerQueue中,TransactionRollbackType類型的,消息體設置爲空,不會放commitLogOffset到consumerQueue中。
二階段的過程當中,Broker也保存了1條消息。
總結:事務消息過程當中,broker一共保存2條消息。
貼代碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
TransactionCheckListenerImpl.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
本地操做類TransactionExecuterImpl.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Producer類:TransactionProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
RocketMQ 事務消息
RocketMQ將事務拆分紅小事務異步執行的方式來執行。
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
RocketMQ事務消息:
TransactionCheckListenerImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }
TransactionExecuterImpl:
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }
TransactionProducer:
package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
RocketMQConsumer:
package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }