說在前面apache
默認請求處理器,獲取單元化模式消費失敗重試的%RETRY%開頭的topic微信
源碼解析this
進入這個方法,獲取單元化模式消費失敗重試的%RETRY%開頭的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getHasUnitSubUnUnitTopicListcode
private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getHasUnitSubUnUnitTopicListorm
public byte[] getHasUnitSubUnUnitTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())&& TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();}}
往上返回到這個方法,更新namesrv配置,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#updateConfigblog
private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = request.getBody();if (body != null) {String bodyStr;try {bodyStr = new String(body, MixAll.DEFAULT_CHARSET);} catch (UnsupportedEncodingException e) {log.error("updateConfig byte array to string error: ", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}if (bodyStr == null) {log.error("updateConfig get null body!");response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("string2Properties error");return response;}Properties properties = MixAll.string2Properties(bodyStr);if (properties == null) {log.error("updateConfig MixAll.string2Properties error {}", bodyStr);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("string2Properties error");return response;}this.namesrvController.getConfiguration().update(properties);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
往上返回到這個方法,獲取namesrv配置,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getConfigget
private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);String content = this.namesrvController.getConfiguration().getAllConfigsFormatString();if (content != null && content.length() > 0) {try {response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {log.error("getConfig error, ", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
說在最後源碼
本次解析僅表明我的觀點,僅供參考。string
加入技術微信羣it
釘釘技術羣