1、消息中間件基礎知識
2、ActiveMQ介紹
3、ActiveMQ下載安裝(Windows版本)
4、Java操做ActiveMQ代碼實戰
5、Spring整合ActiveMQ代碼實戰
6、項目源碼與參考資料下載
7、參考文章php
https://www.cnblogs.com/WUXIAOCHANG/p/10904987.htmlhtml
1.1 點對點(point-to-point,簡稱PTP)Queue消息傳遞模型
經過該消息傳遞模型,一個應用程序(即消息生產者)能夠向另一個應用程序(即消息消費者)發送消息。在此傳遞模型中,消息目的地類型是隊列(即Destination接口實現類實例由Session接口實現類實例經過調用其createQueue方法並傳入隊列名稱而建立)。消息首先被傳送至消息服務器端特定的隊列中,而後今後對列中將消息傳送至對此隊列進行監聽的某個消費者。同一個隊列能夠關聯多個消息生產者和消息消費者,但一條消息僅能傳遞給一個消息消費者。若是多個消息消費者正在監聽隊列上的消息,,JMS消息服務器將根據「先來者優先」的原則肯定由哪一個消息消費者接收下一條消息。若是沒有消息消費者在監聽隊列,消息將保留在隊列中,直至消息消費者鏈接到隊列爲止。這種消息傳遞模型是傳統意義上的懶模型或輪詢模型。在此模型中,消息不是自動推進給消息消費者的,而是要由消息消費者從隊列中請求得到。java
1.2 發佈/訂閱(publish/subscribe,簡稱pub/sub)Topic消息傳遞模型
經過該消息傳遞模型,應用程序可以將一條消息發送給多個消息消費者。在此傳送模型中,消息目的地類型是主題(即Destination接口實現類實例由Session接口實現類實例經過調用其createTopic方法並傳入主題名稱而建立)。消息首先由消息生產者發佈至消息服務器中特定的主題中,而後由消息服務器將消息傳送至全部已訂閱此主題的消費者。主題目標也支持長期訂閱。長期訂閱表示消費者已註冊了主題目標,但在消息到達目標時該消費者能夠處於非活動狀態。當消費者再次處於活動狀態時,將會接收該消息。若是消費者均沒有註冊某個主題目標,該主題只保留註冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不一樣,pub/sub消息傳遞模型容許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至全部主題訂閱者都接收到消息爲止。pub/sub消息傳遞模型基本上是一個推模型。在該模型中,消息會自動廣播,消息消費者無須經過主動請求或輪詢主題的方法來得到新的消息。spring
1.3 兩種模型方式比較apache
JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。
(1)StreamMessage -- Java原始值的數據流
(2)MapMessage--一套名稱-值對
(3)TextMessage--一個字符串對象
(4)ObjectMessage--一個序列化的 Java對象
(5)BytesMessage--一個字節的數據流windows
一、打開瀏覽器,訪問網址activemq.apache.org,以下圖所示:瀏覽器
二、下載最新的版本,當前最新版本爲5.15.5,根據ActiveMQ須要安裝的操做系統選擇性下載對應的版本,這裏我選擇Windows版本,而後點擊下載ZIP包,以下圖所示:安全
三、下載完成之後,將zip文件解壓到D盤下,解壓後的目錄結構以下圖所示:服務器
四、在啓動ActiveMQ前,首先要確保服務器上已經安裝和配置好JDK,而且JDK的版本要知足ActiveMQ的要求,以下圖所示:session
五、接下來咱們進入到D:\apache-activemq-5.15.5\bin,以下圖所示:
六、根據服務器上操做系統的版本,選擇進入到win32仍是win64,這裏選擇進入win64目錄,而後雙擊activemq.bat,這時activemq就啓動起來了,成功啓動之後打印的日誌以下圖所示:
七、打開瀏覽器,輸入http://localhost:8161/admin/ ,彈出一個windows安全提示框,提示輸入activemq的用戶名和密碼,以下圖所示:
八、接下來咱們打開D:\apache-activemq-5.15.5\conf這個目錄,找到jetty-realm.properties文件(該文件保存着用戶名和密碼信息),以下圖所示:
九、打開該文件,找到文件的末尾,格式是 用戶名: 密碼,用戶角色 ,以下圖所示:
十、角色信息的定義放在D:\apache-activemq-5.15.5\conf下的jetty.xml文件中,以下圖所示:
十一、 咱們知道了角色定義的位置,角色對應的用戶名和密碼後,咱們就可使用默認的用戶名admin和默認的密碼admin來登陸系統,以下圖所示:
十二、 登陸成功之後,就能夠看到activemq的主頁了,以下圖所示:
1.1 idea建立maven項目
建立後項目結構以下:
1.2 pom.xml文件添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-service</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默認的compile,編譯、測試、運行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
1.3 建立測試類
com.wxc.test包下新建TestService.java
其中testQueueProducer方法爲隊列類型,testTopicProducer方法爲發佈/訂閱類型,其中建立步驟以下:
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestService { @Test public void testQueueProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號 //8161是後臺管理系統,61616是給java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 //第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 //第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。 //參數:隊列的名稱。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message對象,建立一個TextMessage對象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); } @Test public void testTopicProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個topic對象。 // 參數:話題的名稱。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 第七步:建立一個Message對象,建立一個TextMessage對象。 /* * TextMessage message = new ActiveMQTextMessage(); message.setText( * "hello activeMq,this is my first test."); */ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); } }
舒適提示:8161端口是後臺管理系統,61616端口是給java用的tcp端口
2.1 idea建立maven項目
建立後項目結構以下:
2.2 pom.xml文件添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-customer</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默認的compile,編譯、測試、運行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
2.3 新建測試類
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是測試隊列方式,testTopicConsumer方法是測試發佈/訂閱方式,建立步驟以下:
消費者:接收消息。
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 //8161是後臺管理系統,61616是給java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } }
3.1 idea建立maven項目
建立後項目結構以下:
3.2 pom.xml文件添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-customer2</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默認的compile,編譯、測試、運行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
3.3 新建測試類
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是測試隊列方式,testTopicConsumer方法是測試發佈/訂閱方式,建立步驟以下:
消費者:接收消息。
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 //8161是後臺管理系統,61616是給java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } }
(1)建立了兩個客戶端的鏈接,是用於測試過程當中體現隊列方式只能被一個消費者接收,而發佈/訂閱方式能夠被多個消費者同時收到
(2)8161端口是後臺管理系統,61616端口是給java用的tcp端口
5.1 隊列方式
運行兩個消費者端
運行服務者端
數據結果以下:
因此驗證了隊列方式只能有一個消費者端接收穫得,且當客戶端未運行時,服務器已經發送信息了,那麼ActivieMQ會在客戶端啓動時候,傳送數據給它
5.2 發佈/訂閱方式
運行兩個消費者端
運行服務者端
數據結果以下:
因此驗證了發佈/訂閱方式能夠多個消費者端接收穫得,且當客戶端未運行時,服務器已經發送信息了,那麼ActivieMQ會在客戶端啓動時候,傳送數據給它
第一步:引用相關的jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
第二步:配置Activemq整合spring。配置ConnectionFactory
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans>
第三步:配置生產者。
使用JMSTemplate對象。發送消息。
第四步:在spring容器中配置Destination
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> </beans>
2.1 發送消息
第一步:初始化一個spring容器
第二步:從容器中得到JMSTemplate對象。
第三步:從容器中得到一個Destination對象
第四步:使用JMSTemplate對象發送消息,須要知道Destination
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一個spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:從容器中得到JMSTemplate對象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:從容器中得到一個Destination對象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate對象發送消息,須要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); }
2.2 接收消息
Taotao-search-Service中接收消息。
第一步:把Activemq相關的jar包添加到工程中
第二步:建立一個MessageListener的實現類。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
第三步:配置spring和Activemq整合。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
第四步:測試代碼。
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
連接:https://pan.baidu.com/s/10jknviW5p7MJr3FKSjvYjQ
提取碼:waeh