RocketMq中的traceId重複問題

背景

最近發現生產上mq的traceId有重複現象,理論上不一樣消息消費,tracdId不該該相同,但爲何有必定的機率會出現呢?

查詢代碼以下:java

protected ConsumeStatus consumeMsgSingle(MessageExt ext) {
       log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
       String message = new String(ext.getBody());
       //獲取到key
       String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
       //根據key從handleMap裏獲取到咱們的處理類
       MessageProcessor messageProcessor = handleMap.get(key);
       if (Objects.isNull(messageProcessor)) {
           messageProcessor = handleMap.get(ext.getTopic());
       }
       Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到消息處理類, topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
       Object obj = null;
       try {
           //將String類型的message反序列化成對應的對象。
           obj = messageProcessor.transferMessage(message);
           if (obj instanceof MqMetaInfo) {
               MqMetaInfo meta = (MqMetaInfo) obj;
               MqMetaInfoConverter.fromExt(meta, ext);
           }
           generateMDC(ext);
       } catch (Exception e) {
           StringBuilder errMsg = new StringBuilder("對象反序列化失敗, ")
                   .append("messageId:     ")
                   .append(ext.getMsgId()).append("\n")
                   .append("msgBody:       ")
                   .append(new String(ext.getBody())).append("\n")
                   .append("messageExt     ")
                   .append(ext).append("\n")
                   .append("stackTrace:    ")
                   .append(JSON.toJSONString(e.getStackTrace()));

           log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{},  message:{}, errMsg:{}"
                   , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
           throw new RRException(errMsg.toString());
       }
       //處理消息
       boolean result = messageProcessor.handleMessage(obj);
       if (!result) {
           if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {
               return ConsumeStatus.SUCCESS;
           }
           return ConsumeStatus.FAIL;
       }
       return ConsumeStatus.SUCCESS;
   }

generateMDC方法以下:
app

緣由分析

能夠看到若是message中有traceId,則把traceId關聯到該線程,並打印出來。但發現最終該方法執行完成後未作清理traceId的動做,即RocketMq的消費者用的是線程池,而線程回收後traceId依然綁定在該線程上,若是下次有消息過來消費則會有一樣traceId出現

重現

消費者

@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {

    @Override
    public boolean handleMessage(String orderNo) {
        log.info("開始消費:{}", orderNo);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return true;
    }

    @Override
    public Class<String> getClazz() {
        return null;
    }

    @Override
    public String transferMessage(String message) {
        return message;
    }
}

生產者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.setNamesrvAddr("ip");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("multi-consumer-demo",
                        "demo",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        //producer.shutdown();
    }
}
運行結果:


能夠看到traceId是有重複的ide

解決

加上finally語句,釋放traceId
ui

解決結果

相關文章
相關標籤/搜索