ActiveMQ學習之消息持久化概念

1、消息三大屬性:

        一、消息頭java

        二、消息體spring

        三、消息屬性apache

2、消息類型

        一、TextMessage   springboot

        二、MapMessage    session

        三、ObjectMessage    tcp

        四、BytesMessage    ide

        五、StreamMessageurl

3、消息持久化

        1、參數說明

                   一、非持久化spa

                          messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);code

                           非持久化,當mq宕機後消息不存在

                    二、持久化(消息默認是持久化)

                          messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

                          持久化,當mq宕機後消息存在

       2、持久化消息

                   一、非持久化隊列

                        生產者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列
            Queue queue=session.createQueue(QUEUE_NAME);
            //五、建立消息生產者,隊列模式
            MessageProducer messageProducer = session.createProducer(queue);
            //六、經過messageProducer生產三條消息發送到MQ消息隊列中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息
                //消息屬性
                textMessage.setStringProperty("c01","vip");
                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
                //九、數據非持久化
                messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

                        消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的queue的名稱要和發送者的一致
            Queue queue = session.createQueue(QUEUE_NAME);
            //五、建立消費者
            MessageConsumer consumer = session.createConsumer(queue);
            //六、經過監聽的方式消費消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                            //獲取消息屬性
                            System.out.println("消息屬性--->"+textMessage.getStringProperty("c01"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //七、保證控制檯一直在運行
            System.in.read();
            //八、閉資源
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

                   二、非持久化主題

                        生產者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:29
 * @Version: 1.0
 */
public class ActiveMQTopicTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立主題
            Topic topic=session.createTopic(TOPIC_NAME);
            //五、建立消息生產者,主題模式
            MessageProducer messageProducer = session.createProducer(topic);
            //六、經過messageProducer生產三條消息發送到MQ消息主題中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息
                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

                        消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:43
 * @Version: 1.0
 */
public class ActiveMQTopicConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的queue的名稱要和發送者的一致
            Topic topic = session.createTopic(TOPIC_NAME);
            //五、建立消費者
            MessageConsumer consumer = session.createConsumer(topic);
            //六、經過監聽的方式消費消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //七、保證控制檯一直在運行
            System.in.read();
            //八、閉資源
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

                   三、持久化隊列

                        生產者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列
            Queue queue=session.createQueue(QUEUE_NAME);
            //五、建立消息生產者,隊列模式
            MessageProducer messageProducer = session.createProducer(queue);
            //六、經過messageProducer生產三條消息發送到MQ消息隊列中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息
                //消息屬性
                textMessage.setStringProperty("c01","vip");
                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
                //九、數據持久化
                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

                        消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的queue的名稱要和發送者的一致
            Queue queue = session.createQueue(QUEUE_NAME);
            //五、建立消費者
            MessageConsumer consumer = session.createConsumer(queue);
            //六、經過監聽的方式消費消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                            //獲取消息屬性
                            System.out.println("消息屬性--->"+textMessage.getStringProperty("c01"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //七、保證控制檯一直在運行
            System.in.read();
            //八、閉資源
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

                   四、持久化主題

                            生產者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:29
 * @Version: 1.0
 */
public class ActiveMQTopicTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();

            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立主題
            Topic topic=session.createTopic(TOPIC_NAME);
            //五、建立消息生產者,主題模式
            MessageProducer messageProducer = session.createProducer(topic);
            //持久化數據
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            //六、經過messageProducer生產三條消息發送到MQ消息主題中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息
                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

                            消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.bdqn.test
 * @Author: huat
 * @Date: 2020/1/4 9:43
 * @Version: 1.0
 */
public class ActiveMQTopicConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            //訂閱名稱
            connection.setClientID("test01");

            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的queue的名稱要和發送者的一致
            Topic topic = session.createTopic(TOPIC_NAME);
            //五、持久化的訂閱者,第一個參數爲訂閱主題名稱,第二個爲備註
            TopicSubscriber topicSubscriber=session.createDurableSubscriber(topic,"remakr");
           //六、啓動鏈接
            connection.start();
            //receive等待消息,不限制時間
            Message message = topicSubscriber.receive();
            while (null!=message){
                TextMessage textMessage=(TextMessage)message;
                System.out.println("******------>"+textMessage.getText());
                //等待五秒鐘
                 message=topicSubscriber.receive(5000L);
            }
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

    主題持久化注意事項

            一、若是消費者註冊關閉後,消息提供者在消費者關閉後在發送消息,消費者再打開,會收到消息提供者發送的消息。

            二、若是在消費者沒有註冊以前,消息提供者發送消息,消費者不會收到以前的消息

      總結:

            一、必定要先運行一次消費者,等同於向MQ註冊,相似我訂閱了這個主題

             二、而後在運行消費者發送消息

              三、不管消費者是否在線,都會收到消息,不在線的話,下次鏈接時會把沒有收到過的消息所有接受。

相關文章
相關標籤/搜索