普通消息 順序消息 分佈式事務消息數據庫
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQbroker-b " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg);
啓動2個同樣的消費者,普通消息會被分配到2個消費者上面去消費分佈式
String[] tags = new String[]{"createTag","payTag","sendTag"}; for(int order=0;order<5;order++){ for(int type=0;type<3;type++){ Message msg = new Message("TopicTest",tags[type % tags.length],order+":" +type,(order+":" +type).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer)o; int index = id % list.size(); return list.get(index); } },order); System.out.printf("%s%n", sendResult); } }
接收到的消息: 4:0 ConsumeMessageThread_1 接收到的消息: 4:0 [MessageExt [queueId=0, storeSize=179, queueOffset=1867, sysFlag=0, b 接收到的消息: 4:1 ConsumeMessageThread_1 接收到的消息: 4:1 接收到的消息: 4:2 ConsumeMessageThread_1 接收到的消息: 4:2ide
TransactionCheckListener checkListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup"); producer.setNamesrvAddr("10.10.10.108:9876;10.10.10.190:9876"); producer.setTransactionCheckListener(checkListener); producer.start(); TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl(); try{ Message msg1 = new Message("TopicTest", "TagA" ,"key1",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); Message msg2 = new Message("TopicTest", "TagA" ,"key2",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult1 = producer.sendMessageInTransaction(msg1,transactionExecuter,null); logger.info("{} mesg1: {}",new Date(),sendResult1); sendResult1 = producer.sendMessageInTransaction(msg2,transactionExecuter,null); logger.info("{} mesg2: {}",new Date(),sendResult1); }catch(Exception e){ e.printStackTrace(); } producer.shutdown(); public class TransactionExecuterImpl implements LocalTransactionExecuter{ private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override //事務消息是否對消費者可見,徹底由事務返回給RMQ的狀態碼決定(狀態碼的本質也是一條消息)。 public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { try{ logger.info("{} 本地事務執行成功",new Date()); }catch(Exception e){ e.printStackTrace(); logger.info("{} 事務執行失敗"+e,new Date()); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } } public class TransactionCheckListenerImpl implements TransactionCheckListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override //若是某條消息沒有返回,那麼此事件會主動詢問查詢數據庫等渠道 獲取消息是否正常 public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { logger.info("server checking Msg {}",messageExt.toString()); return LocalTransactionState.COMMIT_MESSAGE; } } }