一、什麼是ActiveMQ
html
1 ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。 2 主要特色: 3 1). 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 4 2). 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)。 5 3.) 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性。 6 4.) 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上。 7 5). 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。 8 6). 支持經過JDBC和journal提供高速的消息持久化。 9 7). 從設計上保證了高性能的集羣,客戶端-服務器,點對點。 10 8). 支持Ajax。 11 9). 支持與Axis的整合。 12 10). 能夠很容易得調用內嵌JMS provider,進行測試。
二、JMS介紹:java
1 1)、JMS的全稱是Java Message Service,即Java消息服務。用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。 2 3 2)、它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話咱們能夠在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息後去完成對應的業務邏輯。
三、ActiveMQ的兩種消息形式。linux
1 1)、對於消息的傳遞有兩種類型。 2 a)、一種是點對點的,即一個生產者和一個消費者一一對應。 3 b)、另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。 4 5 2)、JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。 6 a)、 StreamMessage -- Java原始值的數據流。 7 b)、 MapMessage--一套名稱-值對。 8 c)、 TextMessage--一個字符串對象。 9 d)、 ObjectMessage--一個序列化的 Java對象。 10 e)、 BytesMessage--一個字節的數據流。
四、ActiveMQ的安裝。官方網址:http://activemq.apache.org/web
因爲ActiveMQ是java開發的,因此須要先安裝jdk(注意:安裝jdk,須要jdk1.7以上版本)的哦。這裏使用的是apache-activemq-5.12.0-bin.tar.gz版本的。spring
開始進行解壓縮操做。macos
1 [root@localhost package]# ls 2 apache-activemq-5.12.0-bin.tar.gz apache-activemq-5.12.0-bin.zip apache-tomcat-7.0.47.tar.gz IK Analyzer 2012FF_hf1 IK Analyzer 2012FF_hf1.rar jdk-7u55-linux-i586.tar.gz solr-4.10.3.tgz.tgz zookeeper-3.4.6.tar.gz 3 [root@localhost package]# tar -zxvf apache-activemq-5.12.0-bin.tar.gz -C /home/hadoop/soft/
解壓縮完之後進入bin目錄。開始進行啓動操做。apache
啓動:[root@localhost bin]# ./activemq start tomcat
中止:[root@localhost bin]# ./activemq stop服務器
查看狀態:[root@localhost bin]# ./activemq status網絡
1 [root@localhost soft]# cd apache-activemq-5.12.0/ 2 [root@localhost apache-activemq-5.12.0]# ls 3 activemq-all-5.12.0.jar bin conf data docs examples lib LICENSE NOTICE README.txt webapps webapps-demo 4 [root@localhost apache-activemq-5.12.0]# ll 5 total 9384 6 -rwxr-xr-x. 1 root root 9524668 Aug 10 2015 activemq-all-5.12.0.jar 7 drwxr-xr-x. 5 root root 4096 Sep 15 00:39 bin 8 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 conf 9 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 data 10 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 docs 11 drwxr-xr-x. 8 root root 4096 Sep 15 00:39 examples 12 drwxr-xr-x. 6 root root 4096 Sep 15 00:39 lib 13 -rw-r--r--. 1 root root 40580 Aug 10 2015 LICENSE 14 -rw-r--r--. 1 root root 3334 Aug 10 2015 NOTICE 15 -rw-r--r--. 1 root root 2610 Aug 10 2015 README.txt 16 drwxr-xr-x. 7 root root 4096 Sep 15 00:39 webapps 17 drwxr-xr-x. 3 root root 4096 Sep 15 00:39 webapps-demo 18 [root@localhost apache-activemq-5.12.0]# cd bin/ 19 [root@localhost bin]# ls 20 activemq activemq-diag activemq.jar env linux-x86-32 linux-x86-64 macosx wrapper.jar 21 [root@localhost bin]# ./activemq start 22 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env' 23 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java' 24 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details 25 INFO: pidfile created : '/home/hadoop/soft/apache-activemq-5.12.0//data/activemq.pid' (pid '9318') 26 [root@localhost bin]# ./activemq status 27 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env' 28 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java' 29 ActiveMQ is running (pid '9318') 30 [root@localhost bin]#
而後你能夠訪問後臺管理界面,帳號和密碼默認都是admin的。訪問地址:http://192.168.110.142:8161/admin
Home是當前的歡迎頁,Queues是點到點形式,Topics是發佈訂閱模式,Subscribers話題消息的發佈與訂閱,Connections客戶端連接,Network當前網絡的連接狀態,Scheduled計劃任務,Send能夠測試發送消息。
五、ActiveMQ的使用方法,JMS消息發送模式。
注意:
1)、在點對點或隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被歸納爲:只有一個消費者將得到消息。生產者不須要在接收者消費該消息期間處於運行狀態,接收者也一樣不須要在消息發送時處於運行狀態。每個成功處理的消息都由接收者簽收。
2)、發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。這種模式比如是匿名公告板。這種模式被歸納爲:多個消費者能夠得到消息,在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便客戶可以購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。在那種狀況下,在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。
六、JMS應用程序接口。
1 1)、ConnectionFactory 接口(鏈接工廠) 2 用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。 管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。 3 2)、Connection 接口(鏈接) 4 鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。 5 3)、Destination 接口(目標) 6 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。 7 4)、MessageConsumer 接口(消息消費者) 8 由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。 9 5)、MessageProducer 接口(消息生產者) 10 由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。 11 6)、Message 接口(消息) 12 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另外一個應用程序。一個消息有三個主要部分: 13 消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。 14 一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。 15 一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 16 消息接口很是靈活,並提供了許多方式來定製消息的內容。 17 7)、Session 接口(會話) 18 表示一個單線程的上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶可使用回滾操做取消這些消息。一個會話容許用戶建立消息生產者來發送消息,建立消息消費者來接收消息。
七、如何使用java操做activeMQ呢,把ActiveMQ依賴的jar包添加到工程中。
使用maven工程,則添加jar包的依賴:
1 <dependency> 2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-all</artifactId> 4 <version>5.11.2</version> 5 </dependency>
而後你就能夠愉快得開發了。是否是很開森呢。
八、ActiveMQ點對點模式(point-to-point)。
ActiveMq的點對點生產者。
1 package com.taotao.activemq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.JMSException; 6 import javax.jms.MessageProducer; 7 import javax.jms.Queue; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 import org.apache.activemq.command.ActiveMQTextMessage; 13 import org.junit.Test; 14 15 /** 16 * 17 * @ClassName: ActiveMqMain.java 18 * @author: biehl 19 * @since: 2019年9月15日 下午4:44:57 20 * @Copyright: ©2019 biehl 版權全部 21 * @version: 0.0.1 22 * @Description: 23 */ 24 public class ActiveMqMain { 25 26 // activeMq得點對點生產者 27 @Test 28 public void queueProducer() throws JMSException { 29 // 一、建立一個鏈接工廠對象ConnectionFactory對象。須要指定mq服務得ip以及端口號61616。 30 String brokerURL = "tcp://192.168.110.142:61616"; 31 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 32 // 二、使用ConnectionFactory建立一個鏈接Connection對象。 33 Connection connection = connectionFactory.createConnection(); 34 // 三、開啓鏈接。調用Connection對象得start方法。 35 connection.start(); 36 // 四、使用Connection對象建立一個Session對象。 37 // 參數一是否開啓事務,通常不開啓事務,保證數據得最終一致性,可使用消息隊列實現數據最終一致性。若是第一個參數爲true,第二個參數自動忽略 38 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。通常使用自動應答。 39 boolean transacted = false;// 不開啓事務 40 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1 41 Session session = connection.createSession(transacted, acknowledgeMode); 42 // 五、使用Session對象建立一個Destination對象。兩種形式queue、topic。如今應該使用queue。 43 String queueName = "queue1";// 當前消息隊列得名稱 44 Queue queue = session.createQueue(queueName); 45 // 六、使用Session對象建立一個Producer對象。 46 // interface Queue extends Destination。destination是一個接口。 47 MessageProducer producer = session.createProducer(queue); 48 // 七、建立一個TextMessage對象。 49 // 建立TextMessage方式一 50 // TextMessage textMessage = new ActiveMQTextMessage(); 51 // textMessage.setText("hello activeMq......"); 52 // 方式二 53 TextMessage textMessage = session.createTextMessage("hello activeMq......"); 54 // 八、發送消息。 55 producer.send(textMessage); 56 // 九、關閉資源。 57 producer.close();// 關閉producer 58 session.close();// 關閉session 59 connection.close();// 關閉connection 60 } 61 62 }
ActiveMQ的點對點消息生產成功之後,能夠在ActiveMQ提供的web界面能夠看到一些信息。
activeMq的點對點消費者。
1 package com.taotao.activemq; 2 3 import java.io.IOException; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 import javax.jms.MessageConsumer; 10 import javax.jms.MessageListener; 11 import javax.jms.MessageProducer; 12 import javax.jms.Queue; 13 import javax.jms.Session; 14 import javax.jms.TextMessage; 15 16 import org.apache.activemq.ActiveMQConnectionFactory; 17 import org.apache.activemq.command.ActiveMQTextMessage; 18 import org.junit.Test; 19 20 /** 21 * 22 * @ClassName: ActiveMqMain.java 23 * @author: biehl 24 * @since: 2019年9月15日 下午4:44:57 25 * @Copyright: ©2019 biehl 版權全部 26 * @version: 0.0.1 27 * @Description: 28 */ 29 public class ActiveMqMain { 30 31 // activeMq的點對點生產者 32 @Test 33 public void queueProducer() throws JMSException { 34 // 一、建立一個鏈接工廠對象ConnectionFactory對象。須要指定mq服務得ip以及端口號61616。 35 String brokerURL = "tcp://192.168.110.142:61616"; 36 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 37 // 二、使用ConnectionFactory建立一個鏈接Connection對象。 38 Connection connection = connectionFactory.createConnection(); 39 // 三、開啓鏈接。調用Connection對象得start方法。 40 connection.start(); 41 // 四、使用Connection對象建立一個Session對象。 42 // 參數一是否開啓事務,通常不開啓事務,保證數據得最終一致性,可使用消息隊列實現數據最終一致性。若是第一個參數爲true,第二個參數自動忽略 43 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。通常使用自動應答。 44 boolean transacted = false;// 不開啓事務 45 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1 46 Session session = connection.createSession(transacted, acknowledgeMode); 47 // 五、使用Session對象建立一個Destination對象。兩種形式queue、topic。如今應該使用queue。 48 String queueName = "queue1";// 當前消息隊列得名稱 49 Queue queue = session.createQueue(queueName); 50 // 六、使用Session對象建立一個Producer對象。 51 // interface Queue extends Destination。destination是一個接口。 52 MessageProducer producer = session.createProducer(queue); 53 // 七、建立一個TextMessage對象。 54 // 建立TextMessage方式一 55 // TextMessage textMessage = new ActiveMQTextMessage(); 56 // textMessage.setText("hello activeMq......"); 57 // 方式二 58 TextMessage textMessage = session.createTextMessage("hello activeMq......"); 59 // 八、發送消息。 60 producer.send(textMessage); 61 // 九、關閉資源。 62 producer.close();// 關閉producer 63 session.close();// 關閉session 64 connection.close();// 關閉connection 65 } 66 67 // activeMq的點對點消費者 68 @Test 69 public void queueConsumer() throws JMSException { 70 // 一、建立一個鏈接工廠ConnectionFactory 對象 71 String brokerURL = "tcp://192.168.110.142:61616"; 72 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 73 // 二、使用鏈接工廠對象建立一個鏈接 74 Connection connection = connectionFactory.createConnection(); 75 // 三、開啓鏈接 76 connection.start(); 77 // 四、使用鏈接對象建立一個Session對象 78 boolean transacted = false;// 關閉事務 79 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 自動響應 80 Session session = connection.createSession(transacted, acknowledgeMode); 81 // 五、使用Session建立一個Destination,Destination應該和消息的發送端一致的。 82 String queueName = "queue1"; 83 Queue queue = session.createQueue(queueName); 84 // 六、使用Session建立一個Consumer對象。 85 MessageConsumer consumer = session.createConsumer(queue); 86 // 七、向Consumer對象中設置一個MessageListener對象,用來接受消息。 87 // 匿名內部類,new 接口,後面加上{},至關於實現了這個接口的實現類。而後建立這個實現類的對象listener。 88 MessageListener listener = new MessageListener() { 89 90 @Override 91 public void onMessage(Message message) { 92 // 接受事件的。當消息到達就能夠在這裏接受到消息了的。 93 // 八、取出消息的內容。 94 if (message instanceof TextMessage) { 95 TextMessage textMessage = (TextMessage) message; 96 // 九、打印消息內容。 97 try { 98 String text = textMessage.getText(); 99 System.out.println(text); 100 } catch (JMSException e) { 101 e.printStackTrace(); 102 } 103 } 104 } 105 }; 106 consumer.setMessageListener(listener); 107 108 // 關閉資源之前,系統等待,等待接受消息。 109 /*while (true) { 110 try { 111 Thread.sleep(100); 112 } catch (InterruptedException e) { 113 e.printStackTrace(); 114 } 115 }*/ 116 117 // 等待鍵盤輸入。纔回接着向下執行的。 118 try { 119 System.in.read(); 120 } catch (IOException e) { 121 e.printStackTrace(); 122 } 123 124 125 // 十、關閉資源。 126 consumer.close();// 關閉consumer 127 session.close();// 關閉session 128 connection.close();// 關閉connection 129 } 130 131 }
執行了activeMq的點對點消費者。能夠在界面看到變化。能夠看到有一個消費者,而後生產了7條消息,7條消息進隊和7條消息出隊。
九、ActiveMQ發佈訂閱模式(publish/subscribe)。
消費者有兩種消費方法(這裏使用異步消費):
a、同步消費。經過調用消費者的receive方法從目的地中顯式提取消息。receive方法能夠一直阻塞到消息到達。
b、異步消費。客戶能夠爲消費者註冊一個消息監聽器,以定義在消息到達時所採起的動做。
實現MessageListener接口,在MessageListener()方法中實現消息的處理邏輯。
1 package com.taotao.activemq; 2 3 import java.io.IOException; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 import javax.jms.MessageConsumer; 10 import javax.jms.MessageListener; 11 import javax.jms.MessageProducer; 12 import javax.jms.Session; 13 import javax.jms.TextMessage; 14 import javax.jms.Topic; 15 16 import org.apache.activemq.ActiveMQConnectionFactory; 17 import org.junit.Test; 18 19 /** 20 * Active的發佈訂閱模式 21 * 22 * @ClassName: ActiveMqTopics.java 23 * @author: biehl 24 * @since: 2019年9月19日 上午10:51:14 25 * @Copyright: ©2019 biehl 版權全部 26 * @version: 0.0.1 27 * @Description: 28 */ 29 public class ActiveMqTopics { 30 31 // 發佈訂閱模式,生產者。topic生產者生產消息默認不持久化客戶端的。 32 @Test 33 public void topicProducer() { 34 try { 35 // 一、建立一個鏈接工廠對象。須要指定mq服務的ip地址以及端口號61616 36 String brikerURL = "tcp://192.168.110.142:61616"; 37 // 建立ConnectionFactory接口對象,實現類ActiveMQConnectionFactory 38 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brikerURL); 39 40 // 二、建立Connection鏈接 41 Connection connection = connectionFactory.createConnection(); 42 43 // 三、開啓鏈接,調用Connection的start方法。 44 connection.start(); 45 46 // 四、建立Session,使用Connection對象建立一個session 47 // 參數一是否開啓事務,通常不開啓事務,保證數據得最終一致性,可使用消息隊列實現數據最終一致性。若是第一個參數爲true,第二個參數自動忽略 48 boolean transacted = false; 49 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。通常使用自動應答。 50 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 51 Session session = connection.createSession(transacted, acknowledgeMode); 52 53 // 五、建立Destination,應該使用topic,區別於點對點的queue 54 String topicName = "topic01"; 55 Topic topic = session.createTopic(topicName); 56 57 // 六、建立一個Producer對象 58 // interface Topic extends Destination. 59 // Destination是一個接口,Topic接口繼承Destination這個接口。 60 MessageProducer producer = session.createProducer(topic); 61 62 // 七、建立一個TextMessage對象 63 String message = null; 64 TextMessage textMessage = null; 65 for (int i = 0; i < 100; i++) { 66 message = i + " ActiveMQ topics......"; 67 textMessage = session.createTextMessage(message); 68 69 // 八、發送消息 70 producer.send(textMessage); 71 } 72 73 // 九、關閉資源 74 producer.close();// 關閉producer 75 session.close();// 關閉session 76 connection.close();// 關閉connection 77 } catch (JMSException e) { 78 e.printStackTrace(); 79 } 80 } 81 82 // 發佈訂閱模式,消費者必須一直等待生產者生產的消息,由於發佈訂閱模式不持久化。 83 @Test 84 public void topicConsumer() { 85 try { 86 // 一、建立一個鏈接工廠對象 87 String brokerURL = "tcp://192.168.110.142:61616"; 88 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 89 90 // 二、使用鏈接工廠對象建立一個鏈接 91 Connection connection = connectionFactory.createConnection(); 92 93 // 三、開啓鏈接 94 connection.start(); 95 96 // 四、使用鏈接對象建立一個Session對象 97 // 參數一是否開啓事務,通常不開啓事務,保證數據得最終一致性,可使用消息隊列實現數據最終一致性。若是第一個參數爲true,第二個參數自動忽略 98 boolean transacted = false; 99 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。通常使用自動應答。 100 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 101 Session session = connection.createSession(transacted, acknowledgeMode); 102 103 // 五、使用session建立destination,注意,destination應該和消息的發送端一致的。 104 String topicName = "topic01"; 105 Topic topic = session.createTopic(topicName); 106 107 // 六、使用session建立一個consumer對象 108 MessageConsumer consumer = session.createConsumer(topic); 109 110 // 七、向Consumer對象中設置一個MessageListener對象,用來接受消息。 111 // 匿名內部類,new 接口,後面加上{},至關於實現了這個接口的實現類。而後建立這個實現類的對象listener。 112 MessageListener listener = new MessageListener() { 113 // 接受事件的。當消息到達就能夠在這裏接受到消息了的。 114 // 八、取出消息的內容。 115 @Override 116 public void onMessage(Message message) { 117 if (message instanceof TextMessage) { 118 TextMessage textMessage = (TextMessage) message; 119 // 九、打印消息內容。 120 try { 121 String text = textMessage.getText(); 122 System.out.println(text); 123 } catch (JMSException e) { 124 e.printStackTrace(); 125 } 126 } 127 } 128 }; 129 consumer.setMessageListener(listener); 130 131 // 啓動三次,模擬是三個消費者 132 System.out.println("消費者1......."); 133 // System.out.println("消費者2......."); 134 // System.out.println("消費者3......."); 135 136 // 等待鍵盤輸入。纔回接着向下執行的。 137 try { 138 System.in.read(); 139 } catch (IOException e) { 140 e.printStackTrace(); 141 } 142 143 // 九、關閉資源 144 consumer.close();// 關閉producer 145 session.close();// 關閉session 146 connection.close();// 關閉connection 147 } catch (JMSException e) { 148 e.printStackTrace(); 149 } 150 151 } 152 153 }
執行了activeMq的發佈訂閱模式。能夠在界面看到變化。能夠看到有三個消費者,而後生產了201條消息,201條消息進隊和603條消息出隊。
十、ActiveMQ與Spring整合以下所示:
在pom.xml配置文件中引入本身的依賴的jar包。
1 <dependency> 2 <groupId>org.springframework</groupId> 3 <artifactId>spring-jms</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>org.springframework</groupId> 7 <artifactId>spring-context-support</artifactId> 8 </dependency>
在配置文件applicationContext-activemq.xml裏面配置ConnectionFactory。以下所示:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> 34 <property name="targetConnectionFactory" 35 ref="targetConnectionFactory" /> 36 </bean> 37 </beans>
開始配置生產者的spring配置。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 1、真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> 34 <!-- 給屬性targetConnectionFactory傳值 --> 35 <property name="targetConnectionFactory" 36 ref="targetConnectionFactory" /> 37 </bean> 38 39 <!-- 3、開始配置生產者配置 --> 40 <!-- 配置生產者 --> 41 <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> 42 <bean id="jmsTemplate" 43 class="org.springframework.jms.core.JmsTemplate"> 44 <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> 45 <!-- 給屬性connectionFactory傳值 --> 46 <property name="connectionFactory" ref="connectionFactory" /> 47 </bean> 48 49 <!-- 4、配置消息的Destination對象 --> 50 <!-- 點對點模式 --> 51 <!-- 這個是隊列目的地,點對點的。 --> 52 <bean id="queueDestination" 53 class="org.apache.activemq.command.ActiveMQQueue"> 54 <constructor-arg> 55 <!-- 給ActiveMQQueue構造參數傳遞一個值爲queue --> 56 <value>queue</value> 57 </constructor-arg> 58 </bean> 59 60 <!-- 發佈訂閱模式 --> 61 <!-- 這個是主題目的地,一對多的。 --> 62 <bean id="topicDestination" 63 class="org.apache.activemq.command.ActiveMQTopic"> 64 <!-- 給ActiveMQTopic構造參數傳遞一個值爲topic --> 65 <constructor-arg value="topic" /> 66 </bean> 67 68 </beans>
生產者測試代碼以下所示:
能夠根據以前的消費者測試一下,消息的消費。
1 package com.taotao.activemq; 2 3 import javax.jms.Destination; 4 import javax.jms.JMSException; 5 import javax.jms.Message; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.junit.Test; 10 import org.springframework.context.ApplicationContext; 11 import org.springframework.context.support.ClassPathXmlApplicationContext; 12 import org.springframework.jms.core.JmsTemplate; 13 import org.springframework.jms.core.MessageCreator; 14 15 /** 16 * 17 * @ClassName: SpringActiveMQ.java 18 * @author: biehl 19 * @since: 2019年9月19日 下午7:01:43 20 * @Copyright: ©2019 biehl 版權全部 21 * @version: 0.0.1 22 * @Description: 23 */ 24 public class SpringActiveMQ { 25 26 // 使用spring與activemq整合,是喲個jmsTemplate發送消息 27 @Test 28 public void jmsTemplateProducer() { 29 // 一、初始化spring容器 30 ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 31 "classpath:/spring/applicationContext-activemq.xml"); 32 // 二、從容器中得到jmsTemplate對象。根據類型獲取到bean的對象 33 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); 34 // 三、從容器中得到Destination對象。根據名稱獲取到bean的對象 35 Destination destination = (Destination) applicationContext.getBean("queueDestination"); 36 37 // 四、發送消息 38 jmsTemplate.send(destination, new MessageCreator() { 39 40 @Override 41 public Message createMessage(Session session) throws JMSException { 42 // 定義一個消息 43 String message = "hello activeMq......"; 44 // 發送消息 45 TextMessage textMessage = session.createTextMessage(message); 46 return textMessage; 47 } 48 }); 49 } 50 51 }
效果以下所示:
開始配置消費者的spring配置。
1)、注意:那麼消費者是經過Spring爲咱們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每一個消費者對應每一個目的地都須要有對應的MessageListenerContainer。
2)、對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個JMS服務器,這是經過在配置MessageConnectionFactory的時候往裏面注入一個ConnectionFactory來實現的。
3)、因此在配置一個MessageListenerContainer的時候有三個屬性必須指定:
a、一個是表示從哪裏監聽的ConnectionFactory
b、一個是表示監聽什麼的Destination;
c、一個是接收到消息之後進行消息處理的MessageListener。
4)、經常使用的MessageListenerContainer實現類是DefaultMessageListenerContainer。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 1、真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> 34 <!-- 給屬性targetConnectionFactory傳值 --> 35 <property name="targetConnectionFactory" 36 ref="targetConnectionFactory" /> 37 </bean> 38 39 <!-- 3、配置消息的Destination對象。接受消息的目的地。 --> 40 <!-- 點對點模式 --> 41 <!-- 這個是隊列目的地,點對點的。 --> 42 <bean id="queueDestination" 43 class="org.apache.activemq.command.ActiveMQQueue"> 44 <constructor-arg> 45 <!-- 給ActiveMQQueue構造參數傳遞一個值爲queue --> 46 <value>queue</value> 47 </constructor-arg> 48 </bean> 49 50 <!-- 發佈訂閱模式 --> 51 <!-- 這個是主題目的地,一對多的。 --> 52 <bean id="topicDestination" 53 class="org.apache.activemq.command.ActiveMQTopic"> 54 <!-- 給ActiveMQTopic構造參數傳遞一個值爲topic --> 55 <constructor-arg value="topic" /> 56 </bean> 57 58 <!-- 4、配置消息接收者 --> 59 <!-- 配置一個監聽器 --> 60 <bean id="activeMqMessageListener" 61 class="com.taotao.search.listener.ActiveMqMessageListener" /> 62 63 <!-- 配置監聽容器 --> 64 <bean 65 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 66 <!-- 屬性設置 --> 67 <!-- 一個是表示從哪裏監聽的ConnectionFactory --> 68 <property name="connectionFactory" ref="connectionFactory" /> 69 <!-- 一個是表示監聽什麼的Destination --> 70 <property name="destination" ref="queueDestination" /> 71 <!-- 一個是接收到消息之後進行消息處理的MessageListener --> 72 <property name="messageListener" ref="activeMqMessageListener" /> 73 </bean> 74 75 76 </beans>
而後能夠寫消息監聽器,用來監聽生產者生產的消息,以便實現本身的業務邏輯。
1 package com.taotao.search.listener; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import javax.jms.JMSException; 7 import javax.jms.Message; 8 import javax.jms.MessageListener; 9 import javax.jms.TextMessage; 10 11 /** 12 * 接受ActiveMQ發送的消息. 13 * 14 * @ClassName: ActiveMqMessageListener.java 15 * @author: biehl 16 * @since: 2019年9月19日 下午7:55:24 17 * @Copyright: ©2019 biehl 版權全部 18 * @version: 0.0.1 19 * @Description: 20 */ 21 public class ActiveMqMessageListener implements MessageListener { 22 23 @Override 24 public void onMessage(Message message) { 25 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 26 System.out.println("監聽生產者生產的消息,消費者進行消息消費......."); 27 // 消息到了onMessage就接受到了消息 28 if (message instanceof TextMessage) { 29 TextMessage textMessage = (TextMessage) message; 30 try { 31 String text = textMessage.getText(); 32 System.out.println(sdf.format(new Date()) + " : " + text); 33 } catch (JMSException e) { 34 e.printStackTrace(); 35 } 36 } 37 } 38 39 }
因爲這裏只是簡單的測試,若是是正式項目的話,直接加載這個配置文件,而後就能夠進行消息的監聽消費,我這裏只是加載一下這個配置文件便可。
1 package com.taotao.search.service; 2 3 import java.io.IOException; 4 5 import org.springframework.context.ApplicationContext; 6 import org.springframework.context.support.ClassPathXmlApplicationContext; 7 8 /** 9 * 10 * @ClassName: ActiveMqConsumer.java 11 * @author: biehl 12 * @since: 2019年9月19日 下午8:10:55 13 * @Copyright: ©2019 biehl 版權全部 14 * @version: 0.0.1 15 * @Description: 16 */ 17 public class ActiveMqConsumer { 18 19 // 啓動spring容器。就能夠實現監聽生產者發送消息,消費者消費小的目的地。 20 public static void main(String[] args) { 21 // 初始化spring容器 22 ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 23 "classpath:/spring/applicationContext-activemq.xml"); 24 System.out.println("spring容器加載完畢,開始監聽生產者生產的消息......."); 25 try { 26 System.in.read(); 27 } catch (IOException e) { 28 e.printStackTrace(); 29 } 30 } 31 }
實現效果以下所示:
控制檯打印以下所示,只要你生產消息,這裏就能夠進行消息的消費。
待續......
原文出處:https://www.cnblogs.com/biehongli/p/11522793.html