rocketmq使用記錄(五)

消息類型

普通消息 順序消息 分佈式事務消息數據庫

普通消息

Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQbroker-b " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
  SendResult sendResult = producer.send(msg);

啓動2個同樣的消費者,普通消息會被分配到2個消費者上面去消費分佈式

順序消息

String[] tags = new String[]{"createTag","payTag","sendTag"};
                for(int order=0;order<5;order++){
                    for(int type=0;type<3;type++){
                        Message msg = new Message("TopicTest",tags[type % tags.length],order+":" +type,(order+":" +type).getBytes());
                        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                            @Override
                            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {

                                Integer id = (Integer)o;

                                int index = id % list.size();

                                return list.get(index);
                            }
                        },order);
                        System.out.printf("%s%n", sendResult);

                    }
                }

接收到的消息: 4:0 ConsumeMessageThread_1 接收到的消息: 4:0 [MessageExt [queueId=0, storeSize=179, queueOffset=1867, sysFlag=0, b 接收到的消息: 4:1 ConsumeMessageThread_1 接收到的消息: 4:1 接收到的消息: 4:2 ConsumeMessageThread_1 接收到的消息: 4:2ide

事務消息

TransactionCheckListener checkListener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
        producer.setNamesrvAddr("10.10.10.108:9876;10.10.10.190:9876");
        producer.setTransactionCheckListener(checkListener);
        producer.start();


        TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();

        try{

            Message msg1 = new Message("TopicTest", "TagA" ,"key1",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) );

            Message msg2 = new Message("TopicTest", "TagA" ,"key2",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) );


            SendResult sendResult1 = producer.sendMessageInTransaction(msg1,transactionExecuter,null);

            logger.info("{} mesg1: {}",new Date(),sendResult1);

             sendResult1 = producer.sendMessageInTransaction(msg2,transactionExecuter,null);
            logger.info("{} mesg2: {}",new Date(),sendResult1);


        }catch(Exception e){
            e.printStackTrace();
        }

        producer.shutdown();


public class TransactionExecuterImpl implements LocalTransactionExecuter{

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    //事務消息是否對消費者可見,徹底由事務返回給RMQ的狀態碼決定(狀態碼的本質也是一條消息)。
    public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {

       try{

           logger.info("{} 本地事務執行成功",new Date());
       }catch(Exception e){
           e.printStackTrace();
           logger.info("{} 事務執行失敗"+e,new Date());
           return LocalTransactionState.ROLLBACK_MESSAGE;
       }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
public class TransactionCheckListenerImpl implements TransactionCheckListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    //若是某條消息沒有返回,那麼此事件會主動詢問查詢數據庫等渠道 獲取消息是否正常
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {

        logger.info("server checking Msg {}",messageExt.toString());

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

}
相關文章
相關標籤/搜索