本文主要研究一下rocketmq的updateConsumeOffsetToBrokerjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.javagit
public class RemoteBrokerOffsetStore implements OffsetStore { //...... /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, * here need to be optimized. */ @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } if (findBrokerResult != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.javagithub
public class MQClientInstance { //...... public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { String brokerAddr = null; boolean slave = false; boolean found = false; HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { for (Map.Entry<Long, String> entry : map.entrySet()) { Long id = entry.getKey(); brokerAddr = entry.getValue(); if (brokerAddr != null) { found = true; if (MixAll.MASTER_ID == id) { slave = false; } else { slave = true; } break; } } // end of for } if (found) { return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; } //...... }