通常來講,咱們是經過命令來啓動kafka,可是命令的本質仍是調用代碼中的main方法,因此,咱們重點看下啓動類Kafka。源碼下下來以後,咱們也能夠經過直接運行Kafka.scala中的main方法(須要指定啓動參數,也就是server.properties的位置)來啓動Kafka。由於kafka依賴zookeeper,因此咱們須要提早啓動zookeeper,而後在server.properties中指定zk地址後,啓動。java
下面咱們首先看一下main()方法:bootstrap
def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { kafkaServerStartable.shutdown } }) kafkaServerStartable.startup kafkaServerStartable.awaitShutdown } catch { case e: Throwable => fatal(e) System.exit(1) } System.exit(0) }
咱們慢慢來分析下,首先是getPropsFromArgs(args),這一行很明確,就是從配置文件中讀取咱們配置的內容,而後賦值給serverProps。第二步,KafkaServerStartable.fromProps(serverProps),api
object KafkaServerStartable { def fromProps(serverProps: Properties) = { KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) new KafkaServerStartable(KafkaConfig.fromProps(serverProps)) } }
這塊主要是啓動了一個內部的監控服務(內部狀態監控)。安全
下面是一個在java中常見的鉤子函數,在關閉時會啓動一些銷燬程序,保證程序安全關閉。以後就是咱們啓動的重頭戲了:kafkaServerStartable.startup。跟進去能夠很清楚的看到,裏面調用的方法是KafkaServer中的startup方法,下面咱們重點看下這個方法(比較長):session
def startup() { try { info("starting") if(isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") if(startupComplete.get) return val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true) brokerState.newState(Starting) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkUtils = initZk() /* start log manager */ logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup() /* generate brokerId */ config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup() /* start replica manager */ replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() /* start kafka controller */ kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup() /* start group coordinator */ groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime) groupCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) authZ } /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config), ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)) // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides // TODO: Move this logic to DynamicConfigManager AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) } // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() /* tell everyone we are alive */ val listeners = config.advertisedListeners.map {case(protocol, endpoint) => if (endpoint.port == 0) (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType)) else (protocol, endpoint) } kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion) kafkaHealthcheck.startup() // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it checkpointBrokerId(config.brokerId) /* register broker metrics */ registerStats() shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString) info("started") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) isStartingUp.set(false) shutdown() throw e } }
首先判斷是否目前正在關閉中或者已經啓動了,這兩種狀況直接拋出異常。而後是一個CAS的操做isStartingUp,防止線程併發操做啓動,判斷是否能夠啓動。若是能夠啓動,就開始咱們的啓動過程。併發
/** * Start the background threads to flush logs and do log cleanup */ def startup() { /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", flushDirtyLogs, delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs, period = flushCheckpointMs, TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() }