分佈式事務:消息可靠發送

接上文分佈式事務:基於可靠消息服務介紹了總體中間件的設計思路,有些內容沒有展開。故此,本文詳細講解下如何將消息可靠發送到Rabbitmq。git

在上文簡單提到了如何將消息進行可靠發送,由於shine-mq是無縫集成spring-boot-starter的,因此rabbitmq的操做也是基於spring的rabbitTemplate來完成的。github

rabbitTemplate提供了setConfirmCallback方法,能夠在消息發送到RabbitMQ交換器後,進行ack的回調。spring

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    ...
    //消息發送到RabbitMQ交換器後接收ack回調
    //若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,後面仔細講解)
    if (ack) {
        log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
        ...
    }
    ...
});
複製代碼

在此以前還須要設置CachingConnectionFactory緩存

//設置生成者確認機制
rabbitConnectionFactory.setPublisherConfirms(true);
複製代碼

若是還須要設置setReturnCallback(消息發送到RabbitMQ交換器,但無相應queue時的回調),那就還須要設置rabbitTemplatebash

//使用return-callback時必須設置mandatory爲true
rabbitTemplate.setMandatory(true);
複製代碼

這裏須要知道的是ReturnCallback比ConfirmCallback先回調。服務器

上面提到了setConfirmCallback返回ack的一個坑,就是當消息成功發送到交換器,可是沒有匹配的隊列,就會觸發 ReturnCallback 回調,並且消息也丟失了,最致命的是setConfirmCallback回調返回的ack倒是true,若是單靠這個ack來判斷消息是否成功到達mq,就有必定機率形成消息丟失。網絡

要解決的話,能夠在setReturnCallback作一個緩存,由於上面的狀況會先觸發 ReturnCallback 回調,咱們緩存這個狀態,在setConfirmCallback回調的時候,結合ack和以前緩存的狀態進行判斷是否真的發送成功。app

下面是shine-mq實現的源碼:異步

//消息發送到RabbitMQ交換器後接收ack回調
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (correlationData != null) {
            log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
            String msgId = correlationData.getId();
            CorrelationDataExt ext = (CorrelationDataExt) correlationData;
            Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
            coordinator.confirmCallback(correlationData, ack);
            // 若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,須要注意)
            if (ack && !coordinator.getReturnCallback(msgId)) {
                log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
                coordinator.delReady(msgId);
            } else {
                //失敗了判斷重試次數,重試次數大於0則繼續發送
                if (ext.getMaxRetries() > 0) {
                    try {
                        rabbitmqFactory.setCorrelationData(msgId, ext.getCoordinator(), ext.getMessage(),
                                ext.getMaxRetries() - 1);
                        rabbitmqFactory.getTemplate().send(ext.getMessage(), 0, 0, SendTypeEnum.DISTRIBUTED);
                    } catch (Exception e) {
                        log.error("Message retry failed to send, message:{} exception: ", ext.getMessage(), e);
                    }
                } else {
                    log.error("Message delivery failed, msgId: {}, cause: {}", msgId, cause);
                }
            }
            coordinator.delReturnCallback(msgId);
        }
    });
    //使用return-callback時必須設置mandatory爲true
    rabbitTemplate.setMandatory(true);
    //消息發送到RabbitMQ交換器,但無相應queue時的回調
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        String messageId = message.getMessageProperties().getMessageId();
        String coordinatorName = messageId.split(MqConstant.SPLIT)[0];
        Coordinator coordinator = (Coordinator) applicationContext.getBean(coordinatorName);
        coordinator.setReturnCallback(messageId);
        log.error("ReturnCallback exception, no matching queue found. message id: {}, replyCode: {}, replyText: {},"
                + "exchange: {}, routingKey: {}", messageId, replyCode, replyText, exchange, routingKey);
    });
複製代碼

熟悉Rabbitmq的同窗可能知道,Rabbitmq有兩種機制來實現消息的可靠發送。分佈式

  • 經過事務機制,這個上篇文章分析過,在這個模式下,rabbitmq的效率很低,不適合。
  • Confirm模式,這個模式下會有三種方式,分別是:
    • 普通confirm模式:每發送一條消息後,調用waitForConfirms()方法,等待服務器端confirm。其實是一種串行confirm了。
    • 批量confirm模式:每發送一批消息後,調用waitForConfirms()方法,等待服務器端confirm
    • 異步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息後Client端會回調這個方法。(rabbitTemplate設置回調就是這個模式)

因此咱們知道了rabbitTemplate提供的確認機制是一種異步機制,並不能同步的發現問題,也就是說在極端的網絡條件下是會出現消息丟失的。

因此shine-mq經過增長一個Coordinator(協調者)來實現。Coordinator會保存2個狀態,一個是prepare(攜帶回查id),這個狀態在前文說過是用來保證上游服務的任務狀態的。

而另外一個狀態ready,就是來保證消息的可靠投遞。

首先shine-mq是使用@DistributedTrans來開啓。在這個註解的切面裏,先持久化ready狀態。

@Around(value = "@annotation(trans)")
public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable {
    ...
    try {
        EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId,
                coordinatorName, msgId);
        //將消息持久化
        coordinator.setReady(msgId, checkBackId.toString(), message);
        rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null);
        rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null);
        if (flag) {
            rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE,
                    MqConstant.DEAD_LETTER_ROUTEKEY, null, null);
            flag = false;
        }
        rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED);
    } catch (Exception e) {
        log.error("Message failed to be sent : ", e);
        throw e;
    }
}
複製代碼

而後在回調中刪除該狀態:

//消息發送到RabbitMQ交換器後接收ack回調
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (correlationData != null) {
        log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
        String msgId = correlationData.getId();
        CorrelationDataExt ext = (CorrelationDataExt) correlationData;
        Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
        //能夠自定義實現的回調
        coordinator.confirmCallback(correlationData, ack);
        // 若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack返回值爲仍是true(這裏是一個坑,須要注意)
        if (ack && !coordinator.getReturnCallback(msgId)) {
            log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
            //刪除ready狀態
            coordinator.delStatus(msgId);
        } else {
            ...
        }
    }
});
複製代碼

由於存儲ready是在上游服務任務執行以後的,因此只要有超時的ready記錄未被清理掉,daemon(守護線程)只管撈起來進行重發就行,由於Mq的可靠性投遞就已經要求下游服務是須要保證冪等性了。

最後還有個極端的狀況,就是ready消息存儲的時候由於網絡抖動該消息丟失了,這時候也沒有關係,由於有prepare狀態會進行回查,該狀態只有在ready存儲後纔會觸發刪除。

若是對你有幫助,那就幫忙點個星星把 ^.^

github地址:github.com/7le/shine-m…


Github 不要吝嗇你的star ^.^ 更多精彩 戳我

相關文章
相關標籤/搜索