目錄html
MQ,是一種跨進程的通訊機制,用於上下游傳遞消息。在傳統的互聯網架構中一般使用MQ來對上下游來作解耦合。java
舉例:當A系統對B系統進行消息通信,如A系統發佈一條系統公告,B系統能夠訂閱該頻道進行系統公告同步,整個過程當中A系統並不關係B系統會不會同步,由訂閱該頻道的系統自行處理。spring
官方說明:apache
隨着使用愈來愈多的隊列和虛擬主題,ActiveMQ IO模塊遇到了瓶頸。咱們盡力經過節流,斷路器或降級來解決此問題,但效果不佳。所以,咱們那時開始關注流行的消息傳遞解決方案Kafka。不幸的是,Kafka不能知足咱們的要求,特別是在低延遲和高可靠性方面。springboot
看到這裏能夠很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件。架構
具備如下特性:app
下載地址:https://rocketmq.apache.org/dowloading/releases/異步
從官方下載二進制或者源碼來進行使用。源碼編譯須要Maven3.2x,JDK8jvm
在根目錄進行打包:分佈式
mvn -Prelease-all -DskipTests clean packager -U
distribution/target/apache-rocketmq
文件夾中會存在一個文件夾版,zip,tar三個可運行的完整程序。
使用rocketmq-4.6.0.zip
:
SpringBoot 入門:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 經常使用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
當前環境版本爲:
<!-- MQ Begin --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- MQ End -->
因爲咱們這邊已經有工程了因此就不在進行建立這種過程了。主要是看看如何使用RocketMQ。
建立RocketMQProperties配置屬性類,類中內容以下:
@ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; //省略get set }
如今咱們全部子系統中的生產者,消費者對應:
isEnable 是否開啓mq
namesrvAddr 集羣地址
groupName 分組名稱
設置爲統一已方便系統對接,若有其它需求在進行擴展,類中咱們已經給了默認值也能夠在配置文件或配置中心中獲取配置,配置以下:
#發送同一類消息的設置爲同一個group,保證惟一,默認不須要設置,rocketmq會使用ip@pid(pid表明jvm名字)做爲惟一標示 rocketmq.groupName=please_rename_unique_group_name #是否開啓自動配置 rocketmq.isEnable=true #mq的nameserver地址 rocketmq.namesrvAddr=127.0.0.1:9876 #消息最大長度 默認1024*4(4M) rocketmq.producer.maxMessageSize=4096 #發送消息超時時間,默認3000 rocketmq.producer.sendMsgTimeout=3000 #發送消息失敗重試次數,默認2 rocketmq.producer.retryTimesWhenSendFailed=2 #消費者線程數量 rocketmq.consumer.consumeThreadMin=5 rocketmq.consumer.consumeThreadMax=32 #設置一次消費消息的條數,默認爲1條 rocketmq.consumer.consumeMessageBatchMaxSize=1
建立消費者接口 RocketConsumer.java 該接口用戶約束消費者須要的核心步驟:
/** * 消費者接口 * * @author SimpleWu * */ public interface RocketConsumer { /** * 初始化消費者 */ public abstract void init(); /** * 註冊監聽 * * @param messageListener */ public void registerMessageListener(MessageListener messageListener); }
建立抽象消費者 AbstractRocketConsumer.java:
/** * 消費者基本信息 * * @author SimpelWu */ public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; /** * 必要的信息 * * @param topics * @param tags * @param consumerTitel */ public void necessary(String topics, String tags, String consumerTitel) { this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel; } public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }
在類中咱們必須指定這個topics,tags與消息監聽邏輯
public abstract void init();
該方法是用於初始化消費者,由子類實現。
接下來咱們編寫自動配置類RocketMQConfiguation.java,該類用戶初始化一個默認的生產者鏈接,以及加載全部的消費者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置文件
@Configuration 標註爲配置類
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有當配置中指定rocketmq.isEnable = true的時候纔會生效
核心內容以下:
/** * mq配置 * * @author SimpleWu */ @Configuration @EnableConfigurationProperties({ RocketMQProperties.class }) @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") public class RocketMQConfiguation { private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class); public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) { this.properties = properties; this.applicationContext = applicationContext; } /** * 注入一個默認的消費者 * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) { throw new MQClientException(-1, "groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) { throw new MQClientException(-1, "nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 若是須要同一個jvm中不一樣的producer往不一樣的mq集羣發送消息,須要設置不一樣的instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 若是發送消息失敗,設置重試次數,默認爲2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try { producer.start(); log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(), properties.getNamesrvAddr()); } catch (MQClientException e) { log.error(String.format("producer is error {}", e.getMessage(), e)); throw e; } return producer; } /** * SpringBoot啓動時加載全部消費者 */ @PostConstruct public void initConsumer() { Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) { log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) { String beanName = (String) beans.next(); AbstractRocketConsumer consumer = consumers.get(beanName); consumer.init(); createConsumer(consumer); log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags, consumer.topics); } } /** * 經過消費者信心建立消費者 * * @param consumerPojo */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName()); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax()); consumer.registerMessageListener(arc.messageListenerConcurrently); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 設置消費模型,集羣仍是廣播,默認爲集羣 */ // consumer.setMessageModel(MessageModel.CLUSTERING); /** * 設置一次消費消息的條數,默認爲1條 */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize()); try { consumer.subscribe(arc.topics, arc.tags); consumer.start(); arc.mqPushConsumer=consumer; } catch (MQClientException e) { log.error("info consumer title {}", arc.consumerTitel, e); } } }
而後在src/main/resources文件夾中建立目錄與文件META-INF/spring.factories裏面添加自動配置類便可開啓啓動配置,咱們只須要導入依賴便可:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xcloud.config.rocketmq.RocketMQConfiguation
接下來在服務中導入依賴,而後經過咱們的抽象類獲取全部必要信息對消費者進行建立,該步驟會在全部消費者初始化完成後進行,且只會管理是Spring Bean的消費者。
下面咱們看看如何建立一個消費者,建立消費者的步驟很是簡單,只須要繼承AbstractRocketConsumer而後再加上Spring的@Component就可以完成消費者的建立,咱們能夠在類中自定義消費的主題與標籤。
在項目能夠根據需求當消費者建立失敗的時候是否繼續啓動工程。
建立一個默認的消費者 DefaultConsumerMQ.java
@Component public class DefaultConsumerMQ extends AbstractRocketConsumer { /** * 初始化消費者 */ @Override public void init() { // 設置主題,標籤與消費者標題 super.necessary("TopicTest", "*", "這是標題"); //消費者具體執行邏輯 registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach(msg -> { System.out.printf("consumer message boyd %s %n", new String(msg.getBody())); }); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } }
super.necessary("TopicTest", "*", "這是標題"); 是必需要設置的,表明該消費者監聽TopicTest主題下全部tags,標題那個字段是我本身定義的,因此對於該配置來講沒什麼意義。
咱們能夠在這裏注入Spring的Bean來進行任意邏輯處理。
建立一個消息發送類進行測試
@Override public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 發送消息到一個Broker SendResult sendResult = defaultMQProducer.send(msg); // 經過sendResult返回消息是否成功送達 System.out.printf("%s%n", sendResult); return null; }
咱們來經過Http請求測試:
http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿
好了到這裏簡單的start算是設計完成了,後面還有一些:順序消息生產,順序消費消息,異步消息生產等一系列功能,官人可參照官方去自行處理。