本文主要研究一下rocketmq的pullFromWhichNodeTablejava
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.javagit
public class PullAPIWrapper { private final InternalLogger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; private Random random = new Random(System.currentTimeMillis()); private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); //...... public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; } public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (null == suggest) { this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); } else { suggest.set(brokerId); } } public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; } AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (suggest != null) { return suggest.get(); } return MixAll.MASTER_ID; } public boolean isConnectBrokerByUser() { return connectBrokerByUser; } //...... }
MixAll.MASTER_ID
),不然從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_IDrocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.javagithub
public class PullResultExt extends PullResult { private final long suggestWhichBrokerId; private byte[] messageBinary; public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); this.suggestWhichBrokerId = suggestWhichBrokerId; this.messageBinary = messageBinary; } public byte[] getMessageBinary() { return messageBinary; } public void setMessageBinary(byte[] messageBinary) { this.messageBinary = messageBinary; } public long getSuggestWhichBrokerId() { return suggestWhichBrokerId; } }
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.javaapache
public class MQClientAPIImpl { private final static InternalLogger log = ClientLogger.getLog(); private static boolean sendSmartMsg = Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); //...... private PullResult processPullResponse( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; default: throw new MQBrokerException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); } //...... }
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.javaapp
public class PullMessageResponseHeader implements CommandCustomHeader { @CFNotNull private Long suggestWhichBrokerId; @CFNotNull private Long nextBeginOffset; @CFNotNull private Long minOffset; @CFNotNull private Long maxOffset; //...... }
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.javadom
public class PullMessageProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; private List<ConsumeMessageHook> consumeMessageHookList; //...... private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { //...... if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } //...... } //...... }
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.javathis
public class SubscriptionGroupConfig { private String groupName; private boolean consumeEnable = true; private boolean consumeFromMinEnable = true; private boolean consumeBroadcastEnable = true; private int retryQueueNums = 1; private int retryMaxTimes = 16; private long brokerId = MixAll.MASTER_ID; private long whichBrokerWhenConsumeSlowly = 1; private boolean notifyConsumerIdsChangedEnable = true; //...... }
MixAll.MASTER_ID
),不然從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_ID