對於ActiveMQ消息的發送,原聲的api操做繁瑣,並且若是不進行二次封裝,打開關閉會話以及各類建立操做也是夠夠的了。那麼,Spring提供了一個很方便的去收發消息的框架,spring jms。整合Spring後,代碼不只變得很是優雅,並且易用性和擴展性更好。php
<!-- activemq --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}" /> <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="jmsConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- 消息處理器 --> <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <!-- ====Producer side start==== --> <!-- 定義JmsTemplate的Queue類型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="jmsConnectionFactoryExtend" /> <!-- 非pub/sub模型(發佈/訂閱),即隊列模式 --> <property name="pubSubDomain" value="false" /> <property name="messageConverter" ref="jmsMessageConverter"></property> </bean> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="jmsConnectionFactoryExtend" /> <!-- pub/sub模型(發佈/訂閱) --> <property name="pubSubDomain" value="true" /> <property name="messageConverter" ref="jmsMessageConverter"></property> </bean> <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10"> <jms:listener destination="testqueue" ref="queueReciver" /> </jms:listener-container>
第一個是配置咱們的mq鏈接,ip+端口號,賬號密碼的信息。java
第二個是引入spring的mq鏈接池。能夠配置緩存的鏈接數。spring
第三個是消息處理器,Spring默認提供了基於Jdk Serializable的消息處理和MappingJackson2MessageConventer,其實這兩個挺經常使用,在Spring Redis中,在Spring MVC中,都有着這幾種conventer的身影。apache
下面是兩個發送消息的模版類,相似於以前講過的RedisTemplate。向其注入上面定義的消息處理器,代碼中咱們會用到。(其實類中已經判斷若是不進行注入就設置一個默認的,可是本身注入的話,方便咱們控制)swift
listener-container是Spring提供的一個監聽器容器,用於統一控制咱們的監聽類來接收處理消息。這裏面有一些配置,schema有說明。能夠配置響應模式,消費者數量等。開啓多消費者,有助於加快隊列處理速度。api
若是要用註解的方式,就不須要在xml中本身定義消息監聽容器了。只須要加入如下的代碼:緩存
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"> <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/> </bean> <!-- 監聽註解支持 --> <jms:annotation-driven/>
這樣,配置咱們消費處理類上的@listener註解,便可監聽對應的queue或者topic消息。session
隊列消息:app
@Resource @Component("queueSender") public class QueueSender { @Resource(name = "jmsQueueTemplate") private JmsTemplate jmsQueueTemplate;// 經過@Qualifier修飾符來注入對應的bean public void send(String destination, final Object message) { jmsQueueTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return jmsQueueTemplate.getMessageConverter().toMessage(message, session); } }); } }
訂閱消息:框架
@Component public class TopicSender { @Resource(name="jmsTopicTemplate") private JmsTemplate jmsTemplate; /** * 發送一條消息到指定的隊列(目標) * @param queueName 隊列名稱 * @param message 消息內容 */ public void publish(String destination,final Object message){ jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return jmsTemplate.getMessageConverter().toMessage(message, session); } }); } }
package cn.test.activemq.consumer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.listener.adapter.MessageListenerAdapter; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.stereotype.Component; import cn.test.MqBean; import cn.test.activemq.message.types.QueueDefination; /** * @author Han */ @Component("spqueueconsumertest") public class SpringQueueReciverTest extends MessageListenerAdapter{ private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class); @JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10") public void onMessagehehe(Message message, Session session) throws JMSException { try { MqBean bean = (MqBean) getMessageConverter().fromMessage(message); System.out.println(bean.getName()); System.out.println(session); message.acknowledge(); message.acknowledge(); } catch (MessageConversionException | JMSException e) { e.printStackTrace(); } } }
上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用註解方式監聽的時候加入。若是用xml配置容易,能夠忽略。
附上MqBean
public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
運行效果截圖: