updateCache方法用來更新緩存的。api
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/KafkaController | sendUpdateMetadataRequest | 1043 |
kafka/controller/KafkaController | onControllerFailover | 288 |
kafka/controller/KafkaController | elect | 1658 |
kafka/controller/KafkaController$Startup$ | process | 1581 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply$mcV$sp | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/metrics/KafkaTimer | time | 32 |
kafka/controller/ControllerEventManager$ControllerEventThread | doWork | 64 |
kafka/utils/ShutdownableThread | run | 70 |
啓動的時候選舉,啓動這個動做也是個事件緩存
// KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { registerSessionExpirationListener() registerControllerChangeListener() elect() } }
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/KafkaController | sendUpdateMetadataRequest | 1043 |
kafka/controller/KafkaController | onBrokerStartup | 387 |
kafka/controller/KafkaController$BrokerChange | process | 1208 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply$mcV$sp | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/metrics/KafkaTimer | time | 32 |
kafka/controller/ControllerEventManager$ControllerEventThread | doWork | 64 |
kafka/utils/ShutdownableThread | run | 70 |
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/KafkaController | sendUpdateMetadataRequest | 1043 |
kafka/controller/TopicDeletionManager | kafka$controller$TopicDeletionManager$$onTopicDeletion | 268 |
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 | apply | 333 |
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 | apply | 333 |
scala/collection/immutable/Set$Set1 | foreach | 94 |
kafka/controller/TopicDeletionManager | resumeDeletions | 333 |
kafka/controller/TopicDeletionManager | enqueueTopicsForDeletion | 110 |
kafka/controller/KafkaController$TopicDeletion | process | 1280 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply$mcV$sp | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/metrics/KafkaTimer | time | 32 |
kafka/controller/ControllerEventManager$ControllerEventThread | doWork | 64 |
kafka/utils/ShutdownableThread | run | 70 |
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/ControllerBrokerRequestBatch | updateMetadataRequestBrokerSet | 291 |
kafka/controller/ControllerBrokerRequestBatch | newBatch | 294 |
kafka/controller/PartitionStateMachine | handleStateChanges | 105 |
kafka/controller/KafkaController | onNewPartitionCreation | 499 |
kafka/controller/KafkaController | onNewTopicCreation | 485 |
kafka/controller/KafkaController$TopicChange | process | 1237 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply$mcV$sp | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/metrics/KafkaTimer | time | 32 |
kafka/controller/ControllerEventManager$ControllerEventThread | doWork | 64 |
kafka/utils/ShutdownableThread | run | 70 |
topic建立這個是從隊列中拿到事件再處理的方式
隊列是kafka.controller.ControllerEventManager.queue
放入過程以下,本質仍是監聽zk的path的child的變化:
安全
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/ControllerEventManager | put | 44 |
kafka/controller/TopicChangeListener | handleChildChange | 1712 |
org/I0Itec/zkclient/ZkClient$10 | run | 848 |
org/I0Itec/zkclient/ZkEventThread | run | 85 |
註冊監聽器的代碼以下:app
// class KafkaController private def registerTopicChangeListener() = { zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener) }
順帶說一下有6個地方訂閱了zk的子節點的變化:ide
處理建立topic事件:ui
// ControllerChannelManager.scala class ControllerBrokerRequestBatch def sendRequestsToBrokers(controllerEpoch: Int) { // ....... val updateMetadataRequest = { val liveBrokers = if (updateMetadataRequestVersion == 0) { // ....... } else { controllerContext.liveOrShuttingDownBrokers.map { broker => val endPoints = broker.endPoints.map { endPoint => new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName) } new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) } } new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava) } updateMetadataRequestBrokerSet.foreach { broker => controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null) } // ....... }
topic建立時更新metadata再進一步的過程
構建發送請求事件放入發送隊列等待發送線程發送
構建發送請求事件代碼以下:this
// ControllerChannelManager def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest], callback: AbstractResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => stateInfo.messageQueue.put(QueueItem(apiKey, request, callback)) case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } } }
調用棧:
線程
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/controller/ControllerChannelManager | sendRequest | 81 |
kafka/controller/KafkaController | sendRequest | 662 |
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2 | apply | 405 |
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2 | apply | 405 |
scala/collection/mutable/HashMap$$anonfun$foreach$1 | apply | 130 |
scala/collection/mutable/HashMap$$anonfun$foreach$1 | apply | 130 |
scala/collection/mutable/HashTable$class | foreachEntry | 241 |
scala/collection/mutable/HashMap | foreachEntry | 40 |
scala/collection/mutable/HashMap | foreach | 130 |
kafka/controller/ControllerBrokerRequestBatch | sendRequestsToBrokers | 502 |
kafka/controller/PartitionStateMachine | handleStateChanges | 105 |
kafka/controller/KafkaController | onNewPartitionCreation | 499 |
kafka/controller/KafkaController | onNewTopicCreation | 485 |
kafka/controller/KafkaController$TopicChange | process | 1237 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply$mcV$sp | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 | apply | 53 |
kafka/metrics/KafkaTimer | time | 32 |
kafka/controller/ControllerEventManager$ControllerEventThread | doWork | 64 |
kafka/utils/ShutdownableThread | run | 70 |
發送線程發送請求:
代碼以下:scala
// ControllerChannelManager.scala class RequestSendThread override def doWork(): Unit = { def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100)) val QueueItem(apiKey, requestBuilder, callback) = queue.take() //... while (isRunning.get() && !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { if (!brokerReady()) { isSendSuccessful = false backoff() } else { val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true) clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) isSendSuccessful = true } } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, requestBuilder.toString, brokerNode.toString), e) networkClient.close(brokerNode.idString) isSendSuccessful = false backoff() } } // ...... }
CLASS_NAME | METHOD_NAME | LINE_NUM |
kafka/server/MetadataCache | kafka$server$MetadataCache$$addOrUpdatePartitionInfo | 150 |
kafka/utils/CoreUtils$ | inLock | 219 |
kafka/utils/CoreUtils$ | inWriteLock | 225 |
kafka/server/MetadataCache | updateCache | 184 |
kafka/server/ReplicaManager | maybeUpdateMetadataCache | 988 |
kafka/server/KafkaApis | handleUpdateMetadataRequest | 212 |
kafka/server/KafkaApis | handle | 142 |
kafka/server/KafkaRequestHandler | run | 72 |
線程信息: kafka-request-handler-5
靠 partitionMetadataLock
讀寫鎖控制cache數據的讀取與寫入的線程安全。元數據信息在發送請求中已經構造好了。此處還涉live broker的更新等。code
應該還要補充:leader切換和isr變化等