ActiveMQ介紹及Spring整合實例

前言html

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。主要解決進場通信,應用耦合,流量削鋒。實現高性能,高可用,可伸縮和最終一致性架構。異步消息是一個應用程序向另一個應用程序間接發送消息的一種方式,這種方式無需等待對方的相應。好比用戶註冊以後發送短信,秒殺程序的流量削鋒,聊天室的消息通信,複雜耗時的程序的而無須當即返回的結果的程序之間的通信。java

消息模型

ActiveMQ遵循了JMS規範,有兩種消息模型,點對點和發佈訂閱模型。spring

點對點:在點對點模型中,每一條消息都會有一個發送者和一個接受者。當消息代理獲得消息時,它將消息放入一個隊列中。當接收者請求隊列中的下一條消息時,消息會從隊列中取出,並投遞給接收者。由於消息投遞後會從隊列刪除,這樣就能夠保證消息只能投遞給一個接收者。如圖:
image
儘管消息隊列中每一條消息只被投遞給一個接收者,可是並不意味着只能使用一個接收者從隊列中獲取消息。一般可使用幾個接收者來處理隊列中的消息,有點像銀行辦理業務,本身的業務就是一個消息,選取一個空閒的窗口辦理業務。apache

發佈-訂閱模型:消息會發生給一個主題。與隊列相似,多個接收者均可以監聽一個主題。可是,與隊列不一樣的是,消息再也不是隻投遞給一個接收者,而是主題的全部訂閱者都會接收到此消息的副本。如圖:
image
有點像雜誌的發行商和雜誌訂閱者的關係。session

ActiveMQ下載

下載地址http://activemq.apache.org/download.html
注意:Java Runtime Environment (JRE) JRE 1.7 (1.6 for version <=5.10.0)
5.10以上的版本須要java1.7以上的環境。
安裝後在http://localhost:8161 (本機)可查看控制檯
下面來代碼整合。架構

maven配置

<dependencies>
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-messaging</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>${spring.version}</version>
    </dependency>


    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.14.4</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
        <scope>test</scope>
    </dependency>
      <!--spring單元測試依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>


  </dependencies>

實體類

package com.study.entity;

import java.io.Serializable;

public class TestMessage implements Serializable{

    private static final long serialVersionUID = -2132582539599141027L;

    private Integer id;
    private String msg;
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public String toString() {
        return "TestMessage [id=" + id + ", msg=" + msg + "]";
    }


}

ActiveMQ配置

ActiveMQ有本身的spring配置文件命名空間。異步

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.3.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.14.4.xsd">

        <context:component-scan base-package="com.study"></context:component-scan>
        <!-- 聲明鏈接工廠 -->
        <amq:connectionFactory id="targetConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" trustAllPackages="true" />

        <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>

        <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>

        <!-- 消息監聽器容器 -->
        <jms:listener-container connection-factory="connectionFactory">
            <jms:listener destination="study.queue.simple" ref="consumerService" method="receiveMessage"/>
            <jms:listener destination="study.queue.simple" ref="consumerService" method="receiveMessage2"/>
            <jms:listener destination="study.queue.object" ref="consumerService" method="receiveObject" />
        </jms:listener-container>

        <jms:listener-container connection-factory="connectionFactory" destination-type="topic">
            <jms:listener destination="study.topic.simple" ref="consumerService" method="receiveTopicMessage"/>
            <jms:listener destination="study.topic.simple" ref="consumerService" method="receiveTopicMessage2"/>
        </jms:listener-container>

</beans>

Spring提供了JmsTemplate來簡化JMS的開發。因此須要聲明JmsTemplate的bean,已經所依賴的connectionFactory。
上面brokerURL需填寫activeMQ安裝的地址,由於我是本地安裝,因此這麼寫。消息監聽器容器中聲明瞭接收者其中destination聲明一個queue或者topic名稱。ref聲明接收者的類 ,method聲明方法。若是該類繼承了MessageListener則能夠不用寫method方法,會默認調用onMessage()方法來接收消息。maven

Producer代碼

package com.study.producer;

import com.study.entity.TestMessage;


public interface ProducerService {
    /**
     * 發送queue文本消息
     * @param destination
     * @param message
     */
    public void sendMessage(String destinationName,String message);
    /**
     * 發送queue對象消息
     * @param destinationName
     * @param testMessage
     */
    public void sendMessage(String destinationName,TestMessage testMessage);
    /**
     * 發送topic文本消息
     * @param destinationName
     * @param message
     */
    public void sendTopicMessage(String destinationName, String message);

}
package com.study.producer.impl;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.study.entity.TestMessage;
import com.study.producer.ProducerService;

@Component
public class ProducerServiceImpl implements ProducerService{

    @Resource
    private JmsTemplate jmsTemplate;


    public void sendMessage(String destinationName,  final String message) {
        System.out.println("生產方發送字符串消息:"+message);
        jmsTemplate.send(destinationName, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });

    }

    public void sendMessage(String destinationName, final TestMessage testMessage) {
        System.out.println("生產方發送對象消息:"+testMessage);

        /*jmsTemplate.send(destinationName, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(testMessage);
            }
        });*/
        jmsTemplate.convertAndSend(destinationName, testMessage);
    }

    public void sendTopicMessage(String destinationName, String message){
        System.out.println("生產方發送文本消息:"+message);
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.convertAndSend(destinationName, message);
    }

}

上面代碼中使用了jmsTemplate.send()和jmsTemplate.convertAndSend()方法。JMS在發送消息的時候,傳送的對象是Message對象,MessageCreator提付了自定義消息與Message的轉換。而convertAndSend()會內置消息轉換器,因此不用傳入MessageCreator做爲參數。默認的消息轉換器是SimpleMessageConverter,能夠本身實現或者使用spring提供的其餘轉換器來配置消息的轉換。tcp

Consumer代碼

package com.study.consumer;

import com.study.entity.TestMessage;

public interface ConsumerService {
    /**
     * 消費方 接受字符串
     * @param message
     */
    public void receiveMessage(String message);
    /**
     * 消費方 接受對象
     * @param message
     */
    public void receiveObject(TestMessage testMessage);

    /**
     * 消費方 topic 接受字符串
     * @param message
     */
    public void receiveTopicMessage(String message);

    /**
     * 消費方 topic 接受字符串
     * @param message
     */
    public void receiveTopicMessage2(String message);

}
package com.study.consumer.impl;

import org.springframework.stereotype.Component;

import com.study.consumer.ConsumerService;
import com.study.entity.TestMessage;
@Component("consumerService")
public class ConsumerServiceImpl implements ConsumerService{


    public void receiveMessage(String message) {
        System.out.println("消費方接收消息1:"+message);
    }
    public void receiveMessage2(String message) {
        System.out.println("消費方接收消息2:"+message);
    }

    public void receiveObject(TestMessage testMessage) {
        System.out.println("消費方接收對象:"+ testMessage);
    }


    public void receiveTopicMessage(String message) {
        System.out.println("topic消費方接收消息1:"+message);
    }
    public void receiveTopicMessage2(String message) {
        System.out.println("topic消費方接收消息2:"+message);

    }

}

將接受的類型放入接收者方法參數裏面就可直接獲取到消息,而且完成轉換。ide

測試類

package com.test;

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@ContextConfiguration(locations = { "classpath:activemq.xml" })  
@RunWith(SpringJUnit4ClassRunner.class)  
public class SpringTestCase extends AbstractJUnit4SpringContextTests {

}
package com.test;

import javax.annotation.Resource;

import org.junit.Test;

import com.study.entity.TestMessage;
import com.study.producer.ProducerService;

public class ActivemqTest extends SpringTestCase{

    @Resource
    private ProducerService producerService;

    @Test
    public void simpleSend(){
        producerService.sendMessage("study.queue.simple", "簡單文本消息測試");
    }

    @Test
    public void ObjectSend(){
        TestMessage testMessage = new TestMessage();
        testMessage.setId(1);
        testMessage.setMsg("對象消息。。。");
        producerService.sendMessage("study.queue.object", testMessage);
    }

    @Test
    public void simpleTopicSend(){
        producerService.sendTopicMessage("study.topic.simple", "topic簡單文本消息測試");
    }
}

測試simpleSend()方法。發送一條文本消息,發送的目的是study.queue.simple。從咱們的avtivemq.xml配置文件中能夠看到

<jms:listener destination="study.queue.simple" ref="consumerService" method="receiveMessage"/>
<jms:listener destination="study.queue.simple" ref="consumerService" method="receiveMessage2"/>

study.queue.simple有兩個接受者,而jmsTemplate發送默認是點對點模型,因此是receiveMessage與receiveMessage2隨機獲取到消息。屢次執行simpleSend()查看結果:
image
image
好吧,我試了好幾回,剛開始一直是2

測試ObjectSend() 方法發送了咱們的自定義對象。發送目的是study.queue.object執行結果如圖:
image
能夠看到接收方直接獲取到對象。

測試simpleTopicSend(),發送目的是study.topic.simple,配置文件中接收方配置

<jms:listener-container connection-factory="connectionFactory" destination-type="topic">
    <jms:listener destination="study.topic.simple" ref="consumerService" method="receiveTopicMessage"/>
    <jms:listener destination="study.topic.simple" ref="consumerService" method="receiveTopicMessage2"/>
</jms:listener-container>

並且在發送的時候聲明瞭發佈訂閱消息模型

jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(destinationName, message);

接收者有兩個,而且是發佈訂閱模型,因此這兩個接收者都應該接收到消息,執行結果如圖:
image

相關文章
相關標籤/搜索