本篇主要學習內容:html
1.ActiveMQ 隊列服務監聽java
2.ActiveMQ Topic模型spring
1.ActiveMQ術語及API介紹apache
2.ActiveMQ 文本消息處理session
3.ActiveMQ 對象消息處理多線程
相信大如今對ActiveMQ的一些簡單操做已經很輕鬆掌握了tcp
上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.htmlide
在咱們上一篇的練習中,全部的消費者都是接收一次消息即斷開鏈接,這樣是否是很不方便。學習
試想一下,若是咱們的provider在consumer接收完第一條消息後又繼續發送了一條消息,那麼consumer已經斷開鏈接了,是否是就不能鏈接不間斷的實時獲取消息?測試
解決方案:
很容易,用咱們的隊列服務監聽便可
注*:根據上一章的學習,你們對環境搭建使用配置,確定都已經至關清楚了,這裏就不過多闡述,直接進行代碼實戰
相比之下,我麼你的生產者照以前是沒有任何變化的,主要的變化仍是在cosumer身上
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerProducer { public static void sendTextActiveMq(String txt) { //定義連接工廠 ConnectionFactory connectionFactory = null; //定義連接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的發送者 MessageProducer producer = null; //定義消息 Message message = null; try { //建立連接工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接誒對象 connection = connectionFactory.createConnection(); //啓動連接 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createQueue("queue-listener"); //建立消息生產者 producer = session.createProducer(destination); //建立消息對象 message = session.createTextMessage(txt); //發送消息 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerConsumer { public static void receiveTextActiveMq() { // 定義連接工廠 ConnectionFactory connectionFactory = null; // 定義連接對象 Connection connection = null; // 定義會話 Session session = null; // 目的地 Destination destination = null; // 定義消息的發送者 MessageConsumer consumer = null; // 定義消息 Message message = null; try { //建立連接工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接對象 connection = connectionFactory.createConnection(); //啓動連接 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createQueue("queue-listener"); //建立消息消費者 consumer = session.createConsumer(destination); //隊列服務監聽 consumer.setMessageListener(new MessageListener() { //ActiveMQ回調方法。經過該方法將消息傳遞到consumer @Override public void onMessage(Message message) { //處理消息 String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } }
3.1 provider測試
package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!"); } }
觀察咱們的控制檯能夠發現已經成功發佈到隊列
3.2 consumer測試
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQQueueListenerConsumer.receiveTextActiveMq(); } }咱們運行後能夠發現,它接收到了消息,可是它的進程並無關閉,
咱們用provider繼續發佈一條消息,看看consumer能不能接收到
能夠看到,consumer持續在後臺監聽咱們發佈的消息,
經過上面代碼,不難發現,provider沒有任何改動,只是consumer修改了一部分
經過調用匿名內部類的方法來實現持續監聽
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { } }
注意:由於涉及到隊列持續監聽,因此咱們不能在finally處給資源回收,不然還在監聽狀態,資源都回收沒了,也就無從監聽啦。
在本系列文章第一篇也有介紹過一些Topic模型的概念,那麼這裏咱們將以原理+實戰的方式來帶領你們掌握
和點對點方式不一樣,發佈到Topic的消息會被全部的訂閱者消費,而點對點的只能是指定的消費者去消費
當生產者發佈消息,無論是否有消費者,都不會保存消息,也就是說它是發完就啥也無論了那種,
因此要注意:必定要有消費者,而後在有生產者,不然生產者不發完消息什麼也無論了,你消費者在生產者以後纔有,那麼你是接收不到消息的。
接下來咱們就以實戰的方式鼓搗下。
2 建立生產者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicProducer { public static void sendTextActiveMQ(String txt){ //定義連接工廠 ConnectionFactory connectionFactory = null; //定義連接對象 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義消息的發送者 MessageProducer producer = null; //定義消息 Message message = null; try { //建立連接工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接誒對象 connection = connectionFactory.createConnection(); //啓動連接 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createTopic("topic-test"); //建立消息生產者 producer = session.createProducer(destination); //建立消息對象 message = session.createTextMessage(txt); //發送消息 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
咱們能夠發現,在建立目的地destination的時候代碼有了變更
destination = session.createTopic("topic-test");
變成了createTopic,對這就是topic模式了。
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicConsumer implements Runnable { public static void receiveTextActiveMQ(String threadName) { // 定義連接工廠 ConnectionFactory connectionFactory = null; // 定義連接對象 Connection connection = null; // 定義會話 Session session = null; // 目的地 Destination destination = null; // 定義消息的發送者 MessageConsumer consumer = null; // 定義消息 Message message = null; try { //建立連接工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接對象 connection = connectionFactory.createConnection(); //啓動連接 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createTopic("topic-test"); //建立消息的消費者 consumer = session.createConsumer(destination); //服務監聽 consumer.setMessageListener(new MessageListener() { //ActiveMQ回調方法。經過該方法將消息傳遞到consumer @Override public void onMessage(Message message) { //處理消息 String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(threadName + "--Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void run() { receiveTextActiveMQ(Thread.currentThread().getName()); } }
咱們能夠發現,在建立目的地destination的時候代碼有了變更
destination = session.createTopic("topic-test");
還有實現了Runnable這個是爲了一會測試的時候,多線程啓動,看效果,是否多個都會接受到,(若是看着糊塗的話,你也能夠去掉線程的部分,單獨複製多個對象,並啓動,效果也是同樣的)
4.1 測試消費者
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer(); Thread t1 = new Thread(a1,"a1"); ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer(); Thread t2 = new Thread(a2,"a2"); ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer(); Thread t3 = new Thread(a3,"a3"); t1.start(); t2.start(); t3.start(); } }
能夠看到,咱們的消費者已經啓動了,三個線程。並以監聽服務的方式啓動
4.2 測試生產者
package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQTopicProducer.sendTextActiveMQ("hello,topic"); } }
能夠看到,在topics下面,咱們發佈的內容已經有記錄了
而後咱們在看下,咱們的consumer
能夠發現,三個consumer都已經接收到了
ps:
若是你對ActiveMQ原理性的東西感到困惑,能夠看下咱們前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html
原文出處:https://www.cnblogs.com/arebirth/p/activemq03.html