ActiveMQ學習之java代碼調用ActiveMQ主題

1、基本概念:

        ActiveMQ中共分爲兩種:queue和topicjava

                

            queue:在點對點消息傳遞域中,目的地被稱爲隊列(一對一)spring

            topic:在發佈訂閱消息中,目的地被稱爲主題(一對多)apache

                        特色:一、生產者將消息發佈到topic中,每一個消息能夠有多個消費者,屬於一對多的關係springboot

                                    二、生產者和消費者有時間上的相關性,訂閱某個主題的消費者只能消費自他訂閱之後發佈到消息session

                                    三、生產者生產消息時,topic是不保存消息它是無狀態不落地的,假如無人訂閱就生產消息即生產了一條廢消息,因此通常先啓動消費者,再啓動生產者;maven

2、建立maven工程,並引入依賴,這裏我建立的springboot項目,因此引入的依賴以下:

          依賴:            

<!--activemq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
    </dependency>
 

3、主題測試

        消息生產者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:29
 * @Version: 1.0
 */
public class ActiveMQTopicTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立主題
            Topic topic=session.createTopic(TOPIC_NAME);
            //五、建立消息生產者,主題模式
            MessageProducer messageProducer = session.createProducer(topic);
            //六、經過messageProducer生產三條消息發送到MQ消息主題中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息
                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

        消息消費者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/4 9:43
 * @Version: 1.0
 */
public class ActiveMQTopicConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://192.168.44.135:61616";
    //主題名稱
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的topic的名稱要和發送者的一致
            Topic topic = session.createTopic(TOPIC_NAME);
            //五、建立消費者
            MessageConsumer consumer = session.createConsumer(topic);
            //六、經過監聽的方式消費消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //若是message不等於null而且屬於TextMessage類型(由於消息發送的類型是TextMessage,因此這裏判斷是不是這個類型)
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage=(TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //七、保證控制檯一直在運行
            System.in.read();
            //八、閉資源
            consumer.close();
            session.close();
            connection.close();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索