rocketmq源碼解析之管理請求獲取broker運行時信息

說在前面java

管理請求 GET_BROKER_RUNTIME_INFO 獲取broker運行時信息apache

 

源碼解析緩存

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getBrokerRuntimeInfo查詢broker運行時信息微信

private RemotingCommand getBrokerRuntimeInfo(ChannelHandlerContext ctx, RemotingCommand request) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    HashMap<String, String> runtimeInfo = this.prepareRuntimeInfo();
    KVTable kvTable = new KVTable();
    kvTable.setTable(runtimeInfo);
    byte[] body = kvTable.encode();
    response.setBody(body);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#prepareRuntimeInfo組裝運行時信息異步

private HashMap<String, String> prepareRuntimeInfo() {
        HashMap<String, String> runtimeInfo = this.brokerController.getMessageStore().getRuntimeInfo();
//        rocketmq版本號
        runtimeInfo.put("brokerVersionDesc", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
        runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION));
//        昨天存儲的消息數量
        runtimeInfo.put("msgPutTotalYesterdayMorning",
            String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
//        今天存儲的消息數量
        runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
//        如今存儲的消息數量
        runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
//        昨天消費的消息數量
        runtimeInfo.put("msgGetTotalYesterdayMorning",
            String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
//        今天消費的消息數量
        runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
//        如今消費的消息數量
        runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
//        發送消息線程池隊列大小
        runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
//        發送消息線程池隊列容量大小10000
        runtimeInfo.put("sendThreadPoolQueueCapacity",
            String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
//        拉去消息線程池隊列大小
        runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size()));
//        拉去消息線程池隊列容量大小100000
        runtimeInfo.put("pullThreadPoolQueueCapacity",
            String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
//        查詢線程池隊列大小
        runtimeInfo.put("queryThreadPoolQueueSize", String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
//        查詢線程池隊列容量大小20000
        runtimeInfo.put("queryThreadPoolQueueCapacity",
            String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
//        在commitLog中可是還沒有分配隊列的字節數=commitLog的maxOffset-再次存儲消息的fromOffset =》
        runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
//       緩存鎖定時間
        runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
//        發送線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間
        runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
//        拉取消息線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間
        runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
//        查詢消息線程池隊列頭部元素等待時間=如今時間-隊列頭部元素建立時間
        runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
//        最先消息存儲的時間
        runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
//        開始接收發送請求的時間
        runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
        if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
            DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
//            保存異步刷新消息到磁盤以前存儲消息的緩衝區大小
            runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs()));
            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//                最多消息可提交的消息大小=最大可寫的位置-已提交的位置
                runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false));
            }
//            最多消息可刷新的消息大小=最大偏移量0-消息刷新位置
            runtimeInfo.put("remainHowManyDataToFlush", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToFlush(), false));
        }

        java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
        if (commitLogDir.exists()) {
//            commitLog文件夾的容量大小=總大小-可用容量
            runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false)));
        }

        return runtimeInfo;
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getBrokerRuntimeInfo結束this

 

說在最後spa

本次解析僅表明我的觀點,僅供參考。線程

 

加入技術微信羣code

釘釘技術羣orm

相關文章
相關標籤/搜索