Spring整合JMS(二)——三種消息監聽器

  原文地址:http://haohaoxuexi.iteye.com/blog/1893676java

  

1.3     消息監聽器MessageListener

       在Spring整合JMS的應用中咱們在定義消息監聽器的時候一共能夠定義三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種類型的區別。spring

1.3.1  MessageListener

  MessageListener是最原始的消息監聽器,它是JMS規範中定義的一個接口。apache

  其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。咱們前面在講配置消費者的時候用的消息監聽器就是MessageListener,代碼以下:數組

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        //這裏咱們知道生產者發送的就是一個純文本消息,因此這裏能夠直接進行強制轉換,或者直接把onMessage方法的參數改爲Message的子類TextMessage
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一個純文本消息。");
        try {
            System.out.println("消息內容是:" + textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

  

1.3.2  SessionAwareMessageListener

    MessageListener的設計只是純粹用來接收消息的,假如咱們在使用MessageListener處理接收到的消息時咱們須要發送一個消息通知對方咱們已經收到這個消息了,那麼這個時候咱們就須要在代碼裏面去從新獲取一個Connection或Session。                                                   SessionAwareMessageListener的設計就是爲了方便咱們在接收到消息後發送一個回覆的消息,它一樣爲咱們提供了一個處理接收到的消息的onMessage方法,可是這個方法能夠同時接收兩個參數,一個是表示當前接收到的消息Message,另外一個就是能夠用來發送消息的Session對象。先來看一段代碼:session

  個人理解,不是原文中的內容:直接能夠獲取一個鏈接,好處是減小資源的消耗。和JDBC同樣,查詢自己並不耗時,耗時和耗資源的是創建鏈接,這也是爲何在存在大量查詢時使用批量查詢要比一次查詢一條快不少,只須要創建一次鏈接。app

package com.tiantian.springintejms.listener;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.jms.listener.SessionAwareMessageListener;
 
public class ConsumerSessionAwareMessageListener implements
        SessionAwareMessageListener<TextMessage> {
 
    private Destination destination;
    
    public void onMessage(TextMessage message, Session session) throws JMSException {
        System.out.println("收到一條消息");
        System.out.println("消息內容是:" + message.getText());
        MessageProducer producer = session.createProducer(destination);
        Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
        producer.send(textMessage);
    }
 
    public Destination getDestination() {
        returndestination;
    }
 
    public void setDestination(Destination destination) {
        this.destination = destination;
    }
 
}

  在上面代碼中咱們定義了一個SessionAwareMessageListener,在這個Listener中咱們在接收到了一個消息以後,利用對應的Session建立了一個到destination的生產者和對應的消息,而後利用建立好的生產者發送對應的消息。tcp

       接着咱們在Spring的配置文件中配置該消息監聽器將處理來自一個叫sessionAwareQueue的目的地的消息,而且往該MessageListener中經過set方法注入其屬性destination的值爲queueDestination。這樣當咱們的SessionAwareMessageListener接收到消息以後就會往queueDestination發送一個消息。工具

  配置文件的內容以下:測試

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
 
    <context:component-scan base-package="com.tiantian" /> 
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!--這個是隊列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!--這個是sessionAwareQueue目的地-->
    <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>sessionAwareQueue</value>
        </constructor-arg>
    </bean>
    <!-- 消息監聽器 -->
    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
    <!-- 能夠獲取session的MessageListener -->
    <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">
        <property name="destination" ref="queueDestination"/>
    </bean>
    <!-- 消息監聽容器 -->
    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>
    
    <bean id="sessionAwareListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="sessionAwareQueue" />
        <property name="messageListener" ref="consumerSessionAwareMessageListener" />
    </bean>
</beans>

  個人理解,原文中沒有:sessionAwareListenerContainer中制定的隊列是sessionAwareQuene,也就是consumerSessionAwareMessageListener讀取的是sessionAwareQuene中的消息。consumerSessionAwareMessageListener的destination是queneDestination,說明consumerSessionAwareMessageListener將返回消息送到的隊列是queneDestination。一個接收管道和一個返回管道。this

   接着咱們來作一個測試,測試代碼以下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("sessionAwareQueue")
    private Destination sessionAwareQueue;
    
    @Test
    public void testSessionAwareMessageListener() {
        producerService.sendMessage(sessionAwareQueue, "測試SessionAwareMessageListener");
    }
    
}

  在上述測試代碼中,咱們經過前面定義好的生產者往咱們定義好的SessionAwareMessageListener監聽的sessionAwareQueue發送了一個消息。程序運行以後控制檯輸出以下:

   

       這說明咱們已經成功的往sessionAwareQueue發送了一條純文本消息,消息會被ConsumerSessionAwareMessageListener的onMessage方法進行處理,在onMessage方法中ConsumerSessionAwareMessageListener就是簡單的把接收到的純文本信息的內容打印出來了,以後再往queueDestination發送了一個純文本消息,消息內容是「ConsumerSessionAwareMessageListener…」,該消息隨後就被ConsumerMessageListener處理了,根據咱們的定義,在ConsumerMessageListener中也只是簡單的打印了一下接收到的消息內容。

  

1.3.3  MessageListenerAdapter

  MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要做用是將接收到的消息進行類型轉換,而後經過反射的形式把它交給一個普通的Java類進行處理。

       MessageListenerAdapter會把接收到的消息作以下轉換:

       TextMessage轉換爲String對象;

       BytesMessage轉換爲byte數組;

       MapMessage轉換爲Map對象;

       ObjectMessage轉換爲對應的Serializable對象。

       既然前面說了MessageListenerAdapter會把接收到的消息作一個類型轉換,而後利用反射把它交給真正的目標處理器——一個普通的Java類進行處理(若是真正的目標處理器是一個MessageListener或者是一個SessionAwareMessageListener,那麼Spring將直接使用接收到的Message對象做爲參數調用它們的onMessage方法,而不會再利用反射去進行調用),那麼咱們在定義一個MessageListenerAdapter的時候就須要爲它指定這樣一個目標類。這個目標類咱們能夠經過MessageListenerAdapter的構造方法參數指定,如:

<!-- 消息監聽適配器 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </constructor-arg>
    </bean>

  也能夠經過它的delegate屬性來指定,如:

<!-- 消息監聽適配器 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate">
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </property>
        <property name="defaultListenerMethod" value="receiveMessage"/>
    </bean>

  前面說了若是咱們指定的這個目標處理器是一個MessageListener或者是一個SessionAwareMessageListener的時候Spring將直接利用接收到的Message對象做爲方法參數調用它們的onMessage方法。可是若是指定的目標處理器是一個普通的Java類時Spring將利用Message進行了類型轉換以後的對象做爲參數經過反射去調用真正的目標處理器的處理方法,那麼Spring是如何知道該調用哪一個方法呢?

  這是經過MessageListenerAdapter的defaultListenerMethod屬性來決定的,當咱們沒有指定該屬性時,Spring會默認調用目標處理器的handleMessage方法。

       接下來咱們來看一個示例,假設咱們有一個普通的Java類ConsumerListener,其對應有兩個方法,handleMessage和receiveMessage,其代碼以下: 

package com.tiantian.springintejms.listener;
 
public class ConsumerListener {
 
    public void handleMessage(String message) {
        System.out.println("ConsumerListener經過handleMessage接收到一個純文本消息,消息內容是:" + message);
    }
    
    public void receiveMessage(String message) {
        System.out.println("ConsumerListener經過receiveMessage接收到一個純文本消息,消息內容是:" + message);
    }
    
}

  假設咱們要把它做爲一個消息監聽器來監聽發送到adapterQueue的消息,這個時候咱們就能夠定義一個對應的MessageListenerAdapter來把它當作一個MessageListener使用。

    <!-- 消息監聽適配器 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate">
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </property>
        <property name="defaultListenerMethod" value="receiveMessage"/>
    </bean>

    固然,有了MessageListener以後咱們還須要配置其對應的MessageListenerContainer,這裏配置以下

    <!-- 消息監聽適配器對應的監聽容器 -->
    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="adapterQueue"/>
        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來做爲消息監聽器 -->
    </bean>

  在上面的MessageListenerAdapter中咱們指定了其defaultListenerMethod屬性的值爲receiveMessage,因此當MessageListenerAdapter接收到消息以後會自動的調用咱們指定的ConsumerListener的receiveMessage方法。

       針對於上述代碼咱們定義測試代碼以下:

package com.tiantian.springintejms.test;
 
import javax.jms.Destination;
 import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import com.tiantian.springintejms.service.ProducerService;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {

    @Autowired
    @Qualifier("adapterQueue")
    private Destination adapterQueue;

    @Test
    public void testMessageListenerAdapter() {
        producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter");
    }
    
}

   這時候咱們會看到控制檯輸出以下:

  若是咱們不指定MessageListenerAdapter的defaultListenerMethod屬性,那麼在運行上述代碼時控制檯會輸出以下結果:

 

  MessageListenerAdapter除了會自動的把一個普通Java類當作MessageListener來處理接收到的消息以外,其另一個主要的功能是能夠自動的發送返回消息

     當咱們用於處理接收到的消息的方法的返回值不爲空的時候,Spring會自動將它封裝爲一個JMS Message,而後自動進行回覆。

  那麼這個時候這個回覆消息將發送到哪裏呢?這主要有兩種方式能夠指定。       

  第一,能夠經過發送的Message的setJMSReplyTo方法指定該消息對應的回覆消息的目的地。這裏咱們把咱們的生產者發送消息的代碼作一下修改,在發送消息以前先指定該消息對應的回覆目的地爲一個叫responseQueue的隊列目的地,具體代碼以下所示:

package com.tiantian.springintejms.service.impl;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
import com.tiantian.springintejms.service.ProducerService;
 
@Component
public class ProducerServiceImpl implements ProducerService { 

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("responseQueue")
    private Destination responseDestination;
    
    public void sendMessage(Destination destination, final String message) {
        System.out.println("---------------生產者發送消息-----------------");
        System.out.println("---------------生產者發了一個消息:" + message);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                textMessage.setJMSReplyTo(responseDestination); return textMessage;
            }
        });
    }
 
}

  接着定義一個叫responseQueue的隊列目的地及其對應的消息監聽器和監聽容器。

    <!-- 用於測試消息回覆的 -->
    <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>responseQueue</value>
        </constructor-arg>
    </bean>

    <!-- responseQueue對應的監聽器 -->
    <bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>

    <!-- responseQueue對應的監聽容器 -->
    <bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="responseQueue"/>
        <property name="messageListener" ref="responseQueueListener"/>
    </bean>

   ResponseQueueListener的定義以下所示:

public class ResponseQueueListener implements MessageListener {
 
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收到發送到responseQueue的一個文本消息,內容是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}

  接着把咱們接收消息的ConsumerListener的receiveMessage方法改成以下:

    /**
     * 當返回類型是非null時MessageListenerAdapter會自動把返回值封裝成一個Message,而後進行回覆
     * @param message
     * @return
     */
    public String receiveMessage(String message) {
        System.out.println("ConsumerListener經過receiveMessage接收到一個純文本消息,消息內容是:" + message);
        return "這是ConsumerListener對象的receiveMessage方法的返回值。";
    }

  咱們能夠看到在上述負責接收消息的receiveMessage方法有一個非空的返回值。

       接着咱們運行咱們的測試代碼,利用生產者往咱們定義好的MessageListenerAdapter負責處理的adapterQueue目的地發送一個消息。測試代碼以下所示:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;

    @Qualifier("adapterQueue")
    @Autowired
    private Destination adapterQueue;   

    @Test
    public void testMessageListenerAdapter() {
        producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter");
    }
    
}

  運行上述測試代碼以後,控制檯輸出以下:

  這說明咱們的生產者發送消息被MessageListenerAdapter處理以後,MessageListenerAdapter確實把監聽器的返回內容封裝成一個Message往原Message經過setJMSReplyTo方法指定的回覆目的地發送了一個消息。對於MessageListenerAdapter對應的監聽器處理方法返回的是一個null值或者返回類型是void的狀況,MessageListenerAdapter是不會自動進行消息的回覆的,有興趣的網友能夠本身測試一下。

       第二,經過MessageListenerAdapter的defaultResponseDestination屬性來指定。這裏咱們也來作一個測試,首先維持生產者發送消息的代碼不變,即發送消息前不經過Message的setJMSReplyTo方法指定消息的回覆目的地;接着咱們在定義MessageListenerAdapter的時候經過其defaultResponseDestination屬性指定其默認的回覆目的地是「defaultResponseQueue」,並定義defaultResponseQueue對應的消息監聽器和消息監聽容器。

  

    <!-- 消息監聽適配器 -->
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <!-- <constructor-arg>
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </constructor-arg> -->
        <property name="delegate">
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
        </property>
        <property name="defaultListenerMethod" value="receiveMessage"/>
        <property name="defaultResponseDestination" ref="defaultResponseQueue"/>
    </bean>

    <!-- 消息監聽適配器對應的監聽容器 -->
    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="adapterQueue"/>
        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來做爲消息監聽器 -->
    </bean>

<!-- 默認的消息回覆隊列 -->
    <bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>defaultResponseQueue</value>
        </constructor-arg>
    </bean>

    <!-- defaultResponseQueue對應的監聽器 -->
    <bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>

    <!-- defaultResponseQueue對應的監聽容器 -->
    <bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="defaultResponseQueue"/>
        <property name="messageListener" ref="defaultResponseQueueListener"/>
    </bean>

  DefaultResponseQueueListener的代碼以下所示:

package com.tiantian.springintejms.listener;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class DefaultResponseQueueListener implements MessageListener {
 
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("DefaultResponseQueueListener接收到發送到defaultResponseQueue的一個文本消息,內容是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}

  這時候運行以下測試代碼:

    @Test
    public void testMessageListenerAdapter() {
        producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter");
    }

  控制檯將輸出以下內容:

  這說明MessageListenerAdapter會自動把真正的消息處理器返回的非空內容封裝成一個Message發送回覆消息到經過defaultResponseDestination屬性指定的默認消息回覆目的地。

       既然咱們能夠經過兩種方式來指定MessageListenerAdapter自動發送回覆消息的目的地,那麼當咱們兩種方式都指定了並且它們的目的地還不同的時候會怎麼發送呢?是兩個都發仍是隻發其中的一個呢?關於這部分的測試我這裏就不贅述了,有興趣的網友能夠本身進行。這裏我能夠直接的告訴你們,當兩種方式都指定了消息的回覆目的地的時候使用發送消息的setJMSReplyTo方法指定的目的地將具備較高的優先級,MessageListenerAdapter將只往該方法指定的消息回覆目的地發送回覆消息。

相關文章
相關標籤/搜索