rocketmq源碼解析client管理取消註冊client

說在前面apache

client管理 取消註冊client緩存

 

源碼解析微信

進入這個方法,取消註冊client,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)this



public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);final UnregisterClientRequestHeader requestHeader =(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),requestHeader.getClientID(),request.getLanguage(),request.getVersion());{final String group = requestHeader.getProducerGroup();if (group != null) {//                取消註冊生產者=》this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo);}}{final String group = requestHeader.getConsumerGroup();if (group != null) {//                獲取消費組的訂閱配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);//                消費組訂閱信息發生改變了是否須要通知消費者,默認是須要通知boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();}//                取消註冊消費者=》this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

進入這個方法,取消註冊生產者,org.apache.rocketmq.broker.client.ProducerManager.unregisterProducer(String, ClientChannelInfo)code

public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {try {if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//                    獲取消費組的client的channel信息HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null != channelTable && !channelTable.isEmpty()) {//                        刪除channel信息ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a producer[{}] from groupChannelTable {}", group,clientChannelInfo.toString());}if (channelTable.isEmpty()) {//                            刪除消費組的channel信息this.groupChannelTable.remove(group);log.info("unregister a producer group[{}] from groupChannelTable", group);}}} finally {this.groupChannelLock.unlock();}} else {log.warn("ProducerManager unregisterProducer lock timeout");}} catch (InterruptedException e) {log.error("", e);}    }

進入這個方法,獲取消費組的訂閱配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager.findSubscriptionGroupConfig(String)blog

public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {//        從緩存中獲取組的訂閱信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {//            自動建立消費組或者是系統自用的消費組if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}//                更新數據的版本號this.dataVersion.nextVersion();//                持久化=》this.persist();}}return subscriptionGroupConfig;    }

往上返回進入到這個方法,取消註冊消費者,org.apache.rocketmq.broker.client.ConsumerManager.unregisterConsumer(String, ClientChannelInfo, boolean)事件

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null != consumerGroupInfo) {//            取消註冊channel信息=》consumerGroupInfo.unregisterChannel(clientChannelInfo);if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {//                刪除組的消費組信息ConsumerGroupInfo remove = this.consumerTable.remove(group);if (remove != null) {log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);//                    消費者改變監聽器執行取消註冊事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);}}//            消費組的訂閱信息發生改變了是否須要通知消費者,默認是if (isNotifyConsumerIdsChangedEnable) {//                執行消費組改變事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}    }

進入這個方法,取消註冊channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo.unregisterChannel(ClientChannelInfo)ip

public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());}    }

進入這個方法,消費者改變監聽器執行取消註冊事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 這個方法前面介紹過了。rem

 

進入這個方法,執行消費組改變事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 這個方法前面介紹你過了。get

 

往上返回到這個方法,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索