<!-- xbean 如<amq:connectionFactory /> --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.1</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- ActiveMQ 鏈接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- 提升效率,配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!--定義消息隊列(Queue)--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="order.queue"/> </bean> <!--定義主題(Topic)--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="order.topic"/> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,利用它發送、接收消息。 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默認是false --> <property name="pubSubDomain" value="false" /> </bean> <!-- 配置JMS模板(topic),Spring提供的JMS工具類,利用它發送、接收消息。 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="topicDestination" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默認是false --> <property name="pubSubDomain" value="true" /> </bean> <!-- 配置消息隊列監聽者(Queue or Topic) --> <bean id="messageListener1" class="com.mq.QueueMessageListener" /> <bean id="messageListener2" class="com.mq.QueueMessageListener2" /> <bean id="messageListener3" class="com.mq.QueueMessageListener3" /> <bean id="messageListener4" class="com.mq.QueueMessageListener4" /> <!--queue監聽器--> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="order.queue" ref="messageListener1" /> <jms:listener destination="order.queue" ref="messageListener2" /> </jms:listener-container> <!-- Topic監聽器 --> <!--<jms:listener-container destination-type="topic" --> <!--container-type="default" connection-factory="connectionFactory"--> <!--acknowledge="auto">--> <!--在測試中發現該方式出現消息被重複接收的狀況--> <!--<jms:listener destination="order.topic" ref="messageListener3" />--> <!--<jms:listener destination="order.topic" ref="messageListener4" />--> <!--</jms:listener-container>--> <!--DefaultMessageListenerContainer實現topic監聽器--> <bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="messageListener" ref="messageListener3" /> <property name="destination" ref="topicDestination" /> <!--解決topic消息被重複接收--> <property name="pubSubNoLocal" value="true"></property> </bean> <!--DefaultMessageListenerContainer實現topic監聽器--> <bean id="container2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="messageListener" ref="messageListener4" /> <property name="destination" ref="topicDestination" /> <!--解決topic消息被重複接收--> <property name="pubSubNoLocal" value="true"></property> </bean> </beans>
點對點消息生產者 @Service public class QueueProductService { @Autowired private JmsTemplate jmsQueueTemplate; //使用默認目的地 public void sendMessageDefault(final String msg) { Destination destination = jmsQueueTemplate.getDefaultDestination(); System.out.println("向隊列: " + destination + " 成功發送一條消息"); jmsQueueTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } //可指定目的地 public void sendMessage(Destination destination, final String msg) { jmsQueueTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } } 發佈/訂閱消息生產者 @Service public class TopicProductService { @Autowired private JmsTemplate jmsTopicTemplate; //使用默認目的地 public void sendMessageDefault(final String msg) { Destination destination = jmsTopicTemplate.getDefaultDestination(); System.out.println("向topic: " + destination + " 成功發送一條消息"); jmsTopicTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } //可指定目的地 public void sendMessage(Destination destination, final String msg) { System.out.println(Thread.currentThread().getName() + " 向" + destination.toString() + "發送主題:" + msg); jmsTopicTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } }
public class QueueMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("監聽器1收到的消息-----"+tm.getText()); } catch (JMSException e) { System.out.println("QueueMessageListener異常"); e.printStackTrace(); } } } QueueMessageListener二、QueueMessageListener三、QueueMessageListener4和上面同樣
@RequestMapping("/send") @ResponseBody public String sendMeg(){ System.out.println("發送queue消息"); Destination destination=new ActiveMQQueue("order.queue"); queueProductService.sendMessage(destination,"hello.queue"); return "ok"; } @RequestMapping("/send2") @ResponseBody public String sendMeg2(){ System.out.println("發送topic消息"); Destination destination=new ActiveMQTopic("order.topic"); topicProductService.sendMessage(destination,"hello.topic.123"); return "ok"; } 注意實例化 Destination 方式: Destination destination=new ActiveMQQueue("order.queue"); Destination destination=new ActiveMQTopic("order.topic"); 觀察控制檯輸出: 發送topic消息 http-nio-8080-exec-5 向topic://order.topic發送主題:hello.topic.123 監聽器3收到的消息-----hello.topic.123 監聽器4收到的消息-----hello.topic.123 發送queue消息 監聽器2收到的消息-----hello.queue 發送queue消息 監聽器1收到的消息-----hello.queue 能夠看出: queue消息只會被隨機消費一次,topic會被消費兩次