rocketMQ 消息的 tag

tag 的使用場景:不一樣的消費組,訂閱同一 topic 不一樣的 tag,拉取不一樣的消息並消費。在 topic 內部對消息進行隔離。express

producer 發送消息,指定 tagapache

Message msg = new Message("topic-zhang" /* Topic */,
    "TagA" /* Tag */,
    "key-zhang",
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);

consumer 訂閱 topic,指定 tagthis

consumer.subscribe("topic-zhang", "TagA||TagB||TagC");

broker 存儲 consumer 訂閱的 tag 信息spa

// org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();

broker 計算 tag 的 hashCode,直接取字符串的 hashcode 值debug

// org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize
long tagsCode = 0;
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
    tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}

public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
    if (null == tags || tags.length() == 0) { return 0; }

    return tags.hashCode();
}

broker 存儲消息 tag 的 hashcode 值:code

consumeQueue 中一個 entry 佔 20 字節:8字節 commitLog 物理偏移,4字節消息大小,8字節 tag 的 hashCode 值blog

consumer 拉取消息時,broker 根據 conusmer 提供的 offset 去遍歷 consumeQueue,檢查 tag 的 hashcode 值,是否知足 consumer 的訂閱信息,來對消息進行過濾ip

// org.apache.rocketmq.store.DefaultMessageStore#getMessage
if (messageFilter != null
    && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
    if (getResult.getBufferTotalSize() == 0) {
        status = GetMessageStatus.NO_MATCHED_MESSAGE;
    }

    continue;
}

// org.apache.rocketmq.broker.filter.ExpressionMessageFilter#isMatchedByConsumeQueue
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode()) {
        return true;
    }

    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

        if (tagsCode == null) {
            return true;
        }

        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }

        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    } else {
        // no expression or no bloom
        if (consumerFilterData == null || consumerFilterData.getExpression() == null
            || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
            return true;
        }

        // message is before consumer
        if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
            log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
            return true;
        }

        byte[] filterBitMap = cqExtUnit.getFilterBitMap();
        BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
        if (filterBitMap == null || !this.bloomDataValid
            || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
            return true;
        }

        BitsArray bitsArray = null;
        try {
            bitsArray = BitsArray.create(filterBitMap);
            boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
            log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
            return ret;
        } catch (Throwable e) {
            log.error("bloom filter error, sub=" + subscriptionData
                + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
        }
    }

    return true;
}
相關文章
相關標籤/搜索