ActiveMQ是Apache出品的,很是流行的消息中間件,能夠說要掌握消息中間件,須要從ActiveMQ開始。首先去官網下載:ActiveMQ官網 html
1.1,ActiveMQ目錄 java
binweb |
存放的是ActiveMQ的啓動腳本activemq.bat。apache |
conf瀏覽器 |
裏面是配置文件,重點關注的是activemq.xml、jetty.xml、jetty-realm.properties。在登陸ActiveMQ Web控制檯須要用戶名、密碼信息、在JMS CLIENT和ActiveMQ進行何種協議的鏈接、端口等這些信息都在上面的配置文件中能夠體現。session |
data併發 |
目錄下是ActiveMQ進行消息持久化存放的地方,默認採用的是kahadb,固然咱們能夠採用leveldb,或者採用JDBC存儲到MySQL,或者乾脆不使用持久化機制。app |
webappswebapp |
注意ActiveMQ自帶Jetty提供Web管控臺。ide |
lib |
中ActiveMQ爲咱們提供了分功能的JAR包。 |
1.2,ActiveMQ啓動
CMD進入ActiveMQ所在的bin目錄,輸入ActiveMQ start便可啓動ActiveMQ,這裏必須已經安裝JDK,須要注意JDK版本號。
瀏覽器輸入localhost:8161便可訪問ActiveMQ web後臺管理
生產的消息查看方式:
Number Of Pending Messages |
還有多少條消息沒有被消費,其實是表示消息的積壓程度,就是P-C |
Number Of Consumers |
在該隊列上還有多少消費者在等待接受消息 |
Messages Dequeued |
消費了多少條消息,記作C |
Messages Enqueued |
生產了多少條消息,記作P |
ActiveMQ 各個版本所依賴的JDK版本:
能夠經過查看文件 activemq-all-*.jar包裏面的\META-INF\MANIFEST.MF 屬性值 Build-Jdk
1.3,ActiveMQ配置文件
jetty.xml |
Jetty web容器的配置文件,裏面能夠修改ActiveMQ web後臺的端口號。 |
jetty-realm.properties |
這裏保存了ActiveMQ web控制檯的用戶名、密碼。 |
activemq.xml |
內容太多,單獨說。 |
pom.xml中添加ActiveMQ依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.5</version> </dependency> |
ActiveMQ接受/發送消息流程圖:
2.1,ActiveMQLinkUtil幫助類:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ActiveMQLinkUtil { private static final Logger logger = LoggerFactory.getLogger(ActiveMQLinkUtil.class); //ActiveMQ 的默認用戶名 private static String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMQ 的默認登陸密碼 private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的連接地址 private static String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; //連接工廠對象 private static ConnectionFactory connectionFactory = null; //連接對象 private static Connection connection = null; //會話對象 private static Session session = null; /** * * @方法名: initConnection * @描述: 初始化Connection,並返回Session對象,默認開啓事務和自動簽收 * @return Session對象 */ public static Session initConnection(){ try { String parameters = String.format("鏈接參數:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL); logger.info(parameters); //第一步:建立一個連接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //第二步:從工廠中建立一個連接 connection = connectionFactory.createConnection(); //開啓連接(默認是關閉的) connection.start(); //第三步:建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); logger.info("Init Connection success"); } catch (JMSException e) { e.printStackTrace(); } return session; } /** * * @方法名: initConnection * @描述: 初始化Connection * @param transactionState 是否開啓事務 * @param model 簽收模式 * @return Session對象 */ public static Session initConnection(Boolean transactionState, int model){ try { String parameters = String.format("鏈接參數:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL); logger.info(parameters); connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(transactionState, model); logger.info("Init Connection success"); } catch (JMSException e) { e.printStackTrace(); } return session; } /** * * @方法名: updateLinkParameter * @描述: 修改默認ActiveMQ的鏈接屬性 * @param userName * @param pwd * @param url */ public static void updateLinkParameter(String userName, String pwd , String url){ if(!StringUtils.isBlank(userName)){ USERNAME = userName; } if(!StringUtils.isBlank(pwd)){ PASSWORD = pwd; } if(!StringUtils.isBlank(url)){ BROKEN_URL = url; } } /** * * @方法名: closeConnection * @描述: 關閉鏈接 */ public static void closeConnection(){ if(connection != null){ try { connection.close(); logger.info("close Connection success"); } catch (JMSException e) { e.printStackTrace(); } } } }
2.2,建立生產者
package com.zender.activemq; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; /** * * @類名稱:ActiveMQProducter * @類描述:生產者 */ public class ActiveMQProducter { //會話對象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(); if(session != null){ //第四步:建立一個消息目標,在PTP模式下是Queue,pub/sub模式下是topic Queue queue = session.createQueue(name); //第五步:建立一個消息生產者 MessageProducer messageProducer = session.createProducer(queue); //第六步:設置持久化方式,默認設置爲PERSISTENT(持久性) messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 5; i++) { Thread.sleep(1000); //第七步:建立文本消息,併發送 TextMessage textMessage = session.createTextMessage("消息內容" + (i + 1 )); System.out.println("消息內容:" + textMessage.getText()); //使用生產者,發送消息 messageProducer.send(textMessage); //提交事務 session.commit(); } } } catch (Exception e) { e.printStackTrace(); } finally { //第八步:釋放鏈接 ActiveMQLinkUtil.closeConnection(); } } public static void main(String[] args) { ActiveMQProducter.sendMessages("Zender-MQ"); } }
2.3,建立消費者
import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; /** * * @類名稱:ActiveMQComsumer * @類描述:消費者 */ public class ActiveMQComsumer { //會話對象 private static Session session = null; public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(); if(session != null){ //建立一個消息隊列 Queue queue = session.createQueue(name); //建立一個消息消費者 MessageConsumer messageConsumer = session.createConsumer(queue); for (int i = 0; i < 5; i++) { Thread.sleep(1000); //獲取消息 TextMessage msg = (TextMessage) messageConsumer.receive(); //確認接到消息,這條代碼能夠不寫,上面設置了自動消息確認 //msg.acknowledge(); System.out.println("消息內容:" + msg.getText()); } } } catch (Exception e) { e.printStackTrace(); } finally { ActiveMQLinkUtil.closeConnection(); } } public static void main(String[] args) { ActiveMQComsumer.getMessages("Zender-MQ"); } }
2.4,運行,查看結果
先運行生產者,再運行消費者。
生產者日誌:
消費者日誌:
在經過Connection建立Session的時候,須要設置2個參數,一個是否支持事務,另外一個是簽收的模式。重點說一下簽收模式,ActiveMQ支持一下三種模式:
AUTO_ACKNOWLEDGE |
表示在消費者receive消息的時候自動的簽收 |
CLIENT_ACKNOWLEDGE |
表示消費者receive消息後必須手動的調用acknowledge()方法進行簽收 |
DUPS_OK_ACKNOWLEDGE |
籤不簽收無所謂了,只要消費者可以容忍重複的消息接受,固然這樣會下降Session的開銷 |
在實際中,咱們應該採用CLIENT_ACKNOWLEDGE這種簽收模式,採用手動的方式較自動的方式可能更好些,由於接收到了消息,並不意味着成功的處理了消息,假設咱們採用手動簽收的方式,只有在消息成功處理的前提下才進行簽收,那麼只要消息處理失敗,那麼消息還有效,仍然會繼續消費,直至成功處理。
JMS Selectors,即消息選擇器,前面介紹過消息的組成部分,其中談到消息對象有消息屬性,用於消息選擇器。咱們來看一段代碼:
生產者:
package com.zender.activemq.selectors; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import com.zender.activemq.ActiveMQLinkUtil; /** * * @類名稱:ActiveMQSelectorsProducter * @類描述:生產者-JSM選擇器-消息選擇器 */ public class ActiveMQSelectorsProducter { //會話對象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE); if(session != null){ //建立一個消息目標 Queue queue = session.createQueue(name); //建立一個消息生產者 MessageProducer messageProducer = session.createProducer(queue); for (int i = 0; i < 10; i++) { //建立消息,併發送 TextMessage textMessage = session.createTextMessage("消息A-"+i); textMessage.setStringProperty("JMSXtestId", "a"); //使用生產者,發送消息 messageProducer.send(textMessage); TextMessage textMessage1 = session.createTextMessage("消息B-"+i); textMessage1.setStringProperty("JMSXtestId", "b"); messageProducer.send(textMessage1); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQSelectorsProducter.sendMessages("SelectorsDemo"); } }
消費者:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import com.zender.activemq.ActiveMQLinkUtil; /** * * @類名稱:ActiveMQSelectorsComsumer * @類描述:消費者-JSM選擇器-消息選擇器 */ public class ActiveMQSelectorsComsumer { //會話對象 private static Session session = null; @SuppressWarnings("unchecked") public static void getMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE); if(session != null){ //建立一個消息隊列 Queue queue = session.createQueue(name); //建立一個消息消費者 MessageConsumer messageConsumer = session.createConsumer(queue, "JMSXtestId='a'"); //獲取消息 messageConsumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); //簽收 textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQSelectorsComsumer.getMessages("SelectorsDemo"); } }
結果:
一個生產了20個消息,這邊只消費了消息A的消息,消息B的並無被消費。