說在前面java
請求處理 重置消費者client的offsetapache
源碼解析json
進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#resetOffset緩存
public RemotingCommand resetOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ResetOffsetRequestHeader requestHeader =(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),requestHeader.getTimestamp());Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();if (request.getBody() != null) {ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);offsetTable = body.getOffsetTable();}// 重置offset=》this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);return null; }
進入這個方法,重置offset,org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset微信
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {DefaultMQPushConsumerImpl consumer = null;try {// 獲取消費組的消費者MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {consumer = (DefaultMQPushConsumerImpl) impl;} else {log.info("[reset-offset] consumer dose not exist. group={}", group);return;}// 消費者暫停consumer.suspend();ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {MessageQueue mq = entry.getKey();if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {ProcessQueue pq = entry.getValue();// 處理隊列刪除pq.setDropped(true);// 清空處理隊列=》pq.clear();}}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();while (iterator.hasNext()) {MessageQueue mq = iterator.next();// 獲取處理隊列中的消息隊列的offsetLong offset = offsetTable.get(mq);if (topic.equals(mq.getTopic()) && offset != null) {try {// 更新消費的offset=》consumer.updateConsumeOffset(mq, offset);// 刪除不須要的消息隊列=》consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));iterator.remove();} catch (Exception e) {log.warn("reset offset failed. group={}, {}", group, mq, e);}}}} finally {if (consumer != null) {// 消費者暫停取消=》consumer.resume();}} }
進入這個方法,清空處理隊列,org.apache.rocketmq.client.impl.consumer.ProcessQueue#clear負載均衡
public void clear() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {this.msgTreeMap.clear();// 消息排序this.consumingMsgOrderlyTreeMap.clear();this.msgCount.set(0);this.msgSize.set(0);this.queueOffsetMax = 0L;} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("rollback exception", e);} }
進入這個方法,更新消費的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffsetide
@Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}} }
進入這個方法,刪除不須要的消息隊列,org.apache.rocketmq.client.impl.consumer.RebalancePullImpl#removeUnnecessaryMessageQueue,defaultMQPullConsumerImplthis
@Overridepublic boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {// 消息隊列持久化=》this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);// 刪除消息隊列=》this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);return true; }
進入這個方法,消息隊列持久化,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist編碼
@Overridepublic void persist(MessageQueue mq) {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {try {// 更新broker消費的offset=》this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}} }
進入這個方法,更新broker消費的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker(org.apache.rocketmq.common.message.MessageQueue, long, boolean).net
@Overridepublic void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {// 查詢broker=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());if (null == findBrokerResult) {// 從namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 查找broker的地址 =》findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());}if (findBrokerResult != null) {UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());requestHeader.setCommitOffset(offset);if (isOneway) {// 單途更新消費者的offset=》this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {// 更新消費者的offset=》this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);} }
進入這個方法,查詢broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInAdmin
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {String brokerAddr = null;boolean slave = false;boolean found = false;// 查詢broker的地址列表HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {for (Map.Entry<Long, String> entry : map.entrySet()) {Long id = entry.getKey();brokerAddr = entry.getValue();if (brokerAddr != null) {found = true;if (MixAll.MASTER_ID == id) {slave = false;} else {slave = true;}break;}} // end of for}if (found) {// =》return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null; }
進入這個方法, 從namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {// 獲取默認的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {// 獲取隊列信息for (QueueData data : topicRouteData.getQueueDatas()) {// 讀寫隊列最大數量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {// 獲取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 判斷topic路由是否改變=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 須要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{// topic路由信息轉換成topic發佈信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);// 遍歷生產者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 更新topic發佈信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{// 獲取消息隊列訂閱信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);// 遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 更新topic的訂閱信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false; }
進入這個方法,獲取默認的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);// 同步獲取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark()); }
進入這個方法,同步獲取topic的路由信息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 前面介紹過了。
往上返回到這個方法,獲取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long) 上面介紹過了。
往上返回到這個方法,判斷topic路由是否改變,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChange
private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}
往上返回到這個方法,單途更新消費者的offset,org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffsetOneway
public void updateConsumerOffsetOneway(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);// =》this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); }
進入這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway前面介紹過了。
往上返回都這個方法,更新消費者的offset,org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffset
public void updateConsumerOffset(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);// =》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入到這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync前面介紹過了。
往上返回到這個方法,刪除消息隊列,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#removeOffset
public void removeOffset(MessageQueue mq) {if (mq != null) {this.offsetTable.remove(mq);log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,offsetTable.size());} }
往上返回到這個方法,org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#removeUnnecessaryMessageQueue,defaultMQPushConsumerImpl
@Overridepublic boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {// 持久化消息隊列this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);// 刪除消息隊列的offset=》this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);// 若是是有序消費,消息類型是集羣消費if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {try {// 解鎖延遲=》return this.unlockDelay(mq, pq);} finally {pq.getLockConsume().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}return true; }
進入這個方法,持久化消息隊列,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist 前面介紹過了。
往上返回到這個方法, 刪除消息隊列的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#removeOffset 前面介紹過了。
進入這個方法,解鎖延遲,org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#unlockDelay
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {// 處理隊列中有臨時消息=》if (pq.hasTempMessage()) {log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {@Overridepublic void run() {log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);// 解鎖消息隊列=》RebalancePushImpl.this.unlock(mq, true);}}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);} else {this.unlock(mq, true);}return true; }
進入這個方法,處理隊列中有臨時消息,org.apache.rocketmq.client.impl.consumer.ProcessQueue#hasTempMessage
public boolean hasTempMessage() {try {this.lockTreeMap.readLock().lockInterruptibly();try {return !this.msgTreeMap.isEmpty();} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {}return true; }
進入這個方法,解鎖消息隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#unlock
public void unlock(final MessageQueue mq, final boolean oneway) {// 按brokerName查詢broker地址在訂閱信息中=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (findBrokerResult != null) {UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.getMqSet().add(mq);try {// 解鎖批量消息隊列,1s超時this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}",this.consumerGroup,this.mQClientFactory.getClientId(),mq);} catch (Exception e) {log.error("unlockBatchMQ exception, " + mq, e);}} }
進入這個方法,按brokerName查詢broker地址在訂閱信息中,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;// 獲取broker的緩存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null; }
進入這個方法,解鎖批量消息隊列,1s超時,org.apache.rocketmq.client.impl.MQClientAPIImpl#unlockBatchMQ
public void unlockBatchMQ(final String addr,final UnlockBatchRequestBody requestBody,final long timeoutMillis,final boolean oneway) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);// json編碼request.setBody(requestBody.encode());if (oneway) {// 單途請求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);} else {// 同步執行RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());} }
進入這個方法,單途請求,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 前面介紹過了。
進入這個方法,同步執行,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 前面介紹過了。
往上返回到這個方法,消費者暫停取消,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#resume
public void resume() {this.pause = false;// 負載均衡=》doRebalance();log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); }
進入這個方法,負載均衡,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance 前面介紹過了。
往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#resetOffset結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣