ActiveMQ學習總結------原生實戰操做(下)03

本篇將繼續延續上一篇的內容,做爲知識補充篇,爲接下來咱們學習spring整合ActiveMQ打好基礎

本篇主要學習內容:html

  1.ActiveMQ 隊列服務監聽java

  2.ActiveMQ Topic模型spring


 

回顧下上一篇ActiveMQ學習總結咱們學習到了:

  1.ActiveMQ術語及API介紹apache

  2.ActiveMQ 文本消息處理session

  3.ActiveMQ 對象消息處理多線程

相信大如今對ActiveMQ的一些簡單操做已經很輕鬆掌握了tcp

上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.htmlide


 

 

一 ActiveMQ實現隊列服務監聽

在咱們上一篇的練習中,全部的消費者都是接收一次消息即斷開鏈接,這樣是否是很不方便。學習

試想一下,若是咱們的provider在consumer接收完第一條消息後又繼續發送了一條消息,那麼consumer已經斷開鏈接了,是否是就不能鏈接不間斷的實時獲取消息?測試

解決方案:

  很容易,用咱們的隊列服務監聽便可

 

*:根據上一章的學習,你們對環境搭建使用配置,確定都已經至關清楚了,這裏就不過多闡述,直接進行代碼實戰

 

1 消息生產者

相比之下,我麼你的生產者照以前是沒有任何變化的,主要的變化仍是在cosumer身上

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerProducer { public static void sendTextActiveMq(String txt) { //定義連接工廠
        ConnectionFactory connectionFactory = null; //定義連接對象
        Connection connection = null; //定義會話
        Session session = null; //目的地
        Destination destination = null; //定義消息的發送者
        MessageProducer producer = null; //定義消息
        Message message = null; try { //建立連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接誒對象
            connection = connectionFactory.createConnection(); //啓動連接
 connection.start(); //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createQueue("queue-listener"); //建立消息生產者
            producer = session.createProducer(destination); //建立消息對象
            message = session.createTextMessage(txt); //發送消息
 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源
            if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

 

2 消息消費者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerConsumer { public static void receiveTextActiveMq() { // 定義連接工廠
        ConnectionFactory connectionFactory = null; // 定義連接對象
        Connection connection = null; // 定義會話
        Session session = null; // 目的地
        Destination destination = null; // 定義消息的發送者
        MessageConsumer consumer = null; // 定義消息
        Message message = null; try { //建立連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接對象
            connection = connectionFactory.createConnection(); //啓動連接
 connection.start(); //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createQueue("queue-listener"); //建立消息消費者
            consumer = session.createConsumer(destination); //隊列服務監聽
            consumer.setMessageListener(new MessageListener() { //ActiveMQ回調方法。經過該方法將消息傳遞到consumer
 @Override public void onMessage(Message message) { //處理消息
                    String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } }

 

3 測試

3.1 provider測試

package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!"); } }

觀察咱們的控制檯能夠發現已經成功發佈到隊列

 

 

 

3.2 consumer測試

package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQQueueListenerConsumer.receiveTextActiveMq(); } }

咱們運行後能夠發現,它接收到了消息,可是它的進程並無關閉,

 

 

咱們用provider繼續發佈一條消息,看看consumer能不能接收到

 

 

能夠看到,consumer持續在後臺監聽咱們發佈的消息,

 

 

 

 

 

 

經過上面代碼,不難發現,provider沒有任何改動,只是consumer修改了一部分

經過調用匿名內部類的方法來實現持續監聽

consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { } }

注意:由於涉及到隊列持續監聽,因此咱們不能在finally處給資源回收,不然還在監聽狀態,資源都回收沒了,也就無從監聽啦。

 


 

 

二 Topic模型

在本系列文章第一篇也有介紹過一些Topic模型的概念,那麼這裏咱們將以原理+實戰的方式來帶領你們掌握

 

1 Publish/Subscribe處理模式(Topic)

消息生產者(發佈)消息到topic中,同時有多個消息消費者(訂閱)消費該消息。

和點對點方式不一樣,發佈到Topic的消息會被全部的訂閱者消費,而點對點的只能是指定的消費者去消費

當生產者發佈消息,無論是否有消費者,都不會保存消息,也就是說它是發完就啥也無論了那種,

因此要注意:必定要有消費者,而後在有生產者,不然生產者不發完消息什麼也無論了,你消費者在生產者以後纔有,那麼你是接收不到消息的。

 

接下來咱們就以實戰的方式鼓搗下。

 

2 建立生產者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicProducer { public static void sendTextActiveMQ(String txt){ //定義連接工廠
        ConnectionFactory connectionFactory = null; //定義連接對象
        Connection connection = null; //定義會話
        Session session = null; //目的地
        Destination destination = null; //定義消息的發送者
        MessageProducer producer = null; //定義消息
        Message message = null; try { //建立連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接誒對象
            connection = connectionFactory.createConnection(); //啓動連接
 connection.start(); //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createTopic("topic-test"); //建立消息生產者
            producer = session.createProducer(destination); //建立消息對象
            message = session.createTextMessage(txt); //發送消息
 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源
            if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

咱們能夠發現,在建立目的地destination的時候代碼有了變更

destination = session.createTopic("topic-test");

變成了createTopic,對這就是topic模式了。

 

3 建立消費者

package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicConsumer implements Runnable { public static void receiveTextActiveMQ(String threadName) { // 定義連接工廠
        ConnectionFactory connectionFactory = null; // 定義連接對象
        Connection connection = null; // 定義會話
        Session session = null; // 目的地
        Destination destination = null; // 定義消息的發送者
        MessageConsumer consumer = null; // 定義消息
        Message message = null; try { //建立連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連接對象
            connection = connectionFactory.createConnection(); //啓動連接
 connection.start(); //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地
            destination = session.createTopic("topic-test"); //建立消息的消費者
            consumer = session.createConsumer(destination); //服務監聽
            consumer.setMessageListener(new MessageListener() { //ActiveMQ回調方法。經過該方法將消息傳遞到consumer
 @Override public void onMessage(Message message) { //處理消息
                    String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(threadName + "--Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void run() { receiveTextActiveMQ(Thread.currentThread().getName()); } }

 

咱們能夠發現,在建立目的地destination的時候代碼有了變更

destination = session.createTopic("topic-test");

還有實現了Runnable這個是爲了一會測試的時候,多線程啓動,看效果,是否多個都會接受到,(若是看着糊塗的話,你也能夠去掉線程的部分,單獨複製多個對象,並啓動,效果也是同樣的)

 

4 測試(要先啓動消費者,不然消費者是接收不到消息的!固然,你本身能夠試一下

4.1 測試消費者

package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer(); Thread t1 = new Thread(a1,"a1"); ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer(); Thread t2 = new Thread(a2,"a2"); ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer(); Thread t3 = new Thread(a3,"a3"); t1.start(); t2.start(); t3.start(); } }

 

能夠看到,咱們的消費者已經啓動了,三個線程。並以監聽服務的方式啓動

 

 

4.2 測試生產者

package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQTopicProducer.sendTextActiveMQ("hello,topic"); } }

 

能夠看到,在topics下面,咱們發佈的內容已經有記錄了

 

 

 

而後咱們在看下,咱們的consumer

 

 

 

能夠發現,三個consumer都已經接收到了

 

ps:

  若是你對ActiveMQ原理性的東西感到困惑,能夠看下咱們前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html

 

原文出處:https://www.cnblogs.com/arebirth/p/activemq03.html

相關文章
相關標籤/搜索