JMS學習之路(一):整合activeMQ到SpringMVC

 

JMS的全稱是Java Message Service,即Java消息服務。它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話咱們能夠在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息後去完成對應的業務邏輯。對於消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。java

 

整合activeMQ到springmvc項目也很簡單web

只須要增長以下的maven依賴便可,初學者請直接添加all這個jar,不然若是jar包衝突會影響信心的spring

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.2</version>
        </dependency>
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-jms</artifactId>  
            <version>${spring.version}</version>  
        </dependency>

 

開發步驟:數據庫

① 添加好maven依賴後,請到 http://activemq.apache.org/ 下載最新版的activeMQ, 我這裏下載的是5.13.2,下載解壓後執行bin中的activemq.bat進行啓動apache

② 理解JMS工做原理json

  1.首先配置連接信息緩存

         和操做數據庫同樣,咱們要根據數據庫地址和連接信息,來配置datasource同樣,activemq類同,底層首先須要由activemq廠商提供的驅動,根據具體地址,封裝一個ConnectionFactory, 這是最基本的配置, 同時因爲是由Spring進行統一管理, 因此須要將ConnectionFactory注入到sping。jms中的ConnectionFactory。這樣連接就配置好了。服務器

<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

     同理 spring.jms 提供了 SingleConnectionFactory和CachingConnectionFactory,這裏咱們使用的SingleConnectionFactory,根據意思也知道得八九不離十了,session

SingleConnectionFactory就是每次請求都返回同一個連接,從啓動開始就一直打開,不會關閉。 而CachingConnectionFactory就和數據庫的連接池同樣,能夠緩存不少信息,在用戶不少的狀況下,同等狀況下會增長效率。併發

     同時activeMQ也提供了PooledConnectionFactory,這樣也能夠緩存不少信息,減小資源的使用,配置以下

<!-- 使用pool進行連接 -->
    <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!--     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> -->
<!--         <property name="brokerURL" value="tcp://localhost:61616"/> -->
<!--     </bean> -->
    
<!--     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> -->
<!--         <property name="connectionFactory" ref="targetConnectionFactory"/> -->
<!--         <property name="maxConnections" value="10"/> -->
<!--     </bean> -->
    
<!--     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> -->
<!--         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> -->
<!--     </bean> -->

      2。配置好ConnectionFactory以後咱們就須要配置生產者。生產者負責產生消息併發送到JMS服務器,這一般對應的是咱們的一個業務邏輯服務實現類。可是咱們的服務實現類是怎麼進行消息的發送的呢?這一般是利用Spring爲咱們提供的JmsTemplate類來實現的,因此配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對於消息發送者而言,它在發送消息的時候要知道本身該往哪裏發,爲此,咱們在定義JmsTemplate的時候須要往裏面注入一個Spring提供的ConnectionFactory對象。

<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

      在發送消息的時候,除了知道activemq的地址外,還須要讓發送者知道具體發給誰?

      activemq提供了兩種模式,一個是點對點的 ,一個是發送給多個訂閱者這種訂閱模式

    <!--這個是隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    
    <!--這個是主題目的地,一對多的-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>

 

這樣發送者相關配置就已經完成了。

③ 發送者發送消息的服務類: 很簡單,調用上面配置好的jmsTemplate。send方法就行了

package xiaochangwei.zicp.net.jms;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
 
@Service
public class ProducerServiceImpl implements ProducerService {
 
    @Resource
    private JmsTemplate jmsTemplate;
    
    public void sendMessage(Destination destination, final String message) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    } 
}

 

至此發送者相關的都已經所有完成了,下面配置接受者

④  接收者配置,接收者和發送者的連接信息都是同樣的,否則收不到信息是不, 除了連接信息,要知道何時有消息發送過來,接收者這邊就要實現一個消息監聽器,    

     當監聽到消息後,進行相應的業務處理,每一個目的地都有一個MessageListenerContainer,配置MessageListenerContainer須要連接信息,目的地信息,和接受者的消息監聽器

<!-- 消息監聽器 -->
    <bean id="consumerMessageListener"
        class="xiaochangwei.zicp.net.jms.ConsumerMessageListener" />
    <!-- 消息監聽容器 -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>

 

 

這裏咱們進行一個業務模擬

A系統經過activemq發送命令串給B系統, B系統收到命令後,根據約定的規則進行解析,並進行業務處理

 

A系統調用ProducerServiceImpl中的 sendMessage 進行消息發送,爲減小傳遞量,均採用json發送,代碼以下:

package xiaochangwei.zicp.net.web.controller;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import javax.jms.Destination;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import xiaochangwei.zicp.net.common.tools.JsonUtil;
import xiaochangwei.zicp.net.entity.TestEntity;
import xiaochangwei.zicp.net.jms.ProducerService;

@Controller
@RequestMapping("jms")
public class JmsTestController {

    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;

    @RequestMapping("test")
    public @ResponseBody String testSend() throws Exception {
        
        //系統業務須要, 須要更新用戶表中信息,根據id更新name
        List<TestEntity> list = new LinkedList<TestEntity>();
        
        TestEntity en = new TestEntity();
        en.setId(100);
        en.setName("name1");
        list.add(en);
        
        TestEntity en2 = new TestEntity();
        en2.setId(1002);
        en2.setName("name2");
        list.add(en2);
        
        Map<String,Object> mapEntity = new HashMap<String, Object>();
        mapEntity.put("user", list);
        
        Map<String,Object> map = new HashMap<String, Object>();
        map.put("update", mapEntity);
        
        System.out.println("發送方發送內容爲:" + JsonUtil.object2String(map));
        //發送更新數據請求 
        producerService.sendMessage(destination, JsonUtil.object2String(map));
        
        return "jms exute complete";
    }
}

 

接收者根據約定,知道收到的消息都是json字符串,因此直接按text處理

package xiaochangwei.zicp.net.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        TextMessage textMsg = (TextMessage) message;
        try {
            System.out.println("接收者受到消息:" + textMsg.getText());
            System.out.println("開始進行解析並調用service執行....");
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

 

這裏沒有具體的去解析並執行具體操做,只是打印出來而已,只確認發送方發送的內容和接受方接收到的內容一致便可。

能夠看到發送方和接收方內容一直

解釋下這段json想表達的意思: 更新(update)用戶信息(user),把id=100的用戶 name更新爲name1 ........

 

整合就講到這裏,後續會講到消息監聽器,轉換器等進階知識,感興趣的朋友能夠持續關注

相關文章
相關標籤/搜索