說在前面apache
默認請求處理器,獲取broker集羣信息,從namesrv中獲取全部的broker列表,刪除namesrv中的topic、獲取集羣的全部topic、從namesrv中獲取系統的topic列表、獲取單元化模式非%RETRY%開頭的topic、獲取單元化模式消費失敗重試的%RETRY%開頭的topic微信
源碼解析app
進入這個方法,獲取broker集羣信息,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfothis
private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfocode
public byte[] getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();// broker數據clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);// broker集羣clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper.encode(); }
往上返回到這個方法,從namesrv中獲取全部的broker列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserverserver
private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 從topic隊列信息中獲取全部的topicbyte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,從topic隊列信息中獲取全部的topic,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicListblog
public byte[] getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到這個方法,刪除namesrv中的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv隊列
private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final DeleteTopicInNamesrvRequestHeader requestHeader =(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);// 從topic消息隊列信息中刪除topic相關的信息this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,從topic消息隊列信息中刪除topic相關的信息,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#deleteTopicrem
public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);} }
往上返回到這個方法,獲取集羣的全部topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByClusterget
private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetTopicsByClusterRequestHeader requestHeader =(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);// 根據broker集羣地址獲取topicsbyte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,根據broker集羣地址獲取topics,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster
public byte[] getTopicsByCluster(String cluster) {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();// 獲取集羣的brokerSet<String> brokerNameSet = this.clusterAddrTable.get(cluster);for (String brokerName : brokerNameSet) {Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();for (QueueData queueData : queueDatas) {if (brokerName.equals(queueData.getBrokerName())) {// 從topic的消息隊列中獲取topictopicList.getTopicList().add(topic);break;}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到這個方法,從namesrv中獲取系統的topic列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getSystemTopicListFromNs
private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getSystemTopicList
public byte[] getSystemTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {// 這裏有點疑問,broker集羣的clusterName、brokerName添加到topic列表中是什麼意思,系統topic就是這些名字嗎topicList.getTopicList().add(entry.getKey());topicList.getTopicList().addAll(entry.getValue());}if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {Iterator<String> it = brokerAddrTable.keySet().iterator();while (it.hasNext()) {BrokerData bd = brokerAddrTable.get(it.next());HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();if (brokerAddrs != null && !brokerAddrs.isEmpty()) {Iterator<Long> it2 = brokerAddrs.keySet().iterator();// 從broker數據中獲取broker地址topicList.setBrokerAddr(brokerAddrs.get(it2.next()));break;}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到這個方法,獲取單元化模式非%RETRY%開頭的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList
private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// =》byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics
public byte[] getUnitTopics() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到這個方法,獲取單元化模式消費失敗重試的%RETRY%開頭的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getHasUnitSubTopicList
private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getHasUnitSubTopicList
public byte[] getHasUnitSubTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣