springboot集成activeMq

一,activeMq安裝java

1,下載activeMq,放置到電腦某個目錄,web

2,直接運行spring

/bin/activemq.bat腳本,便可啓動activeMqapache

3,訪問網址localhost:8161,便可查看activeMq網頁界面springboot

4,activeMq目錄中有一些例子,能夠參考;dom

二,springboot集成activeMQtcp

1,添加依賴ide

<!-- mq begin -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <!-- <version>5.7.0</version> -->
        </dependency>

        <!-- mq end -->

2,添加activeMQ配置信息spring-boot

spring.activemq.broker-url=tcp://localhost:61616   #activeMQ地址信息,注意訪問端口是61616
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.close-timeout=5000
spring.activemq.in-memory=false   
#spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
spring.activemq.send-timeout=3000
#activeMQ發送topic消息,須要此配置
#spring.jms.pub-sub-domain=true     
#信任全部的包
spring.activemq.packages.trust-all=true
#queque 及 topic配置信息
queque.police=queque.police     
queue.edu=queue.edu
topic.pbc=topic.pbc
topic.veh=topic.veh

3,配置類this

package org.spring.web.component;

import javax.jms.Queue;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;

//MQ configuration class
//主要配置 配置信息中的 spring.jms.pub-sub-domain 屬性值信息,能夠實現同時發送queue和topic信息
@Configuration public class MqConfig { /* @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616"); }*/ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); System.out.println("注入的connectionFactory>>>>>"+connectionFactory.getUserName()); return bean; }
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); bean.setPubSubDomain(false); return bean; } /* @Bean public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){ return new JmsMessagingTemplate(connectionFactory); }*/ }

4,注入mqBeans

package org.spring.web.mq;

import javax.jms.Queue;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;

/**
 *
 * 項目名稱:spring-web 類名稱:MqBans 類描述: 建立人:john 建立時間:2018年7月30日 上午7:30:18 修改人:john
 * 修改時間:2018年7月30日 上午7:30:18 修改備註:
 * 
 * @version
 *
 */
@Configuration
@EnableJms
public class MqBans {
    @Value("${queque.police}")
    private String policeQueue;
    @Value("${queue.edu}")
    private String eduQueue;
    @Value("${topic.pbc}")
    private String pbcTopic;
    @Value("${topic.veh}")
    private String vehTopic;

    @Bean
    public Queue policeQueue() {
        return new ActiveMQQueue(policeQueue);
    }

    @Bean
    public Queue eduQueue() {
        return new ActiveMQQueue(eduQueue);
    }

    @Bean
    public Topic pbcTopic() {
        return new ActiveMQTopic(pbcTopic);
    }

    @Bean
    public Topic vehTopic() {
        return new ActiveMQTopic(vehTopic);
    }

}

5,生產者

package org.spring.web.mq;
import javax.jms.Queue;
import javax.jms.Topic;

import org.spring.web.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer implements CommandLineRunner {
    
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue policeQueue;
    
    @Autowired
    private Topic pbcTopic;
    
    @Autowired
    private Topic vehTopic;
    
    @Override
    public void run(String... args) throws Exception {
        for(int i=0;i<1;i++){    
            send("Sample message"+i);
        }
        sendTopic("topic send");
    }

    public void send(String msg) {
        this.jmsMessagingTemplate.convertAndSend(this.policeQueue, msg);
        System.out.println("Message was sent to the policeQueue");

    }

    public void sendTopic(String msg){
        System.out.println("消費者發送topic消息");
        this.jmsMessagingTemplate.convertAndSend(this.pbcTopic,msg);
        System.out.println("Message was sent to the pbcTopic");
    }
    
    
}

6,消費者

package org.spring.web.mq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;

import org.spring.web.entity.User;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
        System.out.println("police消費者1接受到的消息");
        System.out.println(text);
        return "sample.queue接受的消息>>>>>" + text;
    }
    
    @JmsListener(destination = "${queque.police}",containerFactory="jmsListenerContainerQueue")
    public void receiveQueue2(String text){
       System.out.println("police消費者2接受到的消息");
       System.out.println(text);
       
    }
     
    @JmsListener(destination = "out.queue",containerFactory="jmsListenerContainerQueue")
    public void receiveOutQueue(String text) {
        System.out.println("out.Queue接受到的信息" + text);
    }
    
    @JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic")
    public void receiveTopic(String text){
        System.out.println("pbc1消費者接受的信息");
        System.out.println(text);
    }
    @JmsListener(destination = "${topic.pbc}",containerFactory="jmsListenerContainerTopic")
    public void receiveTopic2(String text){
        System.out.println("pbc2消費者接受的信息"+text);
    }
    
    @JmsListener(destination = "${topic.veh}",containerFactory="jmsListenerContainerTopic")
    public void receiveTopicUser(Message m){
        System.out.println("接受到的對象信息message>>>>"+m);
        User  user=new User();
         if(m instanceof  ObjectMessage){
             ObjectMessage objectMessage=(ObjectMessage) m;
             System.out.println("objectMessage>>>>>>>"+objectMessage);
             try {
                user=(User) objectMessage.getObject();
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
         }
        
        System.out.println("消費者接受的用戶對象信息"+user);
    }
    
}

 

 

總結:

 1,activeMq的端口號是61616;

 2,使用topic,須要配置 spring.jms.pub-sub-domain=true;

 3,queque若是沒有消費者,會將信息存儲到queue中;

 4,主方法啓動的時候,消費者的監聽已經生效;

 5,發送的消息爲對象的時候,須要將對象序列化;消費者接受對象信息的時候須要使用ObjectMessage進行轉化;

 6,使用JmsListener註解中的containerFactory屬性,能夠配置spring.jms.pub-sub屬性,實現同事接收queque和topic;

 7,queque爲點對點模式;tipic爲發佈訂閱模式;

相關文章
相關標籤/搜索