kafka 系列 -- 4.一、消費者基本介紹

一、消費者食用DEMO

Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        String key = record.key();
        String value = record.value();
        System.err.println(record.toString());
    }
}

二、消費者基本概念

kafka 消費者是以 組爲基本單位 進行消費的。消費的模型以下
image.pngjava

1 個 topic 容許被多個 消費組 消費。再次強調,kafka 消費是以組爲單位。正則表達式

prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");

以上這行代碼設置了消費組。算法

2.一、partition 分配

topic 爲邏輯上的概念,partition 纔是物理上的概念。那麼看完這個以上的消費模型圖。你可能會很疑惑。當一個組下有多個消費者時,每一個消費者是如何消費的?安全

先說明:partition 的分配爲平均分配多線程

假設一:topic1 下面有 3 個分區。分別以下:p1 - p3。那麼 groupA 下的三個消費者消費的對應 partition 爲以下ide

instance1: p1
instance2: p2
instance3: p3

假設二:topic1 下面有 8 個分區。分別爲 p1 - p8。那麼 groupA 中每一個消費者分配到的 partition 就以下函數

instance1: p1,p2,p3
instance2: p4,p5,p6
instance3: p7,p8

2.二、partition 重分配

假設三:topic1 下面有 8 個分區:P1 - P8。groupA 有三個消費者:c1,c2,c3。此時分配的 partition 以下fetch

c1: p1,p2,p3
c2: p4,p5,p6
c3: p7,p8

若是此時,又有一個新的消費者加入到 groupA 會發生什麼呢? partition 會被從新分配大數據

c1: p1,p2
c2: p3,p4
c3: p5,p6
c4: p7,p8

三、消費者端 API 介紹

3.一、訂閱主題

void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

從方法上看 kafka 容許一個消費者訂閱多個 topicspa

void subscribe(Pattern pattern);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

入參 Pattern 則表示,可使用正則表達式匹配多個 topic. 實例代碼以下

Pattern pattern = Pattern.compile("test?");
consumer.subscribe(pattern);

能夠訂閱主題,那麼天然也能夠取消訂閱主題

consumer.unsubscribe();

固然,也能夠直接獲取到消費組訂閱的主題

Set<String> topics = consumer.subscription();

一個主題下面有多個 partition, 那麼是否能夠指定要消費的隊列呢?答案是能夠的

TopicPartition p1 = new TopicPartition("test1", 0);
TopicPartition p2 = new TopicPartition("test1", 1);
consumer.assign(Arrays.asList(p1, p2));

不過須要注意的是,若是指定了消費的分區,那麼是消費者是沒法自動 rebanlance 的。

3.二、消息消費

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

從消費者端的這行代碼,咱們能夠看出,kafka 消息消費採用的是拉取模式。當未拉取到消息時,會阻塞線程。

poll 方法返回的 ConsumerRecords 實現 Iterable 接口,是 ConsumerRecord 的迭代器。ConsumerRecord 屬性相對簡單

public class ConsumerRecord<K, V> { 
    private final String topic; // 主題
    private final int partition; // 分區
    private final long offset; // 消息所屬分區偏移量
    private final long timestamp; // 時間戳
    private final TimestampType timestampType; // 二者類型,消息建立時間戳及消息追加到日誌的時間戳 
    private final int serializedKeySize;
    private final int serializedValueSize; 
    private final Headers headers; // 發送的header
    private final K key;   // 發送的 key
    private final V value; // 發送的內容
    private volatile Long checksum; // CRC32 校驗值
}

3.三、位移提交

對於分區而言,消息會有一個惟一 offset, 表示消息在分區中的位置,稱之爲 偏移量。對於消息消費而言,也有消費進度的 offset,稱之爲 位移
kafka 將消息的消費進度存儲在 kafka 內部主題 __onsumer_offset 中。
kafka 中默認每隔 5s 保存消息的消費進度。可經過 auto.commit.interval.ms 進行配置。

kafka 提供手動提交的 API,下面演示一下。

Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("消費:" + record.toString());
    }
    consumer.commitSync();
}

須要注意的是,須要將 enable.auto.commit 設置爲 true.

3.四、設置新消費組從哪一個位置開始消費

kafka 設置 新消費組 從哪一個位置開始消費的配置爲:auto.offset.reset
該配置有如下 3 個配置項

  • latest(默認配置)

默認從最新的位置,開始消費。

  • earliest

從最先的位置開始消費。當配置爲該參數時,kafka 會打印以下日誌:Resetting offset for partition

  • none

當消費組,沒有對應消費進度時,會直接拋 NoOffsetForPartitionException 異常

kafka 還提供了 seek(TopicPartition partition, long offset) 方法,容許新的消費者,設置從哪一個位置開始消費。

// 由於分配 分區的動做,發生在 pool 中,所以在設置消費偏移量時,須要先拉取消息
Set<TopicPartition> assignment = new HashSet<>();

while (assignment.size() == 0) {
    consumer.poll(Duration.ofMillis(100));
    assignment = consumer.assignment();
}

for (TopicPartition tp : assignment) {
    consumer.seek(tp, 50);
}
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.err.println("消費:" + record.toString());
    }
}

更多狀況下,咱們可能會指定消費組從指定的時間點開始消費

Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
    // 指定從一天前開始消費
    timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}

Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);

for (TopicPartition tp : assignment) {
    OffsetAndTimestamp timestamp = offsets.get(tp);
    if (null != timestamp) {
        consumer.seek(tp, timestamp.offset());
    }
}

3.五、分區再均衡

在分區再均衡期間,消費組內的消費者是沒法讀取消息的。而且若是以前的消費者沒有及時提交消費進度,那麼會形成重複消費。

kafkasubscribe 的時候,提供了回調函數,容許咱們在觸發再均衡時,作控制

void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

看一下 ConsumerRebalanceListener 定義的接口

// 再均衡開始以前和消費者中止讀取消息以前被調用,可利用該會掉,提交消費位移
void onPartitionsRevoked(Collection<TopicPartition> partitions);

// 從新分區後,消費者開始讀取消息以前被調用
void onPartitionsAssigned(Collection<TopicPartition> partitions);

下面演示,如何在再均衡以前,提交消費偏移

consumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {

    // 在再均衡開始以前和消費者中止讀取消息以前被調用,可利用該會掉,提交消費位移
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 提交消費偏移
        consumer.commitSync();
    }
    
    // 從新分區後,消費者開始讀取消息以前被調用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }
});

3.六、消費者攔截器

消費者,容許在 消費以前消費偏移提交以後關閉以前,進行控制,多個攔截器則組成攔截器鏈, 且多個攔截器以前須要用 ',' 號隔開。
先看攔截器定義的接口

public interface ConsumerInterceptor<K, V> extends Configurable {
    // 消息消費以前
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    
    // 提交以後調用
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    
    // 關閉以前調用
    void close();
}
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName() + "," + MyConsumerInterceptor2.class.getName());

3.七、重要的消費者參數

  • fetch.min.bytes

默認 1Bpoll 時,拉取的最小數據量。

  • fetch.max.bytes

默認 5242880B,50MB,poll 時,拉取的最大數據量。

  • fetch.max.wait.ms

默認 500ms,若是 kafka 一直沒有觸發 poll 動做,那麼最多等待 fetch.max.wait.ms

  • max.partition.fetch.bytes

默認 1048576B,1MB,分區拉取時的最大數據量

  • max.poll.records

默認 500條,拉取的最大消息條數

  • connections.max.idle.ms

默認 540000ms, 9分鐘,多久關閉閒置的鏈接

  • receive.buffer.bytes

默認 65536B64KBSOCKET 接受消息的緩衝區(SO_RECBUF

  • request.timeout.ms

默認 30000ms,配置 consumer 等待請求響應的最長時間

  • metadata.max.age.ms

默認 300000ms,5 分鐘,配置元數據過時時間。元數據在限定的時間內,沒有更新,會被強制更新

  • reconnect.backoff.ms

默認 50ms,配置嘗試鏈接指定主機以前的等待時間,避免頻繁鏈接主機

  • retry.backoff.ms

默認 100ms,發送失敗時,2次的間隔時間

四、總結

  1. kafka 消費以組爲單位,且容許一個消費組訂閱多個 topic
  2. partition 重分配算法,爲平均算法
  3. KafkaConsumer 爲線程不安全。所以 poll() 只有當前線程在拉取消息。kafka 要實現多線程拉取相對麻煩
  4. kafka 消費者端,提供的 API 很是靈活,容許從指定的位置消費,容許手動提交某個分區的消費偏移
  5. kafka 提供消費者攔截器鏈,容許在 消費以前,提交消費偏移以後 控制。

五、與 RocketMQ 異同

  1. RocketMQ 建議 1 個消費組只消費一個 topic, 且在實際開發中,若是消費者訂閱多個 topic 會沒法正常工做。kafka 中 1 個消費者能夠訂閱多個 topic
  2. RocketMQ 能夠確保消費時,消息不丟失,kafka 沒法保證。
  3. RocketMQ 在消費者端,實現了多線程消費,kafka 則沒有
  4. kafka 默認每 5s 持久化消費進度,RocketMQ 也是。不過 RocketMQ 會提交偏移量最小的消息。好比,線程 A 消費了 20 的消息。線程 B 消費了 10 的消息。當線程 A 提交消費進度的時候,會提交 10,而不會提交20。這也是 RocketMQ 能夠確保消息消費時不丟的緣由。
  5. RocketMQ 發生 rebalance,即 kafka 的再分配。默認和 kafka 一致,採用的是 平均分配算法。不過 RocketMQ 容許自定義再分配算法,且提供了豐富算法支持。
  6. RocketMQkafka 一致,都存在重複消費問題。
  7. 從暴露出來的 API 來看,kafka 客戶端會比 RocketMQ 更加靈活。
  8. kafka 設置 新的消費組 從哪一個位置開始消費,沒有額外的條件限制;RocketMQ 只有當舊消息堆積很是多時,纔會有效。
相關文章
相關標籤/搜索