【HAVENT原創】Spring Boot + Kafka 消息日誌開發

最近由於部門須要將服務程序的各類日誌發送給 Kafka 進行分析,因此寫一個 Kafka 消息日誌操做類,主要用來保存日誌到 Kafka 以便查詢。java

1、pom.xml 增長配置web

<!-- HH: 引入 kafka 模塊 -->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.0.0.RELEASE</version>
		</dependency>

		<!-- HH: 引入 fastjson 模塊 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.43</version>
		</dependency>

 

2、appication.yml 配置算法

server:
  port: 8081

spring:
  application:
    name: HAVENT-SPRING-BOOT-DEMO

  kafka:
    producer:
      bootstrap-servers: IP地址1:9092,IP地址2:9092,IP地址3:9092
    template:
      topic: mobile-service

logging:
  level:
    root: info

 

3、com.havent.logger.config.KafkaConfiguration 配置文件spring

package com.havent.logger.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfiguration {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String serverAddress;
    
    public Map<String, Object> producerConfigs() {
        System.out.println("HH > serverAddress: " + serverAddress);

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);

        // 若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性
        props.put(ProducerConfig.RETRIES_CONFIG, 1);

        // Request發送請求,即Batch批處理,以減小請求次數,該值即爲每次批處理的大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);

        /**
         * 這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,
         * 可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,
         * 這個設置將增長1毫秒的延遲請求以等待更多的消息。 須要注意的是,在高負載下,相近的時間通常也會組成批,即便是
         * linger.ms=0。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。
         */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);

        /**
         * 控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。
         * 當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定, 以後它將拋出一個TimeoutException。
         */
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }
    
}

 

4、com.havent.logger.request.LoggerMessageReq 文件sql

package com.havent.logger.request;

import java.sql.Timestamp;

public class LoggerMessageReq {

    private String appName;
    private Object message;
    private String loggerType;
    private String loggerLevel;
    private Timestamp timestamp;

    public LoggerMessageReq(String appName, Object message) {
        this.appName = appName;
        this.message = message;
        this.timestamp = new Timestamp(System.currentTimeMillis());
    }

    public String getAppName() {
        return appName;
    }

    public Object getMessage() {
        return message;
    }

    public void setMsg(Object message) {
        this.message = message;
    }

    public String getLoggerLevel() {
        return loggerLevel;
    }

    public void setLoggerLevel(String loggerLevel) {
        this.loggerLevel = loggerLevel;
    }

    public String getLoggerType() {
        return loggerType;
    }

    public void setLoggerType(String loggerType) {
        this.loggerType = loggerType;
    }

    public Timestamp getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Timestamp timestamp) {
        this.timestamp = timestamp;
    }
}

 

5、com.havent.logger.service.KafkaService 服務文件apache

package com.havent.logger.service;

import com.alibaba.fastjson.JSON;
import com.havent.demo.logger.request.LoggerMessageReq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * kafka 消息推送服務類
 * @author havent.liu
 */
@Component
public class KafkaService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.template.topic}")
    private String topic;

    @Value("${spring.application.name}")
    private String appName;

    public void trace(Object msg, String loggerType){
        this.sendMessage("trace", loggerType, msg);
    }
    public void trace(Object msg){
        this.sendMessage("trace", "", msg);
    }

    public void debug(Object msg, String loggerType){
        this.sendMessage("debug", loggerType, msg);
    }
    public void debug(Object msg){
        this.sendMessage("debug", "", msg);
    }

    public void info(Object msg, String loggerType) {
        this.sendMessage("info", loggerType, msg);
    }
    public void info(Object msg) {
        this.sendMessage("info", "", msg);
    }

    public void warn(Object msg, String loggerType){
        this.sendMessage("warn", loggerType, msg);
    }
    public void warn(Object msg){
        this.sendMessage("warn", "", msg);
    }

    public void error(Object msg, String loggerType){
        this.sendMessage("error", loggerType, msg);
    }
    public void error(Object msg){
        this.sendMessage("error", "", msg);
    }

    private void sendMessage(String loggerLevel, String loggerType, Object msg) {
        LoggerMessageReq loggerMessage = new LoggerMessageReq(appName, msg);
        loggerMessage.setLoggerLevel(loggerLevel);
        loggerMessage.setLoggerType(loggerType);
        String message = JSON.toJSONString(loggerMessage);

        this.sendMessage(message);
    }

    /**
     * 發送消息到 kafka
     */
    private void sendMessage(String message) {
        ListenableFuture future = kafkaTemplate.send(topic, message);
        future.addCallback(o -> System.out.println("kafka > 消息發送成功:" + message), throwable -> System.out.println("kafka > 消息發送失敗:" + message));
    }
    
}

 

6、com.havent.controller.HelloController 調用示例json

package com.havent.controller;

import com.alibaba.fastjson.JSON;
import com.havent.demo.logger.request.LoggerMessageReq;
import com.havent.demo.logger.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private KafkaService logger;

    @RequestMapping("/")
    public String index() {
        logger.info("test info");
        logger.trace(this.getClass());
        logger.warn(new LoggerMessageReq("testApp", "test message"));
        logger.error(JSON.parse("{id:111,name:'test',content:'something wrong!'}"));

        return "Hello World";
    }

}

 

執行效果:bootstrap

相關文章
相關標籤/搜索