說在前面java
管理請求 GET_BROKER_RUNTIME_INFO 獲取broker運行時信息apache
源碼解析緩存
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getBrokerRuntimeInfo查詢broker運行時信息微信
private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo(); KVTable kvTable = new KVTable(); kvTable.setTable(runtimeInfo); byte[] body = kvTable.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#prepareRuntimeInfo組裝運行時信息異步
private HashMap<String, String> prepareRuntimeInfo() { HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo(); // rocketmq版本號 runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION)); // 昨天存儲的消息數量 runtimeInfo.put("msgPutTotalYesterdayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning())); // 今天存儲的消息數量 runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning())); // 如今存儲的消息數量 runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow())); // 昨天消費的消息數量 runtimeInfo.put("msgGetTotalYesterdayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning())); // 今天消費的消息數量 runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning())); // 如今消費的消息數量 runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow())); // 發送消息線程池隊列大小 runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size())); // 發送消息線程池隊列容量大小10000 runtimeInfo.put("sendThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity())); // 拉去消息線程池隊列大小 runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size())); // 拉去消息線程池隊列容量大小100000 runtimeInfo.put("pullThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity())); // 查詢線程池隊列大小 runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size())); // 查詢線程池隊列容量大小20000 runtimeInfo.put("queryThreadPoolQueueCapacity", String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); // 在commitLog中可是還沒有分配隊列的字節數=commitLog的maxOffset-再次存儲消息的fromOffset =》 runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); // 緩存鎖定時間 runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); // 發送線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間 runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue())); // 拉取消息線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間 runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue())); // 查詢消息線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間 runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue())); // 最先消息存儲的時間 runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime())); // 開始接收發送請求的時間 runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp())); if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) { DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); // 保存異步刷新消息到磁盤以前存儲消息的緩衝區大小 runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs())); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 最多消息可提交的消息大小=最大可寫的位置-已提交的位置 runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false)); } // 最多消息可刷新的消息大小=最大偏移量0-消息刷新位置 runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false)); } java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); if (commitLogDir.exists()) { // commitLog文件夾的容量大小=總大小-可用容量 runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); } return runtimeInfo; }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getBrokerRuntimeInfo結束this
說在最後spa
本次解析僅表明我的觀點,僅供參考。線程
加入技術微信羣code
釘釘技術羣orm