ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。java
JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。git
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${active-mq-version}</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
下面咱們就開始消息的收發,先來探討一下點對點的消息收發:github
一、消息的發送數據庫
原理上咱們經過ConnectionFactory會話工廠 ---> 建立一個會話鏈接 ---> 經過會話鏈接建立一個會話線程 ---> 經過會話線程建立一個消息隊列 ---> 經過會話線程和消息隊列建立一個消息生產者(MessageProducer) ---> 最後由消息生產者進行消息的發送。apache
private static ConnectionFactory factory;//會話鏈接工廠 private static Connection connection;//會話鏈接 private static Session session;//會話接收或發送消息線程 private static Destination destination;//消息隊列 private static MessageProducer messageProducer;//消息發送者
經過ActiveMQConnectionFactory建立會話工廠瀏覽器
factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);//建立會話鏈接工廠
設置會話鏈接額度地址和用戶名密碼在(這裏我使用的是默認的地址和用戶名密碼)服務器
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
下面封裝了一下上面的過程:session
/** * @Description 發送消息 * @param queryName 消息隊列名稱 * @param msg 消息內容 * @return * * @author 高尚 * @version 1.0 * @date 建立時間:2018年1月31日 下午3:54:22 */ public static boolean producerSendQueryMessage(final String queryName, final String msg){ boolean flag = true; try { connection = factory.createConnection();//建立會話鏈接 connection.start();//啓動會話鏈接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立會話線程 destination = session.createQueue(queryName);//建立消息隊列 messageProducer = session.createProducer(destination);//建立會話生成者 Message message = session.createTextMessage(msg);//建立消息對象 messageProducer.send(message);//發送消息 session.commit(); } catch (JMSException e) { e.printStackTrace(); flag = false; }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return flag; }
這裏單獨說一下connection.createSession生成會話線程:異步
Session session = connection.createSession(paramA,paramB); paramA是設置事務,paramB是設置acknowledgment mode paramA 取值有: 一、true:支持事務 爲true時:paramB的值忽略, acknowledgment mode被jms服務器設置爲SESSION_TRANSACTED 。 二、false:不支持事務 爲false時:paramB的值可爲Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一個。 paramB 取值有: 1、Session.AUTO_ACKNOWLEDGE:爲自動確認,客戶端發送和接收消息不須要作額外的工做。 2、Session.CLIENT_ACKNOWLEDGE:爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。 3、DUPS_OK_ACKNOWLEDGE:容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。 四、SESSION_TRANSACTED
到這裏咱們的消息發送模塊就封裝完成,下面咱們看一下消息的接收:分佈式
原理上咱們經過ConnectionFactory會話工廠 ---> 建立一個會話鏈接 ---> 經過會話鏈接建立一個會話線程 ---> 經過會話線程建立一個消息隊列 ---> 經過會話線程和消息隊列建立一個消息消費者(MessageConsumer) ---> 最後由消息消費者進行消息獲取
/** * @Description 接收消息隊列中的消息 * @param queryName 消息隊列名稱 * @return * * @author 高尚 * @version 1.0 * @date 建立時間:2018年1月31日 下午4:24:14 */ public static void consumerGetQueryMessage(final String queryName){ try { connection = factory.createConnection();//建立會話鏈接 connection.start();//啓動會話鏈接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queryName); messageConsumer = session.createConsumer(destination); while(true){ TextMessage message = (TextMessage) messageConsumer.receive(); if(null != message){ System.out.println(queryName+"發送消息:"+message.getText()); } } } catch (Exception e) { // TODO: handle exception System.out.println("消費消息異常"); }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
最後單獨說一下messageConsumer.receive方法:
messageConsumer.receive();//一直等到消息 messageConsumer.receive(1000);//等到消息1秒鐘 messageConsumer.receiveNoWait();//不等待消息
固然這樣的接收消息方式是否是感受很low,固然咱們還能夠經過監聽器來實現消息接收的監聽(MessageListener),咱們實現MessageListener接口,自定義消息接收監聽器:
/** * @Description 消息監聽器 * @author 高尚 * @version 1.0 * @date 建立時間:2018年1月31日 下午4:52:36 */ public class MsgListener implements MessageListener { /** * 監聽的會話隊列 */ private static String queryName; @Override public void onMessage(Message msg) { TextMessage textMsg = (TextMessage) msg; try { if(null != textMsg){ System.out.println("【" + queryName + "】發送的消息:" + textMsg.getText()); } } catch (JMSException e) { e.printStackTrace(); System.out.println("獲取會話消息異常"); } } public MsgListener(String queryName) { super(); // TODO Auto-generated constructor stub this.queryName = queryName; } }
而後咱們須要簡單修改一個消息接收策略:
/** * @Description 經過Listener接收消息隊列中的消息 * @param queryName 消息隊列名稱 * @return * * @author 高尚 * @version 1.0 * @date 建立時間:2018年1月31日 下午4:24:14 */ public static void consumerGetQueryMessageListener(String queryName) { try { connection = factory.createConnection();//建立會話鏈接 connection.start();//啓動會話鏈接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queryName); messageConsumer = session.createConsumer(destination); MsgListener msgListener = new MsgListener(queryName); messageConsumer.setMessageListener(msgListener); while(true){ Thread.sleep(10000); } } catch (Exception e) { // TODO: handle exception System.out.println("消費消息異常"); }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
最後咱們一塊兒看一下消息的發佈和訂閱,首先消息的發佈和訂閱與點點的消息收發基本一致,不一樣點在於會話線程的建立:
destination = session.createTopic(queryName);//建立消息發佈線程
其餘部分一致,沒有什麼難度,這裏再也不闡述,你們能夠自行測試。固然我也爲你們提供了測試參考代碼:https://github.com/hpugs/ActiveMQ
到這裏關於ActiveMQ入門部分就和你們探討完畢,若有錯誤還有指教。謝謝