rabbitmq 是spring所在公司Pivotal本身的產品 是基於AMQP高級隊列協議的消息中間件 採用erlang開發 所以安裝須要erlang環境 具體安裝根據本身的環境 由於跟spring有共同的血緣關係 因此spring 全家桶對其的支持應該是至關完善的java
通常消息隊列 都是生產者將消息發送到隊列 消費者監聽隊列進行消費 rabbitmq 一個虛擬主機(默認 /)持有一個或者多個交換機(Exchange) 用戶只能在虛擬主機的粒度進行權限控制 交換機根據必定的策略(RoutingKey)綁定(Binding)到隊列(Queue)上 這樣生產者和隊列就沒有直接聯繫 而是將消息發送的交換機 交換機再把消息轉發到對應綁定的隊列上 此處須要詳細熟悉rabbitmq的工做流程 不清楚能夠找相關資料進行學習 git
上面說了 Exchange 做爲rabbitmq的一個獨特的重要的概念 這裏有必要着重強調一下 咱們從 spring對rabbitmq的封裝來解讀一下這個東西 web
package org.springframework.amqp.core; /** * Constants for the standard Exchange type names. * * @author Mark Fisher * @author Gary Russell */ public abstract class ExchangeTypes { public static final String DIRECT = "direct"; public static final String TOPIC = "topic"; public static final String FANOUT = "fanout"; public static final String HEADERS = "headers"; public static final String SYSTEM = "system"; /** * The constant to represent {@code x-delayed-message} exchange mode. * @deprecated since 1.6.4, it's not a user-available exchange type, * the delayed {@code boolean} is used for that. */ @Deprecated public static final String DELAYED = "x-delayed-message"; }
上面是 交換機類型的定義類 說明了6種交換機類型 最後一種由於即將棄用 因此是五種 咱們經常使用的有四種 下面這個建造類說明了一切spring
package org.springframework.amqp.core; import java.util.Map; /** * Builder providing a fluent API for building {@link Exchange}s. * * @author Gary Russell * @since 1.6 * */ public final class ExchangeBuilder extends AbstractBuilder { private final String name; private final String type; private boolean durable; private boolean autoDelete; private boolean internal; private boolean delayed; /** * Construct an instance of the appropriate type. * @param name the exchange name * @param type the type name * @see ExchangeTypes * @since 1.6.7 */ public ExchangeBuilder(String name, String type) { this.name = name; this.type = type; } /** * Return a {@link DirectExchange} builder. * @param name the name. * @return the builder. */ public static ExchangeBuilder directExchange(String name) { return new ExchangeBuilder(name, ExchangeTypes.DIRECT); } /** * Return a {@link TopicExchange} builder. * @param name the name. * @return the builder. */ public static ExchangeBuilder topicExchange(String name) { return new ExchangeBuilder(name, ExchangeTypes.TOPIC); } /** * Return a {@link FanoutExchange} builder. * @param name the name. * @return the builder. */ public static ExchangeBuilder fanoutExchange(String name) { return new ExchangeBuilder(name, ExchangeTypes.FANOUT); } /** * Return a {@link HeadersExchange} builder. * @param name the name. * @return the builder. */ public static ExchangeBuilder headersExchange(String name) { return new ExchangeBuilder(name, ExchangeTypes.HEADERS); } /** * Set the auto delete flag. * @return the builder. */ public ExchangeBuilder autoDelete() { this.autoDelete = true; return this; } /** * Set the durable flag to true. * @return the builder. * @deprecated - in 2.0, durable will be true by default * @see #durable(boolean) */ @Deprecated public ExchangeBuilder durable() { this.durable = true; return this; } /** * Set the durable flag. * @param durable the durable flag (default false). * @return the builder. * @see #durable */ public ExchangeBuilder durable(boolean durable) { this.durable = durable; return this; } /** * Add an argument. * @param key the argument key. * @param value the argument value. * @return the builder. */ public ExchangeBuilder withArgument(String key, Object value) { getOrCreateArguments().put(key, value); return this; } /** * Add the arguments. * @param arguments the arguments map. * @return the builder. */ public ExchangeBuilder withArguments(Map<String, Object> arguments) { this.getOrCreateArguments().putAll(arguments); return this; } /** * Set the internal flag. * @return the builder. */ public ExchangeBuilder internal() { this.internal = true; return this; } /** * Set the delayed flag. * @return the builder. */ public ExchangeBuilder delayed() { this.delayed = true; return this; } public Exchange build() { AbstractExchange exchange; if (ExchangeTypes.DIRECT.equals(this.type)) { exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments()); } else if (ExchangeTypes.TOPIC.equals(this.type)) { exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments()); } else if (ExchangeTypes.FANOUT.equals(this.type)) { exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments()); } else if (ExchangeTypes.HEADERS.equals(this.type)) { exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments()); } else { throw new IllegalStateException("Invalid type: " + this.type); } exchange.setInternal(this.internal); exchange.setDelayed(this.delayed); return exchange; } }
這四種的說明springboot
在熟悉了相關概念後咱們開始搞一搞這個東西 首先你要安裝好rabbitmq 相關方法資料不少 此處不表 在本機安裝好 並啓用了管理頁面後打開 localhost:15672 會顯示一個管理頁面 以下 能夠進行一些可視化操做app
新建springboot工程 springboot 版本 1.5.10 依賴以下dom
<dependencies> <!--amqp rabbitmq 依賴必須 必須--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--springboot單元測試 選--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--springboot健康監控 選--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--web支持 選--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
application.yml 配置文件 rabbitmq 相關:spring-boot
spring: rabbitmq: username: rabbitAdmin password: 123456789 # 支持發佈確認 publisher-confirms: true # 支持發佈返回 publisher-returns: true listener: simple: # 採用手動應答 acknowledge-mode: manual # 當前監聽容器數 concurrency: 1 # 最大數 max-concurrency: 1 # 是否支持重試 retry: enabled: true # 日誌配置 logging: config: classpath:logback.xml
定製模版類 聲明交換機 隊列 綁定交換機到隊列單元測試
這裏 聲明瞭Direct 交換機 並經過路由鍵綁定到一個隊列中 來測試Direct模式學習
聲明瞭Fanout交換機 並綁定到2個隊列 來測試廣播模式
package cn.felord.message.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * 隊列配置. * * @author dax. * @version v1.0 * @since 2018 /2/23 14:28 */ @Configuration public class RabbitConfig { @Resource private RabbitTemplate rabbitTemplate; /** * 定製化amqp模版 可根據須要定製多個 * * * 此處爲模版類定義 Jackson消息轉換器 * ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調 即消息發送到exchange ack * ReturnCallback接口用於實現消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調 即消息發送不到任何一個隊列中 ack * * @return the amqp template */ // @Primary @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); // 使用jackson 消息轉換器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); // 開啓returncallback yml 須要 配置 publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 發送失敗, 應答碼:{} 緣由:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey); }); // 消息確認 yml 須要配置 publisher-returns: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("消息發送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息發送到exchange失敗,緣由: {}", cause); } }); return rabbitTemplate; } /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ /** * 聲明Direct交換機 支持持久化. * * @return the exchange */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); } /** * 聲明一個隊列 支持持久化. * * @return the queue */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); } /** * 經過綁定鍵 將指定隊列綁定到一個指定的交換機 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); } /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ /** * 聲明 fanout 交換機. * * @return the exchange */ @Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); } /** * Fanout queue A. * * @return the queue */ @Bean("fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable("FANOUT_QUEUE_A").build(); } /** * Fanout queue B . * * @return the queue */ @Bean("fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable("FANOUT_QUEUE_B").build(); } /** * 綁定隊列A 到Fanout 交換機. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } /** * 綁定隊列B 到Fanout 交換機. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } }
編寫監聽器 來監聽隊列消息
package cn.felord.message.comsumer; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 監聽器. * * @author dax. * @version v1.0 * @since 2018 /2/24 9:36 */ @Component public class Receiver { private static final Logger log= LoggerFactory.getLogger(Receiver.class); /** * FANOUT廣播隊列監聽一. * * @param message the message * @param channel the channel * @throws IOException the io exception 這裏異常須要處理 */ @RabbitListener(queues = {"FANOUT_QUEUE_A"}) public void on(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_A "+new String(message.getBody())); } /** * FANOUT廣播隊列監聽二. * * @param message the message * @param channel the channel * @throws IOException the io exception 這裏異常須要處理 */ @RabbitListener(queues = {"FANOUT_QUEUE_B"}) public void t(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_B "+new String(message.getBody())); } /** * DIRECT模式. * * @param message the message * @param channel the channel * @throws IOException the io exception 這裏異常須要處理 */ @RabbitListener(queues = {"DIRECT_QUEUE"}) public void message(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("DIRECT "+new String (message.getBody())); } }
編寫 發送消息接口 來進行測試
package cn.felord.message.controller; import cn.felord.message.bean.ResponseEntity; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.UUID; /** * 消息接口. * * @author dax. * @version v1.0 * @since 2018 /2/23 17:27 */ @RestController @RequestMapping("/rabbit") public class SendController { @Resource private RabbitTemplate rabbitTemplate; /** * 測試廣播模式. * * @param p the p * @return the response entity */ @RequestMapping("/fanout") public ResponseEntity send(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); return ResponseEntity.ok(); } /** * 測試Direct模式. * * @param p the p * @return the response entity */ @RequestMapping("/direct") public ResponseEntity direct(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); return ResponseEntity.ok(); } }
測試廣播模式
控制檯輸出
一樣 本身能夠測試Direct模式 能夠打開rabbitmq控制檯進行追蹤 相關運行信息
配套源碼 :https://gitee.com/felord/springboot-message
下一篇會在 springboot中實現 rabbitmq死信隊列