(1)、Brokerjava
RocketMQ 的核心,接收 Producer 發過來的消息、處理 Consumer 的消費消息請求、消息的持 久化存儲、服務端過濾功能等 。spring
(2)、NameServerapache
消息隊列中的狀態服務器,集羣的各個組件經過它來了解全局的信息 。相似微服務中註冊中心的服務註冊,發現,下線,上線的概念。緩存
熱備份: NamServer能夠部署多個,相互之間獨立,其餘角色同時向多個NameServer 機器上報狀態信息。服務器
心跳機制: NameServer 中的 Broker、 Topic等狀態信息不會持久存儲,都是由各個角色定時上報並存儲到內存中,超時不上報的話, NameServer會認爲某個機器出故障不可用。微信
(3)、Producer架構
消息的生成者,最經常使用的producer類就是DefaultMQProducer。負載均衡
(4)、Consumerjvm
消息的消費者,經常使用Consumer類 DefaultMQPushConsumer 收到消息後自動調用傳入的處理方法來處理,實時性高 DefaultMQPullConsumer 用戶自主控制 ,靈活性更高。ide
(1)、Broker啓動後須要完成一次將本身註冊至NameServer的操做;隨後每隔30s時間定時向NameServer更新Topic路由信息。
(2)、Producer發送消息時候,須要根據消息的Topic從本地緩存的獲取路由信息。若是沒有則更新路由信息會從NameServer從新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。
(3)、Consumer消費消息時候,從NameServer獲取的路由信息,並再完成客戶端的負載均衡後,監聽指定消息隊列獲取消息並進行消費。
版本描述
<spring-boot.version>2.1.3.RELEASE</spring-boot.version> <rocketmq.version>4.3.0</rocketmq.version>
rocketmq: # 生產者配置 producer: isOnOff: on # 發送同一類消息的設置爲同一個group,保證惟一 groupName: FeePlatGroup # 服務地址 namesrvAddr: 10.1.1.207:9876 # 消息最大長度 默認1024*4(4M) maxMessageSize: 4096 # 發送消息超時時間,默認3000 sendMsgTimeout: 3000 # 發送消息失敗重試次數,默認2 retryTimesWhenSendFailed: 2 # 消費者配置 consumer: isOnOff: on # 官方建議:確保同一組中的每一個消費者訂閱相同的主題。 groupName: FeePlatGroup # 服務地址 namesrvAddr: 10.1.1.207:9876 # 接收該 Topic 下全部 Tag topics: FeePlatTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 設置一次消費消息的條數,默認爲1條 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag fee-plat: fee-plat-group: FeePlatGroup fee-plat-topic: FeePlatTopic fee-account-tag: FeeAccountTag
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RocketMQ 生產者配置 */ @Configuration public class ProducerConfig { private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //若是須要同一個jvm中不一樣的producer往不一樣的mq集羣發送消息,須要設置不一樣的instanceName if(this.maxMessageSize!=null){ producer.setMaxMessageSize(this.maxMessageSize); } if(this.sendMsgTimeout!=null){ producer.setSendMsgTimeout(this.sendMsgTimeout); } //若是發送消息失敗,設置重試次數,默認爲2次 if(this.retryTimesWhenSendFailed!=null){ producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * RocketMQ 消費者配置 */ @Configuration public class ConsumerConfig { private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Resource private RocketMsgListener msgListener; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(msgListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } }
import com.rocket.queue.service.impl.ParamConfigService; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; /** * 消息消費監聽 */ @Component public class RocketMsgListener implements MessageListenerConcurrently { private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ; @Resource private ParamConfigService paramConfigService ; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOG.info("接受到的消息爲:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已經重試了3次,若是不須要再次消費,則返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "FeeAccountTag": LOG.info("開戶 tag == >>"+tags); break ; default: LOG.info("未匹配到Tag == >>"+tags); break; } } // 消息消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class ParamConfigService { @Value("${fee-plat.fee-plat-group}") public String feePlatGroup ; @Value("${fee-plat.fee-plat-topic}") public String feePlatTopic ; @Value("${fee-plat.fee-account-tag}") public String feeAccountTag ; }
import com.rocket.queue.service.FeePlatMqService; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class FeePlatMqServiceImpl implements FeePlatMqService { @Resource private DefaultMQProducer defaultMQProducer; @Resource private ParamConfigService paramConfigService ; @Override public SendResult openAccountMsg(String msgInfo) { // 能夠不使用Config中的Group defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup); SendResult sendResult = null; try { Message sendMsg = new Message(paramConfigService.feePlatTopic, paramConfigService.feeAccountTag, "fee_open_account_key", msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } return sendResult ; } }