轉載自雲棲社區java
摘要: Sprng-jms消息服務小項目 所需的包: spring的基礎包 spring-jms-xx包 spring-message–xx包 commons-collection-xx包 commons-pool2-xx包 aop切面的包: spring-aop,spring-aspect...spring
Sprng-jms消息服務小項目apache
所需的包:
spring的基礎包
spring-jms-xx包
spring-message–xx包
commons-collection-xx包
commons-pool2-xx包
aop切面的包: spring-aop,spring-aspect,aopalliance,aspectjrt.jar,aspectjweaver.jarmarkdown
配置:
1.配置ConnectionFactory
2.配置jmsTemplate;
3.配置Destinationsession
4.配置listener
5.配置container架構
話很少說,直接上代碼
前提是你已經開啓了activemq服務
看一下項目架構app
臨聽器有兩種實現方案: 一種是採用原生的jms的MessageListener
另外一種是採用spring的方案:SessionAwareMessageListener
前一種只能消費消息,不能發送回送消息
後一種能夠再接到消息後,給生產者回送消息,它本身便是生產者也是消費者
全部所需的鏈接工、目的地等都是經過Spring注入的,具體看後面的配置文件tcp
發送消息的接口:ide
package com.tg.service;
public interface PersonService {
public void sendMessage(String message);
}
生產者一:測試
package com.tg.service; import javax.annotation.Resource; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.context.annotation.Scope; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @Service("personServiceImpl") @Scope("prototype") public class PersonServiceImpl implements PersonService { private Destination destination; private JmsTemplate jsmTemplate; private Destination replyQueueDestination; @Override public void sendMessage(final String message) { System.out.println("生產者發送消息"+ message); //回調 jsmTemplate.send(destination, new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException { //建立一個文本消息 Message msg = session.createTextMessage(message); //指定爲非持久化方式 msg.setJMSDeliveryMode( DeliveryMode.NON_PERSISTENT ); return msg; } }); } @Resource(name="queueDestination") public void setDestination(Destination destination) { this.destination = destination; } @Resource(name="jmsTemplate") public void setJsmTemplate(JmsTemplate jsmTemplate) { this.jsmTemplate = jsmTemplate; } @Resource(name = "replyQueueDestination") public void setReplyDestination(Destination replyDestination) { this.replyQueueDestination = replyDestination; } }
消費者一:
package com.tg.service; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; //臨聽器有兩種實現方案: 一種是採用原生的jms的MessageListener // 另外一種是採用spring的方案:SessionAwareMessageListener //注意: 這裏的MessageListener接口是 jms的接口 public class ConsumerMessageListener1 implements MessageListener { @Override public void onMessage(Message message) { if( message instanceof TextMessage){ TextMessage text=(TextMessage) message; System.out.println("接收到的消息是一個文本消息:"+ text); //這種方式沒法回覆 因此採用第二種 } } }
生產者二:
package com.tg.service; import javax.annotation.Resource; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @Service("personServiceImpl2") public class PersonServiceImpl2 implements PersonService { private Destination destination; //用於存發送信息的隊列 private JmsTemplate jsmTemplate; //jms操做模板 private Destination replyQueueDestination; //用於存回覆信息的隊列, @Override public void sendMessage(final String message) { System.out.println("生產者2->發送消息" + message); // 回調 jsmTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); // 設置回覆的信息的目的地. msg.setJMSReplyTo(replyQueueDestination); // 設置發送的信息類型 爲非持久化信息 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); //建立一個消費者,用於接收對方回覆的信息 注意,這個消費者臨聽 replyDestination MessageConsumer comsumer2 = session.createConsumer(replyQueueDestination); comsumer2.setMessageListener(new MessageListener() { public void onMessage(Message m) { try { System.out.println("接收到的回覆信息:" + ((TextMessage) m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); return msg; } }); } @Resource(name = "replyQueueDestination") public void setReplyDestination(Destination replyDestination) { this.replyQueueDestination = replyDestination; } @Resource(name = "sendQueueDestination") public void setDestination(Destination destination) { this.destination = destination; } @Resource(name = "jmsTemplate") public void setJsmTemplate(JmsTemplate jsmTemplate) { this.jsmTemplate = jsmTemplate; } }
消費者二:
package com.tg.service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.stereotype.Component; /** * SessionAwareMessageListener:是由spring提供,它能夠在回調方法中傳入session,以此回送信息到生產者 * @author Administrator * */ @Component("consumerMessageListener2") public class ConsumerMessageListener2 implements SessionAwareMessageListener<TextMessage> { private Destination destination; @Override public void onMessage(TextMessage message, Session session) throws JMSException { System.out.println("接收到的消息是一個文本消息:"+ message.getText()); //經過session 建立 producer對象,再回送信息 //從message中取出信息回送的目的地,以便建立生產者. MessageProducer producer=session.createProducer( message.getJMSReplyTo() ); //建立一條消息 Message textMessage=session.createTextMessage( "生產者發過來的信息已經處理完畢,game over..." ); //調用發送 producer.send(textMessage); } @Resource(name="sendQueueDestination") public void setDestination(Destination destination) { this.destination = destination; } }
Spring的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!-- 啓用註解解析器 --> <context:annotation-config /> <!-- 由於採用了混合解析方式( 有一部分配置在xml中,有一部分在java類中,因此要讓spring的註解解析器去掃描包 --> <context:component-scan base-package="com.*" /> <!-- 啓用aspectj的註解方式的代理 --> <aop:aspectj-autoproxy /> <!-- 建立一個真正的基於 jsm提供者的聯接工廠 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://169.254.173.100:61616" /> </bean> <!-- ActiveMQ聯接池的方案 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="100" /> </bean> <!-- 建立spring聯接工廠 --> <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> <!-- <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean> --> <!-- 配置jmsTemplate --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="singleConnectionFactory" /> </bean> <!-- 配置目的地: 這有兩種:一種是 Queue對應是 P2P模式,另外一種是 Topic 對應的是 發佈/訂閱模式, --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>myqueue</value> </constructor-arg> </bean> <!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg> <value>mytopic</value> </constructor-arg> </bean> --> <!-- 配置臨聽器 --> <bean id="consumerMessageListener1" class="com.tg.service.ConsumerMessageListener1"> </bean> <!-- 配置臨聽器運行時所在的容器 讓Listener運行在這個容器,這樣 只要有消息過來,就會回調 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener1" /> </bean> <!-- 如下是第二種方案 --> <!-- 如下用於存放生產者發送的信息 --> <bean id="sendQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>sendQueue1</value> </constructor-arg> </bean> <!-- 如下用於存放消費者回復的信息 --> <bean id="replyQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>replyQueue1</value> </constructor-arg> </bean> <!-- 配置 sessionAware的臨聽器 <bean id="consumerMessageListener2" class="com.yc.jms5.ConsumerMessageListener2" > <property name="destination" ref="sendQueueDestination" /> <property name="replyDestination" ref="replyQueueDestination" /> </bean> --> <!-- 配置 consumerMessageListener2的容器 --> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" > <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="sendQueueDestination" /> <property name="messageListener" ref="consumerMessageListener2" /> </bean> </beans>
第一種的測試類:
package com.tg.test; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import com.tg.service.PersonService; //測試生產者發送了一個消息,消費者接收 //整合spring http://haohaoxuexi.iteye.com/blog/1893038 public class Test1 { public static void main(String[] args) { ApplicationContext ac=new ClassPathXmlApplicationContext("applicationContext.xml"); PersonService ps=(PersonService) ac.getBean("personServiceImpl"); for( int i=0;i<10;i++){ Thread t=new Thread(new User(ps,"湯"+i)); t.start(); } } } class User implements Runnable{ private PersonService ps; private String name; public User(PersonService ps, String name) { super(); this.ps = ps; this.name = name; } @Override public void run() { for( int i=0;i<10;i++){ ps.sendMessage(name+"向你問好"); } } }
運行結果 :
第二種方案測試類:
package com.tg.test; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.ApplicationContext; import com.tg.service.PersonService; //測試生產者發送了一個消息,消費者接收後,再回復一個信息到生產者,生產者接收到後,顯示這個回覆的信息 public class Test2 { public static void main(String[] args) { ApplicationContext ac=new ClassPathXmlApplicationContext("applicationContext.xml"); PersonService ps=(PersonService) ac.getBean("personServiceImpl2"); for( int i=0;i<10;i++){ Thread t=new Thread(new User(ps,": 湯1"+i)); t.start(); } } }
運行結果 :