RocketMQ-初識RocketMQ消息

簡介

本文簡單介紹下RocketMQ消息結構,以及發送消息的簡單示例,爲後續學習RocketMQ消息發送的全流程打下基礎。對於最基礎的消息發送,咱們只須要調用Message(String topic, byte[] body)構造函數new一個Message對象,而後調用producer的send方法即可發送消息了。apache

消息結構

RocketMQ消息封裝類是org.apache.rocketmq.common.message.Message。
類圖以下:
消息結構.jpg函數

Message的基礎屬性只要包括:消息所屬主題topic、消息flag、擴展屬性、消息體。
構造函數有以下4個:學習

  • public Message(String topic, byte[] body)
  • public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)
  • public Message(String topic, String tags, byte[] body)
  • public Message(String topic, String tags, String keys, byte[] body)

最基礎的即是第一個構造函數只須要topic和消息體便可。其餘的即是設置些擴展屬性,Message擴展屬性只要包含下面幾個:spa

  • tag:消息TAG,用於消息過濾。
  • keys:Message索引鍵,多個用空格隔開,RocketMQ能夠根據這些key快速檢索到消息。
  • waitStoreMsgOK:消息發送時是否等消息存儲完成後再返回。
  • delayTimeLevel:消息延遲級別,用於定時消息或消息重試。

這些擴展屬性都會經過方法putProperty存儲在Message的properties中。code

消息發送示例

public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        
        try {
            /*
             * Create a message instance, specifying topic and message body.
             */
            Message msg = new Message("TopicTest" , ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));

            /*
             * Call send message to deliver message to one of brokers.
             */
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }

最關鍵的即是new一個Message對象,只須要topic消息體便可,而後調用producer的send方法即可發送消息。對象

相關文章
相關標籤/搜索