ActiveMQ官方網站:https://activemq.apache.org/html
關於ActiveMQ消息傳遞的方式詳見:java
http://www.javashuo.com/article/p-ovkjfdgv-ev.htmlgit
http://www.javashuo.com/article/p-mgrbykqq-eh.htmlgithub
本篇博客旨在解決的問題:web
1.如何在普通Java環境中使用ActiveMQspring
2.ActiveMQ如何與Spring的整合(XML配置)apache
3.在SpringBoot中如何使用ActiveMQsegmentfault
環境:windows
1. windows 10 64bit瀏覽器
2. apache-activemq-5.14.4
3. jdk 1.8
4. maven 3.3
前置條件:
1.安裝啓動ActiveMQ:
在官方網站(https://activemq.apache.org/components/classic/download/)上下載ActiveMQ
解壓後,進入到目錄bin中,根據本身操做系統的位數進入到win64或者win32目錄下,而後點擊activemq.bat啓動ActiveMQ。
啓動後在瀏覽器輸入http://localhost:8161/,看到如下畫面表示啓動成功:
點擊「Manage ActiveMQ broker」進入到ActiveMQ的後臺管理界面,若要求輸入用戶名密碼則初始用戶名密碼爲admin,admin,以下:
2.本博客使用Maven構建項目,引入如下依賴(問題1與問題2須要引入):
<!--Activemq消息中間件 start--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.4</version> </dependency>
消息生產者:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; import java.util.Date; /** * PTP方式傳遞消息 */ public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createQueue("test-queue"); // 獲取消息生產者 MessageProducer producer = session.createProducer(destination); // 設置不持久化,此處學習,實際根據項目決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 構造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("學生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 發送消息到目的地方 producer.send(message); System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); } }
消息消費者1:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * PTP方式接收消息 */ public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createQueue("test-queue"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
消息消費者2:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * PTP方式接收消息 */ public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createQueue("test-queue"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
先啓動兩個消息消費者,再啓動消息生產者,控制檯輸出信息以下:
消息生產者:
消息消費者1:
消息消費者2:
這個結果使咱們很容易理解PTP的消息傳遞方式。
消息生產者:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; import java.util.Date; /** * Pub/Sub方式傳遞消息 */ public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createTopic("test-topic"); // 獲取消息生產者 MessageProducer producer = session.createProducer(destination); // 設置不持久化,此處學習,實際根據項目決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 構造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("學生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 發送消息到目的地方 producer.send(message); System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); } }
消息消費者1:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * Pub/Sub方式接收消息 */ public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createTopic("test-topic"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
消息消費者2:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * Pub/Sub方式接收消息 */ public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 構造ConnectionFactory實例對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取鏈接對象 Connection connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接,一個發送或接收消息的線程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取消息目的地,消息發送給誰 Destination destination = session.createTopic("test-topic"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
先啓動兩個消息消費者,再啓動消息生產者,控制檯輸出信息以下:
消息生產者:
消息消費者1:
消息消費者2:
這個結果使咱們很容易理解Pub/Sub的消息傳遞方式。
總結:
從以上代碼能夠看出PTP與Pub/Sub方式的消息傳遞,只是在建立消息目的地的時候不同:
PTP方式建立的消息目的地是Queue(隊列),Pub/Sub方式建立的消息目的地是Topic(主題)。
ActiveMQ與Spring整合時並不須要額外依賴相似xxx-spring.jar的jar包,由於在activemq-all包中已經包含了這些依賴。
相似於其餘框架諸如Quartz定時等框架與Spring整合同樣,須要配置xml並在applicationContext.xml總配置文件中引入ActiveMQ的配置文件。
ActiveMQ的配置文件以下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = " http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd "> <!-- 配置可以產生connection的connectionfactory,由JMS對應的服務廠商提供 --> <bean id = "tagertConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name = "brokerURL" value = "tcp://localhost:61616" /> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,至關於spring對connectionfactory的一層封裝 --> <bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory"> <property name = "targetConnectionFactory" ref = "tagertConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring使用JMS工具類,能夠用來發送和接收消息 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 這裏是配置受Spring管理的connectionfactory --> <property name = "connectionFactory" ref = "connectionFactory" /> </bean> <!-- 配置destination --> <!-- 隊列目的地 --> <bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value = "spring-test-queue" /> </bean> <!-- 話題目的地 --> <bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value = "spring-test-topic" /> </bean> <!--消息監聽器--> <bean id = "iMessageListener" class = "at.flying.activemq.listener.IMessageListener" /> <!-- 配置消息監聽器 --> <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name = "connectionFactory" ref = "connectionFactory" /> <property name = "destination" ref = "queueDestination" /> <property name = "messageListener" ref = "iMessageListener" /> </bean> <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name = "connectionFactory" ref = "connectionFactory" /> <property name = "destination" ref = "topicDestination" /> <property name = "messageListener" ref = "iMessageListener" /> </bean> </beans>
消息監聽器定義以下:
package at.flying.activemq.listener; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息監聽器(消費者) */ public class IMessageListener implements MessageListener { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { Student student = CommonUtils.deserialize(textMessage.getText()); System.out.println( String.format("%sListener-接受消息:%d-%s-%s", message.getJMSDestination().toString().toLowerCase().startsWith("topic") ? "Topic" : "Queue", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } catch (JMSException e) { e.printStackTrace(); } } } }
在applicationContext.xml總配置文件中引入ActiveMQ的配置文件:
至此,配置文件配置完畢。
爲測試ActiveMQ在Web應用中的使用,咱們須要寫一個頁面與一個Controller來作測試。
準備一個JSP頁面(其實隨便啥頁面都行):
<%@ page language = "java" import = "java.util.*" pageEncoding = "UTF-8" %> <%@taglib prefix = "c" uri = "http://java.sun.com/jsp/jstl/core" %> <% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; %> <!DOCTYPE html> <html lang = "en"> <head> <base href = "<%=basePath %>" /> <meta charset = "UTF-8"> <title>Activemq 學習</title> </head> <body> <center><h1>Activemq 學習</h1></center> <h1>測試PTP方式傳遞消息</h1> <form action = "<c:url value='/activemq/testQueue'/>" method = "get"> sid:<input name = "sid" type = "text" /> name:<input name = "name" type = "text" /> <button onsubmit = "true">test Queue</button> </form> <br /> <h1>測試Pub/Sub方式傳遞消息</h1> <form action = "<c:url value='/activemq/testTopic'/>" method = "get"> sid:<input name = "sid" type = "text" /> name:<input name = "name" type = "text" /> <button onsubmit = "true">test Topic</button> </form> </body> </html>
準備一個接收請求的Controller:
package at.flying.web.action; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import javax.jms.*; import java.util.Date; @Controller @RequestMapping("activemq") public class ActivemqAction { @Autowired @Qualifier("jmsTemplate") private JmsTemplate jmsTemplate; @Autowired @Qualifier("queueDestination") private Destination queueDestination; @Autowired @Qualifier("topicDestination") private Destination topicDestination; @RequestMapping(value = "testQueue", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testQueue( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.queueDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 發送消息到目的地方 System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; } @RequestMapping(value = "testTopic", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testTopic( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.topicDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 發送消息到目的地方 System.out.println(String.format("發送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; } }
測試結果以下:
①PTP方式:
消息發送頁面:
消息發送控制檯:
消息接收控制檯:
②Pub/Sub方式:
消息發送頁面:
消息發送控制檯:
消息接收控制檯:
首先在pom文件中加入以下依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
application.properties文件中加入以下配置:
#ActiveMQ spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin #配置消息類型,false則消息模式爲PTP,true則消息模式爲PUB/SUB,默認值爲false spring.jms.pub-sub-domain=false
新建一個配置類ActiveMQConfig:
package at.flying.springbootproject.config.activemq; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Destination; @Configuration public class ActiveMQConfig { @Bean("test-queue") public Destination testQueue() { return new ActiveMQQueue("test-queue"); } @Bean("test-topic") public Destination testTopic() { return new ActiveMQTopic("test-topic"); } }
配置監聽器(消息消費者):
package at.flying.springbootproject.config.activemq; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class ConsumerListenters { @JmsListener(destination = "test-queue") public void testQueue1(String msg) { System.out.println(String.format("test-queue1:%s", msg)); } @JmsListener(destination = "test-queue") public void testQueue2(String msg) { System.out.println(String.format("test-queue2:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic1(String msg) { System.out.println(String.format("test-topic1:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic2(String msg) { System.out.println(String.format("test-topic2:%s", msg)); } }
配置消息生產者:
package at.flying.springbootproject.service; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; @Service public class ActiveMQService { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired @Qualifier("test-queue") private Destination testQueue; @Autowired @Qualifier("test-topic") private Destination testTopic; public void testQueue(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testQueue, msg); } } public void testTopic(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testTopic, msg); } } }
到這裏其實已經配置完畢,可是爲了測試效果咱們還須要一個Controller來接受頁面請求而後觸發消息的發送:
package at.flying.springbootproject.controller; import at.flying.springbootproject.service.ActiveMQService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("activemq") public class TestActiveMQController { @Autowired private ActiveMQService activeMQService; @RequestMapping("test-queue") @ResponseBody public String test1( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testQueue(msg); return msg; } @RequestMapping("test-topic") @ResponseBody public String test2( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testTopic(msg); return msg; } }
而後咱們打開瀏覽器請求test-queue:
連續請求4次,控制檯輸出以下:
輸出了四條消息,而且是兩個消費者輪流消費。
如今咱們來測試test-topic:
注意:
此時咱們須要把application.properties裏的spring.jms.pub-sub-domain屬性改成true,由於true值才表明消息模式爲PUB/SUB,若不更改不會報錯,可是發送Topic消息時消息消費者不會消費該消息,也就是沒有觸發Topic消息監聽器。
咱們打開瀏覽器請求test-topic:
連續請求2次,控制檯輸出以下:
輸出了四條消息,同一消息兩個Topic消費者均消費了。