1 def main(args: Array[String]): Unit = { 2 3 try { 4 //經過args讀取properties 5 val serverProps = getPropsFromArgs(args) 6 val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) 7 8 // 增長shutdown方法 9 Runtime.getRuntime().addShutdownHook(new Thread() { 10 override def run() = { 11 12 kafkaServerStartable.shutdown 13 } 14 }) 15 16 kafkaServerStartable.startup 17 kafkaServerStartable.awaitShutdown 18 } 19 catch { 20 case e: Throwable => 21 fatal(e) 22 System.exit(1) 23 } 24 System.exit(0) 25 }
1 def startup() { 2 try { 3 server.startup() 4 } 5 catch { 6 case e: Throwable => 7 fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) 8 System.exit(1) 9 } 10 }
1 /** 2 * 啓動接口 3 * 生成Kafka server實例 4 * 實例化LogManager、SocketServer和KafkaRequestHandlers 5 */ 6 def startup() { 7 try { 8 9 if (isShuttingDown.get) 10 throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") 11 12 if (startupComplete.get) 13 return 14 15 val canStartup = isStartingUp.compareAndSet(false, true) 16 if (canStartup) { 17 brokerState.newState(Starting) 18 19 /* start scheduler */ 20 kafkaScheduler.startup() 21 22 /* setup zookeeper */ 23 zkUtils = initZk() 24 25 /* Get or create cluster_id */ 26 _clusterId = getOrGenerateClusterId(zkUtils) 27 info(s"Cluster ID = $clusterId") 28 29 /* generate brokerId */ 30 config.brokerId = getBrokerId 31 this.logIdent = "[Kafka Server " + config.brokerId + "], " 32 33 /* create and configure metrics */ 34 val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter], 35 Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava) 36 reporters.add(new JmxReporter(jmxPrefix)) 37 val metricConfig = KafkaServer.metricConfig(config) 38 metrics = new Metrics(metricConfig, reporters, time, true) 39 40 quotaManagers = QuotaFactory.instantiate(config, metrics, time) 41 notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) 42 43 /* start log manager */ 44 logManager = createLogManager(zkUtils.zkClient, brokerState) 45 logManager.startup() 46 47 metadataCache = new MetadataCache(config.brokerId) 48 credentialProvider = new CredentialProvider(config.saslEnabledMechanisms) 49 50 socketServer = new SocketServer(config, metrics, time, credentialProvider) 51 socketServer.startup() 52 53 /* start replica manager */ 54 replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, 55 isShuttingDown, quotaManagers.follower) 56 replicaManager.startup() 57 58 /* start kafka controller */ 59 kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix) 60 kafkaController.startup() 61 62 adminManager = new AdminManager(config, metrics, metadataCache, zkUtils) 63 64 /* start group coordinator */ 65 // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue 66 groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM) 67 groupCoordinator.startup() 68 69 /* Get the authorizer and initialize it if one is specified.*/ 70 authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => 71 val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) 72 authZ.configure(config.originals()) 73 authZ 74 } 75 76 /* start processing requests */ 77 apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, 78 kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, 79 clusterId, time) 80 81 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, 82 config.numIoThreads) 83 84 Mx4jLoader.maybeLoad() 85 86 /* start dynamic config manager */ 87 dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), 88 ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), 89 ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), 90 ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) 91 92 // Create the config manager. start listening to notifications 93 dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) 94 dynamicConfigManager.startup() 95 96 /* tell everyone we are alive */ 97 val listeners = config.advertisedListeners.map { endpoint => 98 if (endpoint.port == 0) 99 endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) 100 else 101 endpoint 102 } 103 kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, 104 config.interBrokerProtocolVersion) 105 kafkaHealthcheck.startup() 106 107 // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it 108 checkpointBrokerId(config.brokerId) 109 110 /* register broker metrics */ 111 registerStats() 112 113 brokerState.newState(RunningAsBroker) 114 shutdownLatch = new CountDownLatch(1) 115 startupComplete.set(true) 116 isStartingUp.set(false) 117 AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString) 118 info("started") 119 } 120 } 121 catch { 122 case e: Throwable => 123 fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) 124 isStartingUp.set(false) 125 shutdown() 126 throw e 127 } 128 }
1 /** 2 * KafkaScheduler是一個基於java.util.concurrent.ScheduledThreadPoolExecutor的scheduler 3 * 它內部是之前綴kafka-scheduler-xx的線程池處理真正的工做 4 * 5 * @param threads 線程池裏線程的數量 6 * @param threadNamePrefix 使用時的線程名稱,這個前綴將有一個附加的數字 7 * @param daemon 若是爲true,線程將是守護線程,而且不會阻塞jvm關閉 8 */ 9 @threadsafe 10 class KafkaScheduler(val threads: Int, 11 val threadNamePrefix: String = "kafka-scheduler-", 12 daemon: Boolean = true) extends Scheduler with Logging { 13 private var executor: ScheduledThreadPoolExecutor = null 14 private val schedulerThreadId = new AtomicInteger(0) 15 16 override def startup() { 17 debug("Initializing task scheduler.") 18 this synchronized { 19 if (isStarted) 20 throw new IllegalStateException("This scheduler has already been started!") 21 executor = new ScheduledThreadPoolExecutor(threads) 22 executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) 23 executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) 24 executor.setThreadFactory(new ThreadFactory() { 25 def newThread(runnable: Runnable): Thread = 26 Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) 27 }) 28 } 29 }
1 // 鏈接到zk服務器;建立通用節點 2 val zkUtils = ZkUtils(config.zkConnect, 3 sessionTimeout = config.zkSessionTimeoutMs, 4 connectionTimeout = config.zkConnectionTimeoutMs, 5 secureAclsEnabled) 6 zkUtils.setupCommonPaths()
1 // 這些是在kafka代理啓動時應該存在的路徑 2 val persistentZkPaths = Seq(ConsumersPath, 3 BrokerIdsPath, 4 BrokerTopicsPath, 5 ConfigChangesPath, 6 getEntityConfigRootPath(ConfigType.Topic), 7 getEntityConfigRootPath(ConfigType.Client), 8 DeleteTopicsPath, 9 BrokerSequenceIdPath, 10 IsrChangeNotificationPath)
1 /** 2 * 啓動後臺線程,負責log的建立,檢索及清理 3 */ 4 def startup() { 5 /* Schedule the cleanup task to delete old logs */ 6 if (scheduler != null) { 7 info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) 8 scheduler.schedule("kafka-log-retention", 9 cleanupLogs, 10 delay = InitialTaskDelayMs, 11 period = retentionCheckMs, 12 TimeUnit.MILLISECONDS) 13 info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) 14 scheduler.schedule("kafka-log-flusher", 15 flushDirtyLogs, 16 delay = InitialTaskDelayMs, 17 period = flushCheckMs, 18 TimeUnit.MILLISECONDS) 19 scheduler.schedule("kafka-recovery-point-checkpoint", 20 checkpointRecoveryPointOffsets, 21 delay = InitialTaskDelayMs, 22 period = flushCheckpointMs, 23 TimeUnit.MILLISECONDS) 24 scheduler.schedule("kafka-delete-logs", 25 deleteLogs, 26 delay = InitialTaskDelayMs, 27 period = defaultConfig.fileDeleteDelayMs, 28 TimeUnit.MILLISECONDS) 29 } 30 if (cleanerConfig.enableCleaner) 31 cleaner.startup() 32 }
1 /** 2 * SocketServer是socket服務器, 3 * 線程模型是:1個Acceptor線程處理新鏈接,Acceptor還有多個處理器線程,每一個處理器線程擁有本身的選擇器和多個讀socket請求Handler線程。 4 * handler線程處理請求併產生響應寫給處理器線程 5 */ 6 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
1 def startup() { 2 // 啓動ISR過時線程 3 // 一個follower能夠在配置上落後於leader。在它被從ISR中移除以前,複製 4 scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) 5 scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) 6 }
當kafka 服務器的控制器模塊啓動時激活socket
1 def startup() = { 2 inLock(controllerContext.controllerLock) { 3 info("Controller starting up") 4 registerSessionExpirationListener() 5 isRunning = true 6 controllerElector.startup 7 info("Controller startup complete") 8 } 9 }
1 private def registerSessionExpirationListener() = { 2 zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener()) 3 } 4 public void subscribeStateChanges(final IZkStateListener listener) { 5 synchronized (_stateListener) { 6 _stateListener.add(listener); 7 } 8 } 9 10 11 class SessionExpirationListener() extends IZkStateListener with Logging { 12 this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " 13 @throws(classOf[Exception]) 14 def handleStateChanged(state: KeeperState) { 15 // do nothing, since zkclient will do reconnect for us. 16 }
def startup { inLock(controllerContext.controllerLock) { controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } } def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) leaderId = getControllerID /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ if(leaderId != -1) { debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) return amILeader } try { val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled()) zkCheckedEphemeral.create() info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() } catch { case e: ZkNodeExistsException => // If someone else has written the path, then leaderId = getControllerID if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) else warn("A leader has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() } amILeader } def amILeader : Boolean = leaderId == brokerId
1 def startup() { 2 info("Starting up.") 3 heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) 4 joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) 5 isActive.set(true) 6 info("Startup complete.") 7 }
1 /** 2 * Top-level method that handles all requests and multiplexes to the right api 3 */ 4 def handle(request: RequestChannel.Request) { 5 try{ 6 trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". 7 format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal)) 8 request.requestId match { 9 case RequestKeys.ProduceKey => handleProducerRequest(request) 10 case RequestKeys.FetchKey => handleFetchRequest(request) 11 case RequestKeys.OffsetsKey => handleOffsetRequest(request) 12 case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) 13 case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) 14 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) 15 case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) 16 case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) 17 case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) 18 case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) 19 case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request) 20 case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request) 21 case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request) 22 case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request) 23 case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request) 24 case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request) 25 case RequestKeys.ListGroupsKey => handleListGroupsRequest(request) 26 case requestId => throw new KafkaException("Unknown api code " + requestId) 27 } 28 } catch { 29 case e: Throwable => 30 if ( request.requestObj != null) 31 request.requestObj.handleError(e, requestChannel, request) 32 else { 33 val response = request.body.getErrorResponse(request.header.apiVersion, e) 34 val respHeader = new ResponseHeader(request.header.correlationId) 35 36 /* If request doesn't have a default error response, we just close the connection. 37 For example, when produce request has acks set to 0 */ 38 if (response == null) 39 requestChannel.closeConnection(request.processor, request) 40 else 41 requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) 42 } 43 error("error when handling request %s".format(request.requestObj), e) 44 } finally 45 request.apiLocalCompleteTimeMs = SystemTime.milliseconds 46 }
1 /** 2 * Handle a produce request 3 */ 4 def handleProducerRequest(request: RequestChannel.Request) { 5 val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] 6 val numBytesAppended = produceRequest.sizeInBytes 7 8 val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { 9 case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) 10 } 11 12 // the callback for sending a produce response 13 def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { 14 15 val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) 16 17 var errorInResponse = false 18 19 mergedResponseStatus.foreach { case (topicAndPartition, status) => 20 if (status.error != ErrorMapping.NoError) { 21 errorInResponse = true 22 debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( 23 produceRequest.correlationId, 24 produceRequest.clientId, 25 topicAndPartition, 26 ErrorMapping.exceptionNameFor(status.error))) 27 } 28 } 29 30 def produceResponseCallback(delayTimeMs: Int) { 31 32 if (produceRequest.requiredAcks == 0) { 33 // no operation needed if producer request.required.acks = 0; however, if there is any error in handling 34 // the request, since no response is expected by the producer, the server will close socket server so that 35 // the producer client will know that some error has happened and will refresh its metadata 36 if (errorInResponse) { 37 val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => 38 topicAndPartition -> ErrorMapping.exceptionNameFor(status.error) 39 }.mkString(", ") 40 info( 41 s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + 42 s"from client id ${produceRequest.clientId} with ack=0\n" + 43 s"Topic and partition to exceptions: $exceptionsSummary" 44 ) 45 requestChannel.closeConnection(request.processor, request) 46 } else { 47 requestChannel.noOperation(request.processor, request) 48 } 49 } else { 50 val response = ProducerResponse(produceRequest.correlationId, 51 mergedResponseStatus, 52 produceRequest.versionId, 53 delayTimeMs) 54 requestChannel.sendResponse(new RequestChannel.Response(request, 55 new RequestOrResponseSend(request.connectionId, 56 response))) 57 } 58 } 59 60 // When this callback is triggered, the remote API call has completed 61 request.apiRemoteCompleteTimeMs = SystemTime.milliseconds 62 63 quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, 64 numBytesAppended, 65 produceResponseCallback) 66 } 67 68 if (authorizedRequestInfo.isEmpty) 69 sendResponseCallback(Map.empty) 70 else { 71 val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId 72 73 // call the replica manager to append messages to the replicas 74 replicaManager.appendMessages( 75 produceRequest.ackTimeoutMs.toLong, 76 produceRequest.requiredAcks, 77 internalTopicsAllowed, 78 authorizedRequestInfo, 79 sendResponseCallback) 80 81 // if the request is put into the purgatory, it will have a held reference 82 // and hence cannot be garbage collected; hence we clear its data here in 83 // order to let GC re-claim its memory since it is already appended to log 84 produceRequest.emptyData() 85 } 86 }
1 /** 2 * Begin watching for config changes 3 */ 4 def startup() { 5 zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath) 6 zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) 7 processAllConfigChanges() 8 } 9 10 /** 11 * Process all config changes 12 */ 13 private def processAllConfigChanges() { 14 val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath) 15 import JavaConversions._ 16 processConfigChanges((configChanges: mutable.Buffer[String]).sorted) 17 } 18 19 /** 20 * Process the given list of config changes 21 */ 22 private def processConfigChanges(notifications: Seq[String]) { 23 if (notifications.size > 0) { 24 info("Processing config change notification(s)...") 25 val now = time.milliseconds 26 for (notification <- notifications) { 27 val changeId = changeNumber(notification) 28 29 if (changeId > lastExecutedChange) { 30 val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification 31 32 val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode) 33 processNotification(jsonOpt) 34 } 35 lastExecutedChange = changeId 36 } 37 purgeObsoleteNotifications(now, notifications) 38 } 39 }
1 def startup() { 2 zkUtils.zkClient.subscribeStateChanges(sessionExpireListener) 3 register() 4 } 5 6 /** 7 * Register this broker as "alive" in zookeeper 8 */ 9 def register() { 10 val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt 11 val updatedEndpoints = advertisedEndpoints.mapValues(endpoint => 12 if (endpoint.host == null || endpoint.host.trim.isEmpty) 13 EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) 14 else 15 endpoint 16 ) 17 18 // the default host and port are here for compatibility with older client 19 // only PLAINTEXT is supported as default 20 // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect 21 val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) 22 zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) 23 }
ReplicaManager 副本管理
KafkaApis 處理全部request的Proxy類,根據requestKey決定調用具體的handler
KafkaRequestHandlerPool 處理request的線程池,請求處理池
LogManager kafka文件存儲系統管理,負責處理和存儲全部Kafka的topic的partiton數據
TopicConfigManager 監聽此zk節點的⼦子節點/config/changes/,經過LogManager更新topic的配置信息,topic粒度配置管理
KafkaHealthcheck 監聽zk session expire,在zk上建立broker信息,便於其餘broker和consumer獲取其信息
KafkaController kafka集羣中央控制器選舉,leader選舉,副本分配。
KafkaScheduler 負責副本管理和日誌管理調度等等
ZkClient 負責註冊zk相關信息.
BrokerTopicStats topic信息統計和監控
ControllerStats 中央控制器統計和監控