消息隊列之異步消息的基本概念以及ActiveMQ整合Spring的經常使用用法介紹 | 掘金技術徵文

一 簡介

(1)異步消息:

所謂異步消息,跟RMI遠程調用、webservice調用是相似的,異步消息也是用於應用程序之間的通訊。可是它們之間的區別是:javascript

  • RMI、Hession/Burlap、webservice等遠程調用機制是同步的。也就是說,當客戶端調用遠程方法時,客戶端必須等到遠程方法響應後才能繼續執行
  • 異步消息,顧名思義消息是異步發送,消息發送者不須要等待消息消費者處理消息,甚至不須要等待消息投遞完成就能夠繼續發送消息。這是由於消息發送者默認消息接收最終能夠收到這條消息並進行處理

(2)Java消息服務(JMS):

Java消息服務(Java Message Service,即:JMS)是Java中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持php

JMS是Java平臺上有關面向消息中間件(MOM)的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,其做用相似於JDBChtml

(3)消息代理(message broker)和目的地(destination):

在異步消息中有兩個重要的概念,分別是:消息代理和目的地java

當咱們在一個應用中發送一條消息時,會將該消息移交給一個消息代理(PS:通常是一些消息中間件,如:ActiveMQ )。在這裏,消息代理就相似於郵局,消息代理能夠確保消息被投遞到指定的目的地,同時解放消息發送者,使其可以繼續進行其餘業務web

一樣,每條異步消息在被消息發送者發送時都要指定一個目的地(PS:用於區別不一樣類型的消息),而後消息接收者就能夠根據本身的業務需求從指定的目的地(PS:消息仍是在消息中間件存放,目的是是用於區別不一樣類型的消息)獲取本身所需的消息並進行處理spring

(4)隊列(queue)與主題(topic):

i)隊列(queue):

隊列也便是:點對點消息模型apache

在點對點模型中,每條消息分別只有一個發送者和接收者。也就是說,當消息代理獲得發送者發送的消息時,它會將該消息放入到一個隊列中。當某一個消息接收者(PS:同一目的地的消息接收者可能存在多個)請求隊列中的下一條消息時,消息會從隊列中取出並投遞給該接收者。以後該條消息將會從隊列中刪除,這樣就能夠保證一條消息只投遞給一個接收者數組

點對點消息模型

ii)主題(topic):

主題也便是:發佈——訂閱消息模型緩存

在發佈——訂閱消息模型中,每條消息能夠由多個接收者接收。也就是說,消息再也不是隻投遞給一個接收者,而是主題的全部訂閱者都會收到該消息的副本網絡

發佈——訂閱消息模型

(5)異步消息的優勢:

i)無需等待:

在同步通訊中,若是客戶端與遠程服務頻繁通訊,或者遠程服務響應很慢,就會對客戶端應用的性能帶來負面影響

當使用JMS發送消息時,客戶端沒必要等待消息被處理,甚至是被投遞,客戶端只須要將消息發送給消息代理,就能夠確信消息會被投遞給相應的目的地

由於不須要等待,因此客戶端能夠繼續執行其餘任務,這種方式能夠有效的節省時間,客戶端的性能可以獲得極大的提升

ii)面向消息與解耦:

在同步通訊中,客戶端經過服務接口與遠程服務相耦合,若是服務的接口發生變化,那麼此服務的全部客戶端都須要作相應的改變

當使用JMS發送消息時,發送異步消息是以數據爲中心的。這意味着客戶端並無與特定的方法簽名綁定,任何能夠處理數據的隊列或主題訂閱者均可以處理由客戶端發送的消息,而客戶端沒必要了解遠程服務的任何規範

iii)位置獨立:

同步RPC服務一般須要網絡地址來定位。這意味着客戶端沒法靈活地適應網絡拓撲的改變。若是服務的IP地址改變了,或者服務被配置爲監聽其餘端口,客戶端必須進行相應的調整,不然沒法訪問服務。

與之相反,消息客戶端沒必要知道誰會處理它們的消息,或者服務的位置在哪裏。客戶端只須要了解須要經過哪一個隊列或主題來發送消息。所以,只服務可以從隊列或主題中獲取便可,消息客戶端根本不須要關注服務來自哪裏

在點對點模型中,能夠利用這種位置的獨立性來建立消息服務集羣。若是客戶端不知道服務的位置,而且服務的惟一要求就是能夠訪問消息代理,那麼咱們就能夠配置多個服務從同一個隊列中接收消息。若是服務過載,處理能力不足,咱們只須要添加一些新的的服務(接收者)實例來監聽相同的隊列便可平滑加強其處理能力

在發佈一訂閱模型中,位置獨立性會產生另外一種有趣的效應。多個服務能夠訂閱同一個主題,接收相同消息的副本。可是每個服務對消息的處理方式卻可能不一樣。例如,假設咱們有一組能夠共同處理描述新員工信息的消息。一個服務可能會在工資系統中增長該員工,另外一個服務則會將新員工增長到公司交流羣中,同時還有一個服務爲新員工分配內網系統的訪問權限。在這裏,每個服務都是基於相同的數據(都是從同一個主題接收而來),可是卻各自對數據進行了不一樣的處理

在上面的內容中,我介紹了一些關於異步消息的基本概念。下面我將介紹基於ActiveMQ框架的JMS消息的發送與接收以及ActiveMQ在Spring框架中的一些經常使用用法

二 基於JMS的消息發送與接收

(1)ActiveMQ的下載與啓動:

在正式開始編寫測試實例以前,咱們須要作的是ActiveMQ的下載。其官方下載地址是:activemq.apache.org/download.ht…

而後運行:apache-activemq-5.14.1/bin/win64/activemq.bat ,接着保持控制檯窗口不關閉,訪問:http://127.0.0.1:8161/admin/

注:默認帳號密碼是:admin/admin


此時,咱們能夠經過訪問菜單中的「 Queues」或者「 Topics」來實時監控隊列或者主題類型的消息使用狀況


固然,此時由於沒有任何消息存在,因此界面是空白的

(2)使用JMS發送消息:

package cn.zifangsky.test.base;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/** * 消息生產者 * @author zifangsky * */
public class JMSProducer {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        try {
            //鏈接
            Connection connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //消息目的地
            Destination destination = session.createQueue("hello");
            //消息生產者
            MessageProducer producer = session.createProducer(destination);

            //發送消息
            for(int i=0;i<10;i++){
                //建立一條文本消息
                TextMessage message = session.createTextMessage("ActiveMQ:這是第 " + i + " 條消息");
                //生產者發送消息
                producer.send(message);
            }

            session.commit();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}複製代碼

運行上面的代碼以後能夠發現ActiveMQ的隊列監控界面出現了變化:


很顯然,生成了10條目的地爲「hello」的未被消費的消息

(3)使用JMS接收消息:

package cn.zifangsky.test.base;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/** * 消息消費者 * @author zifangsky * */
public class JMSConsumer {

    //默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默認鏈接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //鏈接
            Connection connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //消息目的地
            Destination destination = session.createQueue("hello");
            //消息消費者
            MessageConsumer consumer = session.createConsumer(destination);
            while(true){
                TextMessage message = (TextMessage) consumer.receive();
                if(message != null){
                    System.out.println("接收到消息: " + message.getText());
                }else{
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }


    }

}複製代碼

運行代碼以後,輸出以下:

接收到消息: ActiveMQ:這是第 0 條消息
接收到消息: ActiveMQ:這是第 1 條消息
接收到消息: ActiveMQ:這是第 2 條消息
接收到消息: ActiveMQ:這是第 3 條消息
接收到消息: ActiveMQ:這是第 4 條消息
接收到消息: ActiveMQ:這是第 5 條消息
接收到消息: ActiveMQ:這是第 6 條消息
接收到消息: ActiveMQ:這是第 7 條消息
接收到消息: ActiveMQ:這是第 8 條消息
接收到消息: ActiveMQ:這是第 9 條消息

固然,此時觀察ActiveMQ的隊列監控界面,能夠發現這10條消息已經被消費了

注:上面的代碼很簡單,而且其思路跟JDBC很相似,所以這裏就不作過多解釋了

三 Spring框架整合ActiveMQ的常見用法

若是寫過不少的JDBC代碼的話,能夠發現使用基本的JMS來發送和接收消息就跟JDBC代碼同樣,須要每次寫不少冗長重複的代碼。

針對如何消除冗長和重複的JMS代碼,Spring給出的解決方案是JmsTemplate。JmsTemplate能夠建立鏈接、得到會話以及發送和接收消息。這使得咱們能夠專一於構建要發送的消息或者處理接收到的消息

另外,JmsTemplate能夠處理全部拋出的笨拙的JMsException異常。若是在使用JmsTemplate時拋出JMsException異常,JmsTemplate將捕獲該異常,而後拋出一個非檢查型異常,該異常是Spring自帶的JmsException異常的子類

(1)一個簡單的隊列類型的消息發送和接收實例:

i)activemq.properties:

activemq.ip=127.0.0.1
activemq.username=admin
activemq.passwd=admin複製代碼

這個文件配置了ActiveMQ的地址以及認證的帳號密碼

ii)在Spring的配置文件中加載上面的配置文件:

<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath:jdbc.properties</value>
                <value>classpath:activemq.properties</value>
            </list>
        </property>
    </bean>

    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
            <property name="properties" ref="configProperties" />  
    </bean>複製代碼

iii)ActiveMQ的配置文件context_activemq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd">

    <context:component-scan base-package="cn.zifangsky.activemq" />

    <!-- ActiveMQ 鏈接工廠 -->
    <!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.passwd}" /> -->
    <bean id="amqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://${activemq.ip}:61616"/>
        <property name="userName" value="${activemq.username}" />
        <property name="password" value="${activemq.passwd}" />
        <!-- <property name="trustAllPackages" value="true"/> -->
        <property name="trustedPackages">
            <list>
                <value>java.lang</value>
                <value>javax.security</value>
                <value>java.util</value>
                <value>org.apache.activemq</value>
                <value>cn.zifangsky.activemq</value>
                <value>cn.zifangsky.model</value>
            </list>
        </property>
    </bean>

    <!-- Spring Caching鏈接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory" />
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定義Queue類型的JmsTemplate -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發佈/訂閱),即:隊列模型 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定義Queue監聽器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1"/>
        <jms:listener destination="test.queue" ref="testQueueReceiver2"/>
    </jms:listener-container>

</beans>複製代碼

固然,在web.xml中須要加載該配置文件才行:

<context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>
            classpath:context/context.xml
            classpath:context/context_*.xml
        </param-value>
    </context-param>複製代碼

在上面的context_activemq.xml文件中,首先是定義了自動掃描cn.zifangsky.activemq 這個包下面的註解,在後面配置的兩個接收者:testQueueReceiver一、testQueueReceiver2 的bean就是這樣被加載進來的

接着,amqConnectionFactory這個bean配置了ActiveMQ的鏈接參數(PS:經過配置文件加載進來),以及可信任的能夠被序列化的類的包路徑

再日後,jmsQueueTemplate這個bean配置了一個JmsTemplate的實例,固然這裏定義的是一個隊列模型

最後,使用jms:listener-container配置了兩個消息監聽器,其監聽的目的地都是「test.queue」,處理的接收者分別是:testQueueReceiver1 和 testQueueReceiver2

iv)消息發送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component("queueSender")
public class QueueSender {

    @Resource(name="jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    /** * 發送一條消息到指定隊列 * @param queueName 隊列名稱 * @param message 消息內容 */
    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }

}複製代碼

從上面的代碼能夠看出,這裏僅僅只是使用JmsTemplate的send( )方法建立了一條文本消息

v)兩個消息接收者:

QueueReceiver1.java:

package cn.zifangsky.activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver1")
public class QueueReceiver1 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1收到消息: " + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}複製代碼

QueueReceiver2.java:

package cn.zifangsky.activemq.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver2")
public class QueueReceiver2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver2收到消息: " + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}複製代碼

vi)測試:

package cn.zifangsky.test.springjms;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import cn.zifangsky.activemq.producer.QueueSender;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestQueue {
    private final String QUEUENAME = "test.queue";

    @Resource(name="queueSender")
    private QueueSender queueSender;

    @Test
    public void test(){
        for(int i=0;i<10;i++){
            queueSender.send(QUEUENAME, "Hi,這是第 " + (i+1) + " 條消息!");
        }
    }

}複製代碼

運行這個單元測試方法以後,能夠發現輸出結果以下:

QueueReceiver2收到消息: Hi,這是第 1 條消息!
QueueReceiver1收到消息: Hi,這是第 2 條消息!
QueueReceiver2收到消息: Hi,這是第 3 條消息!
QueueReceiver1收到消息: Hi,這是第 4 條消息!
QueueReceiver2收到消息: Hi,這是第 5 條消息!
QueueReceiver1收到消息: Hi,這是第 6 條消息!
QueueReceiver2收到消息: Hi,這是第 7 條消息!
QueueReceiver1收到消息: Hi,這是第 8 條消息!
QueueReceiver2收到消息: Hi,這是第 9 條消息!
QueueReceiver1收到消息: Hi,這是第 10 條消息!

從上面的輸出結果能夠看出,隊列類型的消息只能被某一個接收者接收並處理

(2)簡化代碼:

上面的例子很顯然在發送和接收消息的時候寫的代碼要比純粹的JMS要少不少,那麼是否是就真的沒有更簡潔的代碼了呢?

答案固然是否,第一是在發送消息的時候使用了JmsTemplate的send( ) 方法來發送消息。其實,除了send( )方法,JmsTemplate還提供了convertAndSend( )方法,與send( ) 方法不一樣的是,convertAndSend( )方法並不須要MessageCreator做爲參數。這是由於convertAndSend( )方法會使用內置的消息轉換器(message converter)爲咱們建立消息

i)改寫jmsQueueTemplate這個bean添加默認的目的地:

<!-- 定義Queue類型的JmsTemplate -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發佈/訂閱),即:隊列模型 -->
        <property name="pubSubDomain" value="false" />
        <property name="defaultDestinationName" value="test.queue"/>
    </bean>複製代碼

ii)改寫消息發送者的send()方法:

改寫以後的方法以下所示:

/** * 發送一條消息到指定隊列 * @param message 消息內容 */
    public void send(final String message){
        jmsTemplate.convertAndSend(message);
    }複製代碼

除了上面使用的內置的消息轉換器以外,Spring還爲通用的轉換任務提供了多個消息轉換器(org.springframework.jms.support.converter包中)

消息轉換器 功能
MappingJackson2MessageConverter 使用Jackson2 JSON庫實現消息與JSON格式的相互轉換
MarshallingMessageConverter 使用JAXB庫實現消息與XML格式之間的相互轉換
SimpleMessageConverter 實現String與TextMessage之間的相互轉換、字節數組與BytesMessage之間的相互轉換、Map與MapMessage之間的相互轉換以及Serializable對象與ObjectMessage之間的相互轉換(PS:對象的序列化與反序列化)

注:默認狀況下,JmsTemplate會在convertAndSend( )方法中使用SimpleMessageConverter這個消息轉換器。若是須要手動執行消息轉化器的話,能夠這樣修改:

<bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter" />

    <bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <property name="pubSubDomain" value="false" />
        <property name="messageConverter" ref="jmsMessageConverter" />
    </bean>複製代碼

好了轉回正題,針對第一個實例的簡化還能夠作其餘的工做。好比:消息接收者在處理消息的時候實現了一個MessageListener接口,同時複寫了onMessage(Message message) 方法。那麼咱們是否能夠將之簡化,改寫成一個普通的POJO呢?

i)改寫QueueReceiver1.java變成一個普通的POJO:

package cn.zifangsky.activemq.consumer;

import org.springframework.stereotype.Component;

@Component("testQueueReceiver1")
public class QueueReceiver1{

    public void handle(String str){
        System.out.println("QueueReceiver1收到消息: " + str);
    }

}複製代碼

從上面的代碼能夠看出,這裏僅僅只定義了一個普通的handle(String str) 方法,徹底看不出來任何JMS的痕跡

ii)修改隊列消息監聽器:

<!-- 定義Queue監聽器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
        <jms:listener destination="test.queue" ref="testQueueReceiver2"/>
    </jms:listener-container>複製代碼

這裏只改寫了第一個監聽相關配置,手動指定了針對接收到的消息的處理方法。固然,Spring會自動完成消息格式的轉化

再次運行單元測試:

@Test
    public void test(){
        for(int i=0;i<10;i++){
            queueSender.send("Hi,這是第 " + (i+1) + " 條消息!");
        }
    }複製代碼

輸出略

(3)發佈——訂閱類型的消息發送和接收實例:

明白了上面的隊列類型的消息發送與接收,那麼定義一個發佈——訂閱類型的消息就很簡單了,只須要把JmsTemplate的類型改爲「pubSubDomain」類型便可:

i)context_activemq.xml文件裏面的配置:

添加如下內容:

<!-- 定義Topic類型的JmsTemplate -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(發佈/訂閱) -->
        <property name="pubSubDomain" value="true" />
        <property name="defaultDestinationName" value="test.topic"/>
    </bean>

    <!-- 定義Topic監聽器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="testTopicReceiver1" method="handle"/>
        <jms:listener destination="test.topic" ref="testTopicReceiver2" method="handle"/>
    </jms:listener-container>複製代碼

ii)消息發送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component("topicSender")
public class TopicSender {

    @Resource(name="jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    /** * 發送一條消息到指定隊列 * @param message 消息內容 */
    public void send(final String message){
        jmsTemplate.convertAndSend(message);
    }
}複製代碼

iii)兩個消息接收者:

package cn.zifangsky.activemq.consumer;

import org.springframework.stereotype.Component;

@Component("testTopicReceiver1")
public class TopicReceiver1{

    public void handle(String str){
        System.out.println("TopicReceiver1收到消息: " + str);
    }

}複製代碼

另外一個接收者代碼跟上面類似,請自行完成,略

iv)單元測試:

package cn.zifangsky.test.springjms;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import cn.zifangsky.activemq.producer.TopicSender;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestTopic {

    @Resource(name="topicSender")
    private TopicSender topicSender;

    @Test
    public void test(){
        for(int i=0;i<5;i++){
            topicSender.send("Hi,這是第 " + (i+1) + " 條消息!");
        }
    }

}複製代碼

輸出以下:

TopicReceiver2收到消息: Hi,這是第 1 條消息!
TopicReceiver1收到消息: Hi,這是第 1 條消息!
TopicReceiver1收到消息: Hi,這是第 2 條消息!
TopicReceiver1收到消息: Hi,這是第 3 條消息!
TopicReceiver1收到消息: Hi,這是第 4 條消息!
TopicReceiver2收到消息: Hi,這是第 2 條消息!
TopicReceiver2收到消息: Hi,這是第 3 條消息!
TopicReceiver2收到消息: Hi,這是第 4 條消息!
TopicReceiver2收到消息: Hi,這是第 5 條消息!
TopicReceiver1收到消息: Hi,這是第 5 條消息!

從上面的輸出內容能夠看出,發佈——訂閱類型的消息每一個接收者都會接收到一份消息的副本

(4)發送和接收對象類型的消息:

若是咱們想要發送和接收對象類型的消息,而不是普通的文本消息。其實,由於Spring提供了默認的消息轉換器——SimpleMessageConverter。因此咱們只須要像發送文本消息那樣發送對象消息,關於對象的序列化和反序列化這些步驟Spring會自動幫咱們完成

i)修改context_activemq.xml文件:

添加如下內容:

<bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發佈/訂閱),即:隊列模型 -->
        <property name="pubSubDomain" value="false" />
        <property name="defaultDestinationName" value="object.queue"/>
    </bean>複製代碼

這裏,定義了新的JmsTemplate,其默認的目的地是:object.queue

修改隊列監聽器,添加:

<!-- 定義Queue監聽器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        ...
        <jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
    </jms:listener-container>複製代碼

ii)消息發送者:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import cn.zifangsky.model.User;

@Component("queueSender2")
public class QueueSender2 {

    @Resource(name="jmsQueueTemplate2")
    private JmsTemplate jmsTemplate;

    /** * 發送一條消息到指定隊列 * @param user 一個User類型的實體 */
    public void send(final User user){
        jmsTemplate.convertAndSend(user);
    }

}複製代碼

能夠看出,這裏的方法參數就不是普通的文本了,而是一個能夠被序列化的對象

注:User.java:

package cn.zifangsky.model;

import java.io.Serializable;

public class User implements Serializable{
    private static final long serialVersionUID = 1L;
    private Long id;
    private String username;
    private String password;

    public User() {

    }

    public User(Long id, String username, String password) {
        this.id = id;
        this.username = username;
        this.password = password;
    }

    (getter和setter方法略)

    @Override
    public String toString() {
        return "User [id=" + id + ", username=" + username + ", password=" + password + "]";
    }

}複製代碼

iii)消息接收者:

package cn.zifangsky.activemq.consumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import cn.zifangsky.model.User;

@Component("testQueueReceiver3")
public class QueueReceiver3{

    public void handle(User user){
        System.out.println("接收到消息: " + user);
    }
}複製代碼

能夠看到,這裏的handle方法的參數時User類型。固然這個User對象是Spring將消息進行反序列化後生成的

iv)測試:

@Resource(name="queueSender2")
    private QueueSender2 queueSender2;

    @Test
    public void testObject(){
        User u = new User((long) 1,"test","123456");

        queueSender2.send(u);
    }複製代碼

輸出以下:

接收到消息: User [id=1, username=test, password=123456]

能夠看出,咱們實際須要作的工做仍是不多的,不少繁瑣的步驟都由Spring在後臺自動完成了

(5)消息被接收到以後進行回覆:

有時,爲了保障消息的可靠性,一般須要在接收到消息以後給某個消息目的地發送一條確認收到的回覆消息。固然,要實現這個功能也很簡單,只須要在收到消息以後調用某個消息發送者發送一條確認消息便可

好比上面的QueueReceiver3能夠改爲這樣:

package cn.zifangsky.activemq.consumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import cn.zifangsky.activemq.producer.QueueSender;
import cn.zifangsky.model.User;

@Component("testQueueReceiver3")
public class QueueReceiver3{

    @Resource(name="queueSender")
    private QueueSender queueSender;

    public void handle(User user){
        System.out.println("接收到消息: " + user);
        queueSender.send("QueueReceiver3已經收到Object類型的消息。。。");
    }

}複製代碼

這樣就能夠在收到消息以後,使用QueueSender 這個發送者給「test.queue」這個消息目的地發送一條確認消息了(PS:實際狀況的處理可能會比這裏稍微複雜一點,這裏爲了測試只是發送了一條文本消息)

注:使用單元測試的時候最後一條消息可能不會打印出來,由於這次單元測試的生命週期結束以後程序就自動中止了。解決辦法能夠是手動執行一下第一個實例中的那個單元測試,或者啓動這個web項目就能夠看到效果了

(6)設置多個並行的消息監聽器:

在前面介紹隊列類型的監聽器的時候爲了驗證一條隊列裏的消息只能被一個接收者接收,所以添加了兩個功能徹底同樣的接收者:testQueueReceiver1和testQueueReceiver2

其實在實際開發中,爲了提升系統的消息處理能力咱們徹底不必像這樣定義多個功能同樣的消息接收者,相反咱們只須要在配置監聽器的時候使用concurrency這個屬性配置多個並行的監聽器便可。好比像這樣:

<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" concurrency="5" acknowledge="auto">
        <jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
        <jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
    </jms:listener-container>複製代碼

若是concurrency屬性設置一個固定的數字則表示每一個消息監聽器都會被同時啓動指定個數的徹底同樣的並行監聽器來監聽消息並轉發給消息接收者處理。固然,除了指定固定的數字以外,咱們還能夠手動指定一個監聽器的數目區間,好比:concurrency=」3-5″ ,表示最少打開3個監聽器,最多打開5個監聽器。消息少時會少打開幾個,消息多時會多打開幾個,這一過程會自動完成而不須要咱們作其餘額外的工做

基於ActiveMQ的異步消息的一些經常使用用法基本上就是這些了。固然,還有一些其餘的內容在這裏沒有介紹到,好比:導出基於JMS的服務、使用AMQP實現消息功能等。限於文章篇幅,這些內容暫且讓我放在其餘文章單獨介紹吧

參考:

相關文章
相關標籤/搜索