本實例嘗試針對 Direct exchange、Fanout exchange、Topic exchange三種路由形式進行了消費者和生產者的集中實現,若有不對之處煩請讀者指出。php
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring-rabbit.version}</version> </dependency>
package com.my.rabbitmq.appconfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.springframework.amqp.core.AbstractExchange; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding.DestinationType; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.my.rabbitmq.util.FastJsonMessageConverter; import com.my.rabbitmq.consumer.RabbitMQConsumer; import com.my.rabbitmq.producer.RabbitMQProducer; /** * * @description rabbitMQ config * * @author yuanzi * @time 2017年8月23日 下午2:46:01 */ @Configuration public class RabbitmqConfig { private static Logger log = LogManager.getLogger(RabbitmqConfig.class); //讀取rabbitmq配置文件 private static InputStream configFile = RabbitmqConfig.class.getClassLoader() .getResourceAsStream("rabbitmq.properties"); private static Properties rabbitmqProperties = new Properties(); static { try { rabbitmqProperties.load(configFile); } catch (IOException e) { e.printStackTrace(); } } //配置生產者ConnectionFactory,配置基礎鏈接信息 @Bean(name = "ProducerConnectionFactory") public ConnectionFactory ProducerConnectionFactory() throws IOException { CachingConnectionFactory connectionFactory = new CachingConnectionFactory( rabbitmqProperties.getProperty("rabbitmq.host")); connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port"))); connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username")); connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password")); return connectionFactory; } //配置消費者ConnectionFactory,配置基礎鏈接信息 @Bean(name = "ConsumerConnectionFactory") public ConnectionFactory ConsumerConnectionFactory() throws IOException { CachingConnectionFactory connectionFactory = new CachingConnectionFactory( rabbitmqProperties.getProperty("rabbitmq.host")); connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port"))); connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username")); connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password")); return connectionFactory; } //配置RabbitAdmin,在配置了多個ConnectionFactory的狀況下,須要配置RabbitAdmin,不然沒法自動在rabbitmq服務器中註冊交換機隊列等。 @Bean public RabbitAdmin rabbitAdmin() throws IOException { return new RabbitAdmin(ConsumerConnectionFactory()); } //配置生產者Template @Bean(name = "ProducerRabbitTemplate") public RabbitTemplate ProducerRabbitTemplate() throws IOException { RabbitTemplate producerRabbitTemplate = new RabbitTemplate(); producerRabbitTemplate.setConnectionFactory(ProducerConnectionFactory()); producerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange")); //配置生產者信息轉換器,封裝發送信息的格式 producerRabbitTemplate.setMessageConverter(ProducerMessageConverter()); return producerRabbitTemplate; } //配置消費者Template @Bean(name = "ConsumerRabbitTemplate") public RabbitTemplate ConsumerRabbitTemplate() throws IOException { RabbitTemplate consumerRabbitTemplate = new RabbitTemplate(ConsumerConnectionFactory()); consumerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange")); return consumerRabbitTemplate; } //配置生產者交換機類型 @Bean(name = "ProducerExchange") public AbstractExchange ProducerExchange() throws IOException { String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType"); if (exchangeType.contains("F")) { FanoutExchange fanoutExchange = new FanoutExchange( rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false); log.info("ProducerExchange-Fanout 生產者:廣播路由"); return fanoutExchange; } else if (exchangeType.contains("T")) { TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false); log.info("ProducerExchange-Topic 生產者:多路廣播路由"); return topicExchange; } else { DirectExchange directExchange = new DirectExchange( rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false); log.info("ProducerExchange-Direct 生產者:直接路由"); return directExchange; } } //配置消費者交換機類型 @Bean(name = "ConsumerExchange") public AbstractExchange ConsumerExchange() throws IOException { String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType"); if (exchangeType.contains("F")) { FanoutExchange fanoutExchange = new FanoutExchange( rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false); log.info("ProducerExchange-Fanout 消費者:廣播路由"); return fanoutExchange; } else if (exchangeType.contains("T")) { TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false); log.info("ProducerExchange-Topic 消費者:多路廣播路由"); return topicExchange; } else { DirectExchange directExchange = new DirectExchange( rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false); log.info("ProducerExchange-Direct 消費者:直接路由"); return directExchange; } } //配置參數:生產者隊列名稱,重啓rabbitmq服務時隊列仍是否存在,只被一個connection使用而且在connection關閉時queue是否被刪除,當最後一個consumer取消訂閱時queue是否被刪除 @Bean(name = "ProducerQueue") public Queue ProducerQueue() throws IOException { Queue producerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), true, false, false); return producerQueue; } //配置參數:消費者隊列名稱,重啓rabbitmq服務時隊列仍是否存在,只被一個connection使用而且在connection關閉時queue是否被刪除,當最後一個consumer取消訂閱時queue是否被刪除 @Bean(name = "ConsumerQueue") public Queue ConsumerQueue() throws IOException { Queue consumerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), true, false, false); return consumerQueue; } //配置生產者消息轉換器 @Bean(name = "jsonMessageConverter") public AbstractMessageConverter ProducerMessageConverter() { FastJsonMessageConverter jsonMessageConverter = new FastJsonMessageConverter(); return jsonMessageConverter; } //配置生產者隊列與交換機之間的綁定關係和routingKey @Bean(name = "ProducerBinding") public Binding ProducerBinding() { Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), DestinationType.QUEUE, rabbitmqProperties.getProperty("rabbitmq.producerExchange"), rabbitmqProperties.getProperty("rabbitmq.producerRoutingKey"), null); return binding; } //配置消費者隊列與交換機之間的綁定關係和routingKey @Bean(name = "ConsumerBinding") public Binding ConsumerBinding() { Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), DestinationType.QUEUE, rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), rabbitmqProperties.getProperty("rabbitmq.consumerRoutingKey"), null); return binding; } //實例化生產者代碼,配置生產者要投遞信息的RoutingKey @Bean(name = "RabbitMQProducer") public RabbitMQProducer RabbitMQProducer() { RabbitMQProducer rabbitMQProducer = new RabbitMQProducer( rabbitmqProperties.getProperty("rabbitmq.producerSendRoutingKey")); return rabbitMQProducer; } //實例化消費者代碼 @Bean(name = "RabbitMQConsumer") public RabbitMQConsumer RabbitMQConsumer() { RabbitMQConsumer rabbitMQConsumer = new RabbitMQConsumer(); return rabbitMQConsumer; } //配置監聽模式,當有消息到達時會通知監聽在對應的隊列上的監聽對象 @Bean(name = "ListenerContainer") public AbstractMessageListenerContainer MessageListenerContainer() throws IOException { SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(ConsumerConnectionFactory()); simpleMessageListenerContainer.setQueueNames(rabbitmqProperties.getProperty("rabbitmq.consumerQueue")); simpleMessageListenerContainer.setMessageListener(RabbitMQConsumer()); return simpleMessageListenerContainer; } }
#單一交換機配置參數 #rabbitmq server地址 rabbitmq.host = 127.0.0.1 #rabbitmq 端口 rabbitmq.port = 5672 #生產者隊列名稱 rabbitmq.producerQueue = d_queue_producer #生產者RoutingKey rabbitmq.producerRoutingKey = d_key_producer #消費者隊列名稱 rabbitmq.consumerQueue = d_queue_consumer #消費者RoutingKey rabbitmq.consumerRoutingKey = d_key_consumer #交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型 rabbitmq.exchangeType = D #生產者須要綁定的交換機 rabbitmq.producerExchange = d_exchange #消費者須要綁定的交換機 rabbitmq.consumerExchange = d_exchange #rabbitmq用戶名稱 rabbitmq.username = guest #rabbitmq用戶密碼 rabbitmq.password = guest #rabbitmq生產者發送消息時的RoutingKey rabbitmq.producerSendRoutingKey = d_key_consumer ---------------------------------------------分割線---------------------------------------------- #廣播交換機配置參數,廣播交換機下routingKey失效 #rabbitmq server地址 rabbitmq.host = 127.0.0.1 #rabbitmq 端口 rabbitmq.port = 5672 #生產者隊列名稱 rabbitmq.producerQueue = f_queue_producer #生產者RoutingKey rabbitmq.producerRoutingKey = f_key_producer #消費者隊列名稱 rabbitmq.consumerQueue = f_queue_consumer #消費者RoutingKey rabbitmq.consumerRoutingKey = f_key_consumer #交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型 rabbitmq.exchangeType = F #生產者須要綁定的交換機 rabbitmq.producerExchange = f_exchange #消費者須要綁定的交換機 rabbitmq.consumerExchange = f_exchange #rabbitmq用戶名稱 rabbitmq.username = guest #rabbitmq用戶密碼 rabbitmq.password = guest #rabbitmq生產者發送消息時的RoutingKey rabbitmq.producerSendRoutingKey = f_test ---------------------------------------------分割線---------------------------------------------- #多路廣播交換機配置參數 #配置rootingkey時,「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。例如「log.*」能與「log.warn」匹配,沒法 #與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配 #rabbitmq server地址 rabbitmq.host = 127.0.0.1 #rabbitmq 端口 rabbitmq.port = 5672 #生產者隊列名稱 rabbitmq.producerQueue = t_queue_producer #生產者RoutingKey rabbitmq.producerRoutingKey = t_key_producer #消費者隊列名稱 rabbitmq.consumerQueue = t_queue_consumer #消費者RoutingKey rabbitmq.consumerRoutingKey = #.test #交換機類型 D爲單一傳播交換機類型,F爲廣播交換機類型,T爲多路廣播交換機類型 rabbitmq.exchangeType = T #生產者須要綁定的交換機 rabbitmq.producerExchange = t_exchange2 #消費者須要綁定的交換機 rabbitmq.consumerExchange = t_exchange2 #rabbitmq用戶名稱 rabbitmq.username = guest #rabbitmq用戶密碼 rabbitmq.password = guest #rabbitmq生產者發送消息時的RoutingKey rabbitmq.producerSendRoutingKey = t.test
package com.my.rabbitmq.producer; import javax.annotation.Resource; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * @description RabbitMQ消息發送 * * @author yuanzi * @time 2017年8月1日 下午3:16:08 */ public class RabbitMQProducer { private static Logger log = LogManager.getLogger(RabbitMQProducer.class); @Resource(name = "ProducerRabbitTemplate") private RabbitTemplate rabbitTemplate; private String routingKey; public RabbitMQProducer(String routingKey) { this.routingKey = routingKey; } public void sendDataToQueue(Object message) { rabbitTemplate.convertAndSend(this.routingKey, message); log.info("rabbitMQ producer send message:" + message); } }
package com.my.rabbitmq.consumer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * * @description RabbitMQ消息接收 * * @author yuanzi * @time 2017年8月23日 下午3:38:03 */ public class RabbitMQConsumer implements MessageListener { private static Logger log = LogManager.getLogger(RabbitMQConsumer.class); public void onMessage(Message message) { String data = new String(message.getBody()); log.info("rabbitMQ consumer receive message:" + data); } }
package com.my.rabbitmq.util; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import net.sf.json.JSONObject; /** * * @description 對發送的信息進行json格式的封裝 * * @author yuanzi * @time 2017年8月1日 下午3:13:30 */ public class FastJsonMessageConverter extends AbstractMessageConverter { public static final String DEFAULT_CHARSET = "UTF-8"; @SuppressWarnings("static-access") @Override protected Message createMessage(Object message, MessageProperties messageProperties) { byte[] bytes = null; JSONObject json = new JSONObject(); json.put("data", message); try { bytes = json.toString().getBytes(this.DEFAULT_CHARSET); } catch (UnsupportedEncodingException e) { throw new MessageConversionException("Failed to convert Message content:" + e); } if (bytes != null) { messageProperties.setContentLength(bytes.length); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.DEFAULT_CHARSET); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { return null; } }