kafka控制層-controller啓動與選舉

KafkaServer啓動

在startup方法裏,會實例化KafkaControlleride

class KafkaServer {
  def startup() {
        .......
        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
        kafkaController.startup()
        ......
    }
}

KafkaController重要屬性

  • activeControllerId:獲取controller的主broker的id
  • eventManager: ControllerEventManager實例,負責處理事件

ControllerEventManager

ControllerEventManager實例化一個線程,用來單獨處理KafkaController發送的事件。函數

class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
                             eventProcessedListener: ControllerEvent => Unit) {
   // eventProcessedListener是event的回調函數

  @volatile private var _state: ControllerState = ControllerState.Idle

  // event隊列
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  private val thread = new ControllerEventThread("controller-event-thread")

  def state: ControllerState = _state

  def start(): Unit = thread.start()

  def close(): Unit = thread.shutdown()
   // KafkaController調用put方法,添加事件
  def put(event: ControllerEvent): Unit = queue.put(event)

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
    override def doWork(): Unit = {
      // 從queue取出event
      val controllerEvent = queue.take()
      // 更新狀態爲event的狀態
      _state = controllerEvent.state

      try {
        // event自身包含process方法,定義瞭如何處理event
        rateAndTimeMetrics(state).time {
          controllerEvent.process()
        }
      } catch {
        case e: Throwable => error(s"Error processing event $controllerEvent", e)
      }
      // 回調
      try eventProcessedListener(controllerEvent)
      catch {
        case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
      }
      // 更新狀態爲Idle
      _state = ControllerState.Idle
    }
  }

}

事件基類

state表示處理event時,controller的狀態。fetch

process表示處理event的程序this

sealed trait ControllerEvent {
  def state: ControllerState
  def process(): Unit
}

ControllerState

ControllerState目前有10種線程

object ControllerState {

  case object Idle extends ControllerState {
    def value = 0
    override protected def hasRateAndTimeMetric: Boolean = false
  }

  case object ControllerChange extends ControllerState {
    def value = 1
  }

  case object BrokerChange extends ControllerState {
    def value = 2
    override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs")
  }

  case object TopicChange extends ControllerState {
    def value = 3
  }

  case object TopicDeletion extends ControllerState {
    def value = 4
  }

  case object PartitionReassignment extends ControllerState {
    def value = 5
  }

  case object AutoLeaderBalance extends ControllerState {
    def value = 6
  }

  case object ManualLeaderBalance extends ControllerState {
    def value = 7
  }

  case object ControlledShutdown extends ControllerState {
    def value = 8
  }

  case object IsrChange extends ControllerState {
    def value = 9
  }

Startup事件

KafkaController在啓動的時候,會生成Startup事件,添加到EventManagescala

class KafkaController {
  def startup() = {
    eventManager.put(Startup)
    eventManager.start()
  }
}

Startup定義debug

case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      // 註冊zookeeper的鏈接事件
      registerSessionExpirationListener()
      // 註冊"/controller"節點的數據變化事件(節點存儲主controller的id,數據變化意味着主controller發送變化)
      registerControllerChangeListener()
      // 選舉,嘗試爭取controller 
      elect()
    }

  private def registerSessionExpirationListener() = {
    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
  }

class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging {

  @throws[Exception]
  override def handleNewSession(): Unit = {
    // 當從新鏈接到zookeeper時,發送Reelect事件
    eventManager.put(controller.Reelect)
  }
}

  private def registerControllerChangeListener() = {
    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
  }

  }

#Reelect事件 Reelect代表競選主controllercode

case object Reelect extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      val wasActiveBeforeChange = isActive
      // 獲取主controller的id
      activeControllerId = getControllerID()
      // 先前是主controller,如今是從controller
      if (wasActiveBeforeChange && !isActive) {
        // 關閉先前zookeeper事件的監聽
        onControllerResignation()
      }
      // 嘗試競選主controller
      elect()
    }
  }

  def elect(): Unit = {
    val timestamp = time.milliseconds
    // 節點數據,爲brokerId + timestamp
    val electString = ZkUtils.controllerZkData(config.brokerId, timestamp)

    activeControllerId = getControllerID()
    // activeControllerId爲-1,表示主controller尚未
    if (activeControllerId != -1) {
      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
      return
    }

    try {
      // 競爭建立臨時節點。若是未成功,會拋出ZkNodeExistsException異常
      val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath,
                                                      electString,
                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
                                                      controllerContext.zkUtils.isSecure)
      zkCheckedEphemeral.create()
      info(config.brokerId + " successfully elected as the controller")
      activeControllerId = config.brokerId
      // 註冊監聽事件,這些都是主controller負責
      onControllerFailover()
    } catch {
      case _: ZkNodeExistsException =>
        activeControllerId = getControllerID
        if (activeControllerId != -1)
          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
        else
          warn("A controller has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
        triggerControllerMove()
    }
  }

  def onControllerFailover() {
    info("Broker %d starting become controller state transition".format(config.brokerId))
    readControllerEpochFromZookeeper()
    incrementControllerEpoch()

    // 分區分配事件
    registerPartitionReassignmentListener()
    // 通知事件
    registerIsrChangeNotificationListener()
    // replica選舉事件
    registerPreferredReplicaElectionListener()
    // topic變化事件
    registerTopicChangeListener()
    // topic刪除事件
    registerTopicDeletionListener()
    // broker事件
    registerBrokerChangeListener()

    // 初始化context
    initializeControllerContext()
    
    val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // 啓動副本狀態機
    replicaStateMachine.startup()
    // 啓動分片狀態機
    partitionStateMachine.startup()

    // 監聽partition改變事件
    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    maybeTriggerPartitionReassignment()
    topicDeletionManager.tryTopicDeletion()
    val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
    onPreferredReplicaElection(pendingPreferredReplicaElections)
    info("starting the controller scheduler")
    kafkaScheduler.startup()
    if (config.autoLeaderRebalanceEnable) {
      scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
    }
  }

歸納

KafkaController負責集羣中的一些事件,好比topic的新增和刪除。orm

KafkaController在kafka集羣中,扮演着很重要的角色。因此爲了保證controller的高可用,集羣的每一個節點,都會運行KafkaController服務,經過zookeeper的選舉,保證任什麼時候刻只有一個主的controller。隊列

KafkaController在啓動的時候,就會嘗試競爭主controller。若是競選成功,就會在zookeeper中註冊監聽事件。

相關文章
相關標籤/搜索