ActiveMQ基礎學習

參考資料

ActiveMQ官網java

ActiveMQ簡介

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。apache

JMS關鍵接口

ConnectionFactory:用於建立鏈接到消息中間件的鏈接工廠。
Connection:表明了應用程序和服務之間的鏈接通路。
Destination:指消息發佈的地點,包括隊列模式和主題模式。
Session:表示一個單線程的上下文,用於發送和接受消息。
MessageConsumer:由會話建立,用於接受發送到目的的消息。
MessageProducer:由會話建立,用於發送消息。
Message:是在消費者和生產者之間傳遞的對象,消息頭,一組消息屬性,和一個消息體。session

ActivMQ依賴

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

隊列消息

發送隊列模型消息tcp

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageProducer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立一個生產者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模擬100個消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("我發送message:" + i);
            //發送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今發的消息是:" + message.getText());
        }
        //關閉鏈接
        connection.close();
    }
}

接收隊列模型消息ide

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageConsumer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String QUEUE_NAME = "MyMessage";
    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立消費者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消費的監聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

注:隊列模型的消息,只會被一個消費者所消費,而不會被共享。線程

主題消息

發送主題模型消息code

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageTopicProducer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的主題名稱
    private static final String TOPIC_NAME = "MyTopicMessage";

    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立一個生產者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模擬100個消息
        for (int i = 1; i <= 100; i++) {
            TextMessage message = session.createTextMessage("當前message是(主題模型):" + i);
            //發送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今發的消息是:" + message.getText());
        }
        //關閉鏈接
        connection.close();
    }
}

接收主題模型消息component

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageTopicConsumer {
    //定義ActivMQ的鏈接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發送消息的隊列名稱
    private static final String TOPIC_NAME = "MyTopicMessage";
    public static void main(String[] args) throws JMSException {
        //建立鏈接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立鏈接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開鏈接
        connection.start();
        //建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立隊列目標
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立消費者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消費的監聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

注:消費者要先訂閱主題,這樣生產者發送的主題模型消息才能被消費者消費。xml

相關文章
相關標籤/搜索