RabbitMQ消息路由失敗的處理方案(回調與備份交換機AE)

2021-02-18. NC.
git


咱們知道,消息在RabbitMQ的整個生命週期是生產者投遞消息ExchangeExchange根據路由鍵消息路由到合適的QueueQueue再將消息推(或消費者主動拉)給消費者
web

在這個過程中,Exchange根據路由鍵將消息路由到合適的Queue的過程,可能發生諸如spring

  1. Exchange沒有任何Queue與其綁定,
  2. 或者根據消息的路由鍵,沒有任何一個合適的Queue來投遞消息,

從而致使消息路由失敗。對於這些路由失敗的消息應該如何處理呢?有兩種方式:微信

  1. 將消息返回給投遞該條消息的生產者。
  2. 使用備份交換機 alternate-exchange(AE)。

方式1:將消息返回給投遞該條消息的生產者

  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當exchange沒法找到任何一個合適的queue時,將消息return給生產者
spring.rabbitmq.template.mandatory=true
# 必須設置爲true,不然消息消息路由失敗也沒法觸發Return回調
spring.rabbitmq.publisher-returns=true
  • 交換機定義與消息發送
@Slf4j
@Component
public class NoMatchQueue {

    /**
     * 交換機名稱
     */

    public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void send() {
        log.info("發送消息");
        Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
        Message message = MessageBuilder
                .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
    }
}

@Configuration
class ExchangeDeclare {
    /**
     * 只定義一個交換機,可是不綁定任何Queue,因此發送到該Exchange的消息都會路由失敗
     *
     * @return
     */

    @Bean
    public Exchange noMatchQueueExchange() {
        return ExchangeBuilder
                .topicExchange(NoMatchQueue.EXCHANGE_NAME)
                .durable(true)
                .build();
    }
}
  • 設置回調函數
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息被退回:{}", returnedMessage);
    }
});
  • 消息被退回:且能夠看到緣由是沒法路由app

方式2:使用備份交換機

使用方式1須要咱們在程序中進行編碼設置回調函數監聽,增長了生產者代碼的複雜性,那麼爲了消息不丟失還有沒有其餘方式來處理路由失敗的消息呢:答案是使用備份交換機ide

  • 相較於使用回調函數,使用備份交換機只須要給交換機綁定一個備份交換機便可,當消息路由失敗以後,消息將投遞到備份交換機,再由備份交換機路由消息到備份隊列。這樣咱們只須要關注這個備份隊列就能知道/獲取到路由失敗的消息。一般狀況下備份交換的Type應該設置爲 fanout
  • 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=futao
spring.rabbitmq.password=123456789
spring.rabbitmq.virtual-host=/tech-sharing

# 當exchange沒法找到任何一個合適的queue時,將消息return給生產者
spring.rabbitmq.template.mandatory=false
# 必須設置爲true,不然消息消息路由失敗也沒法觸發Return回調
spring.rabbitmq.publisher-returns=false
  • 注意: 使用備份交換機模式,mandatory將無效,即就算mandatory設置爲false,路由失敗的消息一樣會被投遞到綁定的備份交換機。
  • 正常業務交換機(不綁定隊列,使得消息必定會路由失敗)
/**
 * 業務交換機
 *
 * @return
 */

@Bean
public Exchange noMatchQueueExchange() {
    return ExchangeBuilder
            .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME)
            .durable(true)
            // 綁定備份交換機
            .alternate(X_ALTERNATE)
            .build();
}
  • 備份交換機/隊列/綁定

/**
 * 備份隊列
 *
 * @return
 */

@Bean
public Queue alternateQueue() {
    return QueueBuilder
            .durable("Q_ALTERNATE")
            .build();
}

/**
 * 備份交換機
 *
 * @return
 */

@Bean
public Exchange alternateExchange() {
    return ExchangeBuilder
            .fanoutExchange(X_ALTERNATE)
            .durable(true)
            .build();
}

/**
 * 備份綁定
 *
 * @param alternateExchange
 * @param alternateQueue
 * @return
 */

@Bean
public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) {
    return BindingBuilder
            .bind(alternateQueue)
            .to(alternateExchange)
            .with("")
            .noargs();
}
  • 消息投遞
/**
 * 正常業務交換機
 */

public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE";

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * 發送消息
 */

@PostConstruct
public void send() {
    log.info("發送消息");
    Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus());
    Message message = MessageBuilder
            .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8))
            .setContentEncoding(StandardCharsets.UTF_8.displayName())
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
}
  • 結果是消息被路由到備份交換機的備份隊列
  • 且: 若是你同時使用了兩種方式,即(mandatory爲true+Listener監聽)和(備份交換機AlternateExchange),消息將只會路由到備份交換機,不會Return回生產者。


# 在原生RabbitMQ-client中演示這一過程:
@Slf4j
public class AeTest {
    /**
     * 獲取Channel
     */

    private static final Channel CHANNEL = MqChannelUtils.getChannel();
    /**
     * 備份交換機
     */

    private static final String X_AE = "X_AE";
    /**
     * 備份交換機綁定的隊列
     */

    private static final String Q_AE = "Q_AE";

    /**
     * 正常業務的交換機
     */

    private static final String X_1 = "X_1";

    public static void main(String[] args) throws IOException {
        // 定義備份交換機-其實也是一個正常的交換機
        CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true);
        // 定義備份隊列
        CHANNEL.queueDeclare(Q_AE, truefalsefalsenull);
        // 綁定備份
        CHANNEL.queueBind(Q_AE, X_AE, "");

        HashMap<String, Object> arguments = new HashMap<>();
        // 綁定的備份交換機
        arguments.put("alternate-exchange", X_AE);
        // 定義交換機
        CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, falsefalse, arguments);

        // 添加監聽器,看看是否還會return消息
        CHANNEL.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                log.error("消息被退回{}", returnMessage);
            }
        });

        // 嘗試向交換機發送消息(沒法路由)- mandatory參數無效
        CHANNEL.basicPublish(X_1, ""falsefalse,
                new AMQP.BasicProperties(), "阿依古麗".getBytes(StandardCharsets.UTF_8));
    }
}
  • 兩個交換機,正常的交換機X_1和備份交換機X_AE函數

  • 備份交換機綁定的隊列已經接收到了路由失敗的消息spring-boot

  • 其餘要注意的點:flex

    • 備份交換機的Type設置爲fanout比較合適,這樣能夠忽略RoutingKey,避免備份交換機又路由失敗。
    • 被投遞到備份交換機的RoutingKey爲消息投遞到MQ時的原始RoutingKey,不會變,這一點在其餘場景下也是同樣的。
    • 使用備份交換機模式,mandatory將無效,即就算mandatory設置爲false,路由失敗的消息一樣會被投遞到綁定的備份交換機。

# 源代碼

https://gitee.com/FutaoSmile/tech-sharing-mq

往期推薦


你可知道publisherReturns參數在spring-boot-starter-amqp中的做用?

SpringBoot RabbitMQ實現消息可靠投遞

RabbitMQ死信隊列在SpringBoot中的使用

使用RabbitMQ實現未支付訂單在30分鐘後自動過時

SpringBoot如何作到自動幫咱們建立RabbitMQ的Queue和Exchange的?


歡迎在評論區留下你看文章時的思考,及時說出,有助於加深記憶和理解,還能和像你同樣也喜歡這個話題的讀者相遇~

本文分享自微信公衆號 - 喜歡天文(AllUnderControl)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索