消息隊列之 ActiveMQ

簡介

ActiveMQ 特色

ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在爲應用程序提供高效、可擴展、穩定、安全的企業級消息通訊。 它的設計目標是提供標準的、面向消息的、多語言的應用集成消息通訊中間件。ActiveMQ 實現了 JMS 1.1 並提供了不少附加的特性,好比 JMX 管理、主從管理、消息組通訊、消息優先級、延遲接收消息、虛擬接收者、消息持久化、消息隊列監控等等。其主要特性有:html

  1. 支持包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種語言的客戶端和協議。協議包含 OpenWire、Stomp、AMQP、MQTT 。
  2. 提供了像消息組通訊、消息優先級、延遲接收消息、虛擬接收者、消息持久化之類的高級特性
  3. 徹底支持 JMS 1.1 和 J2EE 1.4規範(包括持久化、分佈式事務消息、事務)
  4. 對 Spring 框架的支持,ActiveMQ 能夠經過 Spring 的配置文件方式很容易嵌入到 Spring 應用中
  5. 經過了常見的 J2EE 服務器測試,好比 TomEE、Geronimo、JBoss、GlassFish、WebLogic
  6. 鏈接方式的多樣化,ActiveMQ 提供了多種鏈接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA
  7. 支持經過使用 JDBC 和 journal 實現消息的快速持久化
  8. 爲高性能集羣、客戶端-服務器、點對點通訊等場景而設計
  9. 提供了技術和語言中立的 REST API 接口
  10. 支持 Ajax 方式調用 ActiveMQ
  11. ActiveMQ 能夠輕鬆地與 CXF、Axis 等 Web Service 技術整合,以提供可靠的消息傳遞
  12. 可用做爲內存中的 JMS 提供者,很是適合 JMS 單元測試

基本概念

由於 ActiveMQ 是完整支持 JMS 1.1 的,因此從 Java 使用者的角度其基本概念與 JMS 1.1 規範是一致的。java

消息傳送模型
  1. 點對點模型(Point to Point) 使用隊列(Queue)做爲消息通訊載體,知足生產者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。程序員

  2. 發佈訂閱模型(Pub/Sub) 使用主題做爲消息通訊載體,相似於廣播模式,發佈者發佈一條消息,該消息經過主題傳遞給全部的訂閱者,在一條消息廣播以後才訂閱的用戶則是收不到該條消息的。web

基本組件

ActiveMQ 使用時包含的基本組件各與 JMS 是相同的:spring

  1. Broker,消息代理,表示消息隊列服務器實體,接受客戶端鏈接,提供消息通訊的核心服務。
  2. Producer,消息生產者,業務的發起方,負責生產消息並傳輸給 Broker 。
  3. Consumer,消息消費者,業務的處理方,負責從 Broker 獲取消息並進行業務邏輯處理。
  4. Topic,主題,發佈訂閱模式下的消息統一聚集地,不一樣生產者向 Topic 發送消息,由 Broker 分發到不一樣的訂閱者,實現消息的廣播。
  5. Queue,隊列,點對點模式下特定生產者向特定隊列發送消息,消費者訂閱特定隊列接收消息並進行業務邏輯處理。
  6. Message,消息體,根據不一樣通訊協議定義的固定格式進行編碼的數據包,來封裝業務 數據,實現消息的傳輸。

因爲這些概念在 JMS 中已介紹過,這裏再也不詳細介紹。數據庫

鏈接器

ActiveMQ Broker 的主要做用是爲客戶端應用提供一種通訊機制,爲此 ActiveMQ 提供了一種鏈接機制,並用鏈接器(connector)來描述這種鏈接機制。ActiveMQ 中鏈接器有兩種,一種是用於客戶端與消息代理服務器(client-to-broker)之間通訊的傳輸鏈接器(transport connector),一種是用於消息代理服務器之間(broker-to-broker)通訊的網絡鏈接器(network connector)。connector 使用 URI(統一資源定位符)來表示,URI 格式爲: <schema name>:<hierarchical part>[?<query>][#<fragment>] schema name 表示協議, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#noseapache

其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。緩存

  1. 傳輸鏈接器 爲了交換消息,消息生產者和消息消費者(統稱爲客戶端)都須要鏈接到消息代理服務器,這種客戶端和消息代理服務器之間的通訊就是經過傳輸鏈接器(Transport connectors)完成的。不少狀況下用戶鏈接消息代理時的需求側重點不一樣,有的更關注性能,有的更注重安全性,所以 ActiveMQ 提供了一系列l鏈接協議供選擇,來覆蓋這些使用場景。從消息代理的角度看,傳輸鏈接器就是用來處理和監聽客戶端鏈接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),傳輸鏈接的相關配置以下:
<transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="ws" uri="ws://localhost:61614/" />
        </transportConnectors>
複製代碼

傳輸鏈接器定義在<transportConnectors>元素中,一個<transportConnector>元素定義一個特定的鏈接器,一個鏈接器必須有本身惟一的名字和 URI 屬性,但discoveryUri屬性是可選的。目前在 ActiveMQ 最新的5.15版本中經常使用的傳輸鏈接器鏈接協議有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等安全

  • vm,容許客戶端和消息服務器直接在 VM 內部通訊,採用的鏈接不是 Socket 鏈接,而是直接的虛擬機本地方法調用,從而避免網絡傳輸的開銷。應用場景僅限於服務器和客戶端在同一 JVM 中。
  • tcp,客戶端經過 TCP 鏈接到遠程的消息服務器。
  • udp,客戶端經過 UDP 鏈接到遠程的消息服務器。
  • multicast,容許使用組播傳輸的方式鏈接到消息服務器。
  • nio,nio 和 tcp 的做用是同樣的,只不過 nio 使用了 java 的 NIO包,這可能在某些場景下可提供更好的性能。
  • ssl,ssl 容許用戶在 TCP 的基礎上使用 SSL 。
  • http 和 https,容許客戶端使用 REST 或 Ajax 的方式進行鏈接,這意味着能夠直接使用 Javascript 向 ActiveMQ 發送消息。
  • websocket,容許客戶端經過 HTML5 中的 WebSocket 方式鏈接到消息服務器。
  • amqp,5.8版本開始支持。
  • mqtt、stomp,5.6版本開始支持。

每一個協議的具體配置見官網(http://activemq.apache.org/uri-protocols.html )。除了以上這些基本協議以外 ActiveMQ 還支持一些高級協議也能夠經過 URI 的方式進行配置,好比 Failover 和 Fanout 。bash

  • Failover 是一種從新鏈接的機制,工做於上面介紹的鏈接協議的上層,用於創建可靠的傳輸。其配置語法容許制定任意多個複合的 URI ,它會自動選擇其中的一個 URI 來嘗試創建鏈接,若是該鏈接沒有成功,則會繼續選擇其它的 URI 來嘗試。配置語法例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
  • Fanout 是一種從新鏈接和複製的機制,它也工做於其它鏈接的上層,採用複製的方式把消息複製到多個消息服務器。配置語法例如:fanout:(tcp://localhost:61629,tcp://localhost:61639,tcp://localhost:61649)
  1. 網絡鏈接器 不少狀況下,咱們要處理的數據多是海量的,這種場景單臺服務器很難支撐,這就要用到集羣功能,爲此 ActiveMQ 提供了網絡鏈接的模式,簡單說就是經過把多個消息服務器實例鏈接在一塊兒做爲一個總體對外提供服務,從而提升總體對外的消息服務能力。經過這種方式鏈接在一塊兒的服務器實例之間可共享隊列和消費者列表,從而達到分佈式隊列的目的,網絡鏈接器就是用來配置服務器之間的通訊。

使用網絡鏈接器的簡單場景
)

如圖所示,服務器 S1 和 S2 經過 NewworkConnector 相連,生產者 P1 發送的消息,消費者 C3 和 C4 均可以接收到,而生產者 P3 發送的消息,消費者 C1 和 C2 也能夠接收到。要使用網絡鏈接器的功能須要在服務器 S1 的 activemq.xml 中的 broker 節點下添加以下配置(假設192.168.11.23:61617 爲 S2 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.23:61617)"/>    
</networkConnectors>
複製代碼

若是隻是這樣,S1 能夠將消息發送到 S2,但這只是單方向的通訊,發送到 S2 上的的消息還不能發送到 S1 上。若是想 S1 也收到從 S2 發來的消息須要在 S2 的 activemq.xml 中的 broker 節點下也添加以下配置(假設192.168.11.45:61617爲 S1 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.45:61617)"/>    
</networkConnectors>
複製代碼

這樣,S1和S2就能夠雙向通訊了。目前在 ActiveMQ 最新的5.15版本中經常使用的網絡鏈接器協議有 static 和 multicast 兩種。

  • static,靜態協議,用於爲一個網絡中多個代理建立靜態配置,這種配置協議支持複合的 URI (即包含其餘 URI 的 URI)。例如static://(tcp://ip:61616,tcp://ip2:61616)
  • multicast,多點傳送協議,消息服務器會廣播本身的服務,也會定位其餘代理。這種方式用於服務器之間實現動態識別,而不是配置靜態的 IP 組。

對這塊感興趣的話能夠看官方文檔:http://activemq.apache.org/networks-of-brokers.html

消息存儲

JMS 規範中消息的分發方式有兩種:非持久化和持久化。對於非持久化消息 JMS 實現者須保證盡最大努力分發消息,但消息不會持久化存儲;而持久化方式分發的消息則必須進行持久化存儲。非持久化消息經常使用於發送通知或實時數據,當你比較看重系統性能而且即便丟失一些消息並不影響業務正常運做時可選擇非持久化消息。持久化消息被髮送到消息服務器後若是當前消息的消費者並無運行則該消息繼續存在,只有等到消息被處理並被消息消費者確認以後,消息纔會從消息服務器中刪除。

對以上這兩種方式 ActiveMQ 都支持,而且還支持經過緩存在內存中的中間狀態消息的方式來恢復消息。歸納起來看 ActiveMQ 的消息存儲有三種:存儲到內存、存儲到文件、存儲到數據庫。具體使用上 ActiveMQ 提供了一個插件式的消息存儲機制,相似於消息的多點傳播,主要實現了以下幾種:

  • AMQ,是 ActiveMQ 5.0及之前版本默認的消息存儲方式,它是一個基於文件的、支持事務的消息存儲解決方案。 在此方案下消息自己以日誌的形式實現持久化,存放在 Data Log 裏。而且還對日誌裏的消息作了引用索引,方便快速取回消息。
  • KahaDB,也是一種基於文件並具備支持事務的消息存儲方式,從5.3開始推薦使用 KahaDB 存儲消息,它提供了比 AMQ 消息存儲更好的可擴展性和可恢復性。
  • JDBC,基於 JDBC 方式將消息存儲在數據庫中,將消息存到數據庫相對來講比較慢,因此 ActiveMQ 建議結合 journal 來存儲,它使用了快速的緩存寫入技術,大大提升了性能。
  • 內存存儲,是指將全部要持久化的消息放到內存中,由於這裏沒有動態的緩存,因此須要注意設置消息服務器的 JVM 和內存大小。
  • LevelDB,5.6版本以後推出了 LevelDB 的持久化引擎,它使用了自定義的索引代替經常使用的 BTree 索引,其持久化性能高於 KahaDB,雖然默認的持久化方式仍是 KahaDB,可是 LevelDB 將是趨勢。在5.9版本還提供了基於 LevelDB 和 Zookeeper 的數據複製方式,做爲 Master-Slave 方式的首選數據複製方案。

工程實例

Java 訪問 ActiveMQ 實例

JMS 規範中傳遞消息的方式有兩種,一種是點對點模型的隊列(Queue)方式,另外一種是發佈訂閱模型的主題(Topic)方式。下面看下用 ActiveMQ 以主題方式傳遞消息的 Java 示例。

引入依賴

Java 工程中須要引入 ActiveMQ 包的依賴,jar 包版本同你安裝 ActiveMQ 版本一致便可:

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.2</version>
    </dependency>
複製代碼
消息生產者
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {

    /**
     * 默認用戶名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認密碼
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認鏈接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //建立鏈接
            Connection connection = connectionFactory.createConnection();
            //開啓鏈接
            connection.start();
            //建立會話,不須要事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立 Topic,用做消費者訂閱消息
            Topic myTestTopic = session.createTopic("activemq-topic-test1");
            //消息生產者
            MessageProducer producer = session.createProducer(myTestTopic);

            for (int i = 1; i <= 3; i++) {
                TextMessage message = session.createTextMessage("發送消息 " + i);
                producer.send(myTestTopic, message);
            }

            //關閉資源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

在 Topic 模式中消息生產者是用於發佈消息的,絕大部分代碼與 Queue 模式中類似,不一樣的是本例中基於 Session 建立的是主題(Topic),該主題做爲消費者消費消息的目的地。

消息消費者
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {

    /**
     * 默認用戶名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認密碼
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認鏈接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //建立鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //建立鏈接
            Connection connection = connectionFactory.createConnection();
            //開啓鏈接
            connection.start();
            //建立會話,不須要事務
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消費者1 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
            messageConsumer2.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消費者2 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
            messageConsumer3.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消費者3 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            //讓主線程休眠100秒,使消息消費者對象能繼續存活一段時間從而能監聽到消息
            Thread.sleep(100 * 1000);
            //關閉資源
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼

爲了展現主題模式中消息廣播給多個訂閱者的功能,這裏建立了三個消費者對象並訂閱了同一個主題,比較特殊的是最後讓主線程休眠了一段時間,這麼作的目的是讓消費者對象能繼續存活,從而使控制檯能打印出監聽到的消息內容。

啓動 ActiveMQ 服務器

在 ActiveMQ 的 bin 目錄下直接執行activemq start即啓動了 ActiveMQ

運行 TopicSubscriber

須要先運行 TopicSubscriber 類的 main 方法,這樣發佈者發佈消息的時候訂閱者才能接收到消息,若是將執行順序倒過來則消息先發布出去但沒有任何訂閱者在運行,則看不到消息被消費了。

運行 TopicPublisher

接着運行 TopicPublisher 類的 main 方法,向主題中發佈3條消息,而後能夠在 TopicSubscriber 後臺看到接收到的消息內容:

消費者接收到消息

Spring 整合 ActiveMQ

在實際項目中若是使用原生的 ActiveMQ API 開發顯然比較囉嗦,這中間建立鏈接工廠、建立鏈接之類代碼徹底能夠抽取出來由框架統一作,這些事情 Spring 也想到了並幫咱們作了。ActiveMQ 徹底支持基於 Spring 的方式 配置 JMS 客戶端和服務器,下面的例子展現一下在 Spring 中如何使用隊列模式和主題模式傳遞消息。

引入依賴
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>
複製代碼

工程中除了 activemq 的包以外還要添加 Spring 支持 JMS 的包。因爲 connection、session、producer 的建立會消耗大量系統資源,爲此這裏使用 鏈接池 來複用這些資源,因此還要添加 activemq-pool 的依賴。

Spring 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="org.study.mq.activeMQ.spring"/>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/>
    </bean>
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
    <bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
    <bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>

    <bean id="queueContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="queueListener"/>
    </bean>
    <bean id="topic1Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic1Listener"/>
    </bean>
    <bean id="topic2Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic2Listener"/>
    </bean>

</beans>
複製代碼

下面的項目示例中的 Java 代碼採用註解的方式,這也是如今不少程序員的習慣用法,因此在配置文件一開始定義註解掃描包路徑org.study.mq.activeMQ.spring,您能夠根據本身實際狀況修改包名稱,本例中的全部 Java 代碼都放在該包之下。

接下來定義了一個 JMS 工廠 bean,採用的是池化鏈接工廠類org.apache.activemq.pool.PooledConnectionFactory,實際就是對內部的 ActiveMQ 鏈接工廠增長了鏈接池的功能,從其內部配置能夠看到就是對org.apache.activemq.ActiveMQConnectionFactory的功能封裝,而ActiveMQConnectionFactory類則比較熟悉了,就是上面 Java 訪問 ActiveMQ 示例一開始建立鏈接工廠時使用的類。brokerURL 屬性配置的就是鏈接服務器的協議和服務器地址。接下來的 cachingConnectionFactory 是實際項目代碼中經常使用的,對鏈接工廠的又一層加強,使用鏈接的緩存功能以提高效率,讀者可酌情選擇使用。

jmsTemplate 就是 Spring 解決 JMS 訪問時冗長重複代碼的方案,它須要配置的兩個主要屬性是 connectionFactory 和 messageConverter,經過 connectionFactory 獲取鏈接、會話等對象, messageConverter 則是配置消息轉換器,由於一般消息在發送前和接收後都須要進行一個前置和後置處理,轉換器便進行這個工做。這樣實際代碼直接經過 jmsTemplate 來發送和接收消息,而每次發送接收消息時建立鏈接工廠、建立鏈接、建立會話等工做都由 Spring 框架作了。

有了 JMS 模板還須要知道隊列和主題做爲實際發送和接收消息的目的地,因此接下來定義了 testQueue 和 testTopic 做爲兩種模式的示例。而異步接收消息時則須要提供 MessageListener 的實現類,因此定義了 queueListener 做爲隊列模式下異步接收消息的監聽器,topic1Listener 和 topic2Listener 做爲主題模式下異步接收消息的監聽器,主題模式用兩個監聽器是爲了演示多個消費者時都能收到消息。最後的 queueContainer、topic1Container、topic2Container 用於將消息監聽器綁定到具體的消息目的地上。

消息服務類

下面是使用 JMS 模板處理消息的消息服務類

package org.study.mq.activeMQ.spring;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "testQueue")
    private Destination testQueue;

    @Resource(name = "testTopic")
    private Destination testTopic;

    //向隊列發送消息
    public void sendQueueMessage(String messageContent) {
        jmsTemplate.send(testQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 設置消息內容
                msg.setText(messageContent);
                return msg;
            }
        });

    }

    //向主題發送消息
    public void sendTopicMessage(String messageContent) {
        jmsTemplate.send(testTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 設置消息內容
                msg.setText(messageContent);
                return msg;
            }
        });

    }
}
複製代碼

@Service 將該類聲明爲一個服務,實際項目中不少服務代碼也相似。經過 Resource 註解直接將上面配置文件中定義的 jmsTemplate 引入到 MessageService 類中就能夠直接使用了,testQueue 和 testTopic 也是相似,服務類中直接引入配置文件中定義好的隊列和主題。重點是下面的兩個發送消息的方法,sendQueueMessage 向隊列發送消息,sendTopicMessage 向主題發送消息,兩種模式都使用了 jmsTemplate 的 send 方法,send 方法第1個參數是javax.jms.Destination類型,表示消息目的地。因爲javax.jms.Queuejavax.jms.Topic都繼承了javax.jms.Destination接口,因此該方法對隊列模式和主題模式都適用。send 方法的第2個參數是org.springframework.jms.core.MessageCreator,這裏使用了匿名內部類的方式建立對象,從支持的 Session 對象中建立文本消息,這樣就能夠發送消息了。能夠看到不管是隊列仍是主題,經過 Spring 框架來發送消息的代碼比以前的 Java 代碼示例簡潔了不少。

消息監聽器類
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("隊列監聽器接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
        }
    }
}
複製代碼

隊列消息監聽器在收到消息時校驗是不是文本消息類型,是的話則打印出內容。

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主題監聽器1 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
        }
    }
}
複製代碼
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主題監聽器2 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 類型消息!");
        }
    }
}
複製代碼

主題監聽器的代碼與隊列監聽器相似,只是打印時經過不一樣字符串表示當前是不一樣監聽器接收的消息。

啓動應用

爲了演示例子,寫了一個 StartApplication 類,在 main 方法中加載 Spring ,獲取到 MessageService 服務以後調用 sendQueueMessage 和 sendTopicMessage 方法發送消息。

package org.study.mq.activeMQ.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        MessageService messageService = (MessageService) ctx.getBean("messageService");

        messageService.sendQueueMessage("個人測試消息1");
        messageService.sendTopicMessage("個人測試消息2");
        messageService.sendTopicMessage("個人測試消息3");
    }

}
複製代碼

啓動好 activeMQ 服務以後運行 StartApplication 類,在控制檯看到接收到文本消息:

接收到文本消息

隊列監聽器監聽到了一條消息,兩個主題監聽器分別監聽到了兩條消息。

相關文章
相關標籤/搜索