Java消息隊列-Spring整合ActiveMq

一、概述


 

  首先和你們一塊兒回顧一下Java 消息服務,在我以前的博客《Java消息隊列-JMS概述》中,我爲你們分析了:html

  1. 消息服務:一箇中間件,用於解決兩個活多個程序之間的耦合,底層由Java 實現。
  2. 優點:異步、可靠
  3. 消息模型:點對點,發佈/訂閱
  4. JMS中的對象

  而後在另外一篇博客《Java消息隊列-ActiveMq實戰》中,和你們一塊兒從0到1的開啓了一個ActiveMq 的項目,在項目開發的過程當中,咱們對ActiveMq有了必定的瞭解:  java

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
  3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性
  4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上
  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持經過JDBC和journal提供高速的消息持久化
  7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點
  8. 支持Ajax
  9. 支持與Axis的整合
  10. 能夠很容易得調用內嵌JMS provider,進行測試

  在接下來的這篇博客中,我會和你們一塊兒來整合Spring 和ActiveMq,這篇博文,咱們基於Spring+JMS+ActiveMQ+Tomcat,實現了Point-To-Point的異步隊列消息和PUB/SUB(發佈/訂閱)模型,簡單實例,不包含任何業務。web

 

二、目錄結構

 


  2.1 項目目錄

      IDE選擇了IDEA(建議你們使用),爲了不下載jar 的各類麻煩,底層使用maven搭建了一個項目,整合了Spring 和ActiveMqspring

   

   

    2.2 pom.xml

  View Code

    由於這裏pom.xml 文件有點長,就不展開了。apache

    咱們能夠看到其實依賴也就幾個,一、Spring 核心依賴 二、ActiveMq core和pool(這裏若是同窗們選擇導入jar,能夠直接導入咱們上一篇博客中說道的那個activemq-all 這個jar包)三、java servlet 相關依賴後端

    這裏面咱們選擇的ActiveMq pool 的依賴版本會和以後的dtd 有關係,須要版本對應,因此同窗們等下配置activemq 文件的時候,須要注意dtd 版本選擇spring-mvc

 

    2.3 web.xml

    web.xml 也大同小異,指定Spring 配置文件,springMvc 命名,編碼格式tomcat

複製代碼
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">

  <display-name>Archetype Created Web Application</display-name>

  <!-- 加載spring的配置文件,例如hibernate、jms等集成 -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>
      classpath:applicationContext*.xml;
    </param-value>
  </context-param>

  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>

  <servlet>
    <servlet-name>springMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>springMVC</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>

  <!-- 處理編碼格式 -->
  <filter>
    <filter-name>characterEncodingFilter</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncodingFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

</web-app>
複製代碼

 

 

    2.4 SpringMvc 和applicationContext.xml

      這裏面的SpringMVC沒什麼特別,有須要的同窗能夠參考一下:服務器

  View Code

      applicationContext.xml 主要使用來裝載Bean,咱們項目中並無什麼特別的Java Bean,所以只用來指出包掃描路徑:session

  View Code

 

   

    2.5 applicationContext-ActiveMQ.xml

複製代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>

    <context:component-scan base-package="com.Jayce" />
    <mvc:annotation-driven />

    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://192.168.148.128:61616"
                           userName="admin"
                           password="admin" />

    <!-- 配置JMS鏈接工長 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定義消息隊列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 設置消息隊列的名字 -->
        <constructor-arg>
            <value>Jaycekon</value>
        </constructor-arg>
    </bean>

    <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默認是false,此處顯示寫出false -->
        <property name="pubSubDomain" value="false" />
    </bean>


    <!-- 配置消息隊列監聽者(Queue) -->
    <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

    <!-- 顯示注入消息監聽容器(Queue),配置鏈接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>

</beans>
複製代碼

 

       這裏和你們講解一下這個配置文件,若是你們可以從上述配置文件中看懂,能夠跳過。同窗們也能夠在ActiveMQ官網中的查看。

       一、ActiveMq 中的DTD,咱們在聲明相關配置以前,咱們須要先導入ActiveMq 中的DTD,否則Spring 並不理解咱們的標籤是什麼意思。

         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

        咱們在pom.xml 文件中有配置了activemq 的版本依賴咱們這裏的版本,須要和依賴的版本同樣,否則是找不到相關的dtd

       二、amq:connectionFactory:很直白的一個配置項,用於配置咱們連接工廠的地址和用戶名密碼,這裏須要注意的是選擇tcp鏈接而不是http鏈接

       三、jmsTemplate:比較重要的一個配置,這裏指定了鏈接工廠,默認消息發送目的地,還有鏈接時長,發佈消息的方式

 

三、項目結構


  3.1 ProducerService

複製代碼
package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ProducerService {

    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination,final String msg){
        System.out.println(Thread.currentThread().getName()+" 向隊列"+destination.toString()+"發送消息---------------------->"+msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }

    public void sendMessage(final String msg){
        String destination = jmsTemplate.getDefaultDestinationName();
        System.out.println(Thread.currentThread().getName()+" 向隊列"+destination+"發送消息---------------------->"+msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
}
複製代碼

 

     將消息生產者作成一個服務,當咱們須要發送消息的時候,只須要調用ProducerService實例中的sendMessage 方法就能夠向默認目的發送一個消息。

    這裏提供了兩個發送方式,一個是發送到默認的目的地,一個是根據目的地發送消息。

    有興趣的同窗能夠和我上一篇文章《ActiveMq實戰》中ActiveMq 發送消息的方式對比一下,能夠發現一些不一樣。

 

 

   3.2 ConsumerService

複製代碼
package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ConsumerService {
    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public TextMessage receive(Destination destination){
        TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
        try{
            System.out.println("從隊列" + destination.toString() + "收到了消息:\t"
                    + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return textMessage;
    }
}
複製代碼

 

 

     由於咱們項目中並無什麼業務,因此的話對消息的處理也就是打印輸出。咱們只須要調用jmsTemplate中的 receive 方法,就能夠從裏面獲取到一條消息。

     再和咱們上一篇博客對比一下,上一篇博客中,咱們接受到信息以後須要手動確認事務,這樣ActiveMQ中才會肯定這條消息已經被正確讀取了。而整合了Spring以後,事務將由Spring 來管理。

 

   3.3 MessageController

複製代碼
package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
    private Logger logger = LoggerFactory.getLogger(MessageController.class);
    @Resource(name = "demoQueueDestination")
    private Destination destination;

    //隊列消息生產者
    @Resource(name = "producerService")
    private ProducerService producer;

    //隊列消息消費者
    @Resource(name = "consumerService")
    private ConsumerService consumer;

    @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
    @ResponseBody
    public void send(String msg) {
        logger.info(Thread.currentThread().getName()+"------------send to jms Start");
        producer.sendMessage(msg);
        logger.info(Thread.currentThread().getName()+"------------send to jms End");
    }

    @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
    @ResponseBody
    public Object receive(){
        logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
        TextMessage tm = consumer.receive(destination);
        logger.info(Thread.currentThread().getName()+"------------receive from jms End");
        return tm;
    }

}
複製代碼

    控制層裏面須要注入咱們的生產者和消費者(實際開發中,生產者和消費者確定不會在同一個項目中的,否則就消息服務這個東西就沒有意義了)。

    如今服務層和控制層都好了,接下來咱們就進行一個簡單的測試

 

四、項目測試


  4.1 啓動ActiveMq

      先肯定你的ActiveMQ服務已經開啓。

    

 

 

  4.2 啓動項目

    項目使用了Tomcat 插件,避免了本地再下載Tomcat的麻煩,有須要的同窗可使用一下。

複製代碼
<plugins>
      <plugin>
        <groupId>org.apache.tomcat.maven</groupId>
        <artifactId>tomcat7-maven-plugin</artifactId>
        <configuration>
          <port>8080</port>
          <path>/</path>
        </configuration>
      </plugin>
</plugins>
複製代碼

 

 

 

  4.3 發送消息

  這裏用了Chrome 的一個插件PostMan 有興趣的同窗能夠了解一下,在Chrome 拓展程序中能夠找到,避免了後端的同窗去弄頁面!

    

    咱們發送了一個post 請求以後,看一下服務器的效果:

    咱們能夠看到,已經向隊列發送了一條消息。咱們看一下ActiveMq如今的狀態:

    咱們能夠看到,一條消息已經成功發送到了ActiveMq中。

 

 

  4.4 接收消息

    使用get請求訪問服務器後臺:

  

 

     服務的輸出:

 

     ActiveMq服務器狀態:

    咱們能夠看到,消費者已經消費了一條信息,而且沒有斷開與ActiveMq之間的連接。

  

  4.5 監聽器

    在實際項目中,咱們不多會本身手動去獲取消息,若是須要手動去獲取消息,那就沒有必要使用到ActiveMq了,能夠用一個Redis 就足夠了。

    不能手動去獲取消息,那麼咱們就能夠選擇使用一個監聽器來監聽是否有消息到達,這樣子能夠很快的完成對消息的處理。

   4.5.1 applicationContext-ActiveMQ.xml 配置

      在上面的配置文件中,咱們已經默認的添加了這段監聽器的配置文件,若是同窗們不想使用這個監聽器,能夠直接註釋掉。

複製代碼
    <!-- 配置消息隊列監聽者(Queue) -->
    <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

    <!-- 顯示注入消息監聽容器(Queue),配置鏈接工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>
複製代碼

 

 

   4.5.2 MessageListener

      咱們須要建立一個類實現MessageListener 接口:

複製代碼
package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("QueueMessageListener監聽到了文本消息:\t"
                    + tm.getText());
            //do something ...
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

 

   實現接口的onMessage 方法,咱們將須要的業務操做在裏面解決,這樣子,就完成了咱們生產者-中間件-消費者,這樣一個解耦的操做了。 

 

   4.5.3 測試

    和上面同樣,使用postMan 發送post請求,咱們能夠看到控制檯裏面,消息立刻就能打印出來:

   

    再看看ActiveMQ服務器的狀態:

  咱們能夠看到,使用監聽器的效果,和手動接收消息的效果是同樣的。

  這樣子一整個項目下來,咱們已經成功的整合了Spring和ActiveMQ。

 

  4.6 壓力測試

    這裏其實也算不上什麼壓力測試,在配置pom.xml文件的時候,你們有看到一個 commons-httpclient 的依賴,接下來咱們使用httpClient 不停的想服務器發送消息,看一下服務器解決消息的速度如何:

複製代碼
package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

    @Test
    public void test() {
        HttpClient httpClient = new HttpClient();
        new Thread(new Sender(httpClient)).start();

    }

}

class Sender implements Runnable {
    public static AtomicInteger count = new AtomicInteger(0);
    HttpClient httpClient;

    public Sender(HttpClient client) {
        httpClient = client;
    }

    public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
                PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
                post.addParameter("msg", "Hello world!");
                httpClient.executeMethod(post);
                System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
複製代碼

 

     這裏面用了HttpClient 來向服務器發送Post 請求,而後計數輸出,有興趣的同窗能夠本身測試一下,能夠多開幾個線程,這裏只開了一個線程。

相關文章
相關標籤/搜索