rocketmq源碼解析默認請求處理器②

說在前面apache

默認請求處理器,註冊brokerjson

 

源碼解析微信

進入這個方法,註冊broker,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServerapp







public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {try {registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}//        =》RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),registerBrokerBody.getTopicConfigSerializeWrapper(),registerBrokerBody.getFilterServerList(),ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());//        獲取ORDER_TOPIC_CONFIG命名空間的配置byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBrokerthis








public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {//                同步註冊this.lock.writeLock().lockInterruptibly();//                獲取broker的集羣地址Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);boolean registerFirst = false;//               獲取指定brokerName的broker數據BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);//                若是broker是masterif (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {//                        獲取topic配置ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {//                                建立或更新消息隊列數據=》this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//                存活的broker列表BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//                過濾的broker列表if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//                若是broker不是masterif (MixAll.MASTER_ID != brokerId) {//                    獲取broker master地址String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;    }

進入這個方法,建立或更新消息隊列數據,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#createAndUpdateQueueDataspa



private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {QueueData queueData = new QueueData();queueData.setBrokerName(brokerName);queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());queueData.setReadQueueNums(topicConfig.getReadQueueNums());queueData.setPerm(topicConfig.getPerm());queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());//        查詢topic的隊列List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());if (null == queueDataList) {queueDataList = new LinkedList<QueueData>();queueDataList.add(queueData);this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);} else {boolean addNewOne = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();if (qd.getBrokerName().equals(brokerName)) {if (qd.equals(queueData)) {//                        topic存在的隊列和要保存的隊列同樣,不須要添加addNewOne = false;} else {log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,queueData);//                        若是topic存在的隊列和要保存的隊列不一致,就刪除存在的隊列it.remove();}}}//            須要添加if (addNewOne) {queueDataList.add(queueData);}}    }

進入這個方法,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokercode





public RemotingCommand registerBroker(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}TopicConfigSerializeWrapper topicConfigWrapper;if (request.getBody() != null) {topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);} else {topicConfigWrapper = new TopicConfigSerializeWrapper();topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));topicConfigWrapper.getDataVersion().setTimestamp(0);}//        =》RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),topicConfigWrapper,null,ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());//        獲取ORDER_TOPIC_CONFIG命名空間的配置byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

進入這個方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker前面介紹過了。blog

往上返回到這個方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker結束。隊列

 

說在最後rem

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索