說說在項目裏是怎麼使用activemq(簡稱爲amq)進行通訊的。java
有2個系統,面向不一樣的用戶,簡稱爲系統A和系統B。本文爲了簡單,只將系統A做爲 隊列A.CreateMessage的生產端,系統B做爲 隊列A.CreateMessage的消費端,傳輸的message可爲一個設計好的類的對象,本文爲了簡單,傳輸的是一個String對象。spring
另外,系統A也能夠做爲另外一隊列QC的消費端,系統B做爲隊列QC的生產端。apache
1.下載一個apache-activemq-5.10.2,根據系統類型(操做系統位數),選擇啓動bin目錄下的win32或win64目錄下的activemq.bat文件。啓動後,打開瀏覽器,輸入localhost:8161/admin/queues.jsp,瀏覽器
若是頁面是下面這樣的服務器
輸入用戶名:admin,密碼:admin就OK了。session
2.amq也啓動了,那麼接下來是在系統A加上amq相關內容。app
項目目錄結構以下:jsp
系統A的applicationContext-amq.xml文件:tcp
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd"> <!-- 使用spring的listenerContainer,消息用持久化保存,服務器重啓不會丟失 --> <!-- 鏈接外部的activeMQ--> <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory> <!-- Spring JmsTemplate config --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <!-- lets wrap in a pool to avoid creating a connection per send --> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="jmsConnectionFactory" /> </bean> </property> <!-- custom MessageConverter --> <property name="messageConverter" ref="defaultMessageConverter" /> </bean> <!-- converter --> <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" /> <!-- ActiveMQ destinations --> <!-- 使用Queue方式--> <amq:queue name="QUEUE" physicalName="TESTQ" /> <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer"> <property name="template" ref="jmsTemplate" /> <property name="destination" ref="QUEUE" /> </bean>
<!-- consumer for queue --> <bean id="queueConsumer" class="tools.amq.QueueConsumer" /> <!-- Message Listener for queue --> <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="queueConsumer" /> <!-- may be other method --> <property name="defaultListenerMethod" value="receive" /> <!-- custom MessageConverter define --> <property name="messageConverter" ref="defaultMessageConverter" /> </bean> <!-- listener container,MDP無需實現接口 --> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="QUEUE" /> <property name="messageListener" ref="queueListener" /> </bean>
<!-- 測試 向MQ發消息 --> <amq:queue name="CreateMessage" physicalName="A.CreateMessage" /> <!-- 生產數據 --> <bean id="createMessageProducer" class="com.pack.app.amq.producer.CreateMessageProducer"> <property name="template" ref="jmsTemplate" /> <property name="destination" ref="CreateMessage" /> </bean> </beans>
DefaultMessageConverter.java測試
public class DefaultMessageConverter implements MessageConverter { /** * Logger for this class */ private static final Log log = LogFactory.getLog(DefaultMessageConverter.class); public Message toMessage(Object obj, Session session) throws JMSException { if (log.isDebugEnabled()) { log.debug("toMessage(Object, Session) - start"); } // check Type ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage(); HashMap<String, byte[]> map = new HashMap<String, byte[]>(); try { // POJO must implements Seralizable ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); map.put("POJO", bos.toByteArray()); objMsg.setObjectProperty("Map", map); } catch (IOException e) { e.printStackTrace(); log.error("toMessage(Object, Session)", e); } return objMsg; } public Object fromMessage(Message msg) throws JMSException { if (log.isDebugEnabled()) { log.debug("fromMessage(Message) - start"); } if (msg instanceof ObjectMessage) { HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map"); try { // POJO must implements Seralizable ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO")); ObjectInputStream ois = new ObjectInputStream(bis); Object returnObject = ois.readObject(); return returnObject; } catch (IOException e) { e.printStackTrace(); log.error("fromMessage(Message)", e); } catch (ClassNotFoundException e) { e.printStackTrace(); log.error("fromMessage(Message)", e); } return null; } else { throw new JMSException("Msg:[" + msg + "] is not Map"); } } }
QueueMessageProducer.java
import org.springframework.jms.core.JmsTemplate; import javax.jms.Queue; /** * Date: 2015-7-1 * Time: 17:14:23 */ public class QueueMessageProducer { private JmsTemplate template; private Queue destination; public void setTemplate(JmsTemplate template) { this.template = template; } public void setDestination(Queue destination) { this.destination = destination; } public void send(FooMessage message) { template.convertAndSend(this.destination, message); } }
CreateMessageProducer.java(消息生產者)
import javax.jms.Queue; import org.springframework.jms.core.JmsTemplate; public class CreateMessageProducer { private JmsTemplate template; private Queue destination; public void setTemplate(JmsTemplate template) { this.template = template; } public void setDestination(Queue destination) { this.destination = destination; } public void send(String str) { template.convertAndSend(this.destination, str); System.out.println("system A send message to system B~~~~~~~~~~"); } }
3.在系統B加上amq相關內容。
applicationContext-amq.xml文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd"> <!-- 使用spring的listenerContainer,消息用持久化保存,服務器重啓不會丟失 --> <!-- 鏈接外部的activeMQ--> <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory> <!-- ActiveMQ destinations --> <!-- 使用Queue方式--> <amq:queue name="QUEUE" physicalName="TESTQ" /> <!-- Spring JmsTemplate config --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <!-- lets wrap in a pool to avoid creating a connection per send --> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="jmsConnectionFactory" /> </bean> </property> <!-- custom MessageConverter --> <property name="messageConverter" ref="defaultMessageConverter" /> </bean> <!-- converter --> <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" /> <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer"> <property name="template" ref="jmsTemplate" /> <property name="destination" ref="QUEUE" /> </bean> <!-- consumer for queue --> <bean id="queueConsumer" class="tools.amq.QueueConsumer" /> <!-- Message Listener for queue --> <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="queueConsumer" /> <!-- may be other method --> <property name="defaultListenerMethod" value="receive" /> <!-- custom MessageConverter define --> <property name="messageConverter" ref="defaultMessageConverter" /> </bean> <!-- listener container,MDP無需實現接口 --> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="QUEUE" /> <property name="messageListener" ref="queueListener" /> </bean> <!-- 測試 接收消息 --> <amq:queue name="CreateMessage" physicalName="A.CreateMessage" /> <!-- 接收數據 --> <bean id="createMessageConsumer" class="com.pack.app.amq.consumer.CreateMessageConsumer" /> <!-- 監聽 --> <bean id="createMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="createMessageConsumer" /> <!-- may be other method --> <property name="defaultListenerMethod" value="process" /> <!-- custom MessageConverter define --> <property name="messageConverter" ref="defaultMessageConverter" /> </bean> <!-- listener container,MDP無需實現接口 --> <bean id="createMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="CreateMessage" /> <property name="messageListener" ref="createMessageListener" /> <!-- 消費者個數 --> <!-- <property name="concurrentConsumers" value="4"></property> --> </bean> </beans>
DefaultMessageConverter.java、QueueMessageProducer.java、QueueConsumer.java與系統A同樣。
CreateMessageConsumer.java
public class CreateMessageConsumer { @Autowired public AgentService agentService; public void process(String str) { System.out.println("system B receive message from system A "); agentService.agentPath(str); } }
4.啓動系統A和系統B的應用,只要系統A往隊列A.CreateMessage產生消息,系統B會自動接收到消息。