首先說明這裏是在qctivemq配置好並啓動服務的狀況下進行,請先自行配置好。也可關注個人博文(消息中間件qctivemq安全驗證配置)進行配置。前端
1.首先看一下項目結構java
2.所需jar包,這裏只列出mq相關jar包,spring相關不與說明。web
3.消息生產service QueueMessageProducer spring
package cn.carowl.activemq; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 消息生產者service * @author weishengbin * */ public class QueueMessageProducer { private JmsTemplate jmsTemplate; private Destination notifyQueue; private NotifyMessageConverter messageConverter; public void sendQueue(String noticeInfo) { sendMessage(noticeInfo); } private void sendMessage(final String noticeInfo) { this.jmsTemplate.send(notifyQueue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(noticeInfo); } }); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getNotifyQueue() { return notifyQueue; } public void setNotifyQueue(Destination notifyQueue) { this.notifyQueue = notifyQueue; } public NotifyMessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(NotifyMessageConverter messageConverter) { this.messageConverter = messageConverter; } }
4.消息轉換NotifyMessageConverter express
package cn.carowl.activemq; import java.io.Serializable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.command.ActiveMQObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; /** * 消息轉換 * @author weishengbin * */ public class NotifyMessageConverter implements MessageConverter { private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class); /** * 轉換NoticeInfo對象到消息 */ public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { System.out.println("sendMessage:"+object.toString()); ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); msg.setObject((Serializable) object); return msg; } /** * 轉換接收到的消息爲NoticeInfo對象 */ public Object fromMessage(Message message) throws JMSException, MessageConversionException { if (logger.isDebugEnabled()) { logger.debug("Receive JMS message :" + message); } if (message instanceof ObjectMessage) { ObjectMessage oMsg = (ObjectMessage) message; if (oMsg instanceof ActiveMQObjectMessage) { ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) oMsg; try { Object object = aMsg.getObject(); String noticeInfo = object.toString(); return noticeInfo; } catch (Exception e) { logger.error("Message:${} is not a instance of NoticeInfo." + message.toString()); throw new JMSException( "Message:" + message.toString() + "is not a instance of NoticeInfo." + message.toString()); } } else { logger.error("Message:${} is not a instance of ActiveMQObjectMessage." + message.toString()); throw new JMSException("Message:" + message.toString() + "is not a instance of ActiveMQObjectMessage." + message.toString()); } } else { logger.error("Message:${} is not a instance of ObjectMessage." + message.toString()); throw new JMSException( "Message:" + message.toString() + "is not a instance of ObjectMessage." + message.toString()); } } }
5.監聽接收消息QueueMessageListener apache
package cn.carowl.activemq; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 監聽接收消息 * @author weishengbin * */ public class QueueMessageListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class); private NotifyMessageConverter messageConverter; /** * 接收消息 */ public void onMessage(Message message) { try { ObjectMessage textMessage = (ObjectMessage) message; String str = (String)messageConverter.fromMessage(textMessage); System.out.println("接收到的消息:"+str); } catch (Exception e) { logger.error("處理信息時發生異常", e); } } public NotifyMessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(NotifyMessageConverter messageConverter) { this.messageConverter = messageConverter; } }
6.消息生產者Sender api
package cn.carowl.activemq; import javax.servlet.ServletContext; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * 消息生產者 * @author weishengbin * */ @Service public class Sender { private static ServletContext servletContext; private static BeanFactory factory; @Autowired private QueueMessageProducer notifyMessageProducer; /** * 發送點對點信息 * * @param noticeInfo */ public void setQueueSender(String obj) { // factory = new ClassPathXmlApplicationContext("classpath:/applicationContext-activemq.xml"); // factory.getBean("queueMessageProducer"); // QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) factory.getBean("queueMessageProducer")); notifyMessageProducer.sendQueue(obj); } public static ServletContext getServletContext() { return servletContext; } public static void setServletContext(ServletContext servletContext) { Sender.servletContext = servletContext; } public static BeanFactory getFactory() { return factory; } public static void setFactory(BeanFactory factory) { Sender.factory = factory; } }
7.發送消息TestSend瀏覽器
package cn.carowl.activemq; import javax.annotation.Resource; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("/api") public class TestSend { @Resource(name="sender") private Sender sender; @RequestMapping(value = "/send/test", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE) @ResponseBody public void send() { sender.setQueueSender("發送的消息"); } }
8.activemq配置文件applicationContext-activemq.xml緩存
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> <context:component-scan base-package="cn.carowl.activemq"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <!-- ActiveMQ 鏈接工廠 --> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)" /> <property name="closeTimeout" value="60000" /> <property name="userName" value="admin" /> <property name="password" value="admin" /> </bean> <!-- Spring Caching鏈接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectinFactory"></property> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="10"></property> </bean> <!-- 配置消息發送目的地方式 --> <!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 --> <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="q.notify"></constructor-arg> </bean> <!-- 目的地:Topic主題 :放入一個消息,全部訂閱者都會收到 --> <!--這個是主題目的地,一對多的 --> <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="t.notify"></constructor-arg> </bean> <!-- Spring JMS Template 配置JMS模版 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> </bean> <!-- 使用Spring JmsTemplate 的消息生產者 --> <bean id="queueMessageProducer" class="cn.carowl.activemq.QueueMessageProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> <property name="notifyQueue" ref="notifyQueue"></property> <property name="messageConverter" ref="messageConverter"></property> </bean> <!-- <bean id="topicMessageProducer" class="cn.carowl.activemq.TopicMessageProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> <property name="notifyTopic" ref="notifyTopic"></property> <property name="messageConverter" ref="messageConverter"></property> </bean> --> <!-- 消息消費者 通常使用spring的MDP異步接收Queue模式 --> <!-- 消息監聽容器 --> <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectinFactory"></property> <property name="destination" ref="notifyQueue"></property> <property name="messageListener" ref="queueMessageListener"></property> </bean> <!-- 消息監聽容器 <bean id="topicContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectinFactory"></property> <property name="destination" ref="notifyTopic"></property> <property name="messageListener" ref="topicMessageListener"></property> <property name="pubSubDomain" value="true" /> </bean> --> <!-- 異步接收消息處理類 --> <bean id="queueMessageListener" class="cn.carowl.activemq.QueueMessageListener"> <property name="messageConverter" ref="messageConverter"></property> </bean> <!-- <bean id="topicMessageListener" class="cn.carowl.activemq.TopicMessageListener"> <property name="messageConverter" ref="messageConverter"></property> </bean> --> <bean id="messageConverter" class="cn.carowl.activemq.NotifyMessageConverter"> </bean> </beans>
9.配置前端控制器web.xml安全
<context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath*:/applicationContext.xml, classpath*:/applicationContext-shiro.xml, classpath*:/applicationContext-activemq.xml </param-value> </context-param> <servlet> <servlet-name>springmvc</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath*:/applicationContext-mvc.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springmvc</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping>
10.啓動服務,瀏覽器訪問接口路徑,測試發送消息是否成功!