Springboot系列之kafka操做

Springboot系列之kafka操做

kafka簡介

ApacheKafka®是一個分佈式流媒體平臺。有三個關鍵功能:html

發佈和訂閱記錄流,相似於消息隊列或企業消息傳遞系統。
以容錯的持久方式存儲記錄流。
記錄發生時處理流。

Kafka一般用於兩大類應用:app

構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
構建轉換或響應數據流的實時流應用程序

kafka概念

(1)什麼是流處理?異步

所謂流處理,個人理解是流水線處理。例如,電子廠每一個人負責一個功能,來了就處
理,不來就等着。

(2)partition和replication和broker有關嗎?分佈式

partition和replication是分區和備份的概念。即便是單機一個broker也同樣
支持。

(3)consumer如何設置和存儲partition的offset偏移量,有哪幾種消費模式,怎麼肯定消息是否被消費,將偏移量移到前面會當即消費到最後嗎?ide

使用KafkaConsumer設置partition和offset。有自動提交和手動ack模式提交
偏移量兩種消費方式。將偏移量移到前面須要設置成爲消費狀態會當即被消費(設置
新消費組)。

(4)AckMode模式有哪幾種?工具

RECORD:處理記錄後,偵聽器返回時提交偏移量
BATCH:在處理poll()返回的全部記錄時提交偏移量
TIME:只要已超過自上次提交以來的ackTime,就會在處理poll()返回的全部記錄時提交偏移量
COUNT:只要自上次提交以來已收到ackCount記錄,就會在處理poll()返回的全部記錄時提交偏移量
COUNT_TIME:與TIME和COUNT相似,但若是任一條件爲真,則執行提交
MANUAL:消息監聽器負責確認()確認。 以後,應用與BATCH相同的語義    
MANUAL_IMMEDIATE:當偵聽器調用Acknowledgment.acknowledge()方法時,當即提交偏移量

Springboot使用kafka

(1)注入NewTopic自動在broker中添加topicui

@Bean
public NewTopic topic() {
    return new NewTopic("topic1", 2, (short) 1);
}

(2)使用KafkaTemplate發送消息時,topic自動建立,自動建立的partition是0,長度爲1code

(3)使用KafkaTemplate發送消息server

@RequestMapping("sendMsgWithTopic")
public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key,
                               @RequestParam String value) {

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value);
    return "success";
}

(4)異步發送消息htm

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                handleSuccess(data);
            }
            @Override
            public void onFailure(Throwable ex) {
                handleFailure(data, record, ex);
           }
    });
}

(5)同步發送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);
    try {
            template.send(record).get(10, TimeUnit.SECONDS);
            handleSuccess(data);
    }catch (ExecutionException e) {
            handleFailure(data, record, e.getCause());
    }catch (TimeoutException | InterruptedException e) {
            handleFailure(data, record, e);
    }
}

(6)事務

(1)Spring事務支持一塊兒使用(@Transactional,TransactionTemplate等)
(2)使用template執行事務
    boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
    });

(7)消費者

(1)簡單使用
 @KafkaListener(id = "myListener", topics = "myTopic",
    autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
 public void listen(String data) {
    ...
 }
(2)配置多個topic和partition,TopicPartition中partitions和PartitionOffset不能同時使用
 @KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
 public void listen(ConsumerRecord<?, ?> record) {
    ...
 }
(3)使用ack手動確認模式
 @KafkaListener(id = "cat", topics = "myTopic",
      containerFactory = "kafkaManualAckListenerContainerFactory")
 public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
 }
 (4)獲取消息的header信息
 @KafkaListener(id = "qux", topicPattern = "myTopic1")
 public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
    ...
 }
(5)批處理
 @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
 public void listen(List<String> list,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
    @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
 }
(6)使用@Valid校驗數據
 @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
   containerFactory = "kafkaJsonListenerContainerFactory")
 public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
 }
 @Bean
 public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
 }
(7)topic根據參數類型映射不一樣方法
 @KafkaListener(id = "multi", topics = "myTopic")
 static class MultiListenerBean {
    @KafkaHandler
    public void listen(String cat) {
        ...
    }
    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }
    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }
 }

Springboot使用kafka踩坑

(1)須要修改server.properties的listener主機地址否則Java獲取不到消息。

(2)不一樣服務配置相同groupId只有一個監聽者能夠收到消息

kafka圖形化工具 kafka tool

下載地址 http://www.kafkatool.com/down...

有問題請留言! 原文地址:

相關文章
相關標籤/搜索