說在前面apache
client管理 取消註冊client緩存
源碼解析微信
進入這個方法,取消註冊client,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)this
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);final UnregisterClientRequestHeader requestHeader =(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),requestHeader.getClientID(),request.getLanguage(),request.getVersion());{final String group = requestHeader.getProducerGroup();if (group != null) {// 取消註冊生產者=》this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo);}}{final String group = requestHeader.getConsumerGroup();if (group != null) {// 獲取消費組的訂閱配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);// 消費組訂閱信息發生改變了是否須要通知消費者,默認是須要通知boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();}// 取消註冊消費者=》this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,取消註冊生產者,org.apache.rocketmq.broker.client.ProducerManager.unregisterProducer(String, ClientChannelInfo)code
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {try {if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {// 獲取消費組的client的channel信息HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);if (null != channelTable && !channelTable.isEmpty()) {// 刪除channel信息ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a producer[{}] from groupChannelTable {}", group,clientChannelInfo.toString());}if (channelTable.isEmpty()) {// 刪除消費組的channel信息this.groupChannelTable.remove(group);log.info("unregister a producer group[{}] from groupChannelTable", group);}}} finally {this.groupChannelLock.unlock();}} else {log.warn("ProducerManager unregisterProducer lock timeout");}} catch (InterruptedException e) {log.error("", e);} }
進入這個方法,獲取消費組的訂閱配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager.findSubscriptionGroupConfig(String)blog
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 從緩存中獲取組的訂閱信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 自動建立消費組或者是系統自用的消費組if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 更新數據的版本號this.dataVersion.nextVersion();// 持久化=》this.persist();}}return subscriptionGroupConfig; }
往上返回進入到這個方法,取消註冊消費者,org.apache.rocketmq.broker.client.ConsumerManager.unregisterConsumer(String, ClientChannelInfo, boolean)事件
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null != consumerGroupInfo) {// 取消註冊channel信息=》consumerGroupInfo.unregisterChannel(clientChannelInfo);if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {// 刪除組的消費組信息ConsumerGroupInfo remove = this.consumerTable.remove(group);if (remove != null) {log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);// 消費者改變監聽器執行取消註冊事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);}}// 消費組的訂閱信息發生改變了是否須要通知消費者,默認是if (isNotifyConsumerIdsChangedEnable) {// 執行消費組改變事件=》this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}} }
進入這個方法,取消註冊channel信息,org.apache.rocketmq.broker.client.ConsumerGroupInfo.unregisterChannel(ClientChannelInfo)ip
public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());if (old != null) {log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());} }
進入這個方法,消費者改變監聽器執行取消註冊事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 這個方法前面介紹過了。rem
進入這個方法,執行消費組改變事件,org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener.handle(ConsumerGroupEvent, String, Object...) 這個方法前面介紹你過了。get
往上返回到這個方法,org.apache.rocketmq.broker.processor.ClientManageProcessor.unregisterClient(ChannelHandlerContext, RemotingCommand)結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣