從外部重置一個運行中consumer group的消費進度

對於0.10.1以上版本的kafka, 如何從外部重置一個運行中的consumer group的進度呢?好比有一個控制檯,能夠主動重置任意消費組的消費進度重置到12小時以前, 而用戶的程序能夠保持運行狀態,無需下線或重啓。java

須要這麼幾個步驟:git

1. 加入這個groupgithub

2. 踢掉全部其它group memeberapp

3. try assign all TopicPartition to this clientide

4. commit offsetsui

5. leave groupthis

 

其中第二步是爲了讓本身當上leader,固然有可能不須要踢掉其它全部成員就能當上leader(由於誰能當leader其實是按hashmap的迭代次序來的)。spa

當上consumer group的leader之後,須要把全部partition assign給本身,這個須要一個特殊的PartitionAssignor。因爲這個assignor的協議跟其它consumer group協議不一樣(可是也能夠搞一個表面上協議相同,實際上邏輯不一樣的assignor),而cooridnator會阻止與當前leader使用的協議不一樣的成員加入,因此仍是須要踢掉其它成員。scala

public class ExclusiveAssignor extends AbstractPartitionAssignor {

    public interface Callback {
        void onSuccess();
    }


    private static Logger LOGGER = LoggerFactory.getLogger(ExclusiveAssignor.class);

    public static String NAME = "exclusive";


    private String leaderId = null;
    private Callback callback = null;

    public void setLeaderId(String leaderId) {
        this.leaderId = leaderId;
    }
    public void setCallBack(Callback callBack){this.callback = callBack;}


    @Override
    public String name() {
        return NAME;
    }

    private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue())
                put(res, topic, consumerId);
        }
        return res;
    }


    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, List<String>> subscriptions) {
        LOGGER.info("perform exclusive assign");
        if(leaderId == null)
            throw new IllegalArgumentException("leaderId should already been set before assign is called");
        if(callback == null)
            throw new IllegalArgumentException("callback should already been set before assign is called");

        List<TopicPartition> allPartitions = new ArrayList<TopicPartition>();
        partitionsPerTopic.forEach((topic, partitionNumber) -> {
            for(int i=0; i < partitionNumber; i++)
                allPartitions.add(new TopicPartition(topic, i));
        });
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList<TopicPartition>());
            if(memberId.equals(leaderId)){
                assignment.get(memberId).addAll(allPartitions);
            }
        }
        callback.onSuccess();
        return assignment;
    }

}

這個assignor須要知道leaderId是哪一個,而leaderId能夠在KafkaConsumer的code

 protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions)

中獲取,因此還須要修改一下KafkaConsumer的代碼,以確保這個KafkaConsumer的poll並不實際拉取消息,而只是執行commit。

 

驅逐其它member,可使用AdminClient完成

  def forceLeave(coordinator: Node, memberId: String, groupId: String) = {
    logger.info(s"forcing group member: $memberId to leave group: $groupId ")
    send(coordinator, ApiKeys.LEAVE_GROUP, new LeaveGroupRequest(groupId, memberId))
  }

 

最終的邏輯就是

  private def forceCommit(consumer: SimpleKafkaConsumer[_, _], groupId: String, topics: Seq[String], maxRetries: Int, toCommit: Map[TopicPartition, OffsetAndMetadata], coordinatorOpt: Option[Node] = None) = {
    consumer.subscribe(JavaConversions.seqAsJavaList(topics))
    val assignedAll = new AtomicBoolean(false)
    consumer.setExclusiveAssignorCallback(new Callback {
      override def onSuccess(): Unit = assignedAll.set(true)
    })
    var currentRetries = 0
    val coordinatorNode = coordinatorOpt.getOrElse(adminClient.findCoordinator(groupId))
    while (!assignedAll.get() && currentRetries < maxRetries) {
      logger.info(s"trying to reset offset for $groupId, retry count $currentRetries  ....")
      clearCurrentMembers(coordinatorNode, groupId, Some(ConsumerGroupManager.magicConsumerId))
      consumer.poll(5000)
      printCurrentAssignment(consumer)
      currentRetries = currentRetries + 1
    }
    if (currentRetries >= maxRetries)
      throw new RuntimeException(s"retry exhausted when getting leadership of $groupId")
    val javaOffsetToCommit = JavaConversions.mapAsJavaMap(toCommit)
    consumer.commitSync(javaOffsetToCommit)
    logger.info(s"successfully committed offset for $groupId: $toCommit")
    consumer.unsubscribe()
  }

 

  def forceReset(offsetLookupActor: ActorRef, groupId: String, ts: Long, maxRetries: Int)(implicit executionContext: ExecutionContext): Boolean = {
    logger.info(s"resetting offset for $groupId to $ts")
    val groupSummary = adminClient.describeConsumerGroup(groupId)
    val topics = groupSummary.subscribedTopics
    if (topics.isEmpty)
      throw new IllegalStateException(s"group $groupId currently subscribed no topic")
    val offsetToCommit = getOffsetsBehindTs(offsetLookupActor, topics, ts, 10000)
    val consumer = createConsumer(groupId)
    try {
      forceCommit(consumer, groupId, topics, maxRetries, offsetToCommit)
      true
    } finally {
      consumer.close()
    }
  }

 具體代碼見 https://github.com/iBuddha/kafka-simple-ui/blob/master/app/kafka/authorization/manager/utils/ConsumerGroupManager.scala

須要注意的是,發送LeaveGroupRequest可能會使得某些成員到broker的鏈接斷掉,發生這種狀況的緣由是:當一個consumer發送JoinGroupRequest之後,外部的client再發送一個LeaveGroupRequest把這個consumer踢掉,會使得它個consumer沒法收到JoinGroupResponse,從而使得NetworkClient覺得鏈接掛掉。不過client之後會從新鏈接。並且,在外部client踢掉其它成員而且從新commit offset的過程當中,其它consumer不必定有機會加入到group中,於是可能不受這個問題的影響。

相關文章
相關標籤/搜索