RocketMQ的消費者消費消息,有2種模式:負載均衡
首先咱們先啓動一個生產者:發送了10條消息,主題是TopicTest,tag是TagAide
public class SyncProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); //設置生產者組 defaultMQProducer.setProducerGroup("syncProducer"); //設置nameserver defaultMQProducer.setNamesrvAddr("localhost:9876"); //啓動生產者 defaultMQProducer.start(); for (int i = 0; i < 10; i++) { //構建消息 topic tag 內容 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //同步發送,且返回結果 SendResult sendResult = defaultMQProducer.send(msg); System.out.println("發送結果"+sendResult); } //關閉生產者 defaultMQProducer.shutdown(); } } //運行結果 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352] 發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352] 14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
接下來,咱們啓動2個消費者,分別用負載均衡模式和廣播模式去進行消費信息:性能
public class ClusterConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); //設置集羣模式,也就是負載均衡模式 consumer.setMessageModel(MessageModel.CLUSTERING); //訂閱主題和標籤 consumer.subscribe("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消費信息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } //運行2個實例,結果以下: 實例1: 消費信息:Hello RocketMQ 3 消費信息:Hello RocketMQ 2 消費信息:Hello RocketMQ 7 消費信息:Hello RocketMQ 6 實例2: 消費信息:Hello RocketMQ 1 消費信息:Hello RocketMQ 0 消費信息:Hello RocketMQ 4 消費信息:Hello RocketMQ 5 消費信息:Hello RocketMQ 8 消費信息:Hello RocketMQ 9
public class BoardConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer"); consumer.setNamesrvAddr("localhost:9876"); //設置集羣模式,也就是負載均衡模式 consumer.setMessageModel(MessageModel.BROADCASTING); //訂閱主題和標籤 consumer.subscribe("TopicTest","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("消費信息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } //運行2個實例,結果以下: 實例1: 消費信息:Hello RocketMQ 0 消費信息:Hello RocketMQ 1 消費信息:Hello RocketMQ 2 消費信息:Hello RocketMQ 3 消費信息:Hello RocketMQ 5 消費信息:Hello RocketMQ 9 消費信息:Hello RocketMQ 4 消費信息:Hello RocketMQ 6 消費信息:Hello RocketMQ 7 消費信息:Hello RocketMQ 8 實例2: 消費信息:Hello RocketMQ 2 消費信息:Hello RocketMQ 3 消費信息:Hello RocketMQ 1 消費信息:Hello RocketMQ 0 消費信息:Hello RocketMQ 5 消費信息:Hello RocketMQ 9 消費信息:Hello RocketMQ 4 消費信息:Hello RocketMQ 8 消費信息:Hello RocketMQ 7 消費信息:Hello RocketMQ 6
今天分享了消費者負載均衡模式和廣播模式。
在生產中,通常都是用負載均衡模式。廣播模式比較少用。但仍是得具體場景具體分析。spa
歡迎各位入(guan)股(zhu),後續文章乾貨多多。code