JSM-ActiveMQ

1,消息監聽容器java

對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個JMS服務器,這是經過在配置MessageConnectionFactory的時候往裏面注入一個ConnectionFactory來實現的。因此咱們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪裏監聽的ConnectionFactory;一個是表示監聽什麼的Destination;一個是接收到消息之後進行消息處理的MessageListener。Spring一共爲咱們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。spring

SimpleMessageListenerContainer會在一開始的時候就建立一個會話session和消費者Consumer,而且會使用標準的JMS MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時須要和參與外部的事務管理。兼容性方面,它很是接近於獨立的JMS規範,但通常不兼容Java EE的JMS限制。apache

大多數狀況下咱們仍是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時須要,而且可以參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。數組

2.消息監聽器服務器

spring整合JMS的應用中咱們在定義消息監聽器的時候一共能夠定義三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種類型的區別。session

MessageListener是最原始的消息監聽器,它是JMS規範中定義的一個接口。其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。函數

import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
   
public class ConsumerMessageListener implements MessageListener {  
   
    public void onMessage(Message message) {  
        //這裏咱們知道生產者發送的就是一個純文本消息,因此這裏能夠直接進行強制轉換,或者直接把onMessage方法的參數改爲Message的子類TextMessage  
        TextMessage textMsg = (TextMessage) message;  
        System.out.println("接收到一個純文本消息。");  
        try {  
            System.out.println("消息內容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
   
}

SessionAwareMessageListener是Spring爲咱們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如咱們在使用MessageListener處理接收到的消息時咱們須要發送一個消息通知對方咱們已經收到這個消息了,那麼這個時候咱們就須要在代碼裏面去從新獲取一個Connection或Session。SessionAwareMessageListener的設計就是爲了方便咱們在接收到消息後發送一個回覆的消息,它一樣爲咱們提供了一個處理接收到的消息的onMessage方法,可是這個方法能夠同時接收兩個參數,一個是表示當前接收到的消息Message,另外一個就是能夠用來發送消息的Session對象。測試

public void onMessage(TextMessage message, Session session) throws JMSException {  
        System.out.println("收到一條消息");  
        System.out.println("消息內容是:" + message.getText());  
        MessageProducer producer = session.createProducer(destination);  
        Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");  
        producer.send(textMessage);  
    }

MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要做用是將接收到的消息進行類型轉換,而後經過反射的形式把它交給一個普通的Java類進行處理。 其另一個主要的功能是能夠自動的發送返回消息spa

       MessageListenerAdapter會把接收到的消息作以下轉換:.net

       TextMessage轉換爲String對象;

       BytesMessage轉換爲byte數組;

       MapMessage轉換爲Map對象;

       ObjectMessage轉換爲對應的Serializable對象。

<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
    <property name="delegate">  
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
    </property>  
    <property name="defaultListenerMethod" value="receiveMessage"/>  
</bean>
//或者
 <bean id="messageListenerAdapter"  class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg>  
            <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
        </constructor-arg> 
        <property name="defaultListenerMethod" value="receiveMessage"/>  
    </bean>

  消息回覆:

(1),能夠經過發送的Message的setJMSReplyTo方法指定該消息對應的回覆消息的目的地。 這裏咱們把咱們的生產者發送消息的代碼作一下修改,在發送消息以前先指定該消息對應的回覆目的地爲一個叫responseQueue的隊列目的地,具體代碼以下所示:

package com.tiantian.springintejms.service.impl;  
       
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.Message;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
       
    import org.springframework.beans.factory.annotation.Autowired;  
    import org.springframework.beans.factory.annotation.Qualifier;  
    import org.springframework.jms.core.JmsTemplate;  
    import org.springframework.jms.core.MessageCreator;  
    import org.springframework.stereotype.Component;  
       
    import com.tiantian.springintejms.service.ProducerService;  
       
    @Component  
    public class ProducerServiceImpl implements ProducerService {   
      
        @Autowired  
        private JmsTemplate jmsTemplate;  
      
        @Autowired  
        @Qualifier("responseQueue")  
        private Destination responseDestination;  
          
        public void sendMessage(Destination destination, final String message) {  
            System.out.println("---------------生產者發送消息-----------------");  
            System.out.println("---------------生產者發了一個消息:" + message);  
            jmsTemplate.send(destination, new MessageCreator() {  
                public Message createMessage(Session session) throws JMSException {  
                    TextMessage textMessage = session.createTextMessage(message);  
                    textMessage.setJMSReplyTo(responseDestination);  
                    return textMessage;  
                }  
            });  
        }  
       
    }

xml配置文件:

<!-- 用於測試消息回覆的 -->  
    <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg>  
            <value>responseQueue</value>  
        </constructor-arg>  
    </bean>  
      
    <!-- responseQueue對應的監聽器 -->  
    <bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>  
      
    <!-- responseQueue對應的監聽容器 -->  
    <bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="responseQueue"/>  
        <property name="messageListener" ref="responseQueueListener"/>  
    </bean>

      ResponseQueueListener的定義以下所示:  

public class ResponseQueueListener implements MessageListener {  
       
        public void onMessage(Message message) {  
            if (message instanceof TextMessage) {  
                TextMessage textMessage = (TextMessage) message;  
                try {  
                    System.out.println("接收到發送到responseQueue的一個文本消息,內容是:" + textMessage.getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
       
    }

生產者發送消息被MessageListenerAdapter處理以後,MessageListenerAdapter把監聽器的返回內容封裝成一個Message,往原Message經過setJMSReplyTo方法指定的回覆目的地發送了一個消息。對於MessageListenerAdapter對應的監聽器處理方法返回的是一個null值或者返回類型是void的狀況,MessageListenerAdapter是不會自動進行消息的回覆的。

(2)經過MessageListenerAdapter的defaultResponseDestination屬性來指定。在定義MessageListenerAdapter的時候經過其defaultResponseDestination屬性指定其默認的回覆目的地是「defaultResponseQueue」,並定義defaultResponseQueue對應的消息監聽器和消息監聽容器。

<!-- 消息監聽適配器 -->  
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
    <!-- <constructor-arg>  
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
    </constructor-arg> -->  
    <property name="delegate">  
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  
    </property>  
    <property name="defaultListenerMethod" value="receiveMessage"/>  
    <property name="defaultResponseDestination" ref="defaultResponseQueue"/>  
</bean>  
  
<!-- 消息監聽適配器對應的監聽容器 -->  
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    <property name="connectionFactory" ref="connectionFactory"/>  
    <property name="destination" ref="adapterQueue"/>  
    <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來做爲消息監聽器 -->  
</bean>  
  
!-- 默認的消息回覆隊列 -->  
<bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">  
    <constructor-arg>  
        <value>defaultResponseQueue</value>  
    </constructor-arg>  
</bean>  
  
<!-- defaultResponseQueue對應的監聽器 -->  
<bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>  
  
<!-- defaultResponseQueue對應的監聽容器 -->  
<bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    <property name="connectionFactory" ref="connectionFactory"/>  
    <property name="destination" ref="defaultResponseQueue"/>  
    <property name="messageListener" ref="defaultResponseQueueListener"/>  
</bean>

當兩種方式都指定了消息的回覆目的地的時候使用發送消息的setJMSReplyTo方法指定的目的地將具備較高的優先級,MessageListenerAdapter將只往該方法指定的消息回覆目的地發送回覆消息。

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息