《RabbitMQ》如何保證消息不被重複消費

一 重複消息

爲何會出現消息重複?消息重複的緣由有兩個:1.生產時消息重複,2.消費時消息重複。java

1.1 生產時消息重複

因爲生產者發送消息給MQ,在MQ確認的時候出現了網絡波動,生產者沒有收到確認,實際上MQ已經接收到了消息。這時候生產者就會從新發送一遍這條消息。redis

生產者中若是消息未被確認,或確認失敗,咱們可使用定時任務+(redis/db)來進行消息重試。spring

@Component
@Slf4J
public class SendMessage {
    @Autowired
    private MessageService messageService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 最大投遞次數
    private static final int MAX_TRY_COUNT = 3;

    /**
     * 每30s拉取投遞失敗的消息, 從新投遞
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void resend() {
        log.info("開始執行定時任務(從新投遞消息)");

        List<MsgLog> msgLogs = messageService.selectTimeoutMsg();
        msgLogs.forEach(msgLog -> {
            String msgId = msgLog.getMsgId();
            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
                messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
                log.info("超過最大重試次數, 消息投遞失敗, msgId: {}", msgId);
            } else {
                messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數+1

                CorrelationData correlationData = new CorrelationData(msgId);
                rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 從新投遞

                log.info("第 " + (msgLog.getTryCount() + 1) + " 次從新投遞消息");
            }
        });

        log.info("定時任務執行結束(從新投遞消息)");
    }
}

1.2消費時消息重複

消費者消費成功後,再給MQ確認的時候出現了網絡波動,MQ沒有接收到確認,爲了保證消息被消費,MQ就會繼續給消費者投遞以前的消息。這時候消費者就接收到了兩條同樣的消息。數據庫

修改消費者,模擬異常json

@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

    System.out.println("重試"+System.currentTimeMillis());
    System.out.println(message);
    int i = 1 / 0;
}

配置yml重試策略網絡

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啓消費者進行重試
          max-attempts: 5 # 最大重試次數
          initial-interval: 3000 # 重試時間間隔

因爲重複消息是因爲網絡緣由形成的,所以不可避免重複消息。可是咱們須要保證消息的冪等性app

二 如何保證消息冪等性

讓每一個消息攜帶一個全局的惟一ID,便可保證消息的冪等性,具體消費過程爲:dom

  1. 消費者獲取到消息後先根據id去查詢redis/db是否存在該消息
  2. 若是不存在,則正常消費,消費完畢後寫入redis/db
  3. 若是存在,則證實消息被消費過,直接丟棄。

生產者ui

@PostMapping("/send")
public void sendMessage(){

    JSONObject jsonObject = new JSONObject();
    jsonObject.put("message","Java旅途");
    String json = jsonObject.toJSONString();
    Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
    amqpTemplate.convertAndSend("javatrip",message);
}

消費者code

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receiveMessage(Message message) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收到的消息爲:"+msg+"==消息id爲:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("message");
        jedis.set("messageId",messageId);
    }
}

若是須要存入db的話,能夠直接將這個ID設爲消息的主鍵,下次若是獲取到重複消息進行消費時,因爲數據庫主鍵的惟一性,則會直接拋出異常。

> 若是以爲文章不錯,歡迎點贊、留言
> 關注公衆號《Java旅途》,每日推送精品文章
相關文章
相關標籤/搜索