ActiveMQ是一個消息隊列應用服務器(推送服務器)。支持JMS規範。html
全稱:Java Message Service ,即爲Java消息服務,是一套java消息服務的API標準。(標準即接口)前端
實現了JMS標準的系統,稱之爲JMS Provider。java
消息隊列是在消息的傳輸過程當中保存消息的容器,提供一種不一樣進程或者同一進程不一樣線程直接通信的方式。node
Producer:消息生產者,負責產生和發送消息到 Broker;mysql
Broker:消息處理中心。負責消息存儲、確認、重試等,通常其中會包含多個 queue;web
Consumer:消息消費者,負責從 Broker 中獲取消息,並進行相應處理;spring
(1)、ActiveMQsql
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。數據庫
(2)、RabbitMQapache
RabbitMQ是一個在AMQP基礎上完成的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。開發語言爲Erlang。
(3)、RocketMQ
由阿里巴巴定義開發的一套消息隊列應用服務。
(1)實現兩個不一樣應用(程序)之間的消息通信。
(2)實現同一個應用,不一樣模塊之間的消息通信。(確保數據發送的穩定性)
ActiveMQ官網地址: http://activemq.apache.org
ActiveMQ下載地址:http://activemq.apache.org/download-archives.html
--可供下載的歷史版本
--說明:
ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及如下版本使用JDK1.7便可正常使用。
|
--根據操做系統,選擇下載版本。(本教程下載Linux版本)
|
(1)支持多語言、多協議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)對Spring的支持,ActiveMQ能夠很容易整合到Spring的系統裏面去。
(3)支持高可用、高性能的集羣模式。
使用ActiveMQ實現消息隊列模型。
(1)搭建ActiveMQ消息服務器。
(2)建立一個java項目。
(3)建立消息生產者,發送消息。
(4)建立消息消費者,接收消息。
--說明:確保已經安裝了jdk
|
(1)解壓到/usr/local目錄下
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local |
(2)修更名稱爲activemq
[root@node07192 ~]# cd /usr/local/ [root@node07192 local]# mv apache-activemq-5.9.0/ activemq |
--說明:ActiveMQ是免安裝軟件,解壓便可啓動服務。
[root@node07192 local]# cd activemq/bin [root@node07192 bin]# ./activemq start |
--查看ActiveMQ啓動狀態
[root@node07192 bin]# ./activemq status
|
--訪問管理控制檯的服務端口,默認爲:8161
[root@node07192 bin]# cd ../conf [root@node07192 conf]# vim jetty.xml
|
--默認的用戶名、密碼均爲amdin
[root@node07192 conf]# vim users.properties
|
--注意:防火牆是沒有配置該服務的端口的。
所以,要訪問該服務,必須在防火牆中配置。
(1)修改防火牆,開放8161端口
[root@node07192 conf]# vim /etc/sysconfig/iptables |
(2)重啓防火牆
[root@node07192 conf]# service iptables restart |
(3)登陸管理控制檯
--登錄,用戶名、密碼均爲admin
|
--控制檯主界面
|
--搭建ActiveMQ服務器成功!!!
--導包說明:
ActiveMQ的解壓包中,提供了運行ActiveMQ的全部jar。
|
--建立項目
|
--說明:ActiveMQ是實現了JMS規範的。在實現消息服務的時候,必須基於API接口規範。
下述API都是接口類型,定義在javax.jms包中,是JMS標準接口定義。ActiveMQ徹底實現這一套api標準。
連接工廠, 用於建立連接的工廠類型。
連接,用於創建訪問ActiveMQ鏈接的類型, 由連接工廠建立。
會話, 一次持久有效、有狀態的訪問,由連接建立。
目的地, 即本次訪問ActiveMQ消息隊列的地址,由Session會話建立。
(1)interface Queue extends Destination
(2)Queue:隊列模型,只有一個消費者。消息一旦被消費,默認刪除。
(3)Topic:主題訂閱中的消息,會發送給全部的消費者同時處理。
消息,在消息傳遞過程當中數據載體對象,是全部消息【文本消息TextMessage,對象消息ObjectMessage等】具體類型的頂級接口,能夠經過會話建立或經過會話從ActiveMQ服務中獲取。
消息生成者, 在一次有效會話中, 用於發送消息給ActiveMQ服務的工具,由Session會話建立。
消息消費者【消息訂閱者,消息處理者】, 在一次有效會話中, 用於ActiveMQ服務中獲取消息的工具,由Session會話建立。
咱們定義的消息生產者和消費者,都是基於上面API實現的。
package cn.gzsxt.mq.producer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyProducer {
// 定義連接工廠 ConnectionFactory connectionFactory = null; // 定義連接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息生成者 MessageProducer producer = null; // 定義消息 Message message = null;
public void sendToMQ(){
try{
/* * 建立連接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名能夠經過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼能夠經過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構爲 - 協議名://主機地址:端口號 * 此連接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 建立連接對象 connection = connectionFactory.createConnection(); // 啓動連接 connection.start();
/* * 建立會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值爲true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值爲 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值爲: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 建立消息生成者, 建立的消息生成者與某目的地對應, 即方法參數目的地. producer = session.createProducer(destination);
// 建立消息對象, 建立一個文本消息, 此消息對象中保存要傳遞的文本數據. message = session.createTextMessage("hello,activeme");
// 發送消息 producer.send(message); System.out.println("消息發送成功!"); }catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!"); }finally{ try { // 回收消息發送者資源 if(null != producer) producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收連接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
}
|
--添加junit類庫,快捷鍵ctrl+1
package cn.gzsxt.mq.test;
import org.junit.Test;
import cn.gzsxt.mq.producer.MyProducer;
public class MessageTest {
@Test public void sendToMQ(){ MyProducer producer = new MyProducer(); producer.sendToMQ(); } } |
(1)設置防火牆,配置61616端口。注意修改以後重啓防火牆。
(2)測試結果:
--查看控制檯
|
--查看ActiveMQ管理控制界面
|
--消息發送成功!!!
package cn.gzsxt.mq.consumer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * @ClassName:MyConsumer * @Description: 消息消費者代碼 */ public class MyConsumer {
// 定義連接工廠 ConnectionFactory connectionFactory = null; // 定義連接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息消費者 MessageConsumer consumer = null; // 定義消息 Message message = null;
public void recieveFromMQ(){
try{
/* * 建立連接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名能夠經過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼能夠經過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構爲 - 協議名://主機地址:端口號 * 此連接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 建立連接對象 connection = connectionFactory.createConnection(); // 啓動連接 connection.start();
/* * 建立會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值爲true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值爲 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值爲: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地. consumer = session.createConsumer(destination);
// 從ActiveMQ服務中獲取消息 message = consumer.receive();
TextMessage tMsg = (TextMessage) message;
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
}catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{ try { // 回收消息消費者資源 if(null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收連接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } |
@Test public void recieveFromMQ(){ MyConsumer consumer = new MyConsumer(); consumer.recieveFromMQ(); } |
--查看Eclipse控制檯
|
--查看ActiveMQ管理控制界面
|
--消息被消費了,測試成功!!!
問題:在前面的示例中,咱們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,咱們須要屢次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?咱們但願一次將全部的消息所有接收。
答:使用ActiveMQ監聽器來監聽隊列,持續消費消息。
(1)建立一個監聽器對象。
(2)修改消費者代碼,加載監聽器。
--說明:自定義監聽器須要實現MessageListener接口
package cn.gzsxt.mq.listener;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class MyListener implements MessageListener{
@Override public void onMessage(Message message) {
if(null!=message){ TextMessage tMsg = (TextMessage) message;
try { System.out.println("從MQ中獲取的消息是:"+tMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } } |
--說明:監聽器須要持續加載,所以消費程序不能結束。
這裏咱們使用輸入流阻塞消費線程結束。(實際開發中,使用web項目加載)
package cn.gzsxt.mq.consumer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import cn.gzsxt.mq.listener.MyListener;
/** * @ClassName:MyConsumer * @Description: 消息消費者代碼 */ public class MyConsumer {
// 定義連接工廠 ConnectionFactory connectionFactory = null; // 定義連接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息消費者 MessageConsumer consumer = null; // 定義消息 Message message = null;
public Message recieveFromMQ(){
try{
/* * 建立連接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名能夠經過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼能夠經過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構爲 - 協議名://主機地址:端口號 * 此連接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 建立連接對象 connection = connectionFactory.createConnection(); // 啓動連接 connection.start();
/* * 建立會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值爲true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值爲 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值爲: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地. consumer = session.createConsumer(destination);
// // 從ActiveMQ服務中獲取消息 // message = consumer.receive(); // // TextMessage tMsg = (TextMessage) message; // // System.out.println("從MQ中獲取的消息是:"+tMsg.getText()); //加載監聽器 consumer.setMessageListener(new MyListener()); //監聽器須要持續加載,這裏咱們使用輸入流阻塞當前線程結束。 System.in.read();
}catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{ try { // 回收消息消費者資源 if(null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收連接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } return message; } } |
(1)屢次運行生產者,發送多條消息到隊列中。
|
(2)運行消費者。觀察結果
--查看Eclipse控制檯,一次消費了3條消息
|
--查看ActiveMQ管理控制界面,全部消息都被消費了!
|
--測試成功!!!
問題:在入門示例中,只能向一個消費者發送消息。可是有一些場景,需求有多個消費者都能接收到消息,好比:美團APP天天的消息推送。該如何實現呢?
答:ActiveMQ是經過不一樣的服務模式來解決這個問題的。
因此,要搞清楚這個問題,必須知道ActiveMQ有哪些應用模式。
--消息模型
|
消息生產者生產消息發送到queue中,而後消息消費者從queue中取出而且消費消息。
消息被消費之後,queue中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息。
Queue支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費、其它的則不能消費此消息了。
當消費者不存在時,消息會一直保存,直到有消費消費
咱們的入門示例,就是採用的這種PTP服務模式。
--消息模型
|
消息生產者(發佈)將消息發佈到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不一樣,發佈到topic的消息會被全部訂閱者消費。
當生產者發佈消息,不論是否有消費者。都不會保存消息
因此,主題訂閱模式下,必定要先有消息的消費者(訂閱者),後有消息的生產者(發佈者)。
咱們前面已經實現了PTP模式,下面咱們來實現TOPIC模式。
(1)搭建ActiveMQ消息服務器。(已實現)
(2)建立主題訂閱者。
(3)建立主題發佈者。
--說明:主題訂閱模式下,能夠有多個訂閱者。咱們這裏用多線程來模擬。
配置步驟:
(1)建立訂閱者(線程類)。
(2)修改測試類。
(3)查看測試結果。
package cn.gzsxt.mq.subscribe;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MySubscirber implements Runnable{
TopicConnectionFactory factory = null; TopicConnection connection = null; TopicSession session = null; Topic topic = null; TopicSubscriber subscriber = null; Message message =null;
@Override public void run() { try{ // 建立連接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616"); // 經過工廠建立一個鏈接 connection = factory.createTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 topic = session.createTopic("gzsxt.topic"); // 建立消息製做者 subscriber = session.createSubscriber(topic); message = subscriber.receive(); if(null!=message){ TextMessage tMsg = (TextMessage) message; System.out.println(Thread.currentThread().getName()+"訂閱的內容是:"+tMsg.getText()); } } catch (Exception e) { e.printStackTrace(); System.out.println("消息訂閱異常"); } finally { // 關閉釋放資源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } |
--說明:junit單元測試,不支持多線程測試。
因此,這裏咱們在測試類的main方法中測試。
--修改MessageTest類,新增main方法。
public static void main(String[] args) { MySubscirber subscirber = new MySubscirber(); Thread t1 = new Thread(subscirber); Thread t2 = new Thread(subscirber); t1.start(); t2.start(); } |
--查看AcitveMQ管理界面
|
--測試成功!!!
-配置步驟說明:
(1)建立發佈者
(2)修改測試類測試
(3)查看測試結果
package cn.gzsxt.mq.topic;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyPublisher {
TopicConnectionFactory factory = null; TopicConnection connection = null; TopicSession session = null; Topic topic = null; TopicPublisher publisher = null; Message message =null;
public void publishTopic(){ try { // 建立連接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616"); // 經過工廠建立一個鏈接 connection = factory.createTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 topic = session.createTopic("gzsxt.topic"); // 建立主題發佈者 publisher = session.createPublisher(topic); // 建立消息 message = session.createTextMessage("hello,topic"); // 發佈消息 publisher.publish(message); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { // 關閉釋放資源 if (publisher != null) { try { publisher.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } |
@Test public void publishTopic(){ MyPublisher publisher = new MyPublisher(); publisher.publishTopic(); } |
--查看Eclipse控制。發現訂閱者消費到了消息
|
--查看ActiveMQ主界面
|
--消息被消費了。測試成功!
(1)Topic模式可以實現多個訂閱者同時消費消息。
(2)Topic主題模式下,消息不會保存,只有在線的訂閱者纔會接收到消息。
一般能夠用來解決公共消息推送的相關業務。
問題:當隊列中有未被消費的消息時,咱們從新啓動ActiveMQ服務器後,發現消息仍然在隊列中。消息時如何保持的呢?
答:ActiveMQ是支持持久化的,能夠永久保存消息。
消息是保存在內存中的。當內存空間不足,或者ActiveMQ服務關閉的時候,消息會被持久化到磁盤上。
被消費的時候,再加載到內存空間中。
--說明:ActiveMQ持久化方式在/conf/activemq.xml中指定
[root@node07192 conf]# vim activemq.xml |
是ActiveMQ默認的持久化策略。不會保存已經被消費過的消息。
|
--消息存儲位置
|
--說明:5.3版本以前,如今已通過時,不考慮。
ActiveMQ將數據持久化到數據庫中。可使用任意的數據庫。
本教程中使用MySQL數據庫。
(1)建立數據庫。
(2)添加數據庫鏈接jar依賴到ActiveMQ服務器。
(3)修改ActiveMQ配置,建立數據源。
(4)修改ActiveMQ配置,修改持久化方式爲jdbc
數據庫最好不要跟ActiveMQ服務器在同一臺機器。
由於當cpu線程資源不足時,往隊列中寫入消息時,若是數據庫上一次持久化還沒結束,容易形成線程阻塞。
|
--配置數據源時,是支持鏈接池的。咱們這裏使用dbcp2做爲鏈接池。
將jdbc驅動、dbcp2的jar上傳到/lib/目錄下。
|
--在<broker>節點外,建立數據源節點
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.7.149:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="gzsxt"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> |
--數據源節點位置以下以下
|
--在<broker>節點內部,註釋kahadb方式,添加jdbc方式
添加以下配置:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true」/> </persistenceAdapter> |
--注意:註釋<kahaDB>節點
|
(1)從新啓動ActiveMQ
[root@node07192 bin]# ./activemq restart |
(2)查看數據庫,發現生成了三張表
|
(3)運行入門示例中的測試類,往隊列中寫入一條消息
--數據庫表activemq_msgs中,新寫入了一條數據
|
--配置成功!!!
數據表名稱 |
做用 |
activemq_msgs |
存儲消息,Queue和Topic都存儲在這個表中 |
activemq_acks |
用於存儲訂閱關係。訂閱模式下有效 |
activemq_lock |
集羣模式下,存儲主從節點關係 |
Jdbc持久化方式,只要Mysql數據庫穩定運行,就能保證隊列中消息的安全。
安全級別高,可是效率低。
所以,在實際開發中,除非是像銀行這類對數據安全極高的業務,咱們通常都是使用默認持久化方式kahadb。
咱們判斷一個程序的優劣,有一個很重要的指標:高內聚、低耦合。
高內聚:同一個模塊中,功能是高度緊密的。
低耦合:各模塊之間,業務儘可能不要交叉。
可是有一些業務功能,必須涉及到兩個不一樣的業務,那咱們就要想辦法,儘可能將它們解耦開來。
以咱們前面學習的solr爲例。咱們知道solr的數據來自數據庫。這就意味着,當數據庫中的商品發生變化時,咱們須要同步更新索引庫。
這個時候咱們就可使用消息隊列模型來解耦添加添加業務和同步索引庫業務。
|
--後面的電商項目中,會重點講解這個應用場景!!!
|
訂單處理,就能夠由前端應用將訂單信息放到隊列,後端應用從隊列裏依次得到消息處理,高峯時的大量訂單能夠積壓在隊列裏慢慢處理掉。因爲同步一般意味着阻塞,而大量線程的阻塞會下降計算機的性能。
日誌處理是指將消息隊列用在日誌處理中,好比Kafka的應用,解決大量日誌傳輸的問題。架構簡化以下:
|
須要:當咱們在網站註冊的時候,有時候須要認證郵箱或者手機號,這個時候保存數據到數據庫以前,須要先等待認證結束。若是說認證程序耗時比較大,會影響影響用戶註冊的業務。
這個時候,咱們可使用消息隊列模型,將同步執行的業務,經過隊列,變成異步處理
|
(1)在保存數據到數據庫的時候,只須要將用戶的郵箱寫入隊列,不須要等待郵箱認證程序執行結束,才把數據保存到數據庫。
(2)認證程序,經過監聽隊列,從中獲取用戶的郵箱地址,發送認證連接。
Spring已經整合了jms規範了(spring-jms.jar),而ActiveMQ是實現了jms規範的。這就意味着Spring整合ActiveMQ是很是方便的。
而且Spring-jms,提供了一個JmsTemplate類,用來簡化消息讀寫的業務代碼。Spring整合ActivMQ以後,就可使用該類,簡化開發!!!
使用Spring整合ActiveMQ,模擬限時搶購下的流量削峯問題。
(1)搭建環境。(建立項目,導入jar包)
(2)spring整合SpringMVC。
(3)spring整合ActiveMQ
--注意:maven建立web項目時,默認建立web.xml文件。
在/WEB-INF/目錄下,手動建立一個web.xml文件。
|
導包說明:
Spring核心包+AOP
common-logging
activemq核心包
spring整合jms包
jsp相關依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.gzsxt.activemq</groupId> <artifactId>activemq-demo-02-spring</artifactId> <version>1.0</version> <packaging>war</packaging>
<dependencies> <!-- ActiveMQ客戶端完整jar包依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency>
<!-- Spring-JMS插件相關jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Spring框架上下文jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- SpringMVC插件jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- jsp相關 --> <dependency> <groupId>jstl</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jsp-api</artifactId> <version>2.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- 配置Tomcat插件 --> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <version>2.2</version> <configuration> <port>9099</port> <path>/</path> </configuration> </plugin> </plugins> </build> </project> |
<?xml version="1.0" encoding="UTF-8"?> <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_2_5.xsd "> <!-- 編碼過濾器 --> <filter> <filter-name>characterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
<!-- 配置springmvc核心控制器 --> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:springmvc.xml</param-value> </init-param>
<load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>*.action</url-pattern> </servlet-mapping>
</web-app> |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
<context:component-scan base-package="cn.gzsxt.controller" /> <mvc:annotation-driven />
</beans>
|
--訂單頁面order.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> </head> <body> <form action="/save.action" method="post"> 用戶編號:<input type="text" name="userid"><br> 訂單金額:<input type="text" name="price"><br> <input type="submit" value="提交"> </form> </body> </html> |
--成功頁面success.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> </head> <body> 訂單提交成功!!!請稍後去結算中心支付。。。 </body> </html> |
--建立訂單Order類
package cn.gzsxt.jms.pojo;
public class Order {
private Integer id;
private Integer userid;
private float price;
public Order() { super(); }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public Integer getUserid() { return userid; }
public void setUserid(Integer userid) { this.userid = userid; }
public float getPrice() { return price; }
public void setPrice(float price) { this.price = price; }
} |
--建立OrderController類
package cn.gzsxt.jms.controller;
import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping;
import cn.gzsxt.jms.pojo.Order;
@Controller public class OrderController {
@RequestMapping("/save.action") public String save(Order order){
System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice());
return "/success.jsp"; } } |
--以tomcat插件啓動項目,訪問訂單業務,提交訂單
|
--整合springmvc成功!!!
整合步驟說明:
(1)搭建ActiveMQ服務器。(已實現)
(2)建立消息生產者
(3)建立消息消費者
(4)spring整合activemq
--說明:在這裏,咱們注入JmsTemplate類,來簡化代碼
package cn.gzsxt.jms.producer;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component;
import cn.gzsxt.jms.pojo.Order;
@Component public class OrderProducer {
@Autowired private JmsTemplate jmsTemplate;
//注意:內部類調用外部類屬性,須要用final修飾 public void sendToMQ(final Order order){ //指定隊列名稱 order-mq jmsTemplate.send("order-mq", new MessageCreator() {
@Override public Message createMessage(Session session) throws JMSException { //ActiveMQ處理對象消息時,對象須要實現序列化 Message message = session.createObjectMessage(order);
return message; } }); } } |
--注意事項
(1)ActiveMQ處理對象時,對象必須實現序列化
--修改Order類,實現序列化接口
|
(2)匿名內部類訪問外部類屬性,該屬性須要用final修飾。
--這裏使用監聽器模式
package cn.gzsxt.jms.listener;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage;
import org.springframework.stereotype.Component;
import cn.gzsxt.jms.pojo.Order;
@Component public class OrderListener implements MessageListener{
@Override public void onMessage(Message message) {
if(null!=message){ ObjectMessage oMsg = (ObjectMessage) message;
try { Order order = (Order) oMsg.getObject(); System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice()); /* * 僞代碼: * * orderDao.save(order); */ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } |
--建立spring-jms.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 1、配置activemq鏈接工程 使用鏈接池好處:連接只須要初始化一次,每次要使用的時候,直接從鏈接池獲取,用完以後還給鏈接池。省去了每次建立、銷燬鏈接的時間。 --> <bean name="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.23.12:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> </property> <property name="maxConnections" value="20"></property> </bean>
<!-- 2、spring整合activemq連接工廠 能夠緩存session。 --> <bean name="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property> <property name="sessionCacheSize" value="5"></property> </bean>
<!-- 3、spring整合消息操做對象JmsTemplate 使用jmsTemplate能夠簡化代碼,不須要本身去建立消息的發送對象。 --> <bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> </bean>
<!-- 4、spring加載監聽器 acknowledge="auto" 表示消息獲取以後,自動出隊列 container-type 表示的容器的類型 default|simple default:支持session緩存。 --> <jms:listener-container acknowledge="auto" container-type="default" destination-type="queue" connection-factory="cachingConnectionFactory"> <!-- 指定監聽器 destination="order-mq" 指定監聽的是哪個隊列 ref="orderListener" 指定監聽器對象 使用註解的時候,對象的名稱是類名首字母小寫 --> <jms:listener destination="order-mq" ref="orderListener"/> </jms:listener-container>
</beans> |
<!-- 配置springmvc核心控制器 --> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param> <param-name>contextConfigLocation</param-name> <!-- <param-value>classpath:springmvc.xml</param-value> --> <param-value>classpath:spring*.xml</param-value> </init-param>
<load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>*.action</url-pattern> </servlet-mapping> |
--注入OrderProducer,修改業務邏輯
@Controller public class OrderController { @Autowired private OrderProducer producer;
@RequestMapping("/save.action") public String save(Order order){
// System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice());
producer.sendToMQ(order);
return "/success.jsp"; } } |
--查看Eclipse控制檯
|
--查看ActiveMQ控制檯
|
--整合成功!!!