源碼下載:http://git.oschina.net/zhengweishan/JMS_Study_Demohtml
我使用的是ActiveMQ 5.13.3 Release的Windows版,官網最新版是ActiveMQ 5.13.4 Release,你們能夠自行下載,下載地址。java
須要注意的是,開發時候,要將apache-activemq-5.13.3-bin.zip解壓縮后里面的activemq-all-5.13.3.jar包加入到classpath下面,這個包包含了全部jms接口api的實現。git
項目截圖:apache
JMS 公共 ----------點對點域 ----------發佈/訂閱域api
ConnectionFactory ---------- QueueConnectionFactory ---------- TopicConnectionFactory服務器
Connection ---------- QueueConnection ---------- TopicConnectionsession
Destination ---------- Queue ---------- Topiceclipse
Session ---------- QueueSession ---------- TopicSessionspa
MessageProducer ---------- QueueSender ---------- TopicPublisher.net
MessageConsumer ---------- QueueReceiver ---------- TopicSubscriber
(1)、點對點方式(point-to-point)
點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發送消息,receive接收消息.具體點就是Sender Client發送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行
(2)、發佈/訂閱 方式(publish/subscriber Messaging)
發佈/訂閱方式用於多接收客戶端的方式.做爲發佈訂閱的方式,可能存在多個接收客戶端,而且接收端客戶端與發送客戶端存在時間上的依賴。一個接收端只能接收他建立之後發送客戶端發送的信息。做爲subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。
發送消息的基本步驟:
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息生產者MessageSender
(5)、使用消息生產者MessageSender發送消息
消息接收者從JMS接受消息的步驟
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,須要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,須要定義onMessage事件方法。
package com.active.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MQConnectionFactory { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認鏈接用戶名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認鏈接密碼 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認鏈接地址 private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);//鏈接工廠 /** * 經過鏈接工廠獲取鏈接 * [@return](http://my.oschina.net/u/556800) */ public static Connection getConnection(){ Connection connection = null; try { connection = connectionFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } } package com.active.mq.demo; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class JMSConsumer { public static void main(String[] args) { Connection connection = null;//鏈接 Session session = null;//會話 接受或者發送消息的線程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消費者 try { //經過鏈接工廠獲取鏈接 connection = MQConnectionFactory.getConnection(); //啓動鏈接 connection.start(); //建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個鏈接HelloWorld的消息隊列 destination = session.createQueue("HelloWorld"); //建立消息消費者 messageConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } //提交回話 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session !=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } package com.active.mq.demo; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class JMSProducer { //發送的消息數量 private static final int SENDNUM = 10; public static void main(String[] args) { //鏈接 Connection connection = null; //會話 接受或者發送消息的線程 Session session = null; //消息的目的地 Destination destination; //消息生產者 MessageProducer messageProducer; try { //經過鏈接工廠獲取鏈接 connection = MQConnectionFactory.getConnection(); //啓動鏈接 connection.start(); //建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立一個名稱爲HelloWorld的消息隊列 destination = session.createQueue("HelloWorld"); //建立消息生產者 messageProducer = session.createProducer(destination); //發送消息 sendMessage(session, messageProducer); //提交回話 session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session !=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 發送消息 * [@param](http://my.oschina.net/u/2303379) session * [@param](http://my.oschina.net/u/2303379) messageProducer 消息生產者 * [@throws](http://my.oschina.net/throws) Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { //建立一條文本消息 TextMessage message = session.createTextMessage("發送JMS消息第" + (i + 1) + "條"); System.out.println("發送消息:Activemq 發送JMS消息" + (i + 1)); //經過消息生產者發出消息 messageProducer.send(message); } } }
在獲取工廠類中加入以下代碼:
private static QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); /** * 經過鏈接工廠獲取鏈接(Queue方式) * [@return](http://my.oschina.net/u/556800) */ public static QueueConnection getQueueConnection(){ QueueConnection connection = null; try { connection = queueConnectionFactory.createQueueConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } //消息生產者 package com.active.mq.demo; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; public class QueueProducer { private static final int SEND_NUM = 10; public static void main(String[] args) { QueueConnection queueConnection = null; QueueSession queueSession = null; try { // 經過工廠建立一個鏈接 queueConnection = MQConnectionFactory.getQueueConnection(); // 啓動鏈接 queueConnection.start(); // 建立一個session會話 queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = queueSession.createQueue("QueueMsgDemo"); // 建立消息發送者 QueueSender sender = queueSession.createSender(queue); // 設置持久化模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(queueSession, sender); // 提交會話 queueSession.commit(); } catch (Exception e) { e.printStackTrace(); }finally { // 關閉釋放資源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void sendMessage(QueueSession session, QueueSender sender) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "發送queue消息第" + (i + 1) + "條"; //建立一個Map集合信息 MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println("ActiveMQ 發送queue消息:"+(i + 1)); sender.send(map); } } } //消費者 package com.active.mq.demo; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; public class QueueConsumer { public static void main(String[] args) { QueueConnection queueConnection = null; QueueSession queueSession = null; try { // 經過工廠建立一個鏈接 queueConnection = MQConnectionFactory.getQueueConnection(); // 啓動鏈接 queueConnection.start(); // 建立一個session會話 queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = queueSession.createQueue("QueueMsgDemo"); // 建立消息接收者 QueueReceiver receiver = queueSession.createReceiver(queue); receiver.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "接收到消息#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再關閉 Thread.sleep(1000 * 100); // 提交會話 queueSession.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉釋放資源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
在獲取工廠類中加入以下代碼:
private static TopicConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); /** * 經過鏈接工廠獲取鏈接(Topic方式) * @return */ public static TopicConnection getTopicConnection(){ TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); } catch (JMSException e) { e.printStackTrace(); } return topicConnection; } //生產者 package com.active.mq.demo; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; public class TopicProducer { private static final int SEND_NUM = 10; public static void main(String[] args) { TopicConnection connection = null; TopicSession session = null; try { // 經過工廠建立一個鏈接 connection = MQConnectionFactory.getTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Topic topic = session.createTopic("TopicDemo"); // 建立消息發送者 TopicPublisher publisher = session.createPublisher(topic); // 設置持久化模式 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, publisher); // 提交會話 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉釋放資源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "發送Topic消息第" + (i + 1) + "條"; MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println("ActiveMQ 發送Topic消息:"+(i + 1)); publisher.send(map); } } } //消費者 package com.active.mq.demo; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; public class TopicConsumer { public static void main(String[] args) { TopicConnection connection = null; TopicSession session = null; try { // 經過工廠建立一個鏈接 connection = MQConnectionFactory.getTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Topic topic = session.createTopic("TopicDemo"); // 建立消息消費者 TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "Topic接收消息#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再關閉 Thread.sleep(1000 * 100); // 提交會話 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉釋放資源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
以使用JMS方式發送接收消息爲例說明 一、首先,啓動ActiveMQ 二、運行發送者,eclipse控制檯輸出,以下圖:
三、查看ActiveMQ服務器,Queues內容以下:
咱們能夠看到建立了一個名稱爲HelloWorld的消息隊列,隊列中有10條消息未被消費,咱們也能夠經過Browse查看是哪些消息,若是這些隊列中的消息,被刪除,消費者則沒法消費。
四、運行一下消費者,eclipse控制檯打印消息,以下:
五、咱們在查看一下ActiveMQ服務器,Queues內容以下:
咱們能夠看到HelloWorld的消息隊列發生變化,多一個消息者,隊列中的10條消息被消費了,點擊Browse查看,已經爲空了。 點擊Active Consumers,咱們能夠看到這個消費者的詳細信息。
實例到此就結束了,你們能夠本身多看點ActiveMQ服務器的內容,進一步熟悉ActiveMQ。