【Kafka源碼】Kafka啓動過程

通常來講,咱們是經過命令來啓動kafka,可是命令的本質仍是調用代碼中的main方法,因此,咱們重點看下啓動類Kafka。源碼下下來以後,咱們也能夠經過直接運行Kafka.scala中的main方法(須要指定啓動參數,也就是server.properties的位置)來啓動Kafka。由於kafka依賴zookeeper,因此咱們須要提早啓動zookeeper,而後在server.properties中指定zk地址後,啓動。java

下面咱們首先看一下main()方法:bootstrap

def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown
        }
      })

      kafkaServerStartable.startup
      kafkaServerStartable.awaitShutdown
    }
    catch {
      case e: Throwable =>
        fatal(e)
        System.exit(1)
    }
    System.exit(0)
  }

咱們慢慢來分析下,首先是getPropsFromArgs(args),這一行很明確,就是從配置文件中讀取咱們配置的內容,而後賦值給serverProps。第二步,KafkaServerStartable.fromProps(serverProps),api

object KafkaServerStartable {
  def fromProps(serverProps: Properties) = {
    KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
    new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
  }
}

這塊主要是啓動了一個內部的監控服務(內部狀態監控)。安全

下面是一個在java中常見的鉤子函數,在關閉時會啓動一些銷燬程序,保證程序安全關閉。以後就是咱們啓動的重頭戲了:kafkaServerStartable.startup。跟進去能夠很清楚的看到,裏面調用的方法是KafkaServer中的startup方法,下面咱們重點看下這個方法(比較長):session

def startup() {
    try {
      info("starting")

      if(isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if(startupComplete.get)
        return

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

        brokerState.newState(Starting)

        /* start scheduler */
        kafkaScheduler.startup()

        /* setup zookeeper */
        zkUtils = initZk()

        /* start log manager */
        logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup()

        /* generate brokerId */
        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "

        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()

        /* start replica manager */
        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
          isShuttingDown)
        replicaManager.startup()

        /* start kafka controller */
        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
        kafkaController.startup()

        /* start group coordinator */
        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
        groupCoordinator.startup()

        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

        /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
        brokerState.newState(RunningAsBroker)

        Mx4jLoader.maybeLoad()

        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
        // TODO: Move this logic to DynamicConfigManager
        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
        }

        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
        dynamicConfigManager.startup()

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
        checkpointBrokerId(config.brokerId)

        /* register broker metrics */
        registerStats()

        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
  }

首先判斷是否目前正在關閉中或者已經啓動了,這兩種狀況直接拋出異常。而後是一個CAS的操做isStartingUp,防止線程併發操做啓動,判斷是否能夠啓動。若是能夠啓動,就開始咱們的啓動過程。併發

  • 構造Metrics類
  • 定義broker狀態爲啓動中starting
  • 啓動定時器kafkaScheduler.startup()
  • 構造zkUtils:利用參數中的zk信息,啓動一個zk客戶端
  • 啓動文件管理器:讀取zk中的配置信息,包含__consumer_offsets和__system.topic__。重點是啓動一些定時任務,來刪除符合條件的記錄(cleanupLogs),清理髒記錄(flushDirtyLogs),把全部記錄寫到一個文本文件中,防止在啓動時重啓全部的記錄文件(checkpointRecoveryPointOffsets)。
/**
   *  Start the background threads to flush logs and do log cleanup
   */
  def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }
  • 下一步,獲取brokerId
  • 啓動一個NIO socket服務
  • 啓動複製管理器:啓動ISR超時處理線程
  • 啓動kafka控制器:註冊session過時監聽器,同時啓動控制器leader選舉
  • 啓動協調器
  • 權限認證
  • 開啓線程,開始處理請求
  • 開啓配置監聽,主要是監聽zk節點數據變化,而後廣播到全部機器
  • 開啓健康檢查:目前只是把broker節點註冊到zk上,註冊成功就是活的,不然就是dead
  • 註冊啓動數據信息
  • 啓動成功
  • 等待關閉countDownLatch,若是shutdownLatch變爲0,則關閉Kafka
相關文章
相關標籤/搜索