-ea -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9996
service:jmx:rmi:///jndi/rmi://127.0.0.1:9998/jmxrmi
bean name: kafka:type=kafka.SocketServerStats(每次啓動都會清空這部分數據)java
def getProduceRequestsPerSecond: Double
def getFetchRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getAvgFetchRequestMs: Double
def getMaxFetchRequestMs: Double
def getBytesReadPerSecond: Double
def getBytesWrittenPerSecond: Double
def getNumFetchRequests: Long
def getNumProduceRequests: Long
def getTotalBytesRead: Long
def getTotalBytesWritten: Long
def getTotalFetchRequestMs: Long
def getTotalProduceRequestMs: Long
bean name: kafka:type=kafka.BrokerAllTopicStat(每次啓動都會清空這部分數據)
bean name: kafka:type=kafka.BrokerTopicStat.topic(每次啓動都會清空這部分數據)app
def getMessagesIn: Long 寫入消息的數量
def getBytesIn: Long 寫入的byte數量
def getBytesOut: Long 讀出byte的數量
def getFailedProduceRequest: Long 失敗的生產數量
def getFailedFetchRequest: Long 失敗的讀取操做數量
不是過重要的屬性dom
bean name: kafka:type=kafka.LogFlushStatsfetch
def getFlushesPerSecond: Double
def getAvgFlushMs: Double
def getTotalFlushMs: Long
def getMaxFlushMs: Double
def getNumFlushes: Long
bean name: kafka:type=logs.topic-patternspa
def getName: String 監控項目的名字,格式 topic+」-」+分區ID,好比 guoguo_t_1-0,guoguo_t_1-1
def getSize: Long 執久化文件的大小
def getNumberOfSegments: Int 執久化文件的數量
def getCurrentOffset: Long 基於當前寫入kafka的文件的byte偏移量
def getNumAppendedMessages: Long 追加數據,每次重啓清空
其它的須要監控的數據項目:.net
java堆(堆的內存使用狀況,非堆的內存使用狀況等)
GC信息(GC次數,GC總時間等)code
消費者的狀態
bean name: kafka:type=kafka.ConsumerStatsorm
def getPartOwnerStats: String 好比:guoguo_t_1: [ { 0-1, // broker+分區的信息 fetchoffset: 58246, 取的offset,已經消費的offset consumeroffset: 58246 }{ 0-0, fetchoffset: 2138747,consumeroffset: 2138747}] def getConsumerGroup: String 消費者的組,好比guoguo_group_1 def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long 有多少byte消息沒有讀取 def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long 已經消費了多少byte的數據 def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
bean name: kafka:type=kafka.ConsumerAllTopicStat (全部topic的數據的彙總,重啓數據也會被清空)server
kafka:type=kafka.ConsumerTopicStat.topic(重啓數據也會被清空)blog
def getMessagesPerTopic: Long
def getBytesPerTopic: Long
bean name: kafka:type=kafka.SimpleConsumerStats
def getFetchRequestsPerSecond: Double 每秒種發起的取數據請求數
def getAvgFetchRequestMs: Double 平均取數據請求用的ms數
def getMaxFetchRequestMs: Double 最大取數據請求用的ms數
def getNumFetchRequests: Long 取數據請求的數量
def getConsumerThroughput: Double 消費者的吞吐量,字節每秒
Producer
bean name: kafka:type=kafka.KafkaProducerStats
def getProduceRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getNumProduceRequests: Long
bean name: kafka.producer.Producer:type=AsyncProducerStats
def getAsyncProducerEvents: Int (發出消息數量,與全部消費者的getMessagesPerTopic值相關不該太大)
def getAsyncProducerDroppedEvents: Int
package com.campaign.kafka import javax.management._ import kafka.log.LogStatsMBean import kafka.network.SocketServerStatsMBean import kafka.server.BrokerTopicStatMBean import javax.management.openmbean.CompositeData import java.lang.management.{MemoryUsage, GarbageCollectorMXBean} import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} /** * Created by jiaguotian on 14-1-13. */ 15object RmiMonitor { def main(args: Array[String]) { val jmxUrl: JMXServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi") val connector: JMXConnector = JMXConnectorFactory.connect(jmxUrl) val mBeanServerconnection: MBeanServerConnection = connector.getMBeanServerConnection val domains: Array[String] = mBeanServerconnection.getDomains println("domains:") for (domain <- domains) { println("%25s: %s".format("domain", domain)) } println("-------------------------------") val beanSet: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(null, null) val beans: Array[ObjectInstance] = beanSet.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0) for (instance <- beans) { val objectName: ObjectName = instance.getObjectName println("%s %s".format(instance.getClassName, objectName)) } println("-------------------------------") { val instance: ObjectName = ObjectName.getInstance("kafka:type=kafka.SocketServerStats") val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, instance, classOf[SocketServerStatsMBean], true) println(instance.getCanonicalKeyPropertyListString) println("%25s: %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs)) println("%25s: %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs)) println("%25s: %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond)) println("%25s: %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond)) println("%25s: %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond)) println("%25s: %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs)) println("%25s: %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs)) println("%25s: %s".format("NumFetchRequests", bean.getNumFetchRequests)) println("%25s: %s".format("NumProduceRequests", bean.getNumProduceRequests)) println("%25s: %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond)) } println("-------------------------------"); { val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames( ObjectName.getInstance("java.lang:type=Memory*"), null) val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0)) for (name <- array) { val info: _root_.javax.management.MBeanInfo = mBeanServerconnection.getMBeanInfo(name) val attrInfos: Array[_root_.javax.management.MBeanAttributeInfo] = info.getAttributes println(name.toString) for (info <- attrInfos) { println(info.getName + " " + info.getType) info.getType match { case "javax.management.openmbean.CompositeData" => val attribute: AnyRef = mBeanServerconnection.getAttribute(name, info.getName) val bean: MemoryUsage = MemoryUsage.from(attribute.asInstanceOf[CompositeData]) println("%25s: %s".format("Committed", bean.getCommitted())) println("%25s: %s".format("Init", bean.getInit())) println("%25s: %s".format("Max", bean.getMax())) println("%25s: %s".format("Used", bean.getUsed())) case _ => } } } } println("-------------------------------") { val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames( ObjectName.getInstance("java.lang:type=GarbageCollector,name=*"), null) val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0)) for (next <- array) { val bean: GarbageCollectorMXBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, next, classOf[GarbageCollectorMXBean], true) println("%25s: %s".format("Name", bean.getName())) println("%25s: %s".format("MemoryPoolNames", bean.getMemoryPoolNames())) println("%25s: %s".format("ObjectName", bean.getObjectName())) println("%25s: %s".format("Class", bean.getClass())) println("%25s: %s".format("CollectionCount", bean.getCollectionCount())) println("%25s: %s".format("CollectionTime", bean.getCollectionTime())) } } val TypeValuePattern = "(.*):(.*)=(.*)".r val kafka1: ObjectName = new ObjectName("kafka", "type", "*") val kafka: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(kafka1, null) val kafkas: Array[ObjectInstance] = kafka.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0) for (instance <- kafkas) { val objectName: ObjectName = instance.getObjectName println(instance.getClassName + " " + objectName) objectName.getCanonicalName match { case TypeValuePattern(domain, t, v) => instance.getClassName match { case "kafka.log.LogStats" => val oName: ObjectName = new ObjectName(domain, t, v) val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true) println("%25s: %s".format("CurrentOffset", bean.getCurrentOffset)) println("%25s: %s".format("Name", bean.getName())) println("%25s: %s".format("NumAppendedMessages", bean.getNumAppendedMessages)) println("%25s: %s".format("NumberOfSegments", bean.getNumberOfSegments)) println("%25s: %s".format("Size", bean.getSize())) case "kafka.message.LogFlushStats" => val oName: ObjectName = new ObjectName(domain, t, v) val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true) println("%25s: %s".format("CurrentOffset", bean.getCurrentOffset)) println("%25s: %s".format("Name", bean.getName())) println("%25s: %s".format("NumAppendedMessages", bean.getNumAppendedMessages)) println("%25s: %s".format("NumberOfSegments", bean.getNumberOfSegments)) println("%25s: %s".format("Size", bean.getSize())) case "kafka.network.SocketServerStats" => val oName: ObjectName = new ObjectName(domain, t, v) val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[SocketServerStatsMBean], true) println("%25s: %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond)) println("%25s: %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs)) println("%25s: %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs)) println("%25s: %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond)) println("%25s: %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond)) println("%25s: %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs)) println("%25s: %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs)) println("%25s: %s".format("NumFetchRequests", bean.getNumFetchRequests)) println("%25s: %s".format("NumProduceRequests", bean.getNumProduceRequests)) println("%25s: %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond)) println("%25s: %s".format("TotalBytesRead", bean.getTotalBytesRead)) case "kafka.server.BrokerTopicStat" => val oName: ObjectName = new ObjectName(domain, t, v) val bean: BrokerTopicStatMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[BrokerTopicStatMBean], true) println("%25s: %s".format("BytesIn", bean.getBytesIn)) println("%25s: %s".format("BytesOut", bean.getBytesOut)) println("%25s: %s".format("FailedFetchRequest", bean.getFailedFetchRequest)) println("%25s: %s".format("FailedProduceRequest", bean.getFailedProduceRequest)) println("%25s: %s".format("MessagesIn", bean.getMessagesIn)) case _ => } case _ => } } } }
輸出結果
domains: domain: JMImplementation domain: com.sun.management domain: kafka domain: java.nio domain: java.lang domain: java.util.logging ------------------------------- com.sun.management.UnixOperatingSystem java.lang:type=OperatingSystem javax.management.MBeanServerDelegate JMImplementation:type=MBeanServerDelegate kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1 kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0 kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats kafka.utils.Log4jController kafka:type=kafka.Log4jController sun.management.ClassLoadingImpl java.lang:type=ClassLoading sun.management.CompilationImpl java.lang:type=Compilation sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ConcurrentMarkSweep sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ParNew sun.management.HotSpotDiagnostic com.sun.management:type=HotSpotDiagnostic sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=direct sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=mapped sun.management.ManagementFactoryHelper$PlatformLoggingImpl java.util.logging:type=Logging sun.management.MemoryImpl java.lang:type=Memory sun.management.MemoryManagerImpl java.lang:type=MemoryManager,name=CodeCacheManager sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Survivor Space sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Perm Gen sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Eden Space sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Code Cache sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Old Gen sun.management.RuntimeImpl java.lang:type=Runtime sun.management.ThreadImpl java.lang:type=Threading ------------------------------- type=kafka.SocketServerStats getAvgFetchRequestMs: 0.0 getAvgProduceRequestMs: 0.0 getBytesReadPerSecond: 0.0 getBytesWrittenPerSecond: 0.0 getFetchRequestsPerSecond: -0.0 getMaxFetchRequestMs: 0.0 getMaxProduceRequestMs: 0.0 getNumFetchRequests: 0 getNumProduceRequests: 0 getProduceRequestsPerSecond: -0.0 ------------------------------- java.lang:type=Memory HeapMemoryUsage javax.management.openmbean.CompositeData getCommitted: 3194421248 getInit: 3221225472 getMax: 3194421248 getUsed: 163302248 NonHeapMemoryUsage javax.management.openmbean.CompositeData getCommitted: 24313856 getInit: 24313856 getMax: 136314880 getUsed: 14854816 ObjectPendingFinalizationCount int Verbose boolean ObjectName javax.management.ObjectName ------------------------------- getName: ParNew getMemoryPoolNames: [Ljava.lang.String;@23652209 getObjectName: java.lang:type=GarbageCollector,name=ParNew getClass: class com.sun.proxy.$Proxy1 getCollectionCount: 0 getCollectionTime: 0 getName: ConcurrentMarkSweep getMemoryPoolNames: [Ljava.lang.String;@2c61bbb7 getObjectName: java.lang:type=GarbageCollector,name=ConcurrentMarkSweep getClass: class com.sun.proxy.$Proxy1 getCollectionCount: 0 getCollectionTime: 0 kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1 CurrentOffset: 5519897 Name: guoguo_t_1-1 NumAppendedMessages: 0 NumberOfSegments: 1 Size: 5519897 kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0 CurrentOffset: 7600338 Name: guoguo_t_1-0 NumAppendedMessages: 0 NumberOfSegments: 1 Size: 7600338 kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats BytesReadPerSecond: 0.0 AvgFetchRequestMs: 0.0 AvgProduceRequestMs: 0.0 BytesWrittenPerSecond: 0.0 FetchRequestsPerSecond: -0.0 MaxFetchRequestMs: 0.0 MaxProduceRequestMs: 0.0 NumFetchRequests: 0 NumProduceRequests: 0 ProduceRequestsPerSecond: -0.0 TotalBytesRead: 0 kafka.utils.Log4jController kafka:type=kafka.Log4jController