ActiveMQ學習筆記(四)http://my.oschina.net/xiaoxishan/blog/380446 中記錄瞭如何使用原生的方式從ActiveMQ中收發消息。能夠看出,每次收發消息都要寫許多重複的代碼,Spring 爲咱們提供了更爲方便的方式,這就是Spring JMS。咱們經過一個例子展開講述。包括隊列、主題消息的收發相關的Spring配置、代碼、測試。java
本例中,消息的收發都寫在了一個工程裏。spring
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.4.RELEASE</version> </dependency> </dependencies>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)" /> </bean> <!-- 定義消息隊列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>queue1</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> </bean> <!--queue消息生產者 --> <bean id="producerService" class="guo.examples.mq02.queue.ProducerServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <!--queue消息消費者 --> <bean id="consumerService" class="guo.examples.mq02.queue.ConsumerServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean>
從下面的代碼能夠出,使用Spring JMS,能夠減小重複代碼(接口類ProducerService代碼省略)。shell
package guo.examples.mq02.queue; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate; /** * 向指定隊列發送消息 */ public void sendMessage(Destination destination, final String msg) { System.out.println("向隊列" + destination.toString() + "發送了消息------------" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向默認隊列發送消息 */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向隊列" +destination+ "發送了消息------------" + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
package guo.examples.mq02.queue; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; public class ConsumerServiceImpl implements ConsumerService { private JmsTemplate jmsTemplate; /** * 接受消息 */ public void receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("從隊列" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
接受消息的時候,能夠不用2.3節中的方式,Spring JMS一樣提供了消息監聽的模式,下面給出對應的配置和代碼。
apache
Spring配置session
<!-- 定義消息隊列(Queue),咱們監聽一個新的隊列,queue2 --> <bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>queue2</value> </constructor-arg> </bean> <!-- 配置消息隊列監聽者(Queue),代碼下面給出,只有一個onMessage方法 --> <bean id="queueMessageListener" class="guo.examples.mq02.queue.QueueMessageListener" /> <!-- 消息監聽容器(Queue),配置鏈接工廠,監聽的隊列是queue2,監聽器是上面定義的監聽器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination2" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
監聽類代碼app
package guo.examples.mq02.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class QueueMessageListener implements MessageListener { //當收到消息時,自動調用該方法。 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
在使用Spring JMS的時候,主題(Topic)和隊列消息的主要差別體如今JmsTemplate中"pubSubDomain"是否設置爲True。若是爲True,則是Topic;若是是false或者默認,則是queue。maven
<property name="pubSubDomain" value="true" />
<!-- 定義消息主題(Topic) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>guo_topic</value> </constructor-arg> </bean> <!-- 配置JMS模板(Topic),pubSubDomain="true"--> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> <!--topic消息發佈者 --> <bean id="topicProvider" class="guo.examples.mq02.topic.TopicProvider"> <property name="topicJmsTemplate" ref="topicJmsTemplate"></property> </bean> <!-- 消息主題監聽者 和 主題監聽容器 能夠配置多個,即多個訂閱者 --> <!-- 消息主題監聽者(Topic) --> <bean id="topicMessageListener" class="guo.examples.mq02.topic.TopicMessageListener" /> <!-- 主題監聽容器 (Topic) --> <bean id="topicJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> </bean>
package guo.examples.mq02.topic; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class TopicProvider { private JmsTemplate topicJmsTemplate; /** * 向指定的topic發佈消息 * * @param topic * @param msg */ public void publish(final Destination topic, final String msg) { topicJmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { System.out.println("topic name 是" + topic.toString() + ",發佈消息內容爲:\t" + msg); return session.createTextMessage(msg); } }); } public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) { this.topicJmsTemplate = topicJmsTemplate; } }
package guo.examples.mq02.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** *和隊列監聽的代碼同樣。 */ public class TopicMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("TopicMessageListener \t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package guo.examples.mq02; import javax.jms.Destination; import guo.examples.mq02.queue.ConsumerService; import guo.examples.mq02.queue.ProducerService; import guo.examples.mq02.topic.TopicProvider; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * 測試Spring JMS * * 1.測試生產者發送消息 * * 2. 測試消費者接受消息 * * 3. 測試消息監聽 * * 4.測試主題監聽 * */ @RunWith(SpringJUnit4ClassRunner.class) // ApplicationContext context = new // ClassPathXmlApplicationContext("applicationContext.xml"); @ContextConfiguration("/applicationContext.xml") public class SpringJmsTest { /** * 隊列名queue1 */ @Autowired private Destination queueDestination; /** * 隊列名queue2 */ @Autowired private Destination queueDestination2; /** * 主題 guo_topic */ @Autowired @Qualifier("topicDestination") private Destination topic; /** * 主題消息發佈者 */ @Autowired private TopicProvider topicProvider; /** * 隊列消息生產者 */ @Autowired @Qualifier("producerService") private ProducerService producer; /** * 隊列消息生產者 */ @Autowired @Qualifier("consumerService") private ConsumerService consumer; /** * 測試生產者向queue1發送消息 */ @Test public void testProduce() { String msg = "Hello world!"; producer.sendMessage(msg); } /** * 測試消費者從queue1接受消息 */ @Test public void testConsume() { consumer.receive(queueDestination); } /** * 測試消息監聽 * * 1.生產者向隊列queue2發送消息 * * 2.ConsumerMessageListener監聽隊列,並消費消息 */ @Test public void testSend() { producer.sendMessage(queueDestination2, "Hello China~~~~~~~~~~~~~~~"); } /** * 測試主題監聽 * * 1.生產者向主題發佈消息 * * 2.ConsumerMessageListener監聽主題,並消費消息 */ @Test public void testTopic() throws Exception { topicProvider.publish(topic, "Hello T-To-Top-Topi-Topic!"); } }
topic name 是topic://guo_topic,發佈消息內容爲: Hello T-To-Top-Topi-Topic! TopicMessageListener Hello T-To-Top-Topi-Topic! 向隊列queue://queue2發送了消息------------Hello China~~~~~~~~~~~~~~~ ConsumerMessageListener收到了文本消息: Hello China~~~~~~~~~~~~~~~ 向隊列queue://queue1發送了消息------------Hello world! 從隊列queue://queue1收到了消息: Hello world!
http://pan.baidu.com/s/1gdvPpWftcp