RocketMQ 4.5.1 單機環境搭建以及生產消費測試

爲了學習和方便測試,老是要啓動一個單機版的。下載 http://rocketmq.apache.org/dowloading/releases/java

1. 要先配置環境變量

ROCKETMQ_HOME

E:\rocketmq-all-4.5.1-bin-release

2. 而後進入bin目錄啓動NameServer

3. 啓動Broker

啓動git

E:\rocketmq-all-4.5.1-bin-release\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

可能會出現一個錯誤: 找不到或沒法加載主類 Files\Java\jdk1.8.0_161\lib;C:\Programgithub

 

解決方法:(打開bin目錄的runserver.cmdapache

修改爲maven

從新啓動,成功學習

4. 弄個管控臺方便查看

https://github.com/apache/rocketmq-externals測試

下載好進入 rocketmq-console 目錄打包spa

mvn clean package -Dmaven.test.skip=true

進入target目錄,啓動 (最後的參數的nameserver的地址,也就是我本機地址)3d

E:\rocketmq-externals-master\rocketmq-console\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876

最後訪問 http://localhost:8080 便可code

5. 簡單測試

引入依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

一個簡單的生產者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class Test {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 設置生產者組名
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // 設置NameServer地址
        producer.setNamesrvAddr("10.204.241.15:9876");
        // 啓動
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 建立一條消息,包含topic、tag以及消息內容
            Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 發送結果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 不用的時候關閉
        producer.shutdown();
    }

}

查看管控臺

查看詳細

下面一個簡單的消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Test2 {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 設置生產者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // 設置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱的主題
        consumer.subscribe("MyTopic", "*");
        // 註冊消息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啓動
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

控制檯輸出

不要關閉消費者,查看管控臺

相關文章
相關標籤/搜索