直接運行Kafka.scala中的main方法(須要指定啓動參數,也就是server.properties的位置)來啓動Kafka。由於kafka依賴zookeeper,因此咱們須要提早啓動zookeeper,而後在server.properties中指定zk地址後,啓動。java
看一下main()方法:bootstrap
def main(args: Array[String]): Unit = {
try {
// 加載對應的server.properties配置文件,並生成Properties實例.
val serverProps = getPropsFromArgs(args)
//這裏生成一個KafkaServer的實例,這個實例生成時,會在實例中同時生成一個KafkaServer的實例,
// 生成KafkaServer實例前,須要先經過serverProps生成出一個KafkaConfig的實例.
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
// 中止 服務
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
複製代碼
getPropsFromArgs(args),這一行很明確,就是從配置文件中讀取咱們配置的內容,而後賦值給serverProps。 KafkaServerStartable.fromProps(serverProps),api
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
}
}
複製代碼
這塊主要是啓動了一個內部的監控服務(內部狀態監控)。安全
下面是一個在java中常見的鉤子函數,在關閉時會啓動一些銷燬程序,保證程序安全關閉。kafkaServerStartable.startup。跟進去能夠很清楚的看到,裏面調用的方法是KafkaServer中的startup方法:bash
// 啓動kafka的調度器,這個KafkaScheduler的實例生成時須要獲得background.threads配置的值,默認是10個,用於配置後臺線程池的個數
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
brokerState.newState(Starting)
// 啓動scheduler 實例
/* start scheduler */
kafkaScheduler.startup()
// 生產zk 初始化 並依賴 判斷 broker 是否發生變化
/* setup zookeeper */
zkUtils = initZk()
// 初始化建立並啓動LogManager的實例,
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
// 若是broker.id的配置沒有配置(小於0的值時),同時broker.id.generation.enable配置爲true,默認也就是true,
// 這個時候根據zk中/brokers/seqid路徑的version值,第一次從0開始,每次增長.並加上reserved.broker.max.id配置的值,默認是1000,
//來充當這個server的broker.id,同時把這個broker.id更新到logDir目錄下的meta.properties文件中,
//下次讀取時,直接讀取這個配置文件中的broker.id的值,而不須要從新進行建立.
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
// 啓動 kafka 的sockerServer
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
//,生成並啓動ReplicaManager,此實例依賴kafkaScheduler與logManager實例.
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
//生成並啓動KafkaController實例,此使用用於控制當前的broker中的全部的leader的partition的操做.
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
//生成並啓動GroupCoordinator的實例,這個是0.9新加入的一個玩意,用於對consumer中新加入的與partition的檢查,並對partition與consumer進行平衡操做.
/* start group coordinator */
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
groupCoordinator.startup()
// 根據authorizer.class.name配置項配置的Authorizer的實現類,生成一個用於認證的實例,用於對用戶的操做進行認證.這個默認爲不認證.
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
// 成用於對外對外提供服務的KafkaApis實例,並設置當前的broker的狀態爲運行狀態
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
//生成動態配置修改的處理管理,主要是topic修改與client端配置的修改,並把已經存在的clientid對應的配置進行修改.
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// 建立一個配置實例 併發起通知給個個block
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
複製代碼
首先判斷是否目前正在關閉中或者已經啓動了,這兩種狀況直接拋出異常。而後是一個CAS的操做isStartingUp,防止線程併發操做啓動,判斷是否能夠啓動。若是能夠啓動,就開始咱們的啓動過程。併發
構造Metrics類 定義broker狀態爲啓動中starting 啓動定時器kafkaScheduler.startup() 構造zkUtils:利用參數中的zk信息,啓動一個zk客戶端 啓動文件管理器:讀取zk中的配置信息,包含__consumer_offsets和__system.topic__。重點是啓動一些定時任務,來刪除符合條件的記錄(cleanupLogs),清理髒記錄(flushDirtyLogs),把全部記錄寫到一個文本文件中,防止在啓動時重啓全部的記錄文件(checkpointRecoveryPointOffsets)。socket
/**
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
複製代碼