使用JMX監控Kafka

監控數據源

JMX RMI方式啓動Broker,Consumer,Producer

-ea -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9996

經過JMX RMI方式鏈接

service:jmx:rmi:///jndi/rmi://127.0.0.1:9998/jmxrmi

監控數據

broker

bean name: kafka:type=kafka.SocketServerStats(每次啓動都會清空這部分數據)java

def getProduceRequestsPerSecond: Double
def getFetchRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getAvgFetchRequestMs: Double
def getMaxFetchRequestMs: Double
def getBytesReadPerSecond: Double
def getBytesWrittenPerSecond: Double
def getNumFetchRequests: Long
def getNumProduceRequests: Long
def getTotalBytesRead: Long
def getTotalBytesWritten: Long
def getTotalFetchRequestMs: Long
def getTotalProduceRequestMs: Long

bean name: kafka:type=kafka.BrokerAllTopicStat(每次啓動都會清空這部分數據)
bean name: kafka:type=kafka.BrokerTopicStat.topic(每次啓動都會清空這部分數據)
app

def getMessagesIn: Long  寫入消息的數量
def getBytesIn: Long   寫入的byte數量
def getBytesOut: Long   讀出byte的數量
def getFailedProduceRequest: Long   失敗的生產數量
def getFailedFetchRequest: Long  失敗的讀取操做數量

不是過重要的屬性dom

bean name: kafka:type=kafka.LogFlushStatsfetch

def getFlushesPerSecond: Double
def getAvgFlushMs: Double
def getTotalFlushMs: Long
def getMaxFlushMs: Double
def getNumFlushes: Long

bean name: kafka:type=logs.topic-patternspa

def getName: String    監控項目的名字,格式  topic+」-」+分區ID,好比 guoguo_t_1-0,guoguo_t_1-1
def getSize: Long 執久化文件的大小
def getNumberOfSegments: Int  執久化文件的數量
def getCurrentOffset: Long   基於當前寫入kafka的文件的byte偏移量
def getNumAppendedMessages: Long    追加數據,每次重啓清空

其它的須要監控的數據項目:.net

java堆(堆的內存使用狀況,非堆的內存使用狀況等)
GC信息(GC次數,GC總時間等)code

consumer


消費者的狀態
bean name: kafka:type=kafka.ConsumerStats
orm

def getPartOwnerStats: String
好比:guoguo_t_1: [
  {
       0-1,  //  broker+分區的信息
       fetchoffset: 58246,  取的offset,已經消費的offset
      consumeroffset: 58246
   }{ 0-0,  fetchoffset: 2138747,consumeroffset: 2138747}]
def getConsumerGroup: String    消費者的組,好比guoguo_group_1
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long  有多少byte消息沒有讀取
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long 已經消費了多少byte的數據
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long

bean name: kafka:type=kafka.ConsumerAllTopicStat (全部topic的數據的彙總,重啓數據也會被清空)server

kafka:type=kafka.ConsumerTopicStat.topic(重啓數據也會被清空)blog

def getMessagesPerTopic: Long
def getBytesPerTopic: Long

bean name: kafka:type=kafka.SimpleConsumerStats

def getFetchRequestsPerSecond: Double 每秒種發起的取數據請求數
def getAvgFetchRequestMs: Double 平均取數據請求用的ms數
def getMaxFetchRequestMs: Double 最大取數據請求用的ms數
def getNumFetchRequests: Long 取數據請求的數量
def getConsumerThroughput: Double 消費者的吞吐量,字節每秒

Producer

bean name: kafka:type=kafka.KafkaProducerStats

def getProduceRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getNumProduceRequests: Long

bean name: kafka.producer.Producer:type=AsyncProducerStats

def getAsyncProducerEvents: Int (發出消息數量,與全部消費者的getMessagesPerTopic值相關不該太大)
def getAsyncProducerDroppedEvents: Int

Demo程序

package com.campaign.kafka
 
import javax.management._
import kafka.log.LogStatsMBean
import kafka.network.SocketServerStatsMBean
import kafka.server.BrokerTopicStatMBean
import javax.management.openmbean.CompositeData
import java.lang.management.{MemoryUsage, GarbageCollectorMXBean}
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 
 
/**
 * Created by jiaguotian on 14-1-13.
 */
15object RmiMonitor {
  def main(args: Array[String]) {
    val jmxUrl: JMXServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
   val connector: JMXConnector = JMXConnectorFactory.connect(jmxUrl)
    val mBeanServerconnection: MBeanServerConnection = connector.getMBeanServerConnection

   val domains: Array[String] = mBeanServerconnection.getDomains
    println("domains:")
   for (domain <- domains) {
     println("%25s:  %s".format("domain", domain))
    }

    println("-------------------------------")
   val beanSet: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(null, null)
   val beans: Array[ObjectInstance] = beanSet.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
    for (instance <- beans) {
     val objectName: ObjectName = instance.getObjectName
      println("%s %s".format(instance.getClassName, objectName))
    }

    println("-------------------------------")

  {
    val instance: ObjectName = ObjectName.getInstance("kafka:type=kafka.SocketServerStats")
     val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection,
       instance,
       classOf[SocketServerStatsMBean],
        true)
   println(instance.getCanonicalKeyPropertyListString)
     println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
     println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
     println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
     println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
     println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
     println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
     println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
     println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
     println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
     println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
    }
     println("-------------------------------");
    {
       val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
       ObjectName.getInstance("java.lang:type=Memory*"), null)
       val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
       for (name <- array) {
         val info: _root_.javax.management.MBeanInfo = mBeanServerconnection.getMBeanInfo(name)
         val attrInfos: Array[_root_.javax.management.MBeanAttributeInfo] = info.getAttributes
         println(name.toString)
         for (info <- attrInfos) {
           println(info.getName + " " + info.getType)
           info.getType match {
           case "javax.management.openmbean.CompositeData" =>
             val attribute: AnyRef = mBeanServerconnection.getAttribute(name, info.getName)
             val bean: MemoryUsage = MemoryUsage.from(attribute.asInstanceOf[CompositeData])
             println("%25s:  %s".format("Committed", bean.getCommitted()))
             println("%25s:  %s".format("Init", bean.getInit()))
             println("%25s:  %s".format("Max", bean.getMax()))
             println("%25s:  %s".format("Used", bean.getUsed()))
             case _ =>
          }
       }
     }
   }
   println("-------------------------------")

    {
     val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
        ObjectName.getInstance("java.lang:type=GarbageCollector,name=*"), null)
     val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
     for (next <- array) {
        val bean: GarbageCollectorMXBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, next, classOf[GarbageCollectorMXBean], true)
        println("%25s:  %s".format("Name", bean.getName()))
        println("%25s:  %s".format("MemoryPoolNames", bean.getMemoryPoolNames()))
        println("%25s:  %s".format("ObjectName", bean.getObjectName()))
        println("%25s:  %s".format("Class", bean.getClass()))
        println("%25s:  %s".format("CollectionCount", bean.getCollectionCount()))
        println("%25s:  %s".format("CollectionTime", bean.getCollectionTime()))
    }
  }


   val TypeValuePattern = "(.*):(.*)=(.*)".r
   val kafka1: ObjectName = new ObjectName("kafka", "type", "*")
   val kafka: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(kafka1, null)
   val kafkas: Array[ObjectInstance] = kafka.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
     for (instance <- kafkas) {
       val objectName: ObjectName = instance.getObjectName
      println(instance.getClassName + " " + objectName)

      objectName.getCanonicalName match {
        case TypeValuePattern(domain, t, v) =>
          instance.getClassName match {
            case "kafka.log.LogStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
            println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
            println("%25s:  %s".format("Name", bean.getName()))
            println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
            println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
            println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.message.LogFlushStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
            println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
            println("%25s:  %s".format("Name", bean.getName()))
            println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
            println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
            println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.network.SocketServerStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[SocketServerStatsMBean], true)
            println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
            println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
            println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
            println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
            println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
            println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
            println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
            println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
            println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
            println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
            println("%25s:  %s".format("TotalBytesRead", bean.getTotalBytesRead))
            case "kafka.server.BrokerTopicStat" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: BrokerTopicStatMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[BrokerTopicStatMBean], true)
            println("%25s:  %s".format("BytesIn", bean.getBytesIn))
            println("%25s:  %s".format("BytesOut", bean.getBytesOut))
            println("%25s:  %s".format("FailedFetchRequest", bean.getFailedFetchRequest))
            println("%25s:  %s".format("FailedProduceRequest", bean.getFailedProduceRequest))
            println("%25s:  %s".format("MessagesIn", bean.getMessagesIn))
            case _ =>
          }
        case _ =>
     }
    }
  }
}

輸出結果

domains:
                   domain:  JMImplementation
                   domain:  com.sun.management
                   domain:  kafka
                   domain:  java.nio
                   domain:  java.lang
                   domain:  java.util.logging
-------------------------------
com.sun.management.UnixOperatingSystem java.lang:type=OperatingSystem
javax.management.MBeanServerDelegate JMImplementation:type=MBeanServerDelegate
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
kafka.utils.Log4jController kafka:type=kafka.Log4jController
sun.management.ClassLoadingImpl java.lang:type=ClassLoading
sun.management.CompilationImpl java.lang:type=Compilation
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ParNew
sun.management.HotSpotDiagnostic com.sun.management:type=HotSpotDiagnostic
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=direct
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=mapped
sun.management.ManagementFactoryHelper$PlatformLoggingImpl java.util.logging:type=Logging
sun.management.MemoryImpl java.lang:type=Memory
sun.management.MemoryManagerImpl java.lang:type=MemoryManager,name=CodeCacheManager
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Survivor Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Perm Gen
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Eden Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Code Cache
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Old Gen
sun.management.RuntimeImpl java.lang:type=Runtime
sun.management.ThreadImpl java.lang:type=Threading
-------------------------------
type=kafka.SocketServerStats
     getAvgFetchRequestMs:  0.0
   getAvgProduceRequestMs:  0.0
    getBytesReadPerSecond:  0.0
 getBytesWrittenPerSecond:  0.0
getFetchRequestsPerSecond:  -0.0
     getMaxFetchRequestMs:  0.0
   getMaxProduceRequestMs:  0.0
      getNumFetchRequests:  0
    getNumProduceRequests:  0
getProduceRequestsPerSecond:  -0.0
-------------------------------
java.lang:type=Memory
HeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  3194421248
                  getInit:  3221225472
                   getMax:  3194421248
                  getUsed:  163302248
NonHeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  24313856
                  getInit:  24313856
                   getMax:  136314880
                  getUsed:  14854816
ObjectPendingFinalizationCount int
Verbose boolean
ObjectName javax.management.ObjectName
-------------------------------
                  getName:  ParNew
       getMemoryPoolNames:  [Ljava.lang.String;@23652209
            getObjectName:  java.lang:type=GarbageCollector,name=ParNew
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
                  getName:  ConcurrentMarkSweep
       getMemoryPoolNames:  [Ljava.lang.String;@2c61bbb7
            getObjectName:  java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
            CurrentOffset:  5519897
                     Name:  guoguo_t_1-1
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  5519897
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
            CurrentOffset:  7600338
                     Name:  guoguo_t_1-0
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  7600338
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
       BytesReadPerSecond:  0.0
        AvgFetchRequestMs:  0.0
      AvgProduceRequestMs:  0.0
    BytesWrittenPerSecond:  0.0
   FetchRequestsPerSecond:  -0.0
        MaxFetchRequestMs:  0.0
      MaxProduceRequestMs:  0.0
         NumFetchRequests:  0
       NumProduceRequests:  0
 ProduceRequestsPerSecond:  -0.0
           TotalBytesRead:  0
kafka.utils.Log4jController kafka:type=kafka.Log4jController
相關文章
相關標籤/搜索