Spring Boot RabbitMQ實踐

背景

咱們如今有兩個主要的系統一個是活動系統一個是獎品系統,活動系統會調用獎品系統發放獎勵。git

最開始兩個之間只經過http直接調用,優勢:開發成本低,沒有多餘組件引入;發放獎勵實時返回;活動系統不須要管獎品是否還有剩餘庫存。缺點:這樣就致使上游活動系統強依賴於下游的獎品系統,若是一旦獎品系統掛掉,咱們活動系統也就不可用了;這裏還有個bug,在調用獎品系統發放獎勵,獎品系統發放成功了,可是活動系統請求超時了,就會致使提示客戶的沒有獎品了,可是實際獎品又發放了。github

訪問量上來後發直接走http確定是不行的,因此引入了MQ將將兩個系統隔離開,優勢:全部發放流程異步執行,活動系統響應更快了;這兩個系統就變成弱引用關係,即便獎品系統掛掉,活動系統仍能正常運行;不會出現上面說的bug了;缺點:發放獎勵將會有延遲;引入MQ增長了項目複雜度,咱們必須去考慮消息的丟失,重複消費等問題;活動系統須要知道獎品的庫存狀況。redis

解決方案

針對上面使用MQ發放獎勵會遇到的問題,咱們能夠經過面的方案來解決。spring

消息的丟失問題

在數據庫建立一張異常消息表。數據庫

  • 在發消息的時候若是出現異常,直接將消息記錄到異常消息表,等待後臺跑批,進行補償發放。
  • 在發消息的時候,若是發送消息的ack回調沒沒有發送成功,將進行消息重發,若是重發3次仍是失敗,該消息就記錄到異常消息表,等待後臺跑批,進行補償發放。消息的重複發送可使用RabbitMQ的ConfirmCallback、ReturnCallback機制來實現。
  • 在消費端處理消息(調用獎品系統發放獎勵)的時候,若是出現異常也將消息放到異常消息表中,等待後臺跑批,進行補償發放。若是將異常消息保存到數據庫時發生了異常,則將消息放到死信隊列,等待後臺跑批,進行補償發放。

這樣子雖然仍是不能徹底杜絕消息丟失,可是絕大部分狀況下是沒有問題的。dom

重複消費問題

爲每一個消息生成業務流水號,將流水號和發放裏的參數一塊兒發送到獎品系統,獎品系統在發放獎勵的時候先判斷這個流水號是否存在,存在就表示該獎品已經發過來直接返回發放成功,若是沒有就進行發放獎勵操做。異步

活動系統須要知道獎品的庫存狀況。

咱們在配置活動的時候會將獎品的庫存放到咱們活動系統,在發MQ消息以前回去判斷是否有剩餘庫存,若是沒有直接返回獎勵領完了,若是有才回去發MQ消息。扣減庫存能夠參考基於redis實現的扣減庫存ide

活動流程圖

下面是引入MQ事後咱們系統的流程圖 MQ解耦系統間的依賴關係spring-boot

生產者.png

消費者.png

生產者端實現

/**
 * Rabbit 發送消息
 *
 * @author yuhao.wang
 */
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);

    /**
     * Rabbit MQ 客戶端
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 系統配置
     */
    @Autowired
    private SystemConfig systemConfig;

    /**
     * 發送MQ消息
     *
     * @param exchangeName 交換機名稱
     * @param routingKey   路由名稱
     * @param message      發送消息體
     */
    public void sendMessage(String exchangeName, String routingKey, Object message) {
        Assert.notNull(message, "message 消息體不能爲NULL");
        Assert.notNull(exchangeName, "exchangeName 不能爲NULL");
        Assert.notNull(routingKey, "routingKey 不能爲NULL");

        // 獲取CorrelationData對象
        CorrelationData correlationData = this.correlationData(message);
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("發送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 發送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * 用於實現消息發送到RabbitMQ交換器後接收ack回調。
     * 若是消息發送確認失敗就進行重試。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        // 消息回調確認失敗處理
        if (!ack && correlationData instanceof CorrelationData) {
            CorrelationData correlationDataExtends = (CorrelationData) correlationData;

            //消息發送失敗,就進行重試,重試事後還不能成功就記錄到數據庫
            if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) {
                logger.info("MQ消息發送失敗,消息重發,消息ID:{},重發次數:{},消息體:{}", correlationDataExtends.getId(),
                        correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                // 將重試次數加一
                correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                // 重發發消息
                this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                        correlationDataExtends.getMessage(), correlationDataExtends);
            } else {
                //消息重試發送失敗,將消息放到數據庫等待補發
                logger.warn("MQ消息重發失敗,消息入庫,消息ID:{},消息體:{}", correlationData.getId(),
                        JSON.toJSONString(correlationDataExtends.getMessage()));

                // TODO 保存消息到數據庫
            }
        } else {
            logger.info("消息發送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用於實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。
     * 基本上來講線上不可能出現這種狀況,除非手動將已經存在的隊列刪掉,不然在測試階段確定能測試出來。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("MQ消息發送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));

        // TODO 保存消息到數據庫
    }

    /**
     * 消息相關數據(消息ID)
     *
     * @param message
     * @return
     */
    private CorrelationData correlationData(Object message) {

        return new CorrelationData(UUID.randomUUID().toString(), message);
    }

    /**
     * 發送消息
     *
     * @param exchange        交換機名稱
     * @param routingKey      路由key
     * @param message         消息內容
     * @param correlationData 消息相關數據(消息ID)
     * @throws AmqpException
     */
    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息發送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            // TODO 保存消息到數據庫
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
}

生產者端使用ConfirmCallback和ReturnCallback回調機制,最大限度的保證消息不丟失,對原有CorrelationData類進行擴展,來實現消息的重發,具體請看源碼。測試

消費者端實現

/**
 * 發放優惠券的MQ處理
 *
 * @author yuhao.wang
 */
@Service
@ConditionalOnClass({RabbitTemplate.class})
public class SendMessageListener {

    private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);

    @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)
    public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
        logger.info("[{}]處理髮放優惠券獎勵消息隊列接收數據,消息ID:{},消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON,
                message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage));

        try {
            // 參數校驗
            Assert.notNull(sendMessage, "sendMessage 消息體不能爲NULL");

            // TODO 處理消息

            // 確認消息已經消費成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("MQ消息處理異常,消息ID:{},消息體:{}", message.getMessageProperties().getCorrelationIdString(),
                    JSON.toJSONString(sendMessage), e);

            try {
                // TODO 保存消息到數據庫

                // 確認消息已經消費成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e1) {
                logger.error("保存異常MQ消息到數據庫異常,放到死性隊列,消息ID:{}", message.getMessageProperties().getCorrelationIdString());
                // 確認消息將消息放到死信隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}

消費者端主要作了消息消費失敗的容錯處理。

源碼

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

相關文章
相關標籤/搜索