ActiveMQ是Apache所提供的一個開源的消息系統,徹底採用Java來實現,所以,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規範。JMS是一組Java應用程序接口,它提供消息的建立、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,相似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序可以與不一樣廠商的消息組件很好地進行通訊。html
JMS支持兩種消息發送和接收模型。一種稱爲P2P(Ponit to Point)模型,即採用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱爲可能,P2P模型在點對點的狀況下進行消息傳遞時採用。java
另外一種稱爲Pub/Sub(Publish/Subscribe,即發佈-訂閱)模型,發佈-訂閱模型定義瞭如何向一個內容節點發布和訂閱消息,這個內容節點稱爲topic(主題)。主題能夠認爲是消息傳遞的中介,消息發佈這將消息發佈到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發佈者互相保持獨立,不須要進行接觸便可保證消息的傳遞,發佈-訂閱模型在消息的一對多廣播時採用。git
下載最新的安裝包apache-activemq-5.13.2-bin.tar.gzgithub
下載以後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gzweb
ActiveMQ目錄內容有:spring
bin目錄包含ActiveMQ的啓動腳本sql
conf目錄包含ActiveMQ的全部配置文件數據庫
data目錄包含日誌文件和持久性消息數據apache
example: ActiveMQ的示例微信
lib: ActiveMQ運行所須要的lib
webapps: ActiveMQ的web控制檯和一些相關的demo
ActiveMQ的默認服務端口爲61616,這個能夠在conf/activemq.xml配置文件中修改:
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
JMS的規範流程
得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。
利用factory構造JMS connection
啓動connection
經過connection建立JMS session.
指定JMS destination.
建立JMS producer或者建立JMS message並提供destination.
建立JMS consumer或註冊JMS message listener.
發送和接收JMS message.
關閉全部JMS資源,包括connection, session, producer, consumer等。
pom.xml
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.4.RELEASE</version> </dependency>
Queue類型消息
spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服務的地址和端口--> <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" /> </bean> <!-- 定義消息隊列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>testSpringQueue</value> </constructor-arg> </bean> <!-- 定義消息發佈(Pub/Sub) --> <!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> --> <!-- <constructor-arg> --> <!-- <value>topic</value> --> <!-- </constructor-arg> --> <!-- </bean> --> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
推送代碼
package com.mq.spring.queue; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.Resource; import javax.jms.*; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:spring-mq-queue.xml"}) public class QueueSender { @Resource private JmsTemplate jmsTemplate; @Test public void send(){ sendMqMessage(null,"spring activemq queue type message !"); } /** * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination * @param destination * @param message */ public void sendMqMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send message..."); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
消費代碼
package com.mq.spring.queue; import org.junit.Test; import javax.jms.*; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.Resource; import javax.jms.Message; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:spring-mq-queue.xml"}) public class QueueReceiver { @Resource private JmsTemplate jmsTemplate; @Test public void receiveMqMessage(){ Destination destination = jmsTemplate.getDefaultDestination(); receive(destination); } /** * 接受消息 */ public void receive(Destination destination) { TextMessage tm = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("從隊列" + destination.toString() + "收到了消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
說明:上面的生產者和消費者使用同一套配置文件,使用獨立的程序去接收消息,spring jms也提供了消息監聽處理.接下來咱們換成監聽式消費
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" /> </bean> <!-- 定義消息隊列(Queue) --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>testSpringQueue</value> </constructor-arg> </bean> <!-- 配置消息隊列監聽者(Queue) --> <bean id="consumerMessageListener" class="com.mq.spring.queue.ConsumerMessageListener" /> <!-- 消息監聽容器(Queue),配置鏈接工廠,監聽的隊列是testSpringQueue,監聽器是上面定義的監聽器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> </beans>
監聽器代碼
public class ConsumerMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
這樣咱們的消息消費就能夠在監聽器中處理消費了.生產的代碼不變,修改發送者的消息體內容,執行生產程序
Topic類型消息
在使用 Spring JMS的時候,主題( Topic)和隊列消息的主要差別體如今JmsTemplate中 "pubSubDomain"是否設置爲 True。若是爲 True,則是 Topic;若是是false或者默認,則是 queue。
<property name="pubSubDomain" value="true" />
topic類型消費配置文件說明
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置JMS鏈接工廠 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" /> </bean> <!-- 定義消息Destination --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg> <value>testSpringTopic</value> </constructor-arg> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="destination" /> <property name="receiveTimeout" value="10000" /> </bean> <!-- 配置消息消費監聽者 --> <bean id="consumerMessageListener" class="com.mq.spring.topic.ConsumerMessageListener" /> <!-- 消息監聽容器,配置鏈接工廠,監聽器是上面定義的監聽器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destination" /> <!--主題(Topic)和隊列消息的主要差別體如今JmsTemplate中"pubSubDomain"是否設置爲True。若是爲True,則是Topic;若是是false或者默認,則是queue--> <property name="pubSubDomain" value="true" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> </beans>
生產者代碼
package com.mq.spring.topic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:spring-mq-topic.xml"}) public class TopicSender { @Resource private JmsTemplate jmsTemplate; @Test public void send(){ sendMqMessage(null,"spring activemq topic type message[with listener] !"); } /** * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination * @param destination * @param message */ public void sendMqMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send message..."); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
監聽器代碼
public class ConsumerMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
參考資料
更多內容能夠關注微信公衆號,或者訪問AppZone網站