說在前面apache
管理請求 REGISTER_FILTER_SERVER 註冊過濾的server緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#registerFilterServer註冊過濾的serverthis
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); final RegisterFilterServerRequestHeader requestHeader = (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class); // =》 this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr()); responseHeader.setBrokerId(this.brokerController.getBrokerConfig().getBrokerId()); responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.filtersrv.FilterServerManager#registerFilterServercode
public void registerFilterServer(final Channel channel, final String filterServerAddr) { // 那channel從緩存中查詢出過濾的server FilterServerInfo filterServerInfo = this.filterServerTable.get(channel); if (filterServerInfo != null) { filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); } else { filterServerInfo = new FilterServerInfo(); filterServerInfo.setFilterServerAddr(filterServerAddr); filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis()); this.filterServerTable.put(channel, filterServerInfo); log.info("Receive a New Filter Server<{}>", filterServerAddr); } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#registerFilterServer結束server
說在最後blog
本次解析僅表明我的觀點,僅供參考。get
加入技術微信羣源碼
釘釘技術羣io