說在前面apache
管理請求 GET_PRODUCER_CONNECTION_LIST 獲取生產者鏈接信息緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getProducerConnectionList 獲取生產者鏈接信息this
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetProducerConnectionListRequestHeader requestHeader = (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); // 構建生產者鏈接信息 ProducerConnection bodydata = new ProducerConnection(); HashMap<Channel, ClientChannelInfo> channelInfoHashMap = // 按生產組從緩存中獲取生產者鏈接列表信息=》 this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup()); if (channelInfoHashMap != null) { // 遍歷channelInfo信息 Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator(); while (it.hasNext()) { ClientChannelInfo info = it.next().getValue(); // 構建生產者鏈接 Connection connection = new Connection(); connection.setClientId(info.getClientId()); connection.setLanguage(info.getLanguage()); connection.setVersion(info.getVersion()); connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel())); bodydata.getConnectionSet().add(connection); } byte[] body = bodydata.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the producer group[" + requestHeader.getProducerGroup() + "] not exist"); return response; }
進入這個方法org.apache.rocketmq.broker.client.ProducerManager#getGroupChannelTable 獲取生產組的channel信息集合,再從生產組信息集合中按生產組名稱獲取該生產組的channel信息code
public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() { HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = new HashMap<String, HashMap<Channel, ClientChannelInfo>>(); try { if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { newGroupChannelTable.putAll(groupChannelTable); } finally { groupChannelLock.unlock(); } } } catch (InterruptedException e) { log.error("", e); } return newGroupChannelTable; }
結束blog
說在最後get
本次解析僅表明我的觀點,僅供參考。源碼
加入技術微信羣it
釘釘技術羣io