[TOC]html
最近在搭一套基於SpringBoot的項目,用到了ssm+mysql+rabbitmq+redis。除了rabbitmq以外,其餘幾個都很快整合好了,惟獨rabbitmq找了很多資料,才最終整合好,達到了預期。特此將過程記錄下來,供參考。java
整合流程中的代碼都爲整合的關鍵配置及其使用。至於SpringBoot的基本配置,請參考Spring Boot Quick Start。mysql
<!-- rabbit-mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # 手動應答 concurrency: 5 # 消費端最小併發數 max-concurrency: 10 # 消費端最大併發數 prefetch: 5 # 一次請求中預處理的消息數量 cache: channel: size: 50 # 緩存的channel數量 ### 自定義配置 mq: defaultExchange: amqpExchange # 默認交換器 queue: queue # 隊列名 routeKey: queue_key # 路由key
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix = "mq") public class MQProperties { private String defaultExchange; private String routeKey; private String queue; public String getDefaultExchange() { return defaultExchange; } public void setDefaultExchange(String defaultExchange) { this.defaultExchange = defaultExchange; } public String getRouteKey() { return routeKey; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public String getQueue() { return queue; } public void setQueue(String queue) { this.queue = queue; } }
import com.switchvov.rabbitmq.constant.MQProperties; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class RabbitMQConfig { @Autowired private MQProperties mqProperties; @Bean public Queue queue() { boolean durable = true; boolean exclusive = false; boolean autoDelete = false; return new Queue(mqProperties.getQueue(), durable, exclusive, autoDelete); } @Bean public DirectExchange defaultExchange() { boolean durable = true; boolean autoDelete = false; return new DirectExchange(mqProperties.getDefaultExchange(), durable, autoDelete); } @Bean public Binding binding() { return BindingBuilder.bind(queue()) .to(defaultExchange()) .with(mqProperties.getRouteKey()); } }
import com.switchvov.rabbitmq.constant.MQProperties; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MQProperties mqProperties; @Test public void testSendMessage() { rabbitTemplate.convertAndSend(mqProperties.getDefaultExchange(), mqProperties.getRouteKey(), "發送了一條信息"); } }
import com.switchvov.rabbitmq.common.RabbitMQUtils; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @Service public class RabbitMQService { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQService.class); @RabbitListener(queues = "${mq.queue}") public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { LOGGER.info("消費內容爲:{}", payload); RabbitMQUtils.askMessage(channel, tag, LOGGER); } }
import com.rabbitmq.client.Channel; import org.slf4j.Logger; import java.io.IOException; public final class RabbitMQUtils { public static void askMessage(Channel channel, long tag, final Logger logger) { askMessage(channel, tag, logger, false); } public static void askMessage(Channel channel, long tag, final Logger logger, boolean multiple) { try { channel.basicAck(tag, multiple); } catch (IOException e) { logger.error("RabbitMQ,IO異常,異常緣由爲:{}", e.getMessage()); } } public static void rejectMessage(Channel channel, long tag, final Logger logger) { rejectMessage(channel, tag, logger, false, false); } public static void rejectAndBackMQ(Channel channel, long tag, final Logger logger) { rejectMessage(channel, tag, logger, false, true); } public static void rejectMessage(Channel channel, long tag, final Logger logger, boolean multiple, boolean request) { try { channel.basicNack(tag, multiple, request); } catch (IOException e) { logger.error("RabbitMQ,IO異常,異常緣由爲:{}", e.getMessage()); } } }
RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹redis
Spring Boot中使用RabbitMQspring
queue
、exchange
、binding
spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?架構
分享並記錄所學所見