ActiveMQ結合Spring收發消息

直接使用 ActiveMQ 的方式須要重複寫不少代碼,且不利於管理,Spring 提供了一種更加簡便的方式————Spring JMS ,經過它能夠更加方便地使用 ActiveMQ。spring

Maven 依賴sql

結合Spring使用ActiveMQ的依賴以下:apache

<!-- Spring JMS -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>
<!-- ActiiveMQ -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.7.0</version>
</dependency>

ActiveMQ.xml 文件服務器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">
    <!-- ActiveMQ 鏈接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://localhost:61616"
                           userName="admin"
                           password="admin" />
    <!-- 提升效率,配置JMS鏈接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>
    <!-- 定義消息隊列(Queue)-->
   <!-- <bean id="QueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!– 設置消息隊列的名字 –>
        <constructor-arg value="Queue-zy"/>
    </bean>-->
    <!--定義主題(Topic)-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="Topic-zy"/>
    </bean>
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,利用它發送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="topicDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默認是false -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!-- 配置消息隊列監聽者(Queue or Topic) -->
    <bean id="messageListener" class="com.service.TopicMessageListener" />
    <!-- 顯示注入消息監聽容器,配置鏈接工廠,監聽的目標是QueueDestination,監聽器是上面定義的監聽器 -->
    <bean id="ListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="messageListener" />
    </bean>
</beans>

配置 connectionFactorysession

connectionFactory 是 Spring 用於建立到 JMS 服務器連接的,Spring 提供了多種 connectionFactory。架構

<!-- ActiveMQ 鏈接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
                       brokerURL="tcp://localhost:61616"
                       userName="admin"
                       password="admin" />
<!-- 提升效率,配置JMS鏈接工廠 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
    <property name="sessionCacheSize" value="100" />
</bean>

配置Queue併發

<bean id="QueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
       <!-- 設置消息隊列的名字 -->
       <constructor-arg value="Queue-zy"/>
</bean>

配置Topicapp

<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="Topic-zy"/>
</bean>

配置JMS消息模板——jmsTemplate異步

<!-- 配置JMS模板,Spring提供的JMS工具類,利用它發送、接收消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="QueueDestination" />
    <!--<property name="defaultDestination" ref="topicDestination" />-->
    <property name="receiveTimeout" value="10000" />
    <property name="pubSubDomain" value="false" /><!-- true是topic,false是queue,默認是false -->
</bean>

最後,在 applicationContext.xml 中引入配置好的 ActiveMQ.xmltcp

<import resource="ActiveMQ.xml" />

以上就是配置文件相關的,下面是具體的業務代碼。

消息生產者服務

@Service
public class ProducerService {
    @Autowired
    private JmsTemplate jmsTemplate;
    //使用默認目的地
    public void sendMessageDefault(final String msg){
        Destination destination = jmsTemplate.getDefaultDestination();
        System.out.println("向隊列: " + destination + " 成功發送一條消息");
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
    //可指定目的地
    public void sendMessage(Destination destination,final String msg){
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
}

消息消費者服務

@Service
public class ConsumerService {
    @Autowired
    private JmsTemplate jmsTemplate;
    //從指定的Destination接收消息
    public TextMessage recive(Destination destination){
        TextMessage message = (TextMessage) jmsTemplate.receive(destination);
        try {
            System.out.println("從隊列" + destination.toString() + "收到了消息" + message.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return message;
    }
    //從默認的Destination接收消息
    public void reciveDefault(){
        Destination destination = jmsTemplate.getDefaultDestination();
        jmsTemplate.setReceiveTimeout(5000);
        while(true){
            TextMessage message = (TextMessage) jmsTemplate.receive(destination);
            try {
                //這裏仍是同一個消費者
                System.out.println("消費者  從目的地 " + destination.toString() + " 收到了消息" + message.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

生產者

直接在 main 方法中獲取 ApplicationContext 運行,便於測試。

@Component
public class MsgProducer {
    @Autowired
    private ProducerService producerService;
    public void send(){
        System.out.println("生產者開始發送消息:");
        for(int i = 1; i < 11; i++){
            String msg = "生產者發出的消息";
            producerService.sendMessageDefault(msg + "-----" + i);
        }
    }
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
        MsgProducer msgProducer = context.getBean(MsgProducer.class);
        msgProducer.send();
    }
}

消費者

@Component
public class MsgConsumer {
    @Autowired
    private ConsumerService consumerService;
    public void recive(){
        System.out.println("消費者 1 開始接收消息:");
        consumerService.reciveDefault();
    }
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/applicationContext.xml");
        MsgConsumer msgConsumer = context.getBean(MsgConsumer.class);
        msgConsumer.recive();
    }
}

接下來就能夠啓動項目。一樣是使用兩種方式測試。

第一種方式————點對點(Queue)

同步的方式

先啓動生產者發送10條消息, 再啓動消費者,能夠看到控制檯顯示成功收到10條消息。

 

 

 

 

 

 

 

 

 

 

異步監聽的方式

經過監聽器便可實現異步接收消息的效果,而不是像上面使用 while() 輪詢同步的方式。

項目中通常都是使用異步監聽的方式,在 A 服務中發送了一條消息,B 服務能夠利用消息監聽器監聽,當收到消息後,進行相應的操做。

消息監聽器(3種)

經過繼承 JMS 中的 MessageListener 接口,實現 onMessage() 方法,就能夠自定義監聽器。這是最基本的監聽器。(可根據業務實現自定義的功能)

另外spring也給咱們提供了其餘類型的消息監聽器,好比 SessionAwareMessageListener,它的做用不只能夠接收消息,還能夠發送一條消息通知對方表示本身收到了消息。(還有一種是 MessageListenerAdapter)

一個簡單的自定義監聽器以下:收到消息後打印消息

public class QueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        //若是有消息
        TextMessage tmessage = (TextMessage) message;
        try {
            if(tmessage != null){
                System.out.println("監聽器監聽消息:"+tmessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

在 ActiveMQ.xml 中引入消息監聽器:

<!-- 配置消息隊列監聽者(Queue) -->
    <bean id="queueMessageListener" class="com.service.QueueMessageListener" />
 <!-- 顯示注入消息監聽容器,配置鏈接工廠,監聽的目標是QueueDestination 或 topicDestination,監聽器是上面自定義的監聽器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="QueueDestination" />
        <!--<property name="destination" ref="topicDestination" />-->
        <property name="messageListener" ref="queueMessageListener" />
    </bean>

能夠看到,當使用消息監聽器以後,每發送一條消息立馬就會被監聽到:

 

 

 

 

 

第二種方式————發佈/訂閱(Topic)

同步的方式

相似點對點中同步的方式,只是每一個消費者都能收到生產者發出的所有消息,再也不贅述。

異步監聽的方式

啓動兩個監聽器(兩個消費者),對消息進行異步監聽。看是否各自能收到生產者發送的消息。

<!-- 配置兩個監聽器 -->
<bean id="messageListener" class="com.service.TopicMessageListener" />
<bean id="messageListener2" class="com.service.TopicMessageListener2" />

 

 

 

 

 

能夠看到,每一個監聽器各自都收到了生產者發送的10條消息。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索