本文主要研究一下rocketmq的ExpressionForRetryMessageFilterjava
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MessageFilter.javagit
public interface MessageFilter { /** * match by tags code or filter bit map which is calculated when message received * and stored in consume queue ext. * * @param tagsCode tagsCode * @param cqExtUnit extend unit of consume queue */ boolean isMatchedByConsumeQueue(final Long tagsCode, final ConsumeQueueExt.CqExtUnit cqExtUnit); /** * match by message content which are stored in commit log. * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store, * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null. * * @param msgBuffer message buffer in commit log, may be null if not invoked in store. * @param properties message properties, should decode from buffer if null by yourself. */ boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, final Map<String, String> properties); }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.javagithub
public class ExpressionMessageFilter implements MessageFilter { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME); protected final SubscriptionData subscriptionData; protected final ConsumerFilterData consumerFilterData; protected final ConsumerFilterManager consumerFilterManager; protected final boolean bloomDataValid; public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) { this.subscriptionData = subscriptionData; this.consumerFilterData = consumerFilterData; this.consumerFilterManager = consumerFilterManager; if (consumerFilterData == null) { bloomDataValid = false; return; } BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter(); if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) { bloomDataValid = true; } else { bloomDataValid = false; } } @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { if (null == subscriptionData) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } // by tags code. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (tagsCode == null) { return true; } if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) { return true; } return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { // no expression or no bloom if (consumerFilterData == null || consumerFilterData.getExpression() == null || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) { return true; } // message is before consumer if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) { log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit); return true; } byte[] filterBitMap = cqExtUnit.getFilterBitMap(); BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter(); if (filterBitMap == null || !this.bloomDataValid || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) { return true; } BitsArray bitsArray = null; try { bitsArray = BitsArray.create(filterBitMap); boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray); log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit); return ret; } catch (Throwable e) { log.error("bloom filter error, sub=" + subscriptionData + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e); } } return true; } @Override public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { if (subscriptionData == null) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { return true; } ConsumerFilterData realFilterData = this.consumerFilterData; Map<String, String> tempProperties = properties; // no expression if (realFilterData == null || realFilterData.getExpression() == null || realFilterData.getCompiledExpression() == null) { return true; } if (tempProperties == null && msgBuffer != null) { tempProperties = MessageDecoder.decodeProperties(msgBuffer); } Object ret = null; try { MessageEvaluationContext context = new MessageEvaluationContext(tempProperties); ret = realFilterData.getCompiledExpression().evaluate(context); } catch (Throwable e) { log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); } log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties); if (ret == null || !(ret instanceof Boolean)) { return false; } return (Boolean) ret; } }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.javaexpress
public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) { super(subscriptionData, consumerFilterData, consumerFilterManager); } @Override public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) { if (subscriptionData == null) { return true; } if (subscriptionData.isClassFilterMode()) { return true; } boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) { return true; } ConsumerFilterData realFilterData = this.consumerFilterData; Map<String, String> tempProperties = properties; boolean decoded = false; if (isRetryTopic) { // retry topic, use original filter data. // poor performance to support retry filter. if (tempProperties == null && msgBuffer != null) { decoded = true; tempProperties = MessageDecoder.decodeProperties(msgBuffer); } String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); realFilterData = this.consumerFilterManager.get(realTopic, group); } // no expression if (realFilterData == null || realFilterData.getExpression() == null || realFilterData.getCompiledExpression() == null) { return true; } if (!decoded && tempProperties == null && msgBuffer != null) { tempProperties = MessageDecoder.decodeProperties(msgBuffer); } Object ret = null; try { MessageEvaluationContext context = new MessageEvaluationContext(tempProperties); ret = realFilterData.getCompiledExpression().evaluate(context); } catch (Throwable e) { log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e); } log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties); if (ret == null || !(ret instanceof Boolean)) { return false; } return (Boolean) ret; } }
MessageFilter定義了isMatchedByConsumeQueue、isMatchedByCommitLog方法;ExpressionMessageFilter實現了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter進行判斷;isMatchedByCommitLog方法主要是經過realFilterData.getCompiledExpression().evaluate(context)來獲取結果;ExpressionForRetryMessageFilter繼承了ExpressionMessageFilter,它覆蓋了isMatchedByCommitLog方法,裏頭會使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)來判斷是不是isRetryTopicapache