Spring boot 集成MQ

import lombok.extern.java.Log;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author 周志偉
 * @projectname 項目名稱: ${project_name}
 * @classname: RabbitConfig
 * @description:
 * @date 2018/6/22:10:00
 */
@Configuration
@Log
public class RabbitConfig {
    @Qualifier("firstConnectionFactory")
    @Autowired
    public ConnectionFactory connectionFactory;

    @Autowired
    public MQProperties mqProperties;


    @Bean(name="firstRabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(){
        log.info("########################初始化rabitTemplate################################");
        RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory);
        return firstRabbitTemplate;
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }


    @Bean
    public TopicExchange defaultExchange() {
        return new TopicExchange(mqProperties.getCar_exchange());
    }
package com.car.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 周志偉
 * @projectname 項目名稱: ${project_name}
 * @classname: ConnectionFactoryConfig
 * @description:初始化數據源
 * @date 2018/6/22:11:30
 */
@Configuration
public class ConnectionFactoryConfig {
    @Bean(name="firstConnectionFactory")
    public ConnectionFactory firstConnectionFactory(@Value("${spring.rabbitmq.car_host}") String host,
                                                    @Value("${spring.rabbitmq.car_port}") int port,
                                                    @Value("${spring.rabbitmq.car_username}") String username,
                                                    @Value("${spring.rabbitmq.car_password}") String password,
                                                    @Value("${spring.rabbitmq.car_vhost}") String virtualhost
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        return connectionFactory;
    }
}
package com.car.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @author 周志偉
 * @projectname 項目名稱: ${project_name}
 * @classname: RabbitMQ_Queue
 * @description:建立隊列
 * @date 2018/6/22:13:09
 */
@Configuration
@Component
public class RabbitMQQueue extends RabbitConfig{
    Logger logger = LoggerFactory.getLogger(RabbitMQQueue.class);

    public RabbitMQQueue(){
        logger.info("############### RabbitMQ Queue 建立 ###############");
    }



    @Bean
    public Queue CreationCarQueue() {
        logger.info("建立隊列:{}", mqProperties.getCar_queuename());
        return new Queue(mqProperties.getCar_queuename());
    }



    @Bean
    public Binding binding() {
        logger.info("綁定隊列,exchange:{},routingKey:{},queueName:{}", mqProperties.getCar_exchange(), mqProperties.getCar_routingkey(), mqProperties.getCar_queuename());
        return BindingBuilder.bind(CreationCarQueue()).to(defaultExchange()).with(mqProperties.getCar_routingkey());
    }
}
package com.car.modules.MqMsg.mq;


import com.carloan.feign.info.InserttheorderServiceFeign;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;


/**
 * @author 周志偉
 * @projectname 項目名稱: ${project_name}
 * @classname: ReqCarMqMsg
 * @description:配置監聽
 * @date 2018/5/28:10:50
 */

@Component
public class ReqCarMqMsg{

    private Logger logger = LoggerFactory.getLogger(ReqCarMqMsg.class);

    

    @RabbitListener(queues = "${spring.rabbitmq.car_queuename}", containerFactory = "rabbitListenerContainerFactory")
    public void process(@Payload byte[] bytes) throws UnsupportedEncodingException {
        String result = null;
        try {
            result = new String(bytes, "UTF8");
            logger.info("監聽到消息==========="+  result);
        } catch (UnsupportedEncodingException e) {
            logger.error("監聽車貸門店Mq: insuranceTrial UnsupportedEncodingException error :{}",e);
        } catch (Exception e){
            logger.error("監聽車貸門店Mq: insuranceTrial Exception error :{}",e);
        }
    }
}
package com.car.modules.MqMsg.mq;

import com.car.config.MQProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @author 周志偉
 * @projectname 項目名稱: ${project_name}
 * @classname: PushMq
 * @description:推送消息
 * @date 2018/6/25:9:52
 */
@Component
public class PushMq {
    Logger logger = LoggerFactory.getLogger(PushMq.class);

    @Qualifier("firstRabbitTemplate")
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    public MQProperties mqProperties;

    public void pushmessage(String message){
        try{
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(mqProperties.getCar_exchange(), mqProperties.getCar_put_routingkey(), message,correlationId);
            logger.info("推送消息:{}", message);
        }catch (Exception e){
            logger.error("推送異常:{}",e);
        }

    }
}
相關文章
相關標籤/搜索