簡介
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗spring
概念: 生產者 消息的產生方,負責將消息推送到消息隊列 消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息 隊列 消息的寄存器,負責存放生產者發送的消息 交換機 負責根據必定規則分發生產者產生的消息 綁定 完成交換機和隊列之間的綁定 模式: direct:直連模式,用於實例間的任務分發 topic:話題模式,經過可配置的規則分發給綁定在該exchange上的隊列 headers:適用規則複雜的分發,用headers裏的參數表達規則 fanout:分發給全部綁定到該exchange上的隊列,忽略routing key
SpringBoot集成RabbitMQ
1、引入maven依賴app
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version> </dependency>
2、配置application.propertiesmaven
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualHost = /message-test/
3、編寫AmqpConfiguration配置文件分佈式
package message.test.configuration; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfiguration { /** * 消息編碼 */ public static final String MESSAGE_ENCODING = "UTF-8"; public static final String EXCHANGE_ISSUE = "exchange_message_issue"; public static final String QUEUE_ISSUE_USER = "queue_message_issue_user"; public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user"; public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device"; public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city"; public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user"; public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user"; public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device"; public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city"; public static final String EXCHANGE_PUSH = "exchange_message_push"; public static final String QUEUE_PUSH_RESULT = "queue_message_push_result"; @Autowired private RabbitProperties rabbitProperties; @Bean public Queue issueUserQueue() { return new Queue(QUEUE_ISSUE_USER); } @Bean public Queue issueAllUserQueue() { return new Queue(QUEUE_ISSUE_ALL_USER); } @Bean public Queue issueAllDeviceQueue() { return new Queue(QUEUE_ISSUE_ALL_DEVICE); } @Bean public Queue issueCityQueue() { return new Queue(QUEUE_ISSUE_CITY); } @Bean public Queue pushResultQueue() { return new Queue(QUEUE_PUSH_RESULT); } @Bean public DirectExchange issueExchange() { return new DirectExchange(EXCHANGE_ISSUE); } @Bean public DirectExchange pushExchange() { // 參數1:隊列 // 參數2:是否持久化 // 參數3:是否自動刪除 return new DirectExchange(EXCHANGE_PUSH, true, true); } @Bean public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER); } @Bean public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER); } @Bean public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE); } @Bean public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue, @Qualifier("issueExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY); } @Bean public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue, @Qualifier("pushExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).withQueueName(); } @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setPort(rabbitProperties.getPort()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
3、編寫生產者spring-boot
body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
4、編寫消費者ui
@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT) public void handlePushResult(@Payload byte[] data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { }