Java
消息服務(Java Message Service
,JMS
)是一個Java
標準,定義了使用消息代理的通用API 。在JMS
出現以前,每一個消息代理都有私有的API
,這就使得不一樣代理之間的消息代碼很難通用。可是藉助JMS
,全部聽從規範的實現都使用通用的接口,這就相似於JDBC
爲數據庫操做提供了通用的接口同樣。java
Spring
經過基於模板的抽象爲JMS
功能提供了支持,這個模板也就是JmsTemplate
。使用JmsTemplate
,可以很是容易地在消息生產方發送隊列和主題消息,在消費消息的那一方,也可以很是容易地接收這些消息。spring
Queue
與Topic
比較以下圖所示:數據庫
使用maven管理依賴包,增長pom.xml文件以下:apache
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.4.RELEASE</version> </dependency> </dependencies>
隊列(Queue)消息的收發session
點對點消息,若是沒有消費者在監聽隊列,消息將保留在隊列中,直至消息消費者鏈接到隊列爲止。這種消息傳遞模型是傳統意義上的懶模型或輪詢模型。在此模型中,消息不是自動推進給消息消費者的,而是要由消息消費者從隊列中請求得到(拉模式)。app
Spring配置文件內容以下:maven
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616)" /> </bean> <!-- 定義消息隊列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>queue1</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> </bean> <!--queue消息生產者 --> <bean id="producerService" class="com.yoodb.mq.queue.ProducerServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <!--queue消息消費者 --> <bean id="consumerService" class="com.yoodb.mq.queue.ConsumerServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> </beans>
消息生產者使用Spring JMS
消息,減小重複代碼(接口類ProducerService
代碼省略)以下:tcp
package com.yoodb.mq.queue; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate; /** * 向指定隊列發送消息 */ public void sendMessage(Destination destination, final String msg) { System.out.println("向隊列" + destination.toString() + "發送了消息___" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向默認隊列發送消息 */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向隊列" +destination+ "發送了消息___" + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
消息消費者代碼以下:ide
package com.yoodb.mq.queue; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; public class ConsumerServiceImpl implements ConsumerService { private JmsTemplate jmsTemplate; /** * 接受消息 */ public void receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("從隊列" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
隊列消息監聽,在接受消息的時候,能夠不用消息消費者代碼的方式,Spring JMS一樣提供了消息監聽的模式,對應的配置和代碼內容。工具
Spring配置文件以下:
<!-- 定義消息隊列(Queue),咱們監聽一個新的隊列,queue2 --> <bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>queue2</value> </constructor-arg> </bean> <!-- 配置消息隊列監聽者(Queue),代碼下面給出,只有一個onMessage方法 --> <bean id="queueMessageListener" class="com.yoodb.mq.queue.QueueMessageListener" /> <!-- 消息監聽容器(Queue),配置鏈接工廠,監聽的隊列是queue2,監聽器是上面定義的監聽器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination2" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
監聽類代碼以下:
package com.yoodb.mq.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class QueueMessageListener implements MessageListener { //當收到消息時,自動調用該方法。 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
主題(Topic)消息收發
pub/sub消息傳遞模型基本上是一個推模型。在該模型中,消息會自動廣播,消息消費者無須經過主動請求或輪詢主題的方法來得到新的消息。在使用Spring JMS的時候,主題[訂閱/發佈](Topic)和隊列消息模式的主要差別體如今JmsTemplate中"pubSubDomain"是否設置爲True。若是爲True,則是Topic;若是是false或者默認,則是Queue。
<property name="pubSubDomain" value="true" />
Spring配置文件內容以下:
<!-- 定義消息主題(Topic) --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>guo_topic</value> </constructor-arg> </bean> <!-- 配置JMS模板(Topic),pubSubDomain="true"--> <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> <!--topic消息發佈者 --> <bean id="topicProvider" class="com.yoodb.mq.topic.TopicProvider"> <property name="topicJmsTemplate" ref="topicJmsTemplate"></property> </bean> <!-- 消息主題監聽者 和 主題監聽容器 能夠配置多個,即多個訂閱者 --> <!-- 消息主題監聽者(Topic) --> <bean id="topicMessageListener" class="com.yoodb.mq.topic.TopicMessageListener" /> <!-- 主題監聽容器 (Topic) --> <bean id="topicJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="topicMessageListener" /> </bean>
消息發佈者代碼以下:
package com.yoodb.mq.topic; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class TopicProvider { private JmsTemplate topicJmsTemplate; /** * 向指定的topic發佈消息 * * @param topic * @param msg */ public void publish(final Destination topic, final String msg) { topicJmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { System.out.println("topic name 是" + topic.toString() + ",發佈消息內容爲:\t" + msg); return session.createTextMessage(msg); } }); } public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) { this.topicJmsTemplate = topicJmsTemplate; } }
消息訂閱者(監聽)代碼以下:
package com.yoodb.mq.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** *和隊列監聽的代碼同樣。 */ public class TopicMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("TopicMessageListener \t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
測試代碼以下:
package com.yoodb.mq; import javax.jms.Destination; import com.yoodb.mq.queue.ConsumerService; import com.yoodb.mq.queue.ProducerService; import com.yoodb.mq.topic.TopicProvider; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * 測試Spring JMS * * 1.測試生產者發送消息 * * 2. 測試消費者接受消息 * * 3. 測試消息監聽 * * 4.測試主題監聽 * */ @RunWith(SpringJUnit4ClassRunner.class) // ApplicationContext context = new // ClassPathXmlApplicationContext("applicationContext.xml"); @ContextConfiguration("/applicationContext.xml") public class SpringJmsTest { /** * 隊列名queue1 */ @Autowired private Destination queueDestination; /** * 隊列名queue2 */ @Autowired private Destination queueDestination2; /** * 主題 guo_topic */ @Autowired @Qualifier("topicDestination") private Destination topic; /** * 主題消息發佈者 */ @Autowired private TopicProvider topicProvider; /** * 隊列消息生產者 */ @Autowired @Qualifier("producerService") private ProducerService producer; /** * 隊列消息生產者 */ @Autowired @Qualifier("consumerService") private ConsumerService consumer; /** * 測試生產者向queue1發送消息 */ @Test public void testProduce() { String msg = "Hello world!"; producer.sendMessage(msg); } /** * 測試消費者從queue1接受消息 */ @Test public void testConsume() { consumer.receive(queueDestination); } /** * 測試消息監聽 * * 1.生產者向隊列queue2發送消息 * * 2.ConsumerMessageListener監聽隊列,並消費消息 */ @Test public void testSend() { producer.sendMessage(queueDestination2, "Hello China!!!"); } /** * 測試主題監聽 * 1.生產者向主題發佈消息 * 2.ConsumerMessageListener監聽主題,並消費消息 */ @Test public void testTopic() throws Exception { topicProvider.publish(topic, "Hello T-To-Top-Topi-Topic!"); } }
測試結果以下:
topic name 是topic://guo_topic,發佈消息內容爲:Hello T-To-Top-Topi-Topic! TopicMessageListener Hello T-To-Top-Topi-Topic! 向隊列queue://queue2發送了消息___Hello China!!! ConsumerMessageListener收到了文本消息:Hello China!!! 向隊列queue://queue1發送了消息___Hello world! 從隊列queue://queue1收到了消息:Hello world!
其餘相關文章推薦:
ActiveMQ消息隊列從入門到實踐(4)—使用Spring JMS收發消息
ActiveMQ消息隊列從入門到實踐(3)—經過ActiveMQ收發消息