在Spring中使用ActiveMQ發送郵件

轉自http://dev.ymeng.net/spring-activemq-mail.html 

項目的後臺要求在更改密碼後發送郵件通知用戶,爲了不發送郵件時程序對用戶操做的阻塞,以前中文版中使用了線程來發送郵件,而在英文版中,我決定使用JMS來異步發送郵件,讓用戶更改密碼的操做和發送郵件的操做更進一步解耦,也在實際環境中試試JMS。 

  咱們的環境是Spring 2.5, Tomcat 5.5,使用ActiveMQ來實現JMS傳送和接收。 

  首先,咱們在Spring中加入ActiveMQ Broker的配置: 


     <bean id="broker" 
        class="org.apache.activemq.xbean.BrokerFactoryBean"> 
        <property name="config" 
            value="classpath:activemq.xml" /> 
        <property name="start" 
            value="true" /> 
    </bean> 

  咱們在此處配置了BrokerFactoryBean,此Bean實如今Spring中配置嵌入式Broker,而且支持XBean方式的配置。Broker的配置文件由config屬性指定,此處定義配置文件位於classpath中的activemq.xml。 


  接下來咱們須要建立Broker的配置文件activemq.xml。其實咱們不須要從頭配置,展開ActiveMQ的jar包,在org.apache.activemq.xbean下,就有一個activemq.xml,咱們將其拷貝到WEB-INF/classes/目錄下,並進行修改。 

下面是activemq.xml的內容: 


<beans> 
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> 

    <broker useJmx="false" 
        persistent="false" 
        xmlns="http://activemq.apache.org/schema/core"> 

        <transportconnectors> 
            <transportconnector uri="tcp://localhost:61636" /> 
        </transportconnectors> 

        <networkconnectors></networkconnectors> 
    </broker> 
</beans> 

  在broker中,咱們指定了不開啓JMX,而且不使用持久化(persistent=」false」)。 
  若是不對消息進行持久化存儲,在容器或者JVM關閉、重啓,或者崩潰後,全部的消息都將丟失,在咱們的業務中,對於發送密碼更改通知郵件,並不是是重要的功能,因此咱們選擇不使用持久化存儲,但對於不一樣業務邏輯,可能會須要進行持久化存儲。ActiveMQ提供的持久化存儲方案能夠將消息存儲到文件系統、數據庫等。 

  要在Broker中開啓持久化存儲,須要設置persistent爲true,而且對其子節點persistenceAdapter, journaledJDBC進行配置。ActiveMQ jar包中的activemq.xml有被註釋掉的示例,能夠參考。 

  接着咱們在Spring中配置JMS Connection Factory。 


     <bean id="jmsFactory" 
        class="org.apache.activemq.ActiveMQConnectionFactory"> 
        <property name="brokerURL" 
            value="tcp://localhost:61636" /> 
    </bean> 

  注意其中的borkerURL,應該是你在activemq.xml中transportconnector節點的uri屬性,這表示JMS Server的監聽地址。 

  配置消息發送目的地: 


     <bean id="topicDestination" 
        class="org.apache.activemq.command.ActiveMQTopic"> 
        <constructor -arg value="MY.topic" /> 
    </bean> 

    <bean id="queueDestination" 
        class="org.apache.activemq.command.ActiveMQQueue"> 
        <constructor -arg value="MY.queue" /> 
    </bean> 

  在JMS中,目的地有兩種:主題(topic)和隊列(queue)。二者的區別是:當一個主題目的地中被放入了一個消息後,全部的訂閱者都會收到通知;而對於隊列,僅有一個「訂閱者」會收到這個消息,隊列中的消息一旦被處理,就不會存在於隊列中。顯然,對於郵件發送程序來講,使用隊列纔是正確的選擇,而使用主題時,可能會發送多封相同的郵件。 
  Topic和Queue只是JMS內部以及其處理方式的不一樣,對於消息發送方和接收方來講,徹底沒有代碼上的區別。 

  配置Spring中消息發送的JMS Template: 


     <bean id="producerJmsTemplate" 
        class="org.springframework.jms.core.JmsTemplate"> 
        <property name="connectionFactory"> 
            <bean class="org.springframework.jms.connection.SingleConnectionFactory"> 
                <property name="targetConnectionFactory" 
                    ref="jmsFactory" /> 
            </bean> 
        </property> 
        <property name="defaultDestination" 
            ref="queueDestination" /> 
        <property name="messageConverter" 
            ref="userMessageConverter" /> 
    </bean> 

  注意此處的defaultDestination使用的是基於Queue的目的地。 

  在實際的消息發送中,郵件內容須要用到User.username, User.password, User.email, User.fullname,顯示若是咱們直接發送User對象到消息隊列,接收的時候也能直接取出User對象,那麼在郵件發送程序中操做就會方便許多,因此在些處,咱們定義了messageConverter屬性,他指定了發送消息時使用的消息轉換bean,這樣,在直接發送User到JMS隊列時,Spring會自動幫咱們進行轉換,下面是Converter的配置和代碼: 


     <bean id="userMessageConverter" 
        class="com.tiandinet.jms.sample.UserMessageConverter" /> 

  此轉換器一樣也會使用在消息接收中,將接收到的消息轉換爲User對象。 


package com.tiandinet.jms.sample; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.ObjectMessage; 
import javax.jms.Session; 

import org.apache.activemq.command.ActiveMQObjectMessage; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.springframework.jms.support.converter.MessageConverter; 

import com.tiandinet.jms.sample.User; 

/** 
* Converte User message. 

* @author Yangtze 
*/ 
public class UserMessageConverter implements MessageConverter { 

    private static transient Log logger = LogFactory.getLog(UserMessageConverter.class); 

    /** 
     * {@inheritDoc} 
     * 
     * @see org.springframework.jms.support.converter.MessageConverter 
     * #fromMessage(javax.jms.Message) 
     */ 
    public Object fromMessage(Message message) throws JMSException { 
        if (logger.isDebugEnabled()) { 
            logger.debug("Receive JMS message: " + message); 
        } 

        if (message instanceof ObjectMessage) { 
            ObjectMessage oMsg = (ObjectMessage) message; 

            if (oMsg instanceof ActiveMQObjectMessage) { 
                ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) oMsg; 

                try { 
                    User user = (User) aMsg.getObject(); 

                    return user; 
                } catch (Exception e) { 
                    logger.error("Message:[" + message + "] is not a instance of User."); 
                    throw new JMSException("Message:[" + message + "] is not a instance of User."); 
                } 
            } else { 
                logger.error("Message:[" + message + "] is not " 
    + "a instance of ActiveMQObjectMessage[User]."); 
                throw new JMSException("Message:[" + message + "] is not " 
    + "a instance of ActiveMQObjectMessage[User]."); 
            } 
        } else { 
            logger.error("Message:[" + message + "] is not a instance of ObjectMessage."); 
            throw new JMSException("Message:[" + message + "] is not a instance of ObjectMessage."); 
        } 
    } 

    /** 
     * {@inheritDoc} 
     * 
     * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, 
     *      javax.jms.Session) 
     */ 
    public Message toMessage(Object obj, Session session) throws JMSException { 
        if (logger.isDebugEnabled()) { 
            logger.debug("Convert User object to JMS message: " + obj); 
        } 

        if (obj instanceof User) { 
            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); 
            msg.setObject((User) obj); 

            return msg; 
        } else { 
            logger.error("Object:[" + obj + "] is not a instance of User."); 
            throw new JMSException("Object:[" + obj + "] is not a instance of User."); 
        } 
    } 


  此程序實現了MessageConverter接口,並實現其中的fromMessage和toMessage方法,分別實現轉換接收到的消息爲User對象和轉換User對象到消息。 
咱們在程序中使用的是ActiveMQObjectMessage,它是ActiveMQ中對javax.jms.ObjectMessage的一個實現。 

  此時,咱們已經完成了JMS Connection Factory和用於發送JMS消息的JMS Template配置,接下來,應該編寫發送消息的Bean了,代碼以下: 

package com.tiandinet.jms.sample; 

import org.springframework.jms.core.JmsTemplate; 

import com.tiandinet.jms.sample.User; 

/** 
* Send user's login information mail via JMS. 

* @author Yangtze 
*/ 
public class UserMessageProducerImpl implements IUserMessageProducer { 

    private JmsTemplate jmsTemplate; 

    /** 
     * {@inheritDoc} 
     * 
     * @see com.tiandinet.jms.sample.IUserMessageProducer 
     * #sendUserLoginInformationMail(com.tiandinet.jms.sample.User) 
     */ 
    public void sendUserLoginInformationMail(User user) { 
        getJmsTemplate().convertAndSend(user); 
    } 

    /** 
     * Return the jmsTemplate. 
     * 
     * @return the jmsTemplate 
     */ 
    public final JmsTemplate getJmsTemplate() { 
        return jmsTemplate; 
    } 

    /** 
     * Set the jmsTemplate. 
     * 
     * @param jmsTemplate 
     *            the jmsTemplate to set 
     */ 
    public final void setJmsTemplate(JmsTemplate jmsTemplate) { 
        this.jmsTemplate = jmsTemplate; 
    } 



  代碼很簡單,sendUserLoginInformationMail方法是惟一咱們須要編寫的,調用JMSTemplate的convertAndSend方法,Spring會本身調用咱們以前配置的converter來轉換咱們發送的User對象。 
  將此Java在Spring中進行配置,而後在Controller中進行調用便可實現發送User對象到JMS。 

  到此爲止,咱們已經實現了消息的發送,如今咱們來實現消息的接收。 

  相對於發送,消息的接收的配置要相對簡短些,咱們使用MDP(Message Drive POJO)來實現消息的異步接收。咱們須要實現javax.jms.MessageListener接口的void onMessage(Message message)方法來接收消息。不過咱們可使用Spring中提供的MessageListenerAdapter來簡化接收消息的代碼。 
  咱們先寫處理消息的接口和實現類: 

package com.tiandinet.jms.sample; 

import javax.jms.JMSException; 
import javax.jms.ObjectMessage; 

import com.tiandinet.jms.sample.User; 

/** 
* JMS message handler. 

* Yangtze 
*/ 
public interface IMessageConsumer { 

    /** 
     * Handle the user message. 
     * 
     * @param user 
     *            User 
     * @throws JMSException 
     *             exception 
     */ 
    void handleMessage(User user) throws JMSException; 

package com.tiandinet.jms.sample; 

import javax.jms.JMSException; 

import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 

import com.tiandinet.jms.sample.User; 
import com.tiandinet.jms.sample.IMailService; 

/** 
* JMS message handler - for User Message. 

* Yangtze 
*/ 
public class UserMessageConsumerImpl implements IMessageConsumer { 

    private static transient Log logger = LogFactory.getLog(UserMessageConsumerImpl.class); 

    private IMailService mailService; 

    /** 
     * {@inheritDoc} 
     * 
     * @see com.tiandinet.jms.sample.IMessageConsumer 
* #handleMessage(com.tiandinet.jms.sample.User) 
     */ 
    public void handleMessage(User user) throws JMSException { 
        if (logger.isDebugEnabled()) { 
            logger.debug("Receive a User object from ActiveMQ: " + user.toString()); 
        } 

        mailService.sendUserLoginInforMail(user); 
    } 



  配置message listener 

     <bean id="messageListener" 
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> 
        <constructor -arg> 
            <bean class="com.tiandinet.jms.sample.UserMessageConsumerImpl"> 
                <property name="mailService" 
                    ref="mailService" /> 
            </bean> 
        </constructor> 
        <property name="defaultListenerMethod" 
            value="handleMessage" /> 
        <property name="messageConverter" 
            ref="userMessageConverter" /> 
    </bean> 

  其中的mailService便是咱們的郵件發送類,其sendUserLoginInforMail方法實現了郵件發送的功能。 
消息偵聽適配器defaultListenerMethod屬性指定Spring在收到消息後調用的方法,此處爲handleMessage,Spring會根據收到的消息User對象,調用handleMessage(User user)方法。 

  配置消息偵聽容器,並指定咱們定義的消息偵聽器。 

     <bean id="listenerContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        <property name="concurrentConsumers" 
            value="5" /> 
        <property name="connectionFactory" 
            ref="jmsFactory" /> 
        <property name="destination" 
            ref="queueDestination" /> 
        <property name="messageListener" 
            ref="messageListener" /> 
    </bean> 
相關文章
相關標籤/搜索