kafka刪除一個topic

前言

當咱們在shell中執行topic刪除命令的時候` kafka-topics --delete --topic xxxx --zookeeper xxx`,會顯示,xxxx已經被標記爲刪除。而後過了好久你再查看topic列表,發現那個topic依然被標記刪除,顯然刪除沒有真正執行。下面就深刻了解,kafka刪除topic的流程。 shell

先說結論

delete.topic.enable,配置默認是false,意思是 是否容許kafka集羣刪除topic,只有爲true的狀況,kafka纔會刪除那些已經被標記爲刪除的topic。不然topic將不會被刪除,僅僅被標記,所謂標記,也就是在zk上記錄那些delete的topic。注意修改完後須要重啓集羣。分佈式

若是想手動刪除topic,那麼須要作兩件事情ide

      1. 刪除zookeeper上topic的數據fetch

          /brokers/ids/topics/xxxui

          /config/topics/xxxthis

      2. 刪除該topic全部partition和replica的數據spa

          數據在全部broker的`log.dirs`目錄下,文件夾結構是topic-partition的方式,直接將該topic的整個文件夾刪除便可 debug

Topic標記刪除

經過shell命令能夠找到操做topic的類TopicCommand,在刪除topic這塊邏輯中,只作了3件事情,1.判斷該topic是否存在;2.判斷topic是不是kafka內部topic(不容許被刪除); 3.在zk上建立一個節點(/admin/delete_toppics/xxx)來記錄刪除的topic。下面是詳細代碼code

def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  val topics = getTopics(zkUtils, opts)
  val ifExists = opts.options.has(opts.ifExistsOpt)
  // topic不存在 直接拋出異常
  if (topics.isEmpty && !ifExists) {
    throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
       opts.options.valueOf(opts.zkConnectOpt)))
  }
  topics.foreach { topic =>
    try {
      // topic是kafka本身的,好比__offset之類的topic,那麼不容許刪除
      if (Topic.isInternal(topic)) {
        throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
      } else {
       // 在zk上建立一個節點,標記該topic是須要被刪除的
       zkUtils.createPersistentPath(getDeleteTopicPath(topic))
        println("Topic %s is marked for deletion.".format(topic))
        println("Note: This will have no impact if delete.topic.enable is not set to true.")
      }
    } catch {
      case _: ZkNodeExistsException =>
        println("Topic %s is already marked for deletion.".format(topic))
      case e: AdminOperationException =>
        throw e
      case _: Throwable =>
        throw new AdminOperationException("Error while deleting topic %s".format(topic))
    }
  }
}

至此topic刪除已經走完,那麼背標記爲刪除的topic是在何時才被真正的刪除呢?下面接着分析。orm

Topic刪除

首先仍是從zk上標記的刪除topic開始,KafkaController經過訂閱zookeeper的刪除節點的變化來監聽是否有新的topic須要被刪除,再經過註冊TopicDeletionListener處理監聽到的刪除事件,下面就貼一段處理刪除事件類TopicDeletion的代碼。主要邏輯也是3個,1.判斷刪除的topic是否存在;2.判斷是否開啓delete.topic.enable功能;3.判斷是否有正在從新分配的topic,topic重分配會致使topic的partition數據在broker中轉移,從而致使controller沒法精準的定位到該topic所在broker的信息,因此正在從新分配的topic不能被刪除,直到重分配結束;4.都知足條件,那麼執行刪除邏輯 

 override def process(): Unit = {
    if (!isActive) return
    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
      warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
      nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
    }
    topicsToBeDeleted --= nonExistentTopics
    if (config.deleteTopicEnable) {
      if (topicsToBeDeleted.nonEmpty) {
        info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
        // mark topic ineligible for deletion if other state changes are in progress
        topicsToBeDeleted.foreach { topic =>
          val partitionReassignmentInProgress =
           controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
          if (partitionReassignmentInProgress)
            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
        }
        // add topic to deletion list
       topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    } else {
      // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
      for (topic <- topicsToBeDeleted) {
        info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
       zkUtils.zkClient.delete(getDeleteTopicPath(topic))
      }
    }
  }
}

而後就進入刪除topic的主角TopicDeletionManager,這個類控制了topic的刪除邏輯。聯繫到`delete.topic.enable`這個配置,幾乎全部方法中都很明確地說明了,只有在`delete.topic.enable`爲true的狀況下,topic纔會被刪除。方法名均可以很好的體現功能,咱們直接看方法onTopicDeletion(),裏面調用了onPartitionDeletion(),而onPartitionDeletion()又調用了startReplicaDeletion()。顯而易見,刪除topic其實就是把topic下全部partition刪了,而partition又有不少replica組成,也就是說須要把partition的replica刪了。上面提到的replica有可能正在重分配,或者出現replica暫時不可用的狀況,那麼擁有這些replica的topic會被當作不合格的topic,這些topic不會繼續刪除,直到下次重試的時候,topic的狀態變爲合格。

  /**
   * Invoked with the list of topics to be deleted
   * It invokes onPartitionDeletion for all partitions of a topic.
   * The updateMetadataRequest is also going to set the leader for the topics being deleted to
   * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
   * removed from their caches.
   */
  private def onTopicDeletion(topics: Set[String]) {
    info("Topic deletion callback for %s".format(topics.mkString(",")))
    // send update metadata so that brokers stop serving data for topics to be deleted
    val partitions = topics.flatMap(controllerContext.partitionsForTopic)
    controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
    val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
    topics.foreach { topic =>
      onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
    }
  }

  /**
   * Invoked by onTopicDeletion with the list of partitions for topics to be deleted
   * It does the following -
   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being
   *    deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
   * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
   *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
   *    it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
   * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
   *    will delete all persistent data from all replicas of the respective partitions
   */
  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
    info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
    val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
    startReplicaDeletion(replicasPerPartition)
  }

  /**
   * nvoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending
   * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
   * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
   * is never retried. A topic is removed from the in progress list when
   * 1. Either the topic is successfully deleted OR
   * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
   * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
   * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
   * the replicas a StopReplicaRequest (delete=true)
   * This method does the following things -
   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
   *    for deletion if some replicas are dead since it won't complete successfully anyway
   * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
   *@param replicasForTopicsToBeDeleted
   */
  private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
      val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
      val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
      val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
      // move dead replicas directly to failed state
      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
        new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
          eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
      if (deadReplicasForTopic.nonEmpty) {
        debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
        markTopicIneligibleForDeletion(Set(topic))
      }
    }
  }

TopicDeletionManager將須要刪除的replica經過rpc發送到各個broker,這裏TopicDeletionManager更像一個元數據刪除管理者,由於實際刪除數據是broker乾的事情,畢竟數據是分佈式的。接着參考類ReplicaManager,在接收到stopReplica的rpc後,ReplicaManager負責刪除本地的數據,參考方法stopReplica(),最後經過LogManager刪除相關文件夾。這部分調用鏈比較長就不貼代碼了。

在完成topic刪除後,TopicDeletionManager再將topic的元數據刪除,參考方法completeDeleteTopic(),刪除內存中該topic的相關數據,刪除topic在zookeeper上的數據,包括3個地方/brokers/ids/topics/,/config/topics/,/admin/delete_topics。

  private def completeDeleteTopic(topic: String) {
    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
    // firing before the new topic listener when a deleted topic gets auto created
    controller.deregisterPartitionModificationsListener(topic)
    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
    // controller will remove this replica from the state machine as well as its partition assignment cache
    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
    // move respective partition to OfflinePartition and NonExistentPartition state
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
    topicsToBeDeleted -= topic
    partitionsToBeDeleted.retain(_.topic != topic)
    val zkUtils = controllerContext.zkUtils
    zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
    zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
    zkUtils.zkClient.delete(getDeleteTopicPath(topic))
    controllerContext.removeTopic(topic)
  }

總結

Kafka依託於zookeeper管理其自身的元數據,並由本身的controller管理全部元數據,客戶端經過修改zookeeper節點數據來觸發kafka的事件,從而完成相關操做。對於刪除topic而言,kafka經過partition和replica的狀態機和事件的機制來實如今複雜環境下對topic的刪除。

參考

Kafka 0.11.0.2 源碼

相關文章
相關標籤/搜索