最近由於部門須要將服務程序的各類日誌發送給 Kafka 進行分析,因此寫一個 Kafka 消息日誌操做類,主要用來保存日誌到 Kafka 以便查詢。java
1、pom.xml 增長配置web
<!-- HH: 引入 kafka 模塊 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.0.0.RELEASE</version> </dependency> <!-- HH: 引入 fastjson 模塊 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.43</version> </dependency>
2、appication.yml 配置算法
server: port: 8081 spring: application: name: HAVENT-SPRING-BOOT-DEMO kafka: producer: bootstrap-servers: IP地址1:9092,IP地址2:9092,IP地址3:9092 template: topic: mobile-service logging: level: root: info
3、com.havent.logger.config.KafkaConfiguration 配置文件spring
package com.havent.logger.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConfiguration { @Value("${spring.kafka.producer.bootstrap-servers}") private String serverAddress; public Map<String, Object> producerConfigs() { System.out.println("HH > serverAddress: " + serverAddress); Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress); // 若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性 props.put(ProducerConfig.RETRIES_CONFIG, 1); // Request發送請求,即Batch批處理,以減小請求次數,該值即爲每次批處理的大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); /** * 這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段, * 可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區, * 這個設置將增長1毫秒的延遲請求以等待更多的消息。 須要注意的是,在高負載下,相近的時間通常也會組成批,即便是 * linger.ms=0。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); /** * 控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。 * 當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定, 以後它將拋出一個TimeoutException。 */ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
4、com.havent.logger.request.LoggerMessageReq 文件sql
package com.havent.logger.request; import java.sql.Timestamp; public class LoggerMessageReq { private String appName; private Object message; private String loggerType; private String loggerLevel; private Timestamp timestamp; public LoggerMessageReq(String appName, Object message) { this.appName = appName; this.message = message; this.timestamp = new Timestamp(System.currentTimeMillis()); } public String getAppName() { return appName; } public Object getMessage() { return message; } public void setMsg(Object message) { this.message = message; } public String getLoggerLevel() { return loggerLevel; } public void setLoggerLevel(String loggerLevel) { this.loggerLevel = loggerLevel; } public String getLoggerType() { return loggerType; } public void setLoggerType(String loggerType) { this.loggerType = loggerType; } public Timestamp getTimestamp() { return timestamp; } public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; } }
5、com.havent.logger.service.KafkaService 服務文件apache
package com.havent.logger.service; import com.alibaba.fastjson.JSON; import com.havent.demo.logger.request.LoggerMessageReq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; /** * kafka 消息推送服務類 * @author havent.liu */ @Component public class KafkaService { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${spring.kafka.template.topic}") private String topic; @Value("${spring.application.name}") private String appName; public void trace(Object msg, String loggerType){ this.sendMessage("trace", loggerType, msg); } public void trace(Object msg){ this.sendMessage("trace", "", msg); } public void debug(Object msg, String loggerType){ this.sendMessage("debug", loggerType, msg); } public void debug(Object msg){ this.sendMessage("debug", "", msg); } public void info(Object msg, String loggerType) { this.sendMessage("info", loggerType, msg); } public void info(Object msg) { this.sendMessage("info", "", msg); } public void warn(Object msg, String loggerType){ this.sendMessage("warn", loggerType, msg); } public void warn(Object msg){ this.sendMessage("warn", "", msg); } public void error(Object msg, String loggerType){ this.sendMessage("error", loggerType, msg); } public void error(Object msg){ this.sendMessage("error", "", msg); } private void sendMessage(String loggerLevel, String loggerType, Object msg) { LoggerMessageReq loggerMessage = new LoggerMessageReq(appName, msg); loggerMessage.setLoggerLevel(loggerLevel); loggerMessage.setLoggerType(loggerType); String message = JSON.toJSONString(loggerMessage); this.sendMessage(message); } /** * 發送消息到 kafka */ private void sendMessage(String message) { ListenableFuture future = kafkaTemplate.send(topic, message); future.addCallback(o -> System.out.println("kafka > 消息發送成功:" + message), throwable -> System.out.println("kafka > 消息發送失敗:" + message)); } }
6、com.havent.controller.HelloController 調用示例json
package com.havent.controller; import com.alibaba.fastjson.JSON; import com.havent.demo.logger.request.LoggerMessageReq; import com.havent.demo.logger.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaService logger; @RequestMapping("/") public String index() { logger.info("test info"); logger.trace(this.getClass()); logger.warn(new LoggerMessageReq("testApp", "test message")); logger.error(JSON.parse("{id:111,name:'test',content:'something wrong!'}")); return "Hello World"; } }
執行效果:bootstrap