Springboot 配置RabbitMQ文檔

簡介
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗spring

概念:
  生產者 消息的產生方,負責將消息推送到消息隊列
  消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息
  隊列 消息的寄存器,負責存放生產者發送的消息
  交換機 負責根據必定規則分發生產者產生的消息
  綁定 完成交換機和隊列之間的綁定
模式:
  direct:直連模式,用於實例間的任務分發
  topic:話題模式,經過可配置的規則分發給綁定在該exchange上的隊列
  headers:適用規則複雜的分發,用headers裏的參數表達規則
  fanout:分發給全部綁定到該exchange上的隊列,忽略routing key

SpringBoot集成RabbitMQ
1、引入maven依賴app

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>1.5.2.RELEASE</version>
</dependency>

2、配置application.propertiesmaven

# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualHost = /message-test/

3、編寫AmqpConfiguration配置文件分佈式

package message.test.configuration;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfiguration {
/**
 * 消息編碼
 */
  public static final String MESSAGE_ENCODING = "UTF-8";
  public static final String EXCHANGE_ISSUE = "exchange_message_issue";
  public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";
  public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";
  public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";
  public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";
  public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";
  public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";
  public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";
  public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";
  public static final String EXCHANGE_PUSH = "exchange_message_push";
  public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";

  @Autowired
  private RabbitProperties rabbitProperties;

  @Bean
  public Queue issueUserQueue() {
    return new Queue(QUEUE_ISSUE_USER);
  }

  @Bean
  public Queue issueAllUserQueue() {
    return new Queue(QUEUE_ISSUE_ALL_USER);
  }

  @Bean
  public Queue issueAllDeviceQueue() {
    return new Queue(QUEUE_ISSUE_ALL_DEVICE);
  }

  @Bean
  public Queue issueCityQueue() {
    return new Queue(QUEUE_ISSUE_CITY);
  }

  @Bean
  public Queue pushResultQueue() {
    return new Queue(QUEUE_PUSH_RESULT);
  }

  @Bean
  public DirectExchange issueExchange() {
    return new DirectExchange(EXCHANGE_ISSUE);
  }

  @Bean
  public DirectExchange pushExchange() {
    // 參數1:隊列
    // 參數2:是否持久化
    // 參數3:是否自動刪除
    return new DirectExchange(EXCHANGE_PUSH, true, true);
  }

  @Bean
  public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
  }

  @Bean
  public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
  }

  @Bean
  public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
  }

  @Bean
  public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
  }

  @Bean
  public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,
        @Qualifier("pushExchange") DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).withQueueName();
  }

  @Bean
  public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(rabbitProperties.getHost());
    connectionFactory.setPort(rabbitProperties.getPort());
    connectionFactory.setUsername(rabbitProperties.getUsername());
    connectionFactory.setPassword(rabbitProperties.getPassword());
    connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
    return connectionFactory;
  }

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
  }

  @Bean
  public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) 
  {
    return new RabbitTemplate(connectionFactory);
  }
}

3、編寫生產者spring-boot

body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);
 rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,
                        AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);

4、編寫消費者ui

@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)
public void handlePushResult(@Payload byte[] data, Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
}
相關文章
相關標籤/搜索