一步一步Spring整合JMS

1.1 JMS簡介

    JMS的全稱是Java Message Service,即Java消息服務。它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話咱們能夠在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息後去完成對應的業務邏輯。對於消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。html

1.2 Spring整合JMS

    對JMS作了一個簡要介紹以後,接下來就講一下Spring整合JMS的具體過程。JMS只是一個標準,真正在使用它的時候咱們須要有它的具體實現,這裏咱們就使用Apache的activeMQ來做爲它的實現。所使用的依賴利用Maven來進行管理,具體依賴以下:java

<dependencies>  
    <dependency>  
        <groupId>junit</groupId>  
        <artifactId>junit</artifactId>  
        <version>4.10</version>  
        <scope>test</scope>  
    </dependency>  
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-context</artifactId>  
        <version>${spring-version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-jms</artifactId>  
        <version>${spring-version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-test</artifactId>  
        <version>${spring-version}</version>  
    </dependency>  
    <dependency>  
        <groupId>javax.annotation</groupId>  
        <artifactId>jsr250-api</artifactId>  
        <version>1.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.activemq</groupId>  
        <artifactId>activemq-core</artifactId>  
        <version>5.7.0</version>  
    </dependency>  
</dependencies>

1.2.1 activeMQ準備

       既然是使用的apache的activeMQ做爲JMS的實現,那麼首先咱們應該到apache官網上下載activeMQ(http://activemq.apache.org/download.html),進行解壓後運行其bin目錄下面的activemq.bat文件啓動activeMQ。spring

1.2.2 配置ConnectionFactory

    ConnectionFactory是用於產生到JMS服務器的連接的,Spring爲咱們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對於創建JMS服務器連接的請求會一直返回同一個連接,而且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,因此它擁有SingleConnectionFactory的全部功能,同時它還新增了緩存功能,它能夠緩存Session、MessageProducer和MessageConsumer。這裏咱們使用SingleConnectionFactory來做爲示例。apache

<bean id="connectionFactory" 
    class="org.springframework.jms.connection.SingleConnectionFactory"/>

    這樣就定義好產生JMS服務器連接的ConnectionFactory了嗎?答案是非也。Spring提供的ConnectionFactory只是Spring用於管理ConnectionFactory的,真正產生到JMS服務器連接的ConnectionFactory還得是由JMS服務廠商提供,而且須要把它注入到Spring提供的ConnectionFactory中。咱們這裏使用的是ActiveMQ實現的JMS,因此在咱們這裏真正的能夠產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。因此定義一個ConnectionFactory的完整代碼應該以下所示:api

<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->  
<bean id="targetConnectionFactory" 
    class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
</bean>  
  
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" 
    class="org.springframework.jms.connection.SingleConnectionFactory">  
    <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->  
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
</bean>

    ActiveMQ爲咱們提供了一個PooledConnectionFactory,經過往裏面注入一個ActiveMQConnectionFactory能夠用來將Connection、Session和MessageProducer池化,這樣能夠大大的減小咱們的資源消耗。當使用PooledConnectionFactory時,咱們在定義一個ConnectionFactory時應該是以下定義:緩存

<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->  
<bean id="targetConnectionFactory" 
    class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
</bean>  
  
<bean id="pooledConnectionFactory" 
    class="org.apache.activemq.pool.PooledConnectionFactory">  
    <property name="connectionFactory" ref="targetConnectionFactory"/>  
    <property name="maxConnections" value="10"/>  
</bean>  
  
<bean id="connectionFactory" 
    class="org.springframework.jms.connection.SingleConnectionFactory">  
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  
</bean>

1.2.3 配置生產者

    配置好ConnectionFactory以後咱們就須要配置生產者。生產者負責產生消息併發送到JMS服務器,這一般對應的是咱們的一個業務邏輯服務實現類。可是咱們的服務實現類是怎麼進行消息的發送的呢?這一般是利用Spring爲咱們提供的JmsTemplate類來實現的,因此配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對於消息發送者而言,它在發送消息的時候要知道本身該往哪裏發,爲此,咱們在定義JmsTemplate的時候須要往裏面注入一個Spring提供的ConnectionFactory對象。服務器

<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->  
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->  
    <property name="connectionFactory" ref="connectionFactory"/>  
</bean>

    在真正利用JmsTemplate進行消息發送的時候,咱們須要知道消息發送的目的地,即destination。在Jms中有一個用來表示目的地的Destination接口,它裏面沒有任何方法定義,只是用來作一個標識而已。當咱們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination能夠經過在定義jmsTemplate bean對象時經過屬性defaultDestination或defaultDestinationName來進行注入,defaultDestinationName對應的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另外一個就是支持訂閱/發佈模式的ActiveMQTopic。在定義這兩種類型的Destination時咱們均可以經過一個name屬性來進行構造,如:session

<!--這個是隊列目的地,點對點的-->  
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
    <constructor-arg>  
        <value>queue</value>  
    </constructor-arg>  
</bean>  
<!--這個是主題目的地,一對多的-->  
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
    <constructor-arg value="topic"/>  
</bean>

假設咱們定義了一個ProducerService,裏面有一個向Destination發送純文本消息的方法sendMessage,那麼咱們的代碼就大概是這個樣子:併發

package com.tiantian.springintejms.service.impl;  
   
import javax.annotation.Resource;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.Session;  
   
import org.springframework.jms.core.JmsTemplate;  
import org.springframework.jms.core.MessageCreator;  
import org.springframework.stereotype.Component;  
   
import com.tiantian.springintejms.service.ProducerService;  
   
@Component  
public class ProducerServiceImpl implements ProducerService {  
   
    private JmsTemplate jmsTemplate;  
      
    public void sendMessage(Destination destination, final String message) {  
        System.out.println("---------------生產者發送消息-----------------");  
        System.out.println("---------------生產者發了一個消息:" + message);  
        jmsTemplate.send(destination, new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                return session.createTextMessage(message);  
            }  
        });  
    }   
  
    public JmsTemplate getJmsTemplate() {  
        returnjmsTemplate;  
    }   
  
    @Resource  
    public void setJmsTemplate(JmsTemplate jmsTemplate) {  
        this.jmsTemplate = jmsTemplate;  
    }  
}

咱們能夠看到在sendMessage方法體裏面咱們是經過jmsTemplate來發送消息到對應的Destination的。到此,咱們生成一個簡單的文本消息並把它發送到指定目的地Destination的生產者就配置好了。app

1.2.4 配置消費者

    生產者往指定目的地Destination發送消息後,接下來就是消費者對指定目的地的消息進行消費了。那麼消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是經過Spring爲咱們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每一個消費者對應每一個目的地都須要有對應的MessageListenerContainer。對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個JMS服務器,這是經過在配置MessageConnectionFactory的時候往裏面注入一個ConnectionFactory來實現的。因此咱們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪裏監聽的ConnectionFactory;一個是表示監聽什麼的Destination;一個是接收到消息之後進行消息處理的MessageListener。Spring一共爲咱們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。

    SimpleMessageListenerContainer會在一開始的時候就建立一個會話session和消費者Consumer,而且會使用標準的JMS MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時須要和參與外部的事務管理。兼容性方面,它很是接近於獨立的JMS規範,但通常不兼容Java EE的JMS限制。

大多數狀況下咱們仍是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時須要,而且可以參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。

定義處理消息的MessageListener

    要定義處理消息的MessageListener咱們只須要實現JMS規範中的MessageListener接口就能夠了。MessageListener接口中只有一個方法onMessage方法,當接收到消息的時候會自動調用該方法。

package com.tiantian.springintejms.listener;  
   
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
   
public class ConsumerMessageListener implements MessageListener {  
   
    public void onMessage(Message message) {  
        //這裏咱們知道生產者發送的就是一個純文本消息,因此這裏能夠直接進行強制轉換  
        TextMessage textMsg = (TextMessage) message;  
        System.out.println("接收到一個純文本消息。");  
        try {  
            System.out.println("消息內容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
   
}

    有了MessageListener以後咱們就能夠在Spring的配置文件中配置一個消息監聽容器了。

<!--這個是隊列目的地-->  
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
    <constructor-arg>  
        <value>queue</value>  
    </constructor-arg>  
</bean>  
<!-- 消息監聽器 -->  
<bean id="consumerMessageListener" 
    class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>      
  
<!-- 消息監聽容器 -->  
<bean id="jmsContainer" 
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    <property name="connectionFactory" ref="connectionFactory" />  
    <property name="destination" ref="queueDestination" />  
    <property name="messageListener" ref="consumerMessageListener" />  
</bean>

咱們能夠看到咱們定義了一個名叫queue的ActiveMQQueue目的地,咱們的監聽器就是監聽了發送到這個目的地的消息。

       至此咱們的生成者和消費者都配置完成了,這也就意味着咱們的整合已經完成了。這個時候完整的Spring的配置文件應該是這樣的:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    http://www.springframework.org/schema/context  
    http://www.springframework.org/schema/context/spring-context-3.0.xsd  
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
    http://www.springframework.org/schema/jms 
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">  
   
    <context:component-scan base-package="com.tiantian" />  
   
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
    </bean>  
      
    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->  
    <bean id="targetConnectionFactory" 
        class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://localhost:61616"/>  
    </bean>  
      
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" 
        class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>  
      
    <!--這個是隊列目的地-->  
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg>  
            <value>queue</value>  
        </constructor-arg>  
    </bean>  
    <!-- 消息監聽器 -->  
    <bean id="consumerMessageListener" 
        class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>  
    <!-- 消息監聽容器 -->  
    <bean id="jmsContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="queueDestination" />  
        <property name="messageListener" ref="consumerMessageListener" />  
    </bean>  
</beans>

    接着咱們來測試一下,看看咱們的整合是否真的成功了,測試代碼以下:

package com.tiantian.springintejms.test;  
   
import javax.jms.Destination;  
   
import org.junit.Test;  
import org.junit.runner.RunWith;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.test.context.ContextConfiguration;  
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
import com.tiantian.springintejms.service.ProducerService;  
   
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration("/applicationContext.xml")  
public class ProducerConsumerTest {  
   
    @Autowired  
    private ProducerService producerService;  
    @Autowired  
    @Qualifier("queueDestination")  
    private Destination destination;  
      
    @Test  
    public void testSend() {  
        for (int i=0; i<2; i++) {  
            producerService.sendMessage(destination, "你好,生產者!這是消息:" + (i+1));  
        }  
    }  
      
}

    在上面的測試代碼中咱們利用生產者發送了兩個消息,正常來講,消費者應該能夠接收到這兩個消息。運行測試代碼後控制檯輸出以下:

    看,控制檯已經進行了正確的輸出,這說明咱們的整合確實是已經成功了。


    拓展:關於JMS的其餘部分知識,請至原博客的後續博客查看。

相關文章
相關標籤/搜索