rocketmq事務消息

rocketmq事務消息html

 

參考:java

https://blog.csdn.net/u011686226/article/details/78106215swift

https://yq.aliyun.com/articles/55630服務器

https://my.oschina.net/u/2950586/blog/760677網絡

 https://blog.csdn.net/chunlongyu/article/details/53844393併發

 

 

分佈式消息隊列RocketMQ--事務消息--解決分佈式事務的最佳實踐 less

說到分佈式事務,就會談到那個經典的」帳號轉帳」問題:2個帳號,分佈處於2個不一樣的DB,或者說2個不一樣的子系統裏面,A要扣錢,B要加錢,如何保證原子性?異步

通常的思路都是經過消息中間件來實現「最終一致性」:A系統扣錢,而後發條消息給中間件,B系統接收此消息,進行加錢。分佈式

但這裏面有個問題:A是先update DB,後發送消息呢? 仍是先發送消息,後update DB?ide

假設先update DB成功,發送消息網絡失敗,重發又失敗,怎麼辦? 
假設先發送消息成功,update DB失敗。消息已經發出去了,又不能撤回,怎麼辦?

因此,這裏下個結論: 只要發送消息和update DB這2個操做不是原子的,不管誰先誰後,都是有問題的。

那這個問題怎麼解決呢??

錯誤的方案0

有人可能想到了,我能夠把「發送消息」這個網絡調用和update DB放在同1個事務裏面,若是發送消息失敗,update DB自動回滾。這樣不就保證2個操做的原子性了嗎?

這個方案看似正確,實際上是錯誤的,緣由有2:

(1)網絡的2將軍問題:發送消息失敗,發送方並不知道是消息中間件真的沒有收到消息呢?仍是消息已經收到了,只是返回response的時候失敗了?

若是是已經收到消息了,而發送端認爲沒有收到,執行update db的回滾操做。則會致使A帳號的錢沒有扣,B帳號的錢卻加了。

(2)把網絡調用放在DB事務裏面,可能會由於網絡的延時,致使DB長事務。嚴重的,會block整個DB。這個風險很大。

基於以上分析,咱們知道,這個方案實際上是錯誤的!

 

方案1–業務方本身實現

假設消息中間件沒有提供「事務消息」功能,好比你用的是Kafka。那如何解決這個問題呢?

解決方案以下: 
(1)Producer端準備1張消息表,把update DB和insert message這2個操做,放在一個DB事務裏面。

(2)準備一個後臺程序,源源不斷的把消息表中的message傳送給消息中間件。失敗了,不斷重試重傳。容許消息重複,但消息不會丟,順序也不會打亂。

(3)Consumer端準備一個判重表。處理過的消息,記在判重表裏面。實現業務的冪等。但這裏又涉及一個原子性問題:若是保證消息消費 + insert message到判重表這2個操做的原子性?

消費成功,但insert判重表失敗,怎麼辦?關於這個,在Kafka的源碼分析系列,第1篇, exactly once問題的時候,有過討論。

經過上面3步,咱們基本就解決了這裏update db和發送網絡消息這2個操做的原子性問題。

但這個方案的一個缺點就是:須要設計DB消息表,同時還須要一個後臺任務,不斷掃描本地消息。致使消息的處理和業務邏輯耦合額外增長業務方的負擔。

方案2 – RocketMQ 事務消息

爲了能解決該問題,同時又不和業務耦合,RocketMQ提出了「事務消息」的概念。

具體來講,就是把消息的發送分紅了2個階段:Prepare階段和確認階段。

具體來講,上面的2個步驟,被分解成3個步驟: 
(1) 發送Prepared消息 
(2) update DB 
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。

可能有人會問了,前2步執行成功了,最後1步失敗了怎麼辦?這裏就涉及到了RocketMQ的關鍵點:RocketMQ會按期(默認是1分鐘)掃描全部的Prepared消息,詢問發送方,究竟是要確認這條消息發出去?仍是取消此條消息?

具體代碼實現以下:

也就是定義了一個checkListener,RocketMQ會回調此Listener,從而實現上面所說的方案。

// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,至關於示例中檢查Bob帳戶並扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();

 

public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.若是消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }

 

 

總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把「掃描消息表」這個事情,不讓業務方作,而是消息中間件幫着作了。

至於消息表,其實仍是沒有省掉。由於消息中間件要詢問發送方,事物是否執行成功,仍是須要一個「變相的本地消息表」,記錄事物執行狀態。

人工介入

可能有人又要說了,不管方案1,仍是方案2,發送端把消息成功放入了隊列,但消費端消費失敗怎麼辦?

消費失敗了,重試,還一直失敗怎麼辦?是否是要自動回滾整個流程?

答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是很是巨大的,不但實現複雜,還會引入新的問題。好比自動回滾失敗,又怎麼處理?

對應這種極低機率的case,採起人工處理,會比實現一個高複雜的自動化回滾系統,更加可靠,也更加簡單。

 

 

rocketmq事務消息的理解

 

http://www.cnblogs.com/wxd0108/p/6038543.html

RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

 

若是endTransaction方法執行失敗,致使數據沒有發送到brokerbroker會有回查線程定時(默認1分鐘)掃描每一個存儲事務狀態的表格文件,若是是已經提交或者回滾的消息直接跳過,若是是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用咱們的事務設置的決斷方法,最後調用endTransactionOnewaybroker來更新消息的最終狀態。

再回到轉帳的例子,若是Bob的帳戶的餘額已經減小,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程當中有可能會出現消息重複的問題,按照前面的思路解決便可。

本質上仍是個二階段提交

重複消費冪等性要本身作

 

 

 

RocketMQ 事務消息

源代碼版本是3.2.6,仍是直接跑源代碼。rocketmq事務消息是發生在Producer和Broker之間,是二階段提交。

二階段提交過程看圖:

事務邏輯

 

第一階段是:步驟1,2,3。
第二階段是:步驟4,5。

具體說明:

只有在消息發送成功,而且本地操做執行成功時,才發送提交事務消息,作事務提交。

其餘的狀況,例如消息發送失敗,直接發送回滾消息,進行回滾,或者發送消息成功,可是執行本地操做失敗,也是發送回滾消息,進行回滾。

事務消息原理實現過程:

一階段:
Producer向Broker發送1條類型爲TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,而後返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,經過該變量能夠直接定位到消息自己),因爲該類型的消息在保存的時候,commitLogOffset沒有被保存到consumerQueue中,此時客戶端經過consumerQueue取不到commitLogOffset,因此該類型的消息沒法被取到,致使不會被消費。

一階段的過程當中,Broker保存了1條消息。

二階段:
Producer端的TransactionExecuterImpl執行本地操做,返回本地事務的狀態,而後發送一條類型爲TransactionCommitType或者TransactionRollbackType的消息到Broker確認提交或者回滾,Broker經過Request中的commitLogOffset,獲取到上面狀態爲TransactionPreparedType的消息(簡稱消息A),而後從新構造一條與消息A內容相同的消息B,設置狀態爲TransactionCommitType或者TransactionRollbackType,而後保存。其中TransactionCommitType類型的,會放commitLogOffset到consumerQueue中,TransactionRollbackType類型的,消息體設置爲空,不會放commitLogOffset到consumerQueue中。

二階段的過程當中,Broker也保存了1條消息。

總結:事務消息過程當中,broker一共保存2條消息。

貼代碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<logback.version>1.0.13</logback.version>

<rocketmq.version>3.2.6</rocketmq.version>

</properties>

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>${rocketmq.version}</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

TransactionCheckListenerImpl.java

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* 未決事務,服務器回查客戶端,broker端發起請求代碼沒有被調用,因此此處代碼可能沒用。

*/

public class TransactionCheckListenerImpl implements TransactionCheckListener {

private AtomicInteger transactionIndex = new AtomicInteger(0);

@Override

public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

System.out.println("server checking TrMsg " + msg.toString());

int value = transactionIndex.getAndIncrement();

if ((value % 6) == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}

本地操做類TransactionExecuterImpl.java

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.common.message.Message;

/**

* 執行本地事務

*/

public class TransactionExecuterImpl implements LocalTransactionExecuter {

private AtomicInteger transactionIndex = new AtomicInteger(1);

@Override

public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

int value = transactionIndex.getAndIncrement();

if (value == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}

Producer類:TransactionProducer.java

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

package com.zoo.quickstart.transaction;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

import com.alibaba.rocketmq.common.message.Message;

/**

* 發送事務消息例子

*

*/

public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {

TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

// 事務回查最小併發數

producer.setCheckThreadPoolMinSize(2);

// 事務回查最大併發數

producer.setCheckThreadPoolMaxSize(2);

// 隊列數

producer.setCheckRequestHoldMax(2000);

producer.setTransactionCheckListener(transactionCheckListener);

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

String[] tags = new String[] { "TagA""TagB""TagC""TagD""TagE" };

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

for (int i = 0; i < 1; i++) {

try {

Message msg =

new Message("TopicTest", tags[i % tags.length], "KEY" + i,

("Hello RocketMQ " + i).getBytes());

SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

System.out.println(sendResult);

Thread.sleep(10);

}

catch (MQClientException e) {

e.printStackTrace();

}

}

for (int i = 0; i < 100000; i++) {

Thread.sleep(1000);

}

producer.shutdown();

}

}

 

 

RocketMQ 事務消息

RocketMQ將事務拆分紅小事務異步執行的方式來執行。
    RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
RocketMQ事務消息:


TransactionCheckListenerImpl:

package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }


TransactionExecuterImpl:

package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }


TransactionProducer:

package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } } 


RocketMQConsumer:

package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
相關文章
相關標籤/搜索