ActiveMQ+Spring工程建立詳解(附工程文件)

ActiveMQ是Apache所提供的一個開源的消息系統,徹底採用Java來實現,所以,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規範。JMS是一組Java應用程序接口,它提供消息的建立、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,相似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序可以與不一樣廠商的消息組件很好地進行通訊。html

JMS支持兩種消息發送和接收模型。一種稱爲P2P(Ponit to Point)模型,即採用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱爲可能,P2P模型在點對點的狀況下進行消息傳遞時採用。java

這裏寫圖片描述

另外一種稱爲Pub/Sub(Publish/Subscribe,即發佈-訂閱)模型,發佈-訂閱模型定義瞭如何向一個內容節點發布和訂閱消息,這個內容節點稱爲topic(主題)。主題能夠認爲是消息傳遞的中介,消息發佈這將消息發佈到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發佈者互相保持獨立,不須要進行接觸便可保證消息的傳遞,發佈-訂閱模型在消息的一對多廣播時採用。git

這裏寫圖片描述

ActiveMQ的安裝

下載最新的安裝包apache-activemq-5.13.2-bin.tar.gzgithub

下載以後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gzweb

ActiveMQ目錄內容有:spring

  • bin目錄包含ActiveMQ的啓動腳本sql

  • conf目錄包含ActiveMQ的全部配置文件數據庫

  • data目錄包含日誌文件和持久性消息數據apache

  • example: ActiveMQ的示例微信

  • lib: ActiveMQ運行所須要的lib

  • webapps: ActiveMQ的web控制檯和一些相關的demo

ActiveMQ的默認服務端口爲61616,這個能夠在conf/activemq.xml配置文件中修改:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

JMS的規範流程

  1. 得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。

  2. 利用factory構造JMS connection

  3. 啓動connection

  4. 經過connection建立JMS session.

  5. 指定JMS destination.

  6. 建立JMS producer或者建立JMS message並提供destination.

  7. 建立JMS consumer或註冊JMS message listener.

  8. 發送和接收JMS message.

  9. 關閉全部JMS資源,包括connection, session, producer, consumer等。

案例(整合Spring)

pom.xml

<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.11</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.11.1</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
  <version>4.1.4.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-test</artifactId>
  <version>4.1.4.RELEASE</version>
</dependency>

Queue類型消息

spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">
  <!-- 配置JMS鏈接工廠 -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <!-- ActiveMQ服務的地址和端口-->
    <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
  </bean>
  <!-- 定義消息隊列(Queue) -->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 設置消息隊列的名字 -->
    <constructor-arg>
      <value>testSpringQueue</value>
    </constructor-arg>
  </bean>
<!-- 定義消息發佈(Pub/Sub) -->
<!--     <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> -->
<!--         <constructor-arg> -->
<!--             <value>topic</value> -->
<!--         </constructor-arg> -->
<!--     </bean> -->
  <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="queueDestination" />
    <property name="receiveTimeout" value="10000" />
  </bean>
</beans>

推送代碼

package com.mq.spring.queue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import javax.jms.*;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})
public class QueueSender {
  @Resource
  private JmsTemplate jmsTemplate;
  @Test
  public void send(){
    sendMqMessage(null,"spring activemq queue type message !");
  }
  /**
   * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination
   * @param destination
   * @param message
   */
  public void sendMqMessage(Destination destination, final String message){
    if(null == destination){
      destination = jmsTemplate.getDefaultDestination();
    }
    jmsTemplate.send(destination, new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(message);
      }
    });
    System.out.println("spring send message...");
  }
  public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }
}

消費代碼

package com.mq.spring.queue;
import org.junit.Test;
import javax.jms.*;
import org.junit.runner.RunWith;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import javax.jms.Message;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-mq-queue.xml"})
public class QueueReceiver {
  @Resource
  private JmsTemplate jmsTemplate;
  @Test
  public void receiveMqMessage(){
    Destination destination = jmsTemplate.getDefaultDestination();
    receive(destination);
  }
  /**
   * 接受消息
   */
  public void receive(Destination destination) {
    TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
    try {
      System.out.println("從隊列" + destination.toString() + "收到了消息:\t" + tm.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
  public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }
}

說明:上面的生產者和消費者使用同一套配置文件,使用獨立的程序去接收消息,spring jms也提供了消息監聽處理.接下來咱們換成監聽式消費

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">
  <!-- 配置JMS鏈接工廠 -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
  </bean>
  <!-- 定義消息隊列(Queue) -->
  <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 設置消息隊列的名字 -->
    <constructor-arg>
      <value>testSpringQueue</value>
    </constructor-arg>
  </bean>
  <!-- 配置消息隊列監聽者(Queue) -->
  <bean id="consumerMessageListener" class="com.mq.spring.queue.ConsumerMessageListener" />
  <!-- 消息監聽容器(Queue),配置鏈接工廠,監聽的隊列是testSpringQueue,監聽器是上面定義的監聽器 -->
  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
  </bean>
</beans>

監聽器代碼

public class ConsumerMessageListener implements MessageListener{
  @Override
  public void onMessage(Message message) {
    TextMessage tm = (TextMessage) message;
    try {
      System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

這樣咱們的消息消費就能夠在監聽器中處理消費了.生產的代碼不變,修改發送者的消息體內容,執行生產程序

Topic類型消息

在使用 Spring JMS的時候,主題( Topic)和隊列消息的主要差別體如今JmsTemplate中 "pubSubDomain"是否設置爲 True。若是爲 True,則是 Topic;若是是false或者默認,則是 queue。

<property name="pubSubDomain" value="true" />

topic類型消費配置文件說明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">
  <!-- 配置JMS鏈接工廠 -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
  </bean>
  <!-- 定義消息Destination -->
  <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 設置消息隊列的名字 -->
    <constructor-arg>
      <value>testSpringTopic</value>
    </constructor-arg>
  </bean>
  <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發送、接收消息。 -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="destination" />
    <property name="receiveTimeout" value="10000" />
  </bean>
  <!-- 配置消息消費監聽者 -->
  <bean id="consumerMessageListener" class="com.mq.spring.topic.ConsumerMessageListener" />
  <!-- 消息監聽容器,配置鏈接工廠,監聽器是上面定義的監聽器 -->
  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="destination" />
    <!--主題(Topic)和隊列消息的主要差別體如今JmsTemplate中"pubSubDomain"是否設置爲True。若是爲True,則是Topic;若是是false或者默認,則是queue-->
    <property name="pubSubDomain" value="true" />
    <property name="messageListener" ref="consumerMessageListener" />
  </bean>
</beans>

生產者代碼

package com.mq.spring.topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:spring-mq-topic.xml"})
public class TopicSender {
  @Resource
  private JmsTemplate jmsTemplate;
  @Test
  public void send(){
    sendMqMessage(null,"spring activemq topic type message[with listener] !");
  }
  /**
   * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination
   * @param destination
   * @param message
   */
  public void sendMqMessage(Destination destination, final String message){
    if(null == destination){
      destination = jmsTemplate.getDefaultDestination();
    }
    jmsTemplate.send(destination, new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(message);
      }
    });
    System.out.println("spring send message...");
  }
  public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }
}

監聽器代碼

public class ConsumerMessageListener implements MessageListener{
  @Override
  public void onMessage(Message message) {
    TextMessage tm = (TextMessage) message;
    try {
      System.out.println("ConsumerMessageListener收到了文本消息:\t" + tm.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

參考資料

更多內容能夠關注微信公衆號,或者訪問AppZone網站

http://7xp64w.com1.z0.glb.clouddn.com/qrcode_for_gh_3e33976a25c9_258.jpg

相關文章
相關標籤/搜索