RocketMQ-負載均衡和廣播模式

RocketMQ的消費者消費消息,有2種模式:負載均衡

  • 負載均衡模式
  • 廣播模式

首先咱們先啓動一個生產者:發送了10條消息,主題是TopicTest,tag是TagAide

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        //設置生產者組
        defaultMQProducer.setProducerGroup("syncProducer");

        //設置nameserver
        defaultMQProducer.setNamesrvAddr("localhost:9876");

        //啓動生產者
        defaultMQProducer.start();

        for (int i = 0; i < 10; i++) {
            //構建消息 topic tag 內容
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //同步發送,且返回結果
            SendResult sendResult = defaultMQProducer.send(msg);
            System.out.println("發送結果"+sendResult);
        }
        //關閉生產者
        defaultMQProducer.shutdown();
    }
}
//運行結果
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
發送結果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

接下來,咱們啓動2個消費者,分別用負載均衡模式和廣播模式去進行消費信息:性能

負載均衡(或叫集羣模式)

public class ClusterConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");

        consumer.setNamesrvAddr("localhost:9876");

        //設置集羣模式,也就是負載均衡模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //訂閱主題和標籤
        consumer.subscribe("TopicTest","TagA");


        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費信息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}
//運行2個實例,結果以下:
實例1:
消費信息:Hello RocketMQ 3
消費信息:Hello RocketMQ 2
消費信息:Hello RocketMQ 7
消費信息:Hello RocketMQ 6

實例2:
消費信息:Hello RocketMQ 1
消費信息:Hello RocketMQ 0
消費信息:Hello RocketMQ 4
消費信息:Hello RocketMQ 5
消費信息:Hello RocketMQ 8
消費信息:Hello RocketMQ 9

廣播模式

public class BoardConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");

        consumer.setNamesrvAddr("localhost:9876");

        //設置集羣模式,也就是負載均衡模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //訂閱主題和標籤
        consumer.subscribe("TopicTest","TagA");


        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費信息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
//運行2個實例,結果以下:
實例1:
消費信息:Hello RocketMQ 0
消費信息:Hello RocketMQ 1
消費信息:Hello RocketMQ 2
消費信息:Hello RocketMQ 3
消費信息:Hello RocketMQ 5
消費信息:Hello RocketMQ 9
消費信息:Hello RocketMQ 4
消費信息:Hello RocketMQ 6
消費信息:Hello RocketMQ 7
消費信息:Hello RocketMQ 8


實例2:
消費信息:Hello RocketMQ 2
消費信息:Hello RocketMQ 3
消費信息:Hello RocketMQ 1
消費信息:Hello RocketMQ 0
消費信息:Hello RocketMQ 5
消費信息:Hello RocketMQ 9
消費信息:Hello RocketMQ 4
消費信息:Hello RocketMQ 8
消費信息:Hello RocketMQ 7
消費信息:Hello RocketMQ 6

今天分享了消費者負載均衡模式和廣播模式。
在生產中,通常都是用負載均衡模式。廣播模式比較少用。但仍是得具體場景具體分析。spa

後續文章

  • RocketMQ-入門(已更新)
  • RocketMQ-消息發送(已更新)
  • RocketMQ-消費信息
  • RocketMQ-消費者的廣播模式和集羣模式(已更新)
  • RocketMQ-順序消息
  • RocketMQ-延遲消息
  • RocketMQ-批量消息
  • RocketMQ-過濾消息
  • RocketMQ-事務消息
  • RocketMQ-消息存儲
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主從複製
  • RocketMQ-刷盤機制
  • RocketMQ-冪等性
  • RocketMQ-消息重試
  • RocketMQ-死信隊列
    ...

歡迎各位入(guan)股(zhu),後續文章乾貨多多。code

相關文章
相關標籤/搜索