SpringBoot入門 (九) MQ使用

本文記錄學習在Spring Boot中使用MQ。html

一 什麼是MQ

  MQ全稱(Message Queue)又名消息隊列,是一種異步通信的中間件。它的做用相似於郵局,發信人(生產者)只須要將信(消息)交給郵局,而後由郵局再將信(消息)發送給具體的接收者(消費者),具體發送過程與時間發信人能夠不關注,也不會影響發信人作其它事情。目前常見的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。java

  使用MQ的優勢主要有:spring

  1 方法的異步執行 使用MQ能夠將耗時的同步操做經過以發送消息的方式進行了異步化處理,減小了因爲同步而等待的時間;apache

  2 程序之間鬆耦合 使用MQ能夠減小了服務之間的耦合性,不一樣的服務能夠經過消息隊列進行通訊,只要約定好消息的內容格式就行;服務器

  JMS(Java Message Service)即java消息服務,是一個Java平臺中關於面向消息中間件(MOM)的 API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。JMS的消息機制有2種模型,一種是1對1(Point to Point)的隊列的消息,這種消息,只能被一個消費者消費;另外一種是一對多的發佈/訂閱(Topic)消息,一條消息能夠被多個消費者消費。ActiveMq是對JMS的一個實現。app

二 SpringBoot集成Active MQ

  官網下載一個服務程序,解壓後直接啓動服務就能夠了,下載地址:http://activemq.apache.org/activemq-5158-release.htmldom

  SpringBoot也對Active MQ提供了支持,咱們使用時引入具體的依賴便可,修改pom.xml文件,添加依賴異步

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  在application.properties文件中配置Active MQ服務器的鏈接信息tcp

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
#spring.jms.pub-sub-domain=true

  完成以上配置信息後,當咱們在啓動SpringBoot項目時,會自動幫咱們完成初始化操做,並提供一個JmsMessagingTemplate,提提供了咱們經常使用發送消息的各類方法供咱們使用。咱們只須要在使用的地方注入JmsMessagingTemplate便可使用。分佈式

  發送隊列消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Test
    public void testQueueMsg(){
        //建立名稱爲zyQueue的隊列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向隊列發送消息
        jmsMessagingTemplate.convertAndSend(queue,"這是一個隊列消息!");
    }
}

  消息的接收方,監聽消息隊列,當隊列中有消息時就能夠獲取到消息

@Component
public class Consumer {

    private static DateFormat df =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");

    /**
     * destination 目標地址即隊列
     */
    @JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }
}

  執行測試方法發送消息能夠看到,控制檯輸出的消費者接受到消息

隊列消息只能有一個消費者,若是有多個消費者同時監聽一個隊列時,只能有一個拿到消息,咱們測試,修改發送方法,循環發送10調消息

@Test
    public void testQueueMsg(){
        //建立名稱爲zyQueue的隊列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向隊列發送消息
        for (int i=0;i<10;i++) {
            jmsMessagingTemplate.convertAndSend(queue,"這是第"+i+"個隊列消息!");
        }
    }

  在Consumer 類中再添加一個消費者,監聽隊列zyQueue

@JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

    @JmsListener(destination = "zyQueue")
    public void receiveMessage1(String text){
        System.out.println("1接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  執行發送消息,看到控制檯輸出的結果,2個消費者平分了這10條消息

  若是但願監聽同一個隊列的多個消費者都能接收到全部消息,咱們就只能發送Topic消息了,咱們修改application.properties中的

#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
spring.jms.pub-sub-domain=true

  表示要發送發佈/訂閱消息,發送消息的隊列改用Topic發送消息,以下

@Test
    public void testTopicMsg(){
        Topic topic = new ActiveMQTopic("zyTopic");
        for (int i=0;i<5;i++){
            jmsMessagingTemplate.convertAndSend(topic,"這是第"+i+"個Topic消息!");
        }
    }

  咱們在Consumer 類中添加兩個消費者來監聽zyTopic隊列,接受消息

@JmsListener(destination = "zyTopic")
    public void receiveTopicMessage1(String text){
        System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

    @JmsListener(destination = "zyTopic")
    public void receiveTopicMessage2(String text){
        System.out.println("消費者2接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  執行發消息方法,能夠看到控制檯輸出的內容,2個消費者都完整的接收到了5條消息

 

   咱們在測試發送消息時修改了屬性文件中的配置信息,才能夠發送對應的類型的消息,這是因爲SpringBoot中默認的是隊列消息(查看源碼能夠知道,監聽器默認使用的DefaultJmsListenerContainerFactory),若是咱們想在不修改配置信息的狀況下能夠同時發送Queue和Topic消息怎麼辦呢,咱們須要手動的更改初始的配置類,分別針對Queue和Topic消息提供JmsListenerContainerFactory

  新建一個配置類,以下

@SpringBootConfiguration
public class ActiveMqConfig {

    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設置消息模型爲隊列
        factory.setPubSubDomain(false);
        return factory;
    }
    
    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設置消息模型爲隊列
        factory.setPubSubDomain(true);
        return factory;
    }
}

  在容器啓動時會針對兩種消息類型,初始化獲得兩個不一樣的JmsListenerContainerFactory。下來再修改消費者類,在 @JmsListener 註解中指定 containerFactory,如

@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage1(String text){
        System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,而後註釋掉屬性文件中的消息模式配置就能夠了。

相關文章
相關標籤/搜索