Kafka中數據的流向

1: 多個消費者消費同一個Topic數據相同的數據算法

2: 多個消費者消費同一個Topic數據不一樣數據服務器

3: 各個消費者按組協調消費this

1: 多個消費者消費同一個Topic數據相同的數據

(1)使用一個全新的"group.id"(就是以前沒有被任何消費者使用過);

(2)使用assign來訂閱;
# 例如 groupId 
@KafkaListener(topics = "test-syn",groupId = "test-2")
public void send(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】" + messge);
    }
}

注意:若是把 "enable.auto.commit" 設爲 "false",使用 consumer.commitAsync(currentOffsets, null) 手動提交 offset ,是不能從頭開始消費的spa

auto.offset.reset值含義解釋: code

      • earliest
          • 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      • latest
          • 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
      • none
          • topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

clipboard

也就是說不管哪一種設置,只要 kafka 中相同 group、partition 中已經有提交的 offset,則都沒法從開始消費。blog

參考論壇:服務器重啓了,那麼該group是否會從新消費服務器裏面全部的消息ip

KafkaConsumer.subscribe() : 爲consumer自動分配partition,get

有內部算法保證topic-partition以最優的方式均勻分配給同group下的不一樣consumer。若是有多個partition且只有一個消費者,則按順序消費全部分區。不會重複消費。kafka

KafkaConsumer.assign() : 爲consumer手動、顯示的指定須要消費的topic-partitions,it

不受group.id限制,不提交offset,至關與指定的group無效(this method does not use the consumer's group management)。能夠重複消費。

或者,這樣作:

clipboard

目前就 high level API 而言,offset 是存於 Zookeeper 中的,沒法存於 HDFS,而 low level API 的 offset 是由本身去維護的,能夠將之存於 HDFS 中。

2: 多個消費者消費同一個Topic數據不一樣數據

# groupId 將多個消費者分配到同一個組下面
@KafkaListener(topics = "test-syn",groupId = "test-1")
public void send(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】" + messge);
    }
}
@KafkaListener(topics = "test-syn",groupId = "test-1")
public void send(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】" + messge);
    }
}

3: 各個消費者按組協調消費

@KafkaListener(topics = "test-syn",groupId = "test-1")
public void send(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】1" + messge);
    }
}

@KafkaListener(topics = "test-syn",groupId = "test-2")
public void send2(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】2" + messge);
    }
}
@KafkaListener(topics = "test-syn",groupId = "test-3")
public void send(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】1" + messge);
    }
}

@KafkaListener(topics = "test-syn",groupId = "test-2")
public void send2(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object messge = kafkaMessage.get();
        log.info("【KafkaListener監聽到消息】2" + messge);
    }
}
# 上面
1 2 3 收到相同的消費message
2 2 收到不一樣的message
相關文章
相關標籤/搜索