rocketMQ入門

一:什麼是MQjava

MQ 就是 消息中間件。apache

 

二:爲何使用MQide

場景:電商雙十一 零點的秒殺。在那一瞬間,來自用戶的請求將會激增,若是不作任何措施,那服務極可能會被壓垮。可是咱們又不能直接把這些請求丟棄,而爲了這個很小的時間段去擴容機器又顯得大題小作。因而咱們天然而言的想到,能不能把這些請求先放到一個消息隊列裏面,而後系統從消息隊列裏面拿出來請求作邏輯的處理和響應。經過拉長時間維度來保證服務的穩定性。這就是MQ。ui

使用MQ只要解決的就是 在生產者消費者模式中,生產者生產的數據可能會忽然激增,消費者來不及消費的問題。spa

 

三:rocketMQ中間件

rocketMQ是一個MQ的實現。咱們在開發中一直在強調,不要重複造輪子。既然咱們須要一個MQ,那就找個別人實現過的MQ來用就好了。rocketMQ就是其中的一種。固然,還有其餘的MQ組件,好比的 ActiveMQ、RabbitMQ,Kafka。blog

 

四:rocketMQ下載隊列

http://rocketmq.apache.org/release_notesip

下載bin的包,好比 rocketmq-all-4.3.2-bin-release.zip 開發

 

五:安裝

將下載的文件解壓到對應目錄。好比我解壓到 C:\rocketmq-all-4.3.2

 

六:啓動NAMESERVER

去 C:\rocketmq-all-4.3.2\bin目錄下找到 mqnamesrv.cmd,雙擊運營便可。

 

七:啓動BROKER

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

( 假如彈出提示框提示‘錯誤: 找不到或沒法加載主類 xxxxxx’。打開runbroker.cmd,而後將‘%CLASSPATH%’加上英文雙引號。保存並從新執行start語句。)

 

至此爲止,rocketMQ就安裝啓動完成了。下面咱們寫的demo來使用rocket作一個helloWord

 

八:寫一個生產者,發消息

public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(2000);  //每2秒發送一次消息
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        producer.shutdown();
    }
}

  

 

九:寫一個消費者,用來監聽消息

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "rmq-group");

        consumer.setNamesrvAddr("127.0.0.1:9876");//設置rocketMQ服務的部署地址
        consumer.setInstanceName("consumer");
        /**
         * 被訂閱消息的topic 和 subExpression。
         * 注意:必定要與消息發佈者的topic 和 subExpression 一致
         */
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {//監聽器實現
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));//每次拿到消息我就打印出來
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 

一個簡單的demo就OK了

相關文章
相關標籤/搜索