RabbitMQ基於Spring AMQP的Java Config 簡單配置實例

      本實例嘗試針對 Direct exchange、Fanout exchange、Topic exchange三種路由形式進行了消費者和生產者的集中實現,若有不對之處煩請讀者指出。php

一、項目基於Maven,在pom.xml中引入相關依賴。

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>${rabbitmq.version}</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>${spring-rabbit.version}</version>
</dependency>

二、RabbitMQ的Java Config 部分

package com.my.rabbitmq.appconfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.my.rabbitmq.util.FastJsonMessageConverter;
import com.my.rabbitmq.consumer.RabbitMQConsumer;
import com.my.rabbitmq.producer.RabbitMQProducer;

/**
 * 
 * @description rabbitMQ config
 *
 * @author yuanzi
 * @time 2017年8月23日 下午2:46:01
 */
@Configuration
public class RabbitmqConfig {

	private static Logger log = LogManager.getLogger(RabbitmqConfig.class);

	//讀取rabbitmq配置文件
	private static InputStream configFile = RabbitmqConfig.class.getClassLoader()
			.getResourceAsStream("rabbitmq.properties");
	private static Properties rabbitmqProperties = new Properties();

	static {
		try {
			rabbitmqProperties.load(configFile);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	//配置生產者ConnectionFactory,配置基礎鏈接信息
	@Bean(name = "ProducerConnectionFactory")
	public ConnectionFactory ProducerConnectionFactory() throws IOException {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
				rabbitmqProperties.getProperty("rabbitmq.host"));
		connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port")));
		connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username"));
		connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password"));
		return connectionFactory;
	}

	//配置消費者ConnectionFactory,配置基礎鏈接信息
	@Bean(name = "ConsumerConnectionFactory")
	public ConnectionFactory ConsumerConnectionFactory() throws IOException {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
				rabbitmqProperties.getProperty("rabbitmq.host"));
		connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port")));
		connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username"));
		connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password"));
		return connectionFactory;
	}

	//配置RabbitAdmin,在配置了多個ConnectionFactory的狀況下,須要配置RabbitAdmin,不然沒法自動在rabbitmq服務器中註冊交換機隊列等。
	@Bean
	public RabbitAdmin rabbitAdmin() throws IOException {
		return new RabbitAdmin(ConsumerConnectionFactory());
	}

	//配置生產者Template
	@Bean(name = "ProducerRabbitTemplate")
	public RabbitTemplate ProducerRabbitTemplate() throws IOException {
		RabbitTemplate producerRabbitTemplate = new RabbitTemplate();
		producerRabbitTemplate.setConnectionFactory(ProducerConnectionFactory());
		producerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange"));
		//配置生產者信息轉換器,封裝發送信息的格式
		producerRabbitTemplate.setMessageConverter(ProducerMessageConverter());
		return producerRabbitTemplate;
	}

	//配置消費者Template
	@Bean(name = "ConsumerRabbitTemplate")
	public RabbitTemplate ConsumerRabbitTemplate() throws IOException {
		RabbitTemplate consumerRabbitTemplate = new RabbitTemplate(ConsumerConnectionFactory());
		consumerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange"));
		return consumerRabbitTemplate;
	}

	//配置生產者交換機類型
	@Bean(name = "ProducerExchange")
	public AbstractExchange ProducerExchange() throws IOException {
		String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType");
		if (exchangeType.contains("F")) {
			FanoutExchange fanoutExchange = new FanoutExchange(
					rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false);
			log.info("ProducerExchange-Fanout 生產者:廣播路由");
			return fanoutExchange;
		} else if (exchangeType.contains("T")) {
			TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange"),
					true, false);
			log.info("ProducerExchange-Topic 生產者:多路廣播路由");
			return topicExchange;
		} else {
			DirectExchange directExchange = new DirectExchange(
					rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false);
			log.info("ProducerExchange-Direct 生產者:直接路由");
			return directExchange;
		}
	}

	//配置消費者交換機類型
	@Bean(name = "ConsumerExchange")
	public AbstractExchange ConsumerExchange() throws IOException {
		String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType");
		if (exchangeType.contains("F")) {
			FanoutExchange fanoutExchange = new FanoutExchange(
					rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false);
			log.info("ProducerExchange-Fanout 消費者:廣播路由");
			return fanoutExchange;
		} else if (exchangeType.contains("T")) {
			TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange"),
					true, false);
			log.info("ProducerExchange-Topic 消費者:多路廣播路由");
			return topicExchange;
		} else {
			DirectExchange directExchange = new DirectExchange(
					rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false);
			log.info("ProducerExchange-Direct 消費者:直接路由");
			return directExchange;
		}
	}

	//配置參數:生產者隊列名稱,重啓rabbitmq服務時隊列仍是否存在,只被一個connection使用而且在connection關閉時queue是否被刪除,當最後一個consumer取消訂閱時queue是否被刪除
	@Bean(name = "ProducerQueue")
	public Queue ProducerQueue() throws IOException {
		Queue producerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), true, false, false);
		return producerQueue;
	}

	//配置參數:消費者隊列名稱,重啓rabbitmq服務時隊列仍是否存在,只被一個connection使用而且在connection關閉時queue是否被刪除,當最後一個consumer取消訂閱時queue是否被刪除
	@Bean(name = "ConsumerQueue")
	public Queue ConsumerQueue() throws IOException {
		Queue consumerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), true, false, false);
		return consumerQueue;
	}

	//配置生產者消息轉換器
	@Bean(name = "jsonMessageConverter")
	public AbstractMessageConverter ProducerMessageConverter() {
		FastJsonMessageConverter jsonMessageConverter = new FastJsonMessageConverter();
		return jsonMessageConverter;
	}

	//配置生產者隊列與交換機之間的綁定關係和routingKey
	@Bean(name = "ProducerBinding")
	public Binding ProducerBinding() {
		Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), DestinationType.QUEUE,
				rabbitmqProperties.getProperty("rabbitmq.producerExchange"),
				rabbitmqProperties.getProperty("rabbitmq.producerRoutingKey"), null);
		return binding;
	}

	//配置消費者隊列與交換機之間的綁定關係和routingKey
	@Bean(name = "ConsumerBinding")
	public Binding ConsumerBinding() {
		Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), DestinationType.QUEUE,
				rabbitmqProperties.getProperty("rabbitmq.consumerExchange"),
				rabbitmqProperties.getProperty("rabbitmq.consumerRoutingKey"), null);
		return binding;
	}

	//實例化生產者代碼,配置生產者要投遞信息的RoutingKey
	@Bean(name = "RabbitMQProducer")
	public RabbitMQProducer RabbitMQProducer() {
		RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(
				rabbitmqProperties.getProperty("rabbitmq.producerSendRoutingKey"));
		return rabbitMQProducer;
	}

	//實例化消費者代碼
	@Bean(name = "RabbitMQConsumer")
	public RabbitMQConsumer RabbitMQConsumer() {
		RabbitMQConsumer rabbitMQConsumer = new RabbitMQConsumer();
		return rabbitMQConsumer;
	}

	//配置監聽模式,當有消息到達時會通知監聽在對應的隊列上的監聽對象
	@Bean(name = "ListenerContainer")
	public AbstractMessageListenerContainer MessageListenerContainer() throws IOException {
		SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
		simpleMessageListenerContainer.setConnectionFactory(ConsumerConnectionFactory());
		simpleMessageListenerContainer.setQueueNames(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"));
		simpleMessageListenerContainer.setMessageListener(RabbitMQConsumer());
		return simpleMessageListenerContainer;
	}

}

三、rabbitmq.properties 文件

#單一交換機配置參數
#rabbitmq server地址
rabbitmq.host = 127.0.0.1
#rabbitmq 端口
rabbitmq.port = 5672
#生產者隊列名稱
rabbitmq.producerQueue = d_queue_producer
#生產者RoutingKey
rabbitmq.producerRoutingKey = d_key_producer
#消費者隊列名稱
rabbitmq.consumerQueue = d_queue_consumer
#消費者RoutingKey
rabbitmq.consumerRoutingKey = d_key_consumer
#交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型
rabbitmq.exchangeType = D
#生產者須要綁定的交換機
rabbitmq.producerExchange = d_exchange
#消費者須要綁定的交換機
rabbitmq.consumerExchange = d_exchange
#rabbitmq用戶名稱
rabbitmq.username = guest
#rabbitmq用戶密碼
rabbitmq.password = guest
#rabbitmq生產者發送消息時的RoutingKey
rabbitmq.producerSendRoutingKey = d_key_consumer
---------------------------------------------分割線----------------------------------------------
#廣播交換機配置參數,廣播交換機下routingKey失效
#rabbitmq server地址
rabbitmq.host = 127.0.0.1
#rabbitmq 端口
rabbitmq.port = 5672
#生產者隊列名稱
rabbitmq.producerQueue = f_queue_producer
#生產者RoutingKey
rabbitmq.producerRoutingKey = f_key_producer
#消費者隊列名稱
rabbitmq.consumerQueue = f_queue_consumer
#消費者RoutingKey
rabbitmq.consumerRoutingKey = f_key_consumer
#交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型
rabbitmq.exchangeType = F
#生產者須要綁定的交換機
rabbitmq.producerExchange = f_exchange
#消費者須要綁定的交換機
rabbitmq.consumerExchange = f_exchange
#rabbitmq用戶名稱
rabbitmq.username = guest
#rabbitmq用戶密碼
rabbitmq.password = guest
#rabbitmq生產者發送消息時的RoutingKey
rabbitmq.producerSendRoutingKey = f_test

---------------------------------------------分割線----------------------------------------------
#多路廣播交換機配置參數
#配置rootingkey時,「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。例如「log.*」能與「log.warn」匹配,沒法
#與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配
#rabbitmq server地址
rabbitmq.host = 127.0.0.1
#rabbitmq 端口
rabbitmq.port = 5672
#生產者隊列名稱
rabbitmq.producerQueue = t_queue_producer
#生產者RoutingKey
rabbitmq.producerRoutingKey = t_key_producer
#消費者隊列名稱
rabbitmq.consumerQueue = t_queue_consumer
#消費者RoutingKey
rabbitmq.consumerRoutingKey = #.test
#交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型
rabbitmq.exchangeType = T
#生產者須要綁定的交換機
rabbitmq.producerExchange = t_exchange2
#消費者須要綁定的交換機
rabbitmq.consumerExchange = t_exchange2
#rabbitmq用戶名稱
rabbitmq.username = guest
#rabbitmq用戶密碼
rabbitmq.password = guest
#rabbitmq生產者發送消息時的RoutingKey
rabbitmq.producerSendRoutingKey = t.test

四、生產者代碼

package com.my.rabbitmq.producer;

import javax.annotation.Resource;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @description RabbitMQ消息發送
 *
 * @author yuanzi
 * @time 2017年8月1日 下午3:16:08
 */
public class RabbitMQProducer {

	private static Logger log = LogManager.getLogger(RabbitMQProducer.class);

	@Resource(name = "ProducerRabbitTemplate")
	private RabbitTemplate rabbitTemplate;

	private String routingKey;

	public RabbitMQProducer(String routingKey) {
		this.routingKey = routingKey;
	}

	public void sendDataToQueue(Object message) {

		rabbitTemplate.convertAndSend(this.routingKey, message);

		log.info("rabbitMQ producer send message:" + message);
	}
}

五、消費者代碼

package com.my.rabbitmq.consumer;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
 * 
 * @description RabbitMQ消息接收
 *
 * @author yuanzi
 * @time 2017年8月23日 下午3:38:03
 */
public class RabbitMQConsumer implements MessageListener {

	private static Logger log = LogManager.getLogger(RabbitMQConsumer.class);

	public void onMessage(Message message) {
		String data = new String(message.getBody());
		log.info("rabbitMQ consumer receive message:" + data);
	}
}

六、消息轉換器的配置 

package com.my.rabbitmq.util;

import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import net.sf.json.JSONObject;

/**
 * 
 * @description 對發送的信息進行json格式的封裝
 *
 * @author yuanzi
 * @time 2017年8月1日 下午3:13:30
 */
public class FastJsonMessageConverter extends AbstractMessageConverter {

    public static final String DEFAULT_CHARSET = "UTF-8";

    @SuppressWarnings("static-access")
	@Override
    protected Message createMessage(Object message, MessageProperties messageProperties) {
        byte[] bytes = null;

        JSONObject json = new JSONObject();
        json.put("data", message);
        
        try {
            bytes = json.toString().getBytes(this.DEFAULT_CHARSET);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("Failed to convert Message content:" + e);
        }

        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.DEFAULT_CHARSET);

        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return null;
    }

}
相關文章
相關標籤/搜索