在spring boot中三分鐘上手日誌堆積系統kafka

kafka消息堆積能力比較強,能夠堆積上億的消息,特別適合日誌處理這種實時性要求不過高的場景,同時支持集羣部署,相比redis堆積能力和可靠性更高php

完整項目代碼已上傳github:github.com/neatlife/my…html

能夠經過下面的步驟快速上手這個kafkajava

獲取一個可用的kafka實例

能夠使用docker一鍵啓動一個kafka集羣,參考:github.com/simplesteph…git

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
複製代碼

操做效果以下 github

使用命令docker-compose -f full-stack.yml ps獲取能夠kafka監聽的端口web

記下kafka監聽的地址9092,這個後面會用到redis

8000端口是這個kafka的topic的ui界面,這個界面能夠查看當前的topic列表,效果以下 spring

這裏也看到topic裏保存的數據docker

準備案例項目

能夠在https://start.spring.io/建立測試項目shell

須要加上下面這三個包

  1. spring-boot-starter-web
  2. spring-kafka
  3. lombok

appliation.properties中配置kafka的地址和使用的group-id,這個group-id名稱能夠自行定義,好比:myconsumergroup

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup
複製代碼

用kafka客戶端發送消息

使用一個spring boot的service封裝kafka發送消息的代碼,核心代碼以下

package mykafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private final KafkaTemplate<String, String> kafkaTemplate;


    private String topic = "自行定義的topic";

    Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String message) {
        this.kafkaTemplate.send(topic, message);
        System.out.println("Sent sample message [" + message + "] to " + topic);
    }

}
複製代碼

而後編寫一個接口調用這個發送kafka消息的service,核心代碼以下:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {

    private final Producer producer;

    @RequestMapping("/test1")
    public String test1() {
        producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
        return "test1";
    }
}
複製代碼

注意:上面代碼裏使用的kafka的topic能夠自行定義,好比mytopic

而後在瀏覽器中訪問這個接口 ip:8080/test1

能夠在這個kafka的topic的ui看到發送到kafka的消息

能夠看到這個消息已經發送到kafka了

消費消息

消費消息只須要在方法上加上KafkaListener,並指定topic和groupId便可

核心代碼以下

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    log.info(
            "received message, topic: {}, partition: {}, offset: {}, message: {}",
            topics.get(0),
            partitions.get(0),
            offsets.get(0),
            message
    );
}
複製代碼

操做效果以下:

能夠看到已經成功收到了kafka裏的消息

其它客戶端

php發送和消費客戶端參考:github.com/arnaud-lb/p…

go客戶端參考:github.com/confluentin…

一些注意的點

發送消息和消費消息須要確保topic一致

日誌能夠先發送到kafka作緩衝,而後經過kafka的客戶端把消息取出來放到elk等日誌存儲系統中分析和可視化

由於kafka客戶端發送消息和服務端把消息保存到磁盤都是異步操做,因此存在服務器宕機後消息可能丟失,若是可靠性要求更高,能夠使用改進版的kafka:rocketmq

參考連接

  1. www.baeldung.com/spring-kafk…
  2. www.baeldung.com/spring-inje…
  3. docs.confluent.io/current/cli…
相關文章
相關標籤/搜索