ActiveMQ入門

首先咱們應該先了解J2EE中的一個重要規範:JMS(The Java Message Service)Java消息服務。而JMS的客戶端之間能夠經過JMS服務進行異步的消息傳輸。它主要有兩種模型:點對點和發佈訂閱模型。java

點對點的模型特色:spring

  • 每一個消息只有一個消費者(Consumer)(即一旦被消費,消息就再也不在消息隊列中)。
  • 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息以後,無論接收者有沒有正在運行,它不會影響到消息被髮送到隊列。
  • 接收者在成功接收消息以後需向隊列應答成功。

發佈訂閱模型特色:apache

  • 每一個消息能夠有多個消費者
  • 發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息,並且爲了消費消息,訂閱者必須保持運行的狀態。
  • 爲了緩和這樣嚴格的時間相關性,JMS容許訂閱者建立一個可持久化的訂閱。這樣,即便訂閱者沒有被激活(運行),它也能接收到發佈者的消息。

JMS還定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。瀏覽器

  • StreamMessage -- Java原始值的數據流
  • MapMessage--一套名稱-值對
  • TextMessage--一個字符串對象
  • ObjectMessage--一個序列化的 Java對象
  • BytesMessage--一個字節的數據流

其中咱們用的最多的就是TextMessage字符串對象服務器

消息中間件做爲JMS的實現,在J2EE的企業應用中扮演着特殊的角色。ActiveMQ是一個易於使用的消息中間件。做爲JMS的實現,消息中間件的使用步驟都大同小異,下面咱們以ActiveMQ爲例來介紹一下其使用。session

2、ActiveMQ 概念

  • Broker,消息代理,表示消息隊列服務器實體,接受客戶端鏈接,提供消息通訊的核心服務。
  • Producer,消息生產者,業務的發起方,負責生產消息並傳輸給 Broker 。
  • Consumer,消息消費者,業務的處理方,負責從 Broker 獲取消息並進行業務邏輯處理。
  • Topic,主題,發佈訂閱模式下的消息統一聚集地,不一樣生產者向 Topic 發送消息,由 Broker 分發到不一樣的訂閱者,實現消息的廣播。
  • Queue,隊列,點對點模式下特定生產者向特定隊列發送消息,消費者訂閱特定隊列接收消息並進行業務邏輯處理。
  • Message,消息體,根據不一樣通訊協議定義的固定格式進行編碼的數據包,來封裝業務 數據,實現消息的傳輸。

2、ActiveMQ的基本使用

ActiveMQ的安裝:
ActiveMQ安裝很簡單,只需從其官網下載,解壓並進入bin目錄app

把文件%activemq home%/conf/activemq.xml和%activemq home%/conf/jetty.xml文件裏全部0.0.0.0替換成127.0.0.1異步

使用啓動命令運行就行了。而後咱們打開瀏覽器進入管理後臺http://localhost:8161/admin/
輸入默認的帳號密碼:admin。就能夠看到管控臺了。tcp


而後咱們使用java操做ActiveMQ,mq的使用基本都須要建立鏈接、session、Destination這麼幾步,咱們直接看代碼吧。
點對點的模型:
生產者代碼:ide

@Test
    public void testQueueProducer() throws Exception {
        //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //二、使用工廠對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        //三、開啓鏈接,調用Connection對象的start方法。
        connection.start();
        //四、建立一個Session對象。
        //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。
        //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用queue
        Queue queue = session.createQueue("test-queue");
        //六、使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        //七、建立一個Message對象,可使用TextMessage。
        /*TextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("hello Activemq");*/
        TextMessage textMessage = session.createTextMessage("hello activemq");
        //八、發送消息
        producer.send(textMessage);
        //九、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

消費者代碼:

@Test
    public void testQueueConsumer() throws Exception {
        //建立一個ConnectionFactory對象鏈接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //建立一個鏈接對象
        Connection connection = connectionFactory.createConnection();
        //開啓鏈接
        connection.start();
        //使用Connection對象建立一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個Destination對象。queue對象
        Queue queue = session.createQueue("spring-queue");
        //使用Session對象建立一個消費者對象。
        MessageConsumer consumer = session.createConsumer(queue);
        //接收消息
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                //打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                
            }
        });
        //等待接收消息
        System.in.read();
        //關閉資源
        consumer.close();
        session.close();
        connection.close();
    }


發佈訂閱的模型:
生產者代碼:

@Test
    public void testTopicProducer() throws Exception {
        //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //二、使用工廠對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        //三、開啓鏈接,調用Connection對象的start方法。
        connection.start();
        //四、建立一個Session對象。
        //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。
        //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic
        Topic topic = session.createTopic("test-topic");
        //六、使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        //七、建立一個Message對象,可使用TextMessage。
        /*TextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("hello Activemq");*/
        TextMessage textMessage = session.createTextMessage("topic message");
        //八、發送消息
        producer.send(textMessage);
        //九、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

消費者代碼:

@Test
    public void testTopicConsumer() throws Exception {
        //建立一個ConnectionFactory對象鏈接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //建立一個鏈接對象
        Connection connection = connectionFactory.createConnection();
        //開啓鏈接
        connection.start();
        //使用Connection對象建立一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個Destination對象。topic對象
        Topic topic = session.createTopic("test-topic");
        //使用Session對象建立一個消費者對象。
        MessageConsumer consumer = session.createConsumer(topic);
        //接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                //打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.out.println("topic消費者3啓動。。。。");
        //等待接收消息
        System.in.read();
        //關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

發佈訂閱模型具備嚴格的時間相關性,若是沒有訂閱者的話,發佈者發佈的內容就被浪費掉了。

3、ActiveMQ與Spring整合

Spring提供了JMSTemplate,極大地便利了MQ的使用,咱們只須要提早在配置文件中配置相關的JMS配置,就可在代碼中直接使用。
Spring配置文件:

<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--這個是隊列(Queue)目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題(Topic)目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="itemAddTopic" />
    </bean>
    
    <!-- 接收消息配置 -->
    <!-- 配置監聽器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>

配置好後可在代碼中使用如下方法
生產者代碼:

@Test
    public void testQueueProducer() throws Exception {
        // 第一步:初始化一個spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        // 第二步:從容器中得到JMSTemplate對象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 第三步:從容器中得到一個Destination對象
        Queue queue = (Queue) applicationContext.getBean("queueDestination");
        // 第四步:使用JMSTemplate對象發送消息,須要知道Destination
        jmsTemplate.send(queue, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage("spring activemq test");
                return textMessage;
            }
        });
    }

消費者代碼:

public class MyMessageListener implements MessageListener {
    //繼承MessageListener接口並從新它的onMessage方法
    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息內容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }

消費者的使用是在Spring容器中注入一個監聽器,因此咱們須要在配置文件中配置它,它隨着Spring的啓動而啓動,而且實時監聽。當有消息向它發送時,他會當即進行邏輯處理。

相關文章
相關標籤/搜索