RocketMQ 事務消息

    RocketMQ將事務拆分紅小事務異步執行的方式來執行。
    RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息,它會向消息發送者確認,RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
RocketMQ事務消息:


TransactionCheckListenerImpl:java

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Created by Aaron Sheng on 10/19/16.
 * TransactionCheckListenerImpl handle transaction unsettled.
 * Broker will notify producer to check local transaction.
 */
public class TransactionCheckListenerImpl implements TransactionCheckListener {

    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
        System.out.println("checkLocalTransactionState");
        System.out.println("topic: " + messageExt.getTopic());
        System.out.println("body: " + messageExt.getBody());

        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}


TransactionExecuterImpl:異步

package aaron.mq.producer;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Aaron Sheng on 10/19/16.
 * TransactionExecuterImpl executre local trancation and return result to broker.
 */
public class TransactionExecuterImpl implements LocalTransactionExecuter {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
        System.out.println("executeLocalTransactionBranch " + message.toString());

        int value = transactionIndex.getAndIncrement();
        if ((value % 3) == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if ((value % 3) == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else{
            return LocalTransactionState.UNKNOW;
        }
    }
}


TransactionProducer:ide

package aaron.mq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;

/**
 * Created by Aaron Sheng on 10/19/16.
 */
public class TransactionProducer {
    public static void produce() throws MQClientException {
        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("TxProducer");
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(4);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("TxProducer-instance1");
        producer.setVipChannelEnabled(false);
        producer.start();

        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
        try {
            for (int i = 0; i < 1000; i++) {
                Message msg = new Message("Topic1",
                        "Tag1",
                        "OrderId" + i,
                        ("Body" + i).getBytes());
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                System.out.println(sendResult);
                Thread.sleep(1000);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}


RocketMQConsumer:atom

package aaron.mq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by Aaron Sheng on 10/17/16.
 */
public class RocketMQConsumer {
    public static void consume() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setVipChannelEnabled(false);
        consumer.setInstanceName("rmq-instance");
        consumer.subscribe("Topic1", "Tag1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getKeys() + " " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
相關文章
相關標籤/搜索