初識ActiveMQ消息中間件

ActiveMQ官方網站:https://activemq.apache.org/html

關於ActiveMQ消息傳遞的方式詳見:java

http://www.javashuo.com/article/p-ovkjfdgv-ev.htmlgit

http://www.javashuo.com/article/p-mgrbykqq-eh.htmlgithub

本篇博客旨在解決的問題:web

1.如何在普通Java環境中使用ActiveMQspring

2.ActiveMQ如何與Spring的整合(XML配置)apache

3.在SpringBoot中如何使用ActiveMQsegmentfault

環境:windows

1. windows 10 64bit瀏覽器

2. apache-activemq-5.14.4

3. jdk 1.8

4. maven 3.3

前置條件:

1.安裝啓動ActiveMQ:

在官方網站(https://activemq.apache.org/components/classic/download/)上下載ActiveMQ

解壓後,進入到目錄bin中,根據本身操做系統的位數進入到win64或者win32目錄下,而後點擊activemq.bat啓動ActiveMQ。

啓動後在瀏覽器輸入http://localhost:8161/,看到如下畫面表示啓動成功:

點擊「Manage ActiveMQ broker」進入到ActiveMQ的後臺管理界面,若要求輸入用戶名密碼則初始用戶名密碼爲admin,admin,以下:

2.本博客使用Maven構建項目,引入如下依賴(問題1與問題2須要引入):

<!--Activemq消息中間件 start-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.4</version>
        </dependency>

問題1-如何在普通Java環境中使用ActiveMQ:

採用PTP方式傳遞消息:

消息生產者:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;
import java.util.Date;

/**
 * PTP方式傳遞消息
 */
public class ActiveMQProducer {

    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createQueue("test-queue");
        // 獲取消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化,此處學習,實際根據項目決定
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 構造消息
        for (int i = 1; i <= 4; i++) {
            Student student = new Student();
            student.setId((long) i);
            student.setName("學生" + i);
            student.setBirthday(new Date());
            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
            // 發送消息到目的地方
            producer.send(message);
            System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(),
                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
        }
        connection.close();
    }
}

消息消費者1:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * PTP方式接收消息
 */
public class ActiveMQConsumer1 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createQueue("test-queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

消息消費者2:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * PTP方式接收消息
 */
public class ActiveMQConsumer2 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createQueue("test-queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

先啓動兩個消息消費者,再啓動消息生產者,控制檯輸出信息以下:

消息生產者:

消息消費者1:

消息消費者2:

這個結果使咱們很容易理解PTP的消息傳遞方式。

採用Pub/Sub方式傳遞消息:

消息生產者:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;
import java.util.Date;

/**
 * Pub/Sub方式傳遞消息
 */
public class ActiveMQProducer {

    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createTopic("test-topic");
        // 獲取消息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設置不持久化,此處學習,實際根據項目決定
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 構造消息
        for (int i = 1; i <= 4; i++) {
            Student student = new Student();
            student.setId((long) i);
            student.setName("學生" + i);
            student.setBirthday(new Date());
            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
            // 發送消息到目的地方
            producer.send(message);
            System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(),
                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
        }
        connection.close();
    }
}

消息消費者1:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * Pub/Sub方式接收消息
 */
public class ActiveMQConsumer1 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createTopic("test-topic");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

消息消費者2:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * Pub/Sub方式接收消息
 */
public class ActiveMQConsumer2 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory實例對象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取鏈接對象
        Connection connection = connectionFactory.createConnection();
        // 啓動
        connection.start();
        // 獲取操做鏈接,一個發送或接收消息的線程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取消息目的地,消息發送給誰
        Destination destination = session.createTopic("test-topic");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

先啓動兩個消息消費者,再啓動消息生產者,控制檯輸出信息以下:

消息生產者:

消息消費者1:

消息消費者2:

這個結果使咱們很容易理解Pub/Sub的消息傳遞方式。

總結:

從以上代碼能夠看出PTP與Pub/Sub方式的消息傳遞,只是在建立消息目的地的時候不同:

PTP方式建立的消息目的地是Queue(隊列),Pub/Sub方式建立的消息目的地是Topic(主題)。

問題2-ActiveMQ如何與Spring的整合(XML配置):

ActiveMQ與Spring整合時並不須要額外依賴相似xxx-spring.jar的jar包,由於在activemq-all包中已經包含了這些依賴。

相似於其餘框架諸如Quartz定時等框架與Spring整合同樣,須要配置xml並在applicationContext.xml總配置文件中引入ActiveMQ的配置文件。

ActiveMQ的配置文件以下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation = "
			http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans.xsd
            ">
    <!-- 配置可以產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id = "tagertConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name = "brokerURL" value = "tcp://localhost:61616" />
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,至關於spring對connectionfactory的一層封裝 -->
    <bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory">
        <property name = "targetConnectionFactory" ref = "tagertConnectionFactory" />
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring使用JMS工具類,能夠用來發送和接收消息 -->
    <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
        <!-- 這裏是配置受Spring管理的connectionfactory -->
        <property name = "connectionFactory" ref = "connectionFactory" />
    </bean>
    <!-- 配置destination -->
    <!-- 隊列目的地 -->
    <bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value = "spring-test-queue" />
    </bean>
    <!-- 話題目的地 -->
    <bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value = "spring-test-topic" />
    </bean>

    <!--消息監聽器-->
    <bean id = "iMessageListener" class = "at.flying.activemq.listener.IMessageListener" />
    <!-- 配置消息監聽器 -->
    <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name = "connectionFactory" ref = "connectionFactory" />
        <property name = "destination" ref = "queueDestination" />
        <property name = "messageListener" ref = "iMessageListener" />
    </bean>
    <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name = "connectionFactory" ref = "connectionFactory" />
        <property name = "destination" ref = "topicDestination" />
        <property name = "messageListener" ref = "iMessageListener" />
    </bean>
</beans>

消息監聽器定義以下:

package at.flying.activemq.listener;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息監聽器(消費者)
 */
public class IMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                Student student = CommonUtils.deserialize(textMessage.getText());
                System.out.println(
                        String.format("%sListener-接受消息:%d-%s-%s",
                                message.getJMSDestination().toString().toLowerCase().startsWith("topic") ? "Topic" :
                                        "Queue",
                                student.getId(), student.getName(),
                                DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

在applicationContext.xml總配置文件中引入ActiveMQ的配置文件:

至此,配置文件配置完畢。

爲測試ActiveMQ在Web應用中的使用,咱們須要寫一個頁面與一個Controller來作測試。

準備一個JSP頁面(其實隨便啥頁面都行):

<%@ page language = "java" import = "java.util.*" pageEncoding = "UTF-8" %>
<%@taglib prefix = "c" uri = "http://java.sun.com/jsp/jstl/core" %>
<%
    String path = request.getContextPath();
    String basePath =
            request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/";
%>
<!DOCTYPE html>
<html lang = "en">
<head>
    <base href = "<%=basePath %>" />
    <meta charset = "UTF-8">
    <title>Activemq 學習</title>
</head>
<body>
    <center><h1>Activemq 學習</h1></center>
    <h1>測試PTP方式傳遞消息</h1>
    <form action = "<c:url value='/activemq/testQueue'/>" method = "get">
        sid:<input name = "sid" type = "text" />
        name:<input name = "name" type = "text" />
        <button onsubmit = "true">test Queue</button>
    </form>
    <br />
    <h1>測試Pub/Sub方式傳遞消息</h1>
    <form action = "<c:url value='/activemq/testTopic'/>" method = "get">
        sid:<input name = "sid" type = "text" />
        name:<input name = "name" type = "text" />
        <button onsubmit = "true">test Topic</button>
    </form>
</body>
</html>

準備一個接收請求的Controller:

package at.flying.web.action;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

import javax.jms.*;
import java.util.Date;

@Controller
@RequestMapping("activemq")
public class ActivemqAction {

    @Autowired
    @Qualifier("jmsTemplate")
    private JmsTemplate jmsTemplate;
    @Autowired
    @Qualifier("queueDestination")
    private Destination queueDestination;
    @Autowired
    @Qualifier("topicDestination")
    private Destination topicDestination;


    @RequestMapping(value = "testQueue", method = {RequestMethod.GET, RequestMethod.POST})
    public ModelAndView testQueue(
            @RequestParam("sid")
                    Long sid,
            @RequestParam("name")
                    String name) {
        ModelAndView modelAndView = new ModelAndView();
        this.jmsTemplate.send(this.queueDestination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                Student student = new Student();
                student.setBirthday(new Date());
                student.setId(sid);
                student.setName(name);
                TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
                // 發送消息到目的地方
                System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(),
                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                return message;
            }
        });
        modelAndView.setViewName("redirect:/activemq/start.jsp");
        return modelAndView;
    }

    @RequestMapping(value = "testTopic", method = {RequestMethod.GET, RequestMethod.POST})
    public ModelAndView testTopic(
            @RequestParam("sid")
                    Long sid,
            @RequestParam("name")
                    String name) {
        ModelAndView modelAndView = new ModelAndView();
        this.jmsTemplate.send(this.topicDestination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                Student student = new Student();
                student.setBirthday(new Date());
                student.setId(sid);
                student.setName(name);
                TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
                // 發送消息到目的地方
                System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(),
                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                return message;
            }
        });
        modelAndView.setViewName("redirect:/activemq/start.jsp");
        return modelAndView;
    }
}

測試結果以下:

①PTP方式:

消息發送頁面:

消息發送控制檯:

消息接收控制檯:

 

②Pub/Sub方式:

消息發送頁面:

消息發送控制檯:

消息接收控制檯:

 

問題3-在SpringBoot中如何使用ActiveMQ:

首先在pom文件中加入以下依賴:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

application.properties文件中加入以下配置:

#ActiveMQ
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#配置消息類型,false則消息模式爲PTP,true則消息模式爲PUB/SUB,默認值爲false
spring.jms.pub-sub-domain=false

新建一個配置類ActiveMQConfig:

package at.flying.springbootproject.config.activemq;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Destination;

@Configuration
public class ActiveMQConfig {

    @Bean("test-queue")
    public Destination testQueue() {
        return new ActiveMQQueue("test-queue");
    }

    @Bean("test-topic")
    public Destination testTopic() {
        return new ActiveMQTopic("test-topic");
    }
}

配置監聽器(消息消費者):

package at.flying.springbootproject.config.activemq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerListenters {

    @JmsListener(destination = "test-queue")
    public void testQueue1(String msg) {
        System.out.println(String.format("test-queue1:%s", msg));
    }

    @JmsListener(destination = "test-queue")
    public void testQueue2(String msg) {
        System.out.println(String.format("test-queue2:%s", msg));
    }

    @JmsListener(destination = "test-topic")
    public void testTopic1(String msg) {
        System.out.println(String.format("test-topic1:%s", msg));
    }

    @JmsListener(destination = "test-topic")
    public void testTopic2(String msg) {
        System.out.println(String.format("test-topic2:%s", msg));
    }
}

配置消息生產者:

package at.flying.springbootproject.service;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

@Service
public class ActiveMQService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    @Qualifier("test-queue")
    private Destination testQueue;
    @Autowired
    @Qualifier("test-topic")
    private Destination testTopic;

    public void testQueue(String msg) {
        if (StringUtils.isNotBlank(msg)) {
            this.jmsMessagingTemplate.convertAndSend(this.testQueue, msg);
        }
    }

    public void testTopic(String msg) {
        if (StringUtils.isNotBlank(msg)) {
            this.jmsMessagingTemplate.convertAndSend(this.testTopic, msg);
        }
    }
}

到這裏其實已經配置完畢,可是爲了測試效果咱們還須要一個Controller來接受頁面請求而後觸發消息的發送:

package at.flying.springbootproject.controller;

import at.flying.springbootproject.service.ActiveMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping("activemq")
public class TestActiveMQController {
    @Autowired
    private ActiveMQService activeMQService;

    @RequestMapping("test-queue")
    @ResponseBody
    public String test1(
            @RequestParam(value = "msg", required = false)
                    String msg) {
        this.activeMQService.testQueue(msg);
        return msg;
    }

    @RequestMapping("test-topic")
    @ResponseBody
    public String test2(
            @RequestParam(value = "msg", required = false)
                    String msg) {
        this.activeMQService.testTopic(msg);
        return msg;
    }
}

而後咱們打開瀏覽器請求test-queue:

連續請求4次,控制檯輸出以下:

輸出了四條消息,而且是兩個消費者輪流消費。

如今咱們來測試test-topic:

注意:

此時咱們須要把application.properties裏的spring.jms.pub-sub-domain屬性改成true,由於true值才表明消息模式爲PUB/SUB,若不更改不會報錯,可是發送Topic消息時消息消費者不會消費該消息,也就是沒有觸發Topic消息監聽器。

咱們打開瀏覽器請求test-topic:

連續請求2次,控制檯輸出以下:

輸出了四條消息,同一消息兩個Topic消費者均消費了。

相關文章
相關標籤/搜索