RocketMQ將事務拆分紅小事務異步執行的方式來執行。
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
RocketMQ事務消息:
TransactionCheckListenerImpl:java
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * Created by Aaron Sheng on 10/19/16. * TransactionCheckListenerImpl handle transaction unsettled. * Broker will notify producer to check local transaction. */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { System.out.println("checkLocalTransactionState"); System.out.println("topic: " + messageExt.getTopic()); System.out.println("body: " + messageExt.getBody()); return LocalTransactionState.ROLLBACK_MESSAGE; } }
TransactionExecuterImpl:異步
package aaron.mq.producer; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Aaron Sheng on 10/19/16. * TransactionExecuterImpl executre local trancation and return result to broker. */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { System.out.println("executeLocalTransactionBranch " + message.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 3) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } else if ((value % 3) == 1) { return LocalTransactionState.ROLLBACK_MESSAGE; } else{ return LocalTransactionState.UNKNOW; } } }
TransactionProducer:ide
package aaron.mq.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * Created by Aaron Sheng on 10/19/16. */ public class TransactionProducer { public static void produce() throws MQClientException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("TxProducer"); producer.setCheckThreadPoolMinSize(2); producer.setCheckThreadPoolMaxSize(4); producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("TxProducer-instance1"); producer.setVipChannelEnabled(false); producer.start(); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); try { for (int i = 0; i < 1000; i++) { Message msg = new Message("Topic1", "Tag1", "OrderId" + i, ("Body" + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
RocketMQConsumer:atom
package aaron.mq.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Aaron Sheng on 10/17/16. */ public class RocketMQConsumer { public static void consume() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setVipChannelEnabled(false); consumer.setInstanceName("rmq-instance"); consumer.subscribe("Topic1", "Tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getKeys() + " " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }