KafkaConsumer分析

一 重要的字段

String clientId:Consumer惟一標識

ConsumerCoordinator coordinator: 控制Consumer與服務器端GroupCoordinator之間的通訊邏輯

Fetcher<K, V> fetcher: 負責從服務器端獲取消息的組件,而且更新partition的offset

ConsumerNetworkClient client:  負責和服務器端通訊

SubscriptionState subscriptions: 便於快速獲取topic partition等狀態,維護了消費者消費狀態

Metadata metadata: 集羣元數據信息

AtomicLong currentThread: 當前使用KafkaConsumer的線程id

AtomicInteger refcount: 重入次數



二 核心的方法

2.1 subscribe 訂閱主題

訂閱給定的主題列表,以得到動態分配的分區

主題的訂閱不是增量的,這個列表將會代替當前的分配。注意,不可能將主題訂閱與組管理與手動分區分配相結合

做爲組管理的一部分,消費者將會跟蹤屬於某一個特殊組的消費者列表,若是知足在下列條件,將會觸發再平衡操做:

1 訂閱的主題列表的那些分區數量的改變

2 主題建立或者刪除

3 消費者組的成員掛了

4 經過join api將一個新的消費者添加到一個存在的消費者組

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    // 取得一把鎖
    acquire();
    try {
        if (topics == null) { // 主題列表爲null,拋出異常
            throw new IllegalArgumentException("Topiccollection to subscribe to cannot be null");
        } else if (topics.isEmpty()) {// 主題列表爲空,取消訂閱
            this.unsubscribe();
        } else {
            for (String topic : topics) {
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or emptytopic");
            }
            log.debug("Subscribed to topic(s):{}", Utils.join(topics, ", "));
            this.subscriptions.subscribe(new HashSet<>(topics), listener);
            // 用新提供的topic集合替換當前的topic集合,若是啓用了主題過時,主題的過時時間將在下一次更新中從新設置。
            metadata.setTopics(subscriptions.groupSubscription());
        }
    } finally {
        // 釋放鎖
        release();
    }
}



2.2 assign 手動分配分區

對於用戶手動指定topic的訂閱模式,經過此方法能夠分配分區列表給一個消費者:

public void assign(Collection<TopicPartition> partitions) {
    acquire();
    try {
        if (partitions == null) {
            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
        } else if (partitions.isEmpty()) {// partition爲空取消訂閱
            this.unsubscribe();
        } else {
            Set<String> topics = new HashSet<>();
            // 遍歷TopicPartition,把topic添加到一個集合裏
            for (TopicPartition tp : partitions) {
                String topic = (tp != null) ? tp.topic() : null;
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                topics.add(topic);
            }

            // 進行一次自動提交
            this.coordinator.maybeAutoCommitOffsetsNow();

            log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
            // 根據用戶提供的指定的partitions 改變assignment
            this.subscriptions.assignFromUser(new HashSet<>(partitions));
            metadata.setTopics(topics);// 更新metatdata topic
        }
    } finally {
        release();
    }
}


2.3 commitSync & commitAsync 提交消費者已經消費完的消息的offset,爲指定已訂閱的主題和分區列表返回最後一次poll返回的offset

public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    acquire();
    try {
        coordinator.commitOffsetsSync(offsets);
    } finally {
        release();
    }
}
 
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
    acquire();
    try {
        log.debug("Committing offsets: {} ", offsets);
        coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
    } finally {
        release();
    }
}


2.4 seek 指定消費者消費的起始位置

public void seek(TopicPartition partition, long offset) {
    if (offset < 0) {
        throw new IllegalArgumentException("seek offset must not be a negative number");
    }
    acquire();
    try {
        log.debug("Seeking to offset {} for partition {}", offset, partition);
        this.subscriptions.seek(partition, offset);
    } finally {
        release();
    }
}
// 爲指定的分區查找第一個offset
public void seekToBeginning(Collection<TopicPartition> partitions) {
    acquire();
    try {
        Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
        for (TopicPartition tp : parts) {
            log.debug("Seeking to beginning of partition {}", tp);
            subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
        }
    } finally {
        release();
    }
}
// 爲指定的分區查找最後的offset
public void seekToEnd(Collection<TopicPartition> partitions) {
    acquire();
    try {
        Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
        for (TopicPartition tp : parts) {
            log.debug("Seeking to end of partition {}", tp);
            subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
        }
    } finally {
        release();
    }
}


2.5 poll方法 獲取消息

從指定的主題或者分區獲取數據,在poll以前,你沒有訂閱任何主題或分區是不行的,每一次poll,消費者都會嘗試使用最後一次消費的offset做爲接下來獲取數據的start offset,最後一次消費的offset也能夠經過seek(TopicPartition, long)設置或者自動設置

public ConsumerRecords<K, V> poll(long timeout) {
    acquire();
    try {
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");
        // 若是沒有任何訂閱,拋出異常
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

        // 一直poll新數據直到超時
        long start = time.milliseconds();
        // 距離超時還剩餘多少時間
        long remaining = timeout;
        do {
            // 獲取數據,若是自動提交,則進行偏移量自動提交,若是設置offset重置,則進行offset重置
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            if (!records.isEmpty()) {
                // 再返回結果以前,咱們能夠進行下一輪的fetch請求,避免阻塞等待
                fetcher.sendFetches();
                client.pollNoWakeup();
                // 若是有攔截器進行攔截,沒有直接返回
                if (this.interceptors == null)
                    return new ConsumerRecords<>(records);
                else
                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}


private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    // 輪詢coordinator事件,處理週期性的offset提交
    coordinator.poll(time.milliseconds());

    // fetch positions if we have partitions we're subscribed to that we
    // don't know the offset for
    // 判斷上一次消費的位置是否爲空,若是不爲空,則
    if (!subscriptions.hasAllFetchPositions())
        // 更新fetch position
        updateFetchPositions(this.subscriptions.missingFetchPositions());

    // 數據你準備好了就當即返回,也就是還有可能沒有準備好
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;

    // 咱們須要發送新fetch請求
    fetcher.sendFetches();

    long now = time.milliseconds();
    long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

    client.poll(pollTimeout, now, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        }
    });
    // 早長時間的poll以後,咱們應該在返回數據以前檢查是否這個組須要從新平衡,以致於這個組可以迅速的穩定
    if (coordinator.needRejoin())
        return Collections.emptyMap();
    // 獲取返回的消息
    return fetcher.fetchedRecords();
}


2.6 pause 暫停消費者,暫停後poll返回空

public void pause(Collection<TopicPartition> partitions) {
    acquire();
    try {
        for (TopicPartition partition: partitions) {
            log.debug("Pausing partition {}", partition);
            subscriptions.pause(partition);
        }
    } finally {
        release();
    }
}
// 返回暫停的分區

public Set<TopicPartition> paused() {
    acquire();
    try {
        return Collections.unmodifiableSet(subscriptions.pausedPartitions());
    } finally {
        release();
    }
}


2.7 resume 恢復消費者

public void resume(Collection<TopicPartition> partitions) {
    acquire();
    try {
        for (TopicPartition partition: partitions) {
            log.debug("Resuming partition {}", partition);
            subscriptions.resume(partition);
        }
    } finally {
        release();
    }
}


2.8 position方法 獲取下一個消息的offset

// 獲取下一個record的offset
public long position(TopicPartition partition) {
    acquire();
    try {
        if (!this.subscriptions.isAssigned(partition))
            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
        Long offset = this.subscriptions.position(partition);
        if (offset == null) {
            updateFetchPositions(Collections.singleton(partition));
            offset = this.subscriptions.position(partition);
        }
        return offset;
    } finally {
        release();
    }
}
相關文章
相關標籤/搜索