(四)基於myeclipse的RocketMQ--Demo實踐

基於myeclipse的RocketMQ–Demo實踐

接上文,搭建好環境,用example中的示例只能進行有限的測試任務。Rocket-console沒法模擬發送和接收消息,因此自定義測試任務須要自行編寫demo程序。html

建立Demo項目流程

1.下載myeclipse

2.安裝maven環境,關聯到myeclipse

myeclipse 添加自定義jdk環境:參考文章Ajava

myeclipse添加自定義maven環境:參考文章B參考文章C markdown

3.建立maven項目,配置pom.xml

File–New–Other–Maven Projuect–(Create a simple project)eclipse

4.導入依賴包

直接把RocketMQ/devenv/lib下的jar包都copy到剛建立的maven項目內maven

5.配置pom.xml

直接把RocketMQ的pom.xml的內容copy過去測試

6.編寫消息生產者Producer

src–New Package–New Class–Producer.javaui

內容能夠參考com.alibaba.rocketmq.example.quickstart(simple)下的Producer,下文的Consumer相似spa

package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // tc_pro1爲Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("tc_pro1");
        // 手動指定Namesrv服務地址
        producer.setNamesrvAddr("192.168.1.170:9876");
        producer.setInstanceName("Producer1-tp1");
        producer.start();

        // 若是broker關閉了自動建立Topic功能,請手動添加Topic:tc_demo,以確保能正常發送消息
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("tc_demo",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                SendResult sendResult = producer.send(msg);
                LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {

                    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                        // TODO Auto-generated method stub
                        return null;
                    }
                };

                //producer.sendMessageInTransaction(msg, tranExecuter, arg)
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

7.編寫消息消費者Consumer

xxx Package–New Class–Consumer.java.net

package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer1 {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // tc_con1爲Consumer group name,若是broker關閉了自動訂閱功能,請手動添加訂閱tc_con1,以確保能正常接收消息
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tc_con1");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 手動指定Namesrv服務地址
        consumer.setNamesrvAddr("192.168.1.170:9876");
        consumer.setInstanceName("Consumber1-tp1");

        consumer.subscribe("tc_demo", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

8.啓動Consumer,Producer進行消息收發

前提:環境搭建成功,Namesrv 和 Broker服務運行正常,可經過jps查看服務是否運行code

run Consumer.java /Producer.java 從myeclipse–console能夠看到Consumer角色成功啓動、Producer消息發送、Consumer接收消息。

至此,基於myeclipse上RocketMQ的demo實踐流程就走通了,更多的自定義擴展能夠參考其項目源碼

參考文章D:Producer多topic發送,Consumer多topic消費

FAQ

1.win10下安裝maven完成後,mvn -version顯示報錯

Error: JAVA_HOME is set to an invalid directory.JAVA_HOME = 「C:\Program Files\Java\jdk1.7.0_17\bin」Please set the JAVA_HOME variable in your environment to match thelocation of your Java installation.

解決方案:jdk,maven的環境變量雖已在path裏設置完成,且jdk正常。但maven啓動另需JAVA_HOME,因此手動添加JAVA_HOME的值:xxx/java/jdk_1.7.xx (no /bin)

2.Producer發送信息失敗或Consumer沒法接受信息

問題原由和解決方案:

  1. Namesrv地址未指定或錯誤,請確認Namesrv地址

  2. Namesrv或Broker未啓動,經過jps查詢集羣(單機)節點服務狀態,若是沒有NamesrvStartup和BrokerStartup,從新啓動(能夠參看系列文章(二)/(三))

  3. Broker關閉的自動建立topic和自動訂閱消費組的功能。調用mqadmin 下的 updateTopic 或updateSubGroup 建立topic或訂閱組

相關文章
相關標籤/搜索