rocketmq Don't have SubscriptionGroup

1. 問題描述java

   rocketmq 生產者發消息正常 mq後臺也能夠看到發出的消息spring

   可是消費者一直沒消費 好像訂閱沒成功apache

 

2. 問題排查dom

經過上圖查看 確實沒有檢測到訂閱者ide

 

 

3. 問題解決this

線上環境是 兩臺機器 共四個實例lua

項目中訂閱了兩個不一樣地址不一樣topic的mqurl

而後那個instanceName會有命名衝突spa

ip@進程id 3d

從新修改instanceName的value

//設置instanceName
 defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+"");

spring-rocketmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!--好評加分 差評扣分-->
    <bean id="evaluationConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer">
        <property name="consumerGroup" value="${driver_x_group}"/>
        <property name="namesrvAddr" value="${driver_x_url}"/>
    </bean>
    <!--好評消費者 監聽器-->
    <bean id="evaluationMessageListener" class="com.x.mq.EvaluationXListener"/>
    <!--消費者啓動類-->
    <bean id="evaluationOrderlyConsumer" class="com.x.mq.base.EvaluationXConsumer" init-method="init" destroy-method="destroy" scope="singleton">
        <property name="defaultMQPushConsumer" ref="evaluationConsumer"/>
        <property name="evaluationMessageListener" ref="evaluationMessageListener"/>
        <property name="topic" value="${driver_score_topic}"/>
        <property name="tag" value=""/>
    </bean>
</beans>
View Code

consumer.java

package com.x.mq.base; import com.x.mq.EvaluationMessageListener; import org.apache.commons.lang.math.JVMRandom; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; /** * Created by admin on 2018/1/31. */
public class EvaluationConsumer { private static final Logger logger = LoggerFactory.getLogger(EvaluationConsumer.class); private DefaultMQPushConsumer defaultMQPushConsumer; private EvaluationMessageListener evaluationMessageListener; private String tag; private String topic; public EvaluationMessageListener getEvaluationMessageListener() { return evaluationMessageListener; } public void setEvaluationMessageListener(EvaluationMessageListener evaluationMessageListener) { this.evaluationMessageListener = evaluationMessageListener; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public DefaultMQPushConsumer getDefaultMQPushConsumer() { return defaultMQPushConsumer; } public void setDefaultMQPushConsumer(DefaultMQPushConsumer defaultMQPushConsumer) { this.defaultMQPushConsumer = defaultMQPushConsumer; } public void init() throws MQClientException { // 訂閱指定Topic下tags
 defaultMQPushConsumer.subscribe(topic, tag); // 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> // 若是非第一次啓動,那麼按照上次消費的位置繼續消費
 defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 設置爲集羣消費(區別於廣播消費):集羣只消費一次,廣播會被多個消費者消費
 defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING); // defaultMQPushConsumer.registerMessageListener(messageListenerConcurrently);
 defaultMQPushConsumer.registerMessageListener(evaluationMessageListener); //設置instanceName
        defaultMQPushConsumer.setInstanceName(System.currentTimeMillis()+ JVMRandom.nextLong(10)+""); // 批量消費
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(20); // Consumer對象在使用以前必需要調用start初始化,初始化一次便可<br>
 defaultMQPushConsumer.start(); logger.info("==rocketmq==DefaultMQPushConsumer start success! consumerGroup:{},nameServiceAddr:{},topic:{},tag:{}",defaultMQPushConsumer.getConsumerGroup(),defaultMQPushConsumer.getNamesrvAddr(),topic,tag); } public void destroy(){ defaultMQPushConsumer.shutdown(); } }
View Code
相關文章
相關標籤/搜索