Logstash是一個開源數據收集引擎,具備實時管道功能。Logstash能夠動態地未來自不一樣數據源的數據統一塊兒來,並將數據標準化到你所選擇的目的地,logstash豐富的插件(logstash-input-jdbc,logstash-input-kafka,logstash-input-rabbitmq,logstash-input-flie,logstash-input-syslog等,github地址: https://github.com/logstash-plugins)java
原理其實很簡單,就是基於JAVA的AOP技術攔截方法收集請求日誌和異常日誌發送到Kafka,而後經過logstash訂閱相應的topic來消費消息(即發佈訂閱模式)output到es來實現日誌收集
input { kafka { #kafaka服務地址 bootstrap_servers => "server.natappfree.cc:33402" topics => ["szhuangl_goods_log"] } } output { stdout { codec => rubydebug } elasticsearch { #es服務地址 hosts => ["127.0.0.1:9200"] index => "szhuangl_goods_log" } }
package com.szhuangl.basic.elk.kafka; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * @program: szhunagl-shop-parent * @author: Brian Huang * @create: 2019-10-19 16 **/ @Component @Slf4j public class KafkaSender<T> { @Value("${szhuangl.log.topic: szhuangl_log}") private String log_topic; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * kafka 發送消息 * * @param obj * 消息對象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); // 發送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(log_topic, jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { // TODO 業務處理 log.info("Produce: The message was sent successfully:"); log.info("Produce: >>>>>>>>>>>>>>>>>>> result: " + stringObjectSendResult.toString()); } }); } }
package com.szhuangl.basic.elk.aop; import com.alibaba.fastjson.JSONObject; import com.szhuangl.basic.elk.kafka.KafkaSender; import com.szhuangl.common.web.util.IpUtils; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; /** * @program: kafka日誌收集切面類 * @author: Brian Huang * @create: 2019-10-19 16 **/ @Aspect @Component @Slf4j public class AopLogAspect { @Autowired private KafkaSender<JSONObject> kafkaSender; // 申明一個切點 裏面是 execution表達式 @Pointcut("execution(* com.szhuangl.impl.*.service.*.*(..))") private void serviceAspect() { } // 請求method前打印內容 @Before(value = "serviceAspect()") public void methodBefore(JoinPoint joinPoint) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); String ip = IpUtils.getIpAddress(request); int localPort = request.getLocalPort(); log.info("---localPort---:" + localPort); int serverPort = request.getServerPort(); log.info("---serverPort---:" + serverPort); int remotePort = request.getRemotePort(); log.info("---remotePort---:" + remotePort); JSONObject jsonObject = new JSONObject(); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); jsonObject.put("request_time", LocalDateTime.now().format(dateTimeFormatter)); jsonObject.put("request_ip_port", ip); jsonObject.put("request_url", request.getRequestURL().toString()); jsonObject.put("request_method", request.getMethod()); jsonObject.put("signature", joinPoint.getSignature()); jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs())); JSONObject requestJsonObject = new JSONObject(); requestJsonObject.put("szhuangl_request", jsonObject); kafkaSender.send(requestJsonObject); } // 在方法執行完結後打印返回內容 @AfterReturning(returning = "o", pointcut = "serviceAspect()") public void methodAfterReturing(Object o) { JSONObject respJSONObject = new JSONObject(); JSONObject jsonObject = new JSONObject(); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); jsonObject.put("response_time", LocalDateTime.now().format(dateTimeFormatter)); jsonObject.put("response_content", JSONObject.toJSONString(o)); respJSONObject.put("szhuangl_response", jsonObject); kafkaSender.send(respJSONObject); } }
package com.szhuangl.basic.elk.aop.error; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Date; import com.szhuangl.basic.elk.kafka.KafkaSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * * @description: 全局捕獲異常 */ @ControllerAdvice @Slf4j public class GlobalExceptionHandler { @Autowired private KafkaSender<JSONObject> kafkaSender; @ExceptionHandler(RuntimeException.class) @ResponseBody public JSONObject exceptionHandler(Exception e) { log.info("<<<<<<<<<<<<<<<全局捕獲異常>>>>>>>>>>>>>>>>>,error:{}", e); ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); // 1.封裝異常日誌信息 JSONObject errorJson = new JSONObject(); JSONObject logJson = new JSONObject(); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); logJson.put("request_time", LocalDateTime.now().format(dateTimeFormatter)); logJson.put("error_info", e); errorJson.put("szhuangl_request_error", logJson); kafkaSender.send(errorJson); // 2. 返回錯誤信息 JSONObject result = new JSONObject(); result.put("code", 500); result.put("msg", "系統錯誤"); return result; } }
###服務端口 server: port: 8700 ###eurake eureka: client: service-url: defaultZone: http://localhost:8100/eureka spring: application: name: szhuangl-server-goods data: elasticsearch: cluster-name: szhuangl_es cluster-nodes: j1ekxg71oe.52http.tech:51267 repositories: enable: true kafka: #kafka配置信息 bootstrap-servers: server.natappfree.cc:33402 #配置kafka的־topic szhuangl: log: topic: szhuangl_goods_log