ActiveMQ使用示例之Topic

 非持久的Topic消息示例
服務器

對於非持久的Topic消息的發送
基本跟前面發送隊列信息是同樣的,只是把建立Destination的地方,由建立隊列替換成建立Topic,例如:session

Destination destination = session.createTopic("MyTopic");

對於非持久的Topic消息的接收
1:必需要接收方在線,而後客戶端再發送信息,接收方纔能接收到消息
2:一樣把建立Destination的地方,由建立隊列替換成建立Topic,例如:tcp

Destination destination = session.createTopic("MyTopic");

3:因爲不知道客戶端發送多少信息,所以改爲while循環的方式了,例如:spa

Message message = consumer.receive(); while(message!=null) {   TextMessage txtMsg = (TextMessage)message;   System.out.println("收到消 息:" + txtMsg.getText());   message = consumer.receive(1000L); }

消息的生產者:線程

public class NoPersistenceSender {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";
    //發送的消息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //鏈接工廠
        ConnectionFactory connectionFactory;
        //鏈接
        Connection connection = null;
        //會話 接受或者發送消息的線程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生產者
        MessageProducer messageProducer;
        //實例化鏈接工廠(鏈接到ActiveMQ服務器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME,
                NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);

        try {
            //經過鏈接工廠獲取鏈接
            connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立一個名稱爲MyTopic的消息隊列(生產者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //建立消息生產者
            messageProducer = session.createProducer(destination);
            //發送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 發送消息
     *
     * @param session
     * @param messageProducer 消息生產者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {
            //建立一條文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 發送消息" + i);
            System.out.println("發送消息:Activemq 發送消息" + i);
            //經過消息生產者發出消息
            messageProducer.send(message);
        }

    }
}

消息的消費者:code

public class NoPersistenceReceiver {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//鏈接工廠
        Connection connection = null;//鏈接

        Session session;//會話 接受或者發送消息的線程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消費者

        //實例化鏈接工廠(鏈接到ActiveMQ服務器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME,
                NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);

        try {
            //經過鏈接工廠獲取鏈接
            connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生產者將消息發送到MyTopic,因此消費者要到MyTopic去取
            destination = session.createTopic("MyTopic");
            //建立消息消費者
            messageConsumer = session.createConsumer(destination);

            //Message message = messageConsumer.receive();
            //while (message != null) {
                //TextMessage txtMsg = (TextMessage) message;
                //System.out.println("收到消息:" + txtMsg.getText());
                //message = messageConsumer.receive(1000L);
                //session.commit();
            //}

            Message message = messageConsumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //沒這句有錯
                message = messageConsumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

首先運行運行生產者(消費者處於費運行狀態),而後運行消費者:中間件

 此時再次運行一下生產者(消費者處於開啓狀態)blog

 結論:必需要接收方在線,而後客戶端再發送信息,接收方纔能接收到消息
隊列

 持久的Topic消息示例
get

 對於持久的Topic消息的發送

1:要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在鏈接以前設定
2:必定要設置完成後,再start 這個 connection
 

對於持久的Topic消息的接收

1:須要在鏈接上設置消費者id,用來識別消費者
2:須要建立TopicSubscriber來訂閱
3:要設置好了事後再start 這個 connection
4:必定要先運行一次,等於向消息服務中間件註冊這個消費者,而後再運行客戶端發送信息,這個時候,
不管消費者是否在線,都會接收到,不在線的話,下次鏈接的時候,會把沒有收過的消息都接收下來

生產者:

public class PersistenceSender {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";
    //發送的消息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //鏈接工廠
        ConnectionFactory connectionFactory;
        //鏈接
        Connection connection = null;
        //會話 接受或者發送消息的線程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生產者
        MessageProducer messageProducer;
        //實例化鏈接工廠(鏈接到ActiveMQ服務器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME,
                PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);

        try {
            //經過鏈接工廠獲取鏈接
            connection = connectionFactory.createConnection();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立一個名稱爲MyTopic的消息隊列(生產者生成的消息放在哪)
            destination = session.createTopic("MyTopic1");
            //建立消息生產者
            messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //啓動鏈接
            connection.start();
            //發送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發送消息
     *
     * @param session
     * @param messageProducer 消息生產者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < PersistenceSender.SENDNUM; i++) {
            //建立一條文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 發送消息" + i);
            System.out.println("發送消息:Activemq 發送消息" + i);
            //經過消息生產者發出消息
            messageProducer.send(message);
        }
    }
}

消費者:

public class PersistenceReceiver {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//鏈接工廠
        Connection connection = null;//鏈接

        Session session;//會話 接受或者發送消息的線程
        Topic topic;//消息的目的地

        //實例化鏈接工廠(鏈接到ActiveMQ服務器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME,
                PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);

        try {
            //經過鏈接工廠獲取鏈接
            connection = connectionFactory.createConnection();
            connection.setClientID("winner_0715");
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生產者將消息發送到MyTopic,因此消費者要到MyTopic去取
            topic = session.createTopic("MyTopic1");
            //建立消息消費者
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");

            //啓動鏈接
            connection.start();

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //沒這句有錯
                message = consumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}

消費者須要先運行一次,註冊~

 

由於是持久消息,因此還會有別的訂閱者,因此是0

關於持久化和非持久化消息

持久化消息
這是 ActiveMQ 的默認傳送模式,此模式保證這些消息只被傳送一次和成 功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另外一個重要方面是確保持久性消
息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗,它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。

非持久化消息
保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。 此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。 有兩種方法指定傳送模式:
1.使用setDeliveryMode 方法,這樣全部的消息都採用此傳送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法爲每一條消息設置傳送模式

相關文章
相關標籤/搜索