rocketmq源碼解析請求處理重置消費者client的offset

說在前面java

請求處理 重置消費者client的offsetapache

 

源碼解析json

進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#resetOffset緩存

public RemotingCommand resetOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ResetOffsetRequestHeader requestHeader =(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),requestHeader.getTimestamp());Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();if (request.getBody() != null) {ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);offsetTable = body.getOffsetTable();}//        重置offset=》this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);return null;    }

進入這個方法,重置offset,org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset微信



public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {DefaultMQPushConsumerImpl consumer = null;try {//            獲取消費組的消費者MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {consumer = (DefaultMQPushConsumerImpl) impl;} else {log.info("[reset-offset] consumer dose not exist. group={}", group);return;}//            消費者暫停consumer.suspend();ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {MessageQueue mq = entry.getKey();if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {ProcessQueue pq = entry.getValue();//                  處理隊列刪除pq.setDropped(true);//                    清空處理隊列=》pq.clear();}}try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();while (iterator.hasNext()) {MessageQueue mq = iterator.next();//                獲取處理隊列中的消息隊列的offsetLong offset = offsetTable.get(mq);if (topic.equals(mq.getTopic()) && offset != null) {try {//                        更新消費的offset=》consumer.updateConsumeOffset(mq, offset);//                        刪除不須要的消息隊列=》consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));iterator.remove();} catch (Exception e) {log.warn("reset offset failed. group={}, {}", group, mq, e);}}}} finally {if (consumer != null) {//                消費者暫停取消=》consumer.resume();}}    }

進入這個方法,清空處理隊列,org.apache.rocketmq.client.impl.consumer.ProcessQueue#clear負載均衡

public void clear() {try {this.lockTreeMap.writeLock().lockInterruptibly();try {this.msgTreeMap.clear();//                消息排序this.consumingMsgOrderlyTreeMap.clear();this.msgCount.set(0);this.msgSize.set(0);this.queueOffsetMax = 0L;} finally {this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {log.error("rollback exception", e);}    }

進入這個方法,更新消費的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffsetide

@Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}    }

進入這個方法,刪除不須要的消息隊列,org.apache.rocketmq.client.impl.consumer.RebalancePullImpl#removeUnnecessaryMessageQueue,defaultMQPullConsumerImplthis

@Overridepublic boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {//        消息隊列持久化=》this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);//        刪除消息隊列=》this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);return true;    }

進入這個方法,消息隊列持久化,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist編碼

@Overridepublic void persist(MessageQueue mq) {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {try {//                更新broker消費的offset=》this.updateConsumeOffsetToBroker(mq, offset.get());log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",this.groupName,this.mQClientFactory.getClientId(),mq,offset.get());} catch (Exception e) {log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);}}    }

進入這個方法,更新broker消費的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateConsumeOffsetToBroker(org.apache.rocketmq.common.message.MessageQueue, long, boolean).net



@Overridepublic void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException {//        查詢broker=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());if (null == findBrokerResult) {//            從namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//            查找broker的地址 =》findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());}if (findBrokerResult != null) {UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();requestHeader.setTopic(mq.getTopic());requestHeader.setConsumerGroup(this.groupName);requestHeader.setQueueId(mq.getQueueId());requestHeader.setCommitOffset(offset);if (isOneway) {//                單途更新消費者的offset=》this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {//                更新消費者的offset=》this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);}} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}    }

進入這個方法,查詢broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInAdmin




public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {String brokerAddr = null;boolean slave = false;boolean found = false;//        查詢broker的地址列表HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {for (Map.Entry<Long, String> entry : map.entrySet()) {Long id = entry.getKey();brokerAddr = entry.getValue();if (brokerAddr != null) {found = true;if (MixAll.MASTER_ID == id) {slave = false;} else {slave = true;}break;}} // end of for}if (found) {//            =》return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;    }

進入這個方法, 從namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)





public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {//                        獲取默認的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {//                            獲取隊列信息for (QueueData data : topicRouteData.getQueueDatas()) {//                                讀寫隊列最大數量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {//                        獲取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);//                        判斷topic路由是否改變=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {//                            須要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {//                                更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//                                topic路由信息轉換成topic發佈信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);//                                遍歷生產者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                                        更新topic發佈信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//                                獲取消息隊列訂閱信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//                                遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                                        更新topic的訂閱信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;    }

進入這個方法,獲取默認的topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)




public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);//        同步獲取topic的路由信息=》RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());    }

進入這個方法,同步獲取topic的路由信息,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 前面介紹過了。

 

往上返回到這個方法,獲取topic路由信息,org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long) 上面介紹過了。

 

往上返回到這個方法,判斷topic路由是否改變,org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteDataIsChange

private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {if (olddata == null || nowdata == null)return true;TopicRouteData old = olddata.cloneTopicRouteData();TopicRouteData now = nowdata.cloneTopicRouteData();Collections.sort(old.getQueueDatas());Collections.sort(old.getBrokerDatas());Collections.sort(now.getQueueDatas());Collections.sort(now.getBrokerDatas());return !old.equals(now);}

往上返回到這個方法,單途更新消費者的offset,org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffsetOneway

public void updateConsumerOffsetOneway(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);//        =》this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);    }

進入這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway前面介紹過了。

 

往上返回都這個方法,更新消費者的offset,org.apache.rocketmq.client.impl.MQClientAPIImpl#updateConsumerOffset


public void updateConsumerOffset(final String addr,final UpdateConsumerOffsetRequestHeader requestHeader,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);//        =》RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

進入到這個方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync前面介紹過了。

 

往上返回到這個方法,刪除消息隊列,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#removeOffset

public void removeOffset(MessageQueue mq) {if (mq != null) {this.offsetTable.remove(mq);log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,offsetTable.size());}    }

往上返回到這個方法,org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#removeUnnecessaryMessageQueue,defaultMQPushConsumerImpl


@Overridepublic boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {//        持久化消息隊列this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);//        刪除消息隊列的offset=》this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);//        若是是有序消費,消息類型是集羣消費if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {try {//                        解鎖延遲=》return this.unlockDelay(mq, pq);} finally {pq.getLockConsume().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}return true;    }

進入這個方法,持久化消息隊列,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist 前面介紹過了。

 

往上返回到這個方法, 刪除消息隊列的offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#removeOffset 前面介紹過了。

 

進入這個方法,解鎖延遲,org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#unlockDelay

private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {//        處理隊列中有臨時消息=》if (pq.hasTempMessage()) {log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {@Overridepublic void run() {log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);//                    解鎖消息隊列=》RebalancePushImpl.this.unlock(mq, true);}}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);} else {this.unlock(mq, true);}return true;    }

進入這個方法,處理隊列中有臨時消息,org.apache.rocketmq.client.impl.consumer.ProcessQueue#hasTempMessage

public boolean hasTempMessage() {try {this.lockTreeMap.readLock().lockInterruptibly();try {return !this.msgTreeMap.isEmpty();} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {}return true;    }

進入這個方法,解鎖消息隊列,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#unlock

public void unlock(final MessageQueue mq, final boolean oneway) {//        按brokerName查詢broker地址在訂閱信息中=》FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);if (findBrokerResult != null) {UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.getMqSet().add(mq);try {//                解鎖批量消息隊列,1s超時this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}",this.consumerGroup,this.mQClientFactory.getClientId(),mq);} catch (Exception e) {log.error("unlockBatchMQ exception, " + mq, e);}}    }

進入這個方法,按brokerName查詢broker地址在訂閱信息中,org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe




public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName,final long brokerId,final boolean onlyThisBroker) {String brokerAddr = null;boolean slave = false;boolean found = false;//        獲取broker的緩存信息HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);if (map != null && !map.isEmpty()) {brokerAddr = map.get(brokerId);slave = brokerId != MixAll.MASTER_ID;found = brokerAddr != null;if (!found && !onlyThisBroker) {Entry<Long, String> entry = map.entrySet().iterator().next();brokerAddr = entry.getValue();slave = entry.getKey() != MixAll.MASTER_ID;found = true;}}if (found) {return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));}return null;    }

進入這個方法,解鎖批量消息隊列,1s超時,org.apache.rocketmq.client.impl.MQClientAPIImpl#unlockBatchMQ



public void unlockBatchMQ(final String addr,final UnlockBatchRequestBody requestBody,final long timeoutMillis,final boolean oneway) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);//        json編碼request.setBody(requestBody.encode());if (oneway) {//           單途請求=》this.remotingClient.invokeOneway(addr, request, timeoutMillis);} else {//            同步執行RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());}    }

進入這個方法,單途請求,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 前面介紹過了。

 

進入這個方法,同步執行,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 前面介紹過了。

 

往上返回到這個方法,消費者暫停取消,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#resume

public void resume() {this.pause = false;//        負載均衡=》doRebalance();log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());    }

進入這個方法,負載均衡,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance 前面介紹過了。

往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#resetOffset結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索