深刻淺出 JMS(七) - ActiveMQ 與 Spring 整合

深刻淺出 JMS(七) - ActiveMQ 與 Spring 整合

1、與spring整合實現ptp的同步接收消息

(1)config.properties

## ActiveMQ Config
activemq.brokerURL=tcp\://127.0.0.1\:61616
activemq.userName=admin
activemq.password=password
activemq.pool.maxConnection=10
activemq.queue=mailqueue
activemq.topic=mailtopic

(2)pom.xml

<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.3</version>
</dependency>

(3)spring-jms.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- ActiveMQConnectionFactory就是JMS中負責建立到ActiveMQ鏈接的工廠類 -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" >
        <property name="brokerURL" value="${activemq.brokerURL}"/>
        <property name="userName" value="${activemq.userName}"/>
        <property name="password" value="${activemq.password}"/>
    </bean>

    <!-- 建立鏈接池 -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="maxConnections" value="${activemq.pool.maxConnection}"/>
    </bean>
    <!-- Spring 爲咱們提供了多個 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>

    <!-- Spring 提供的 JMS 工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個 connectionFactory 對應的是咱們定義的 Spring 提供的那個 ConnectionFactory 對象 -->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="defaultDestination" ref="queueDestination"/>
    </bean>

    <!--定義消息隊列(Queue)-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 設置消息隊列的名字 -->
        <constructor-arg index="0" value="${activemq.queue}"/>
    </bean>
</beans>

ConnectionFactory 是用於產生到 JMS 服務器的連接的,Spring 爲咱們提供了多個 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory。java

  • SingleConnectionFactory :對於創建 JMS 服務器連接的請求會一直返回同一個連接,而且會忽略 Connection 的 close 方法調用。
  • CachingConnectionFactory :繼承了 SingleConnectionFactory,因此它擁有 SingleConnectionFactory 的全部功能,同時它還新增了緩存功能,它能夠緩存 Session、MessageProducer 和 MessageConsumer。這裏咱們使用 CachingConnectionFactory 來做爲示例。

(4)生產者

import com.alibaba.fastjson.JSONObject;
import com.github.binarylei.jms.spring.core.Mail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
@Service("mqProducer")
public class MQProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMail(Mail mail) {
        jmsTemplate.send((session) -> {
            return session.createTextMessage(JSONObject.toJSONString(mail));
        });
    }
}

(5)消費者

import com.alibaba.fastjson.JSONObject;
import com.github.binarylei.jms.spring.core.Mail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
@Service("mqCustumer")
public class MQCustumer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination destination;

    public void sendMail() {
        String json = (String) jmsTemplate.receiveAndConvert(destination);
        Mail mail = (Mail) JSONObject.parseObject(json, Mail.class);
        System.out.println(mail);
    }
}

2、PTP 的異步調用

咱們在 spring 中直接配置異步接收消息的監聽器,這樣就至關於在 spring 中配置了消費者,在接受消息的時候就沒必要要啓動消費者了。git

spring-jms.xml:github

<!--消費者,監聽-->
<bean id="messageListener" class="com.github.binarylei.jms.spring.demo1.MessageListener"/>
<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="queueDestination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

生產者往指定目的地 Destination 發送消息後,接下來就是消費者對指定目的地的消息進行消費了。那麼消費者是如何知道有生產者發送消息到指定目的地 Destination了呢?這是經過 Spring 爲咱們封裝的消息監聽容器 MessageListenerContainer 實現的,它負責接收信息,並把接收到的信息分發給真正的 MessageListener 進行處理。spring

每一個消費者對應每一個目的地都須要有對應的 MessageListenerContainer。對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個 JMS 服務器,這是經過在配置 MessageConnectionFactory 的時候往裏面注入一個 ConnectionFactory 來實現的。因此咱們在配置一個 MessageListenerContainer 的時候有三個屬性必須指定,一個是表示從哪裏監聽的 ConnectionFactory;一個是表示監聽什麼的 Destination;一個是接收到消息之後進行消息處理的 MessageListener。apache

Spring 一共爲咱們提供了兩種類型的 MessageListenerContainer:json

  • SimpleMessageListenerContainer :SimpleMessageListenerContainer 會在一開始的時候就建立一個會話 session 和消費者 Consumer,而且會使用標準的 JMS MessageConsumer.setMessageListener() 方法註冊監聽器讓 JMS 提供者調用監聽器的回調函數。它不會動態的適應運行時須要和參與外部的事務管理。兼容性方面,它很是接近於獨立的 JMS 規範,但通常不兼容 Java EE 的 JMS 限制。
  • DefaultMessageListenerContainer :在大多數狀況下咱們仍是使用的 DefaultMessageListenerContainer,跟SimpleMessageListenerContainer 相比,DefaultMessageListenerContainer 會動態的適應運行時須要,而且可以參與外部的事務管理。它很好的平衡了對 JMS 提供者要求低、先進功能如事務參與和兼容 Java EE 環境。

消息監聽器:緩存

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

/**
 * @author: leigang
 * @version: 2018-04-03
 */
public class MessageListener implements javax.jms.MessageListener  {

    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("消息:" + msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

3、發佈訂閱 同步接收

在 spring-jms.xml 中將 ActiveMQTopic 生成 Topic,其它沒什麼變化:服務器

<!--這個是隊列目的地,發佈訂閱-->  
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
    <constructor-arg index="0" value="spring-Topic"/> 
</bean>

天天用心記錄一點點。內容也許不重要,但習慣很重要!session

相關文章
相關標籤/搜索