0. 關鍵概念html
關鍵概念java
Concepts | Function |
---|---|
Topic | 用於劃分Message的邏輯概念,一個Topic能夠分佈在多個Broker上。 |
Partition | 是Kafka中橫向擴展和一切並行化的基礎,每一個Topic都至少被切分爲1個Partition。 |
Offset | 消息在Partition中的編號,編號順序不跨Partition(在Partition內有序)。 |
Consumer | 用於從Broker中取出/消費Message。 |
Producer | 用於往Broker中發送/生產Message。 |
Replication | Kafka支持以Partition爲單位對Message進行冗餘備份,每一個Partition均可以配置至少1個Replication(當僅1個Replication時即僅該Partition自己)。 |
Leader | 每一個Replication集合中的Partition都會選出一個惟一的Leader,全部的讀寫請求都由Leader處理。其餘Replicas從Leader處把數據更新同步到本地。 |
Broker | Kafka中使用Broker來接受Producer和Consumer的請求,並把Message持久化到本地磁盤。每一個Cluster當中會選舉出一個Broker來擔任Controller,負責處理Partition的Leader選舉,協調Partition遷移等工做。 |
ISR | In-Sync Replica,是Replicas的一個子集,表示目前Alive且與Leader可以「Catch-up」的Replicas集合。因爲讀寫都是首先落到Leader上,因此通常來講經過同步機制從Leader上拉取數據的Replica都會和Leader有一些延遲(包括了延遲時間和延遲條數兩個維度),任意一個超過閾值都會把該Replica踢出ISR。每一個Leader Partition都有它本身獨立的ISR。 |
1. 分析kafka源碼的目的node
深刻掌握kafka的內部原理git
深刻掌握scala運用json
2. server的啓動bootstrap
以下所示(原本準備用時序圖的,但感受時序圖沒有思惟圖更能反映,故採用了思惟圖):api
2.1 啓動入口Kafka.scala服務器
從上面的思惟導圖,能夠看到Kafka的啓動入口是Kafka.scala的main()函數:網絡
def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread() { override def run() = { kafkaServerStartable.shutdown } }) kafkaServerStartable.startup kafkaServerStartable.awaitShutdown } catch { case e: Throwable => fatal(e) System.exit(1) } System.exit(0) }
上面代碼主要包含:session
從配置文件讀取kafka服務器啓動參數的getPropsFromArgs()方法;
建立KafkaServerStartable對象;
KafkaServerStartable對象在增長shutdown句柄函數;
啓動KafkaServerStartable的starup()方法;
啓動KafkaServerStartable的awaitShutdown()方法;
2.2 KafkaServer的包裝類KafkaServerStartable
private val server = new KafkaServer(serverConfig) def startup() { try { server.startup() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code System.exit(1) } }
2.3 具體啓動類KafkaServer
KafkaServer啓動的代碼層次比較清晰,加上註釋,看懂基本沒有問題:
/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { try { info("starting") if(isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") if(startupComplete.get) return val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true) brokerState.newState(Starting) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkUtils = initZk() /* start log manager */ logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup() /* generate brokerId */ config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup() /* start replica manager */ replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() /* start kafka controller */ kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup() /* start kafka coordinator */ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager) consumerCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) authZ } /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)) // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides // TODO: Move this logic to DynamicConfigManager AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) } // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() /* tell everyone we are alive */ val listeners = config.advertisedListeners.map {case(protocol, endpoint) => if (endpoint.port == 0) (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType)) else (protocol, endpoint) } kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils) kafkaHealthcheck.startup() /* register broker metrics */ registerStats() shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString) info("started") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) isStartingUp.set(false) shutdown() throw e } }
2.3.1 KafkaScheduler
KafkaScheduler是一個基於java.util.concurrent.ScheduledThreadPoolExecutor的scheduler,它內部是之前綴kafka-scheduler-xx的線程池處理真正的工做。
注意xx是線程序列號。
/** * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor * * It has a pool of kafka-scheduler- threads that do the actual work. * * @param threads The number of threads in the thread pool * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it. * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown. */ @threadsafe class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-", daemon: Boolean = true) extends Scheduler with Logging { private var executor: ScheduledThreadPoolExecutor = null private val schedulerThreadId = new AtomicInteger(0) override def startup() { debug("Initializing task scheduler.") this synchronized { if(isStarted) throw new IllegalStateException("This scheduler has already been started!") executor = new ScheduledThreadPoolExecutor(threads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) executor.setThreadFactory(new ThreadFactory() { def newThread(runnable: Runnable): Thread = Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) }) } }
2.3.2 zk初始化
zk初始化主要完成兩件事情:
val zkUtils = ZkUtils(config.zkConnect,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
secureAclsEnabled)
zkUtils.setupCommonPaths()
一個是鏈接到zk服務器;二是建立通用節點。
通用節點包括:
// These are persistent ZK paths that should exist on kafka broker startup. val persistentZkPaths = Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, EntityConfigChangesPath, getEntityConfigRootPath(ConfigType.Topic), getEntityConfigRootPath(ConfigType.Client), DeleteTopicsPath, BrokerSequenceIdPath, IsrChangeNotificationPath)
2.3.3 日誌管理器LogManager
LogManager是kafka的子系統,負責log的建立,檢索及清理。全部的讀寫操做由單個的日誌實例來代理。
/** * Start the background threads to flush logs and do log cleanup */ def startup() { /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", flushDirtyLogs, delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", checkpointRecoveryPointOffsets, delay = InitialTaskDelayMs, period = flushCheckpointMs, TimeUnit.MILLISECONDS) } if(cleanerConfig.enableCleaner) cleaner.startup() }
2.3.4 SocketServer
SocketServer是nio的socket服務器,線程模型是:1個Acceptor線程處理新鏈接,Acceptor還有多個處理器線程,每一個處理器線程擁有本身的selector和多個讀socket請求Handler線程。handler線程處理請求併產生響應寫給處理器線程。
/** * Start the socket server */ def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val maxRequestSize = config.socketRequestMaxBytes val connectionsMaxIdleMs = config.connectionsMaxIdleMs val brokerId = config.brokerId var processorBeginIndex = 0 endpoints.values.foreach { endpoint => val protocol = endpoint.protocolType val processorEndIndex = processorBeginIndex + numProcessorThreads for (i <- processorBeginIndex until processorEndIndex) { processors(i) = new Processor(i, time, maxRequestSize, requestChannel, connectionQuotas, connectionsMaxIdleMs, protocol, config.values, metrics ) } val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } } newGauge("NetworkProcessorAvgIdlePercent", new Gauge[Double] { def value = allMetricNames.map( metricName => metrics.metrics().get(metricName).value()).sum / totalProcessorThreads } ) info("Started " + acceptors.size + " acceptor threads") }
2.3.5 複製管理器
啓動ISR過時線程
def startup() { // start ISR expiration thread scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) }
2.3.6 kafka控制器
當kafka 服務器的控制器模塊啓動時激活,但並不認爲當前的代理就是控制器。它僅僅註冊了session過時監聽器和啓動控制器選主。
def startup() = { inLock(controllerContext.controllerLock) { info("Controller starting up") registerSessionExpirationListener() isRunning = true controllerElector.startup info("Controller startup complete") } }
session過時監聽器註冊:
private def registerSessionExpirationListener() = { zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener()) } public void subscribeStateChanges(final IZkStateListener listener) { synchronized (_stateListener) { _stateListener.add(listener); } }
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
選主過程:
def startup { inLock(controllerContext.controllerLock) { controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } } def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) leaderId = getControllerID /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ if(leaderId != -1) { debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) return amILeader } try { val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled()) zkCheckedEphemeral.create() info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() } catch { case e: ZkNodeExistsException => // If someone else has written the path, then leaderId = getControllerID if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) else warn("A leader has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() } amILeader } def amILeader : Boolean = leaderId == brokerId
2.3.7 GroupCoordinator
GroupCoordinator處理組成員管理和offset管理,每一個kafka服務器初始化一個協做器來負責一系列組別。每組基於它們的組名來賦予協做器。
def startup() { info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) isActive.set(true) info("Startup complete.") }
注意:若同時須要一個組鎖和元數據鎖,請務必保證先獲取組鎖,而後獲取元數據鎖來防止死鎖。
2.3.8 KafkaApis消息處理接口
/** * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { try{ trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal)) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request) case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request) case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request) case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request) case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request) case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request) case RequestKeys.ListGroupsKey => handleListGroupsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => if ( request.requestObj != null) request.requestObj.handleError(e, requestChannel, request) else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId) /* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
咱們以處理消費者請求爲例:
/** * Handle a produce request */ def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] val numBytesAppended = produceRequest.sizeInBytes val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) var errorInResponse = false mergedResponseStatus.foreach { case (topicAndPartition, status) => if (status.error != ErrorMapping.NoError) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( produceRequest.correlationId, produceRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) } } def produceResponseCallback(delayTimeMs: Int) { if (produceRequest.requiredAcks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata if (errorInResponse) { val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) => topicAndPartition -> ErrorMapping.exceptionNameFor(status.error) }.mkString(", ") info( s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " + s"from client id ${produceRequest.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) requestChannel.closeConnection(request.processor, request) } else { requestChannel.noOperation(request.processor, request) } } else { val response = ProducerResponse(produceRequest.correlationId, mergedResponseStatus, produceRequest.versionId, delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } } // When this callback is triggered, the remote API call has completed request.apiRemoteCompleteTimeMs = SystemTime.milliseconds quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback) } if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId // call the replica manager to append messages to the replicas replicaManager.appendMessages( produceRequest.ackTimeoutMs.toLong, produceRequest.requiredAcks, internalTopicsAllowed, authorizedRequestInfo, sendResponseCallback) // if the request is put into the purgatory, it will have a held reference // and hence cannot be garbage collected; hence we clear its data here in // order to let GC re-claim its memory since it is already appended to log produceRequest.emptyData() } }
對應kafka producer的acks配置:
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
2.3.9 動態配置管理DynamicConfigManager
利用zookeeper作動態配置中心
/** * Begin watching for config changes */ def startup() { zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath) zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) processAllConfigChanges() } /** * Process all config changes */ private def processAllConfigChanges() { val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath) import JavaConversions._ processConfigChanges((configChanges: mutable.Buffer[String]).sorted) } /** * Process the given list of config changes */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { info("Processing config change notification(s)...") val now = time.milliseconds for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode) processNotification(jsonOpt) } lastExecutedChange = changeId } purgeObsoleteNotifications(now, notifications) } }
2.3.10 心跳檢測KafkaHealthcheck
心跳檢測也使用zookeeper維持:
def startup() { zkUtils.zkClient.subscribeStateChanges(sessionExpireListener) register() } /** * Register this broker as "alive" in zookeeper */ def register() { val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt val updatedEndpoints = advertisedEndpoints.mapValues(endpoint => if (endpoint.host == null || endpoint.host.trim.isEmpty) EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) else endpoint ) // the default host and port are here for compatibility with older client // only PLAINTEXT is supported as default // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) }
3. 小結
kafka中KafkaServer類,採用門面模式,是網絡處理,io處理等得入口.
ReplicaManager 副本管理
KafkaApis 處理全部request的Proxy類,根據requestKey決定調⽤用具體的handler
KafkaRequestHandlerPool 處理request的線程池,請求處理池 <-- num.io.threads io線程數量
LogManager kafka文件存儲系統管理,負責處理和存儲全部Kafka的topic的partiton數據
TopicConfigManager 監聽此zk節點的⼦子節點/config/changes/,經過LogManager更新topic的配置信息,topic粒度配置管理,具體請查看topic級別配置
KafkaHealthcheck 監聽zk session expire,在zk上建立broker信息,便於其餘broker和consumer獲取其信息
KafkaController kafka集羣中央控制器選舉,leader選舉,副本分配。
KafkaScheduler 負責副本管理和日誌管理調度等等
ZkClient 負責註冊zk相關信息.
BrokerTopicStats topic信息統計和監控
ControllerStats 中央控制器統計和監控
參考文獻
【1】https://zqhxuyuan1.gitbooks.io/kafka/content/chapter1-intro.html
【2】http://blog.csdn.net/lizhitao/article/details/37911993