RocketMQ搭建及demo

1、RocketMQ機器硬件要求內存最好不要低於8G, 系統linux,且已經安裝好JDKjava

2、安裝文件下載地址:
http://mirror.bit.edu.cn/apache/incubator/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.ziplinux

3、下載RocketMQ安裝文件並上傳到服務器上後apache

解壓 bash

unzip rocketmq-all-4.0.0-incubating-bin-release.zip

進入到解壓目錄下的bin目錄中服務器

啓動 NameServer:maven

nohup sh mqnamesrv &

啓動 brokerui

nohup sh mqbroker -n localhost:9876 &

使用jps命令能夠看到有如下兩個java程序運行中.net

爲了便於其它機器調試訪問,可臨時將防火牆關閉:調試

service firewalld stop

 

4、Java 程序democode

maven依賴包:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.0.0-incubating</version>
</dependency>

消息生產者demo代碼:

package com.classtest.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args){
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("192.168.133.141:9876");
        try {
            producer.start();

            Message msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test1.".getBytes());

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "2",
                    "Just for test2.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test3.".getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
}

消息消費者Java代碼demo:

package com.classtest.rocketmq;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("192.168.133.141:9876");
        try {
            //訂閱PushTopic下Tag爲push的消息
            consumer.subscribe("PushTopic", "push");
            //程序第一次啓動從消息隊列頭取數據
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println(msg.toString());
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

能夠先運行消費者demo, 而後運行生產者demo時能夠消費者demo的運行窗口輸出消息

 

參考:

https://my.oschina.net/jayronwang/blog/861396

http://rocketmq.apache.org/docs/quick-start/

相關文章
相關標籤/搜索