Logstash+ Kafka基於AOP 實時同步日誌到es

Logstash是一個開源數據收集引擎,具備實時管道功能。Logstash能夠動態地未來自不一樣數據源的數據統一塊兒來,並將數據標準化到你所選擇的目的地,logstash豐富的插件(logstash-input-jdbc,logstash-input-kafka,logstash-input-rabbitmq,logstash-input-flie,logstash-input-syslog等,github地址: https://github.com/logstash-plugins)java

1.logstash-input-kafka將微服務日誌同步到 elasticsearch

1.1 基於AOP+Kafka同步數據原理

原理其實很簡單,就是基於JAVA的AOP技術攔截方法收集請求日誌和異常日誌發送到Kafka,而後經過logstash訂閱相應的topic來消費消息(即發佈訂閱模式)output到es來實現日誌收集

1.2 日誌收集配置文件

szhuangl_goods_log.conf

input {
  kafka {
    #kafaka服務地址
    bootstrap_servers => "server.natappfree.cc:33402"
    topics => ["szhuangl_goods_log"]
  }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
       #es服務地址  
       hosts => ["127.0.0.1:9200"]
       index => "szhuangl_goods_log"
    }
}

Kafka消息提供者代碼 

package com.szhuangl.basic.elk.kafka;

 

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

 

/**
 * @program: szhunagl-shop-parent
 * @author: Brian Huang
 * @create: 2019-10-19 16
 **/
@Component
@Slf4j
public class KafkaSender<T> {

    @Value("${szhuangl.log.topic: szhuangl_log}")
    private String log_topic;

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 

    /**
     * kafka 發送消息
     *
     * @param obj
     *            消息對象
     */

    public void send(T obj) {
        String jsonObj = JSON.toJSONString(obj);


        // 發送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(log_topic, jsonObj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

            @Override
            public void onFailure(Throwable throwable) {
                log.info("Produce: The message failed to be sent:" + throwable.getMessage());
            }

 
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                // TODO 業務處理
                log.info("Produce: The message was sent successfully:");
                log.info("Produce: >>>>>>>>>>>>>>>>>>> result: " + stringObjectSendResult.toString());
            }
        });
    }
}

AOP切面收集日誌代碼

package com.szhuangl.basic.elk.aop;
 

import com.alibaba.fastjson.JSONObject;
import com.szhuangl.basic.elk.kafka.KafkaSender;
import com.szhuangl.common.web.util.IpUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;

 

/**
 * @program: kafka日誌收集切面類
 * @author: Brian Huang
 * @create: 2019-10-19 16
 **/
@Aspect
@Component
@Slf4j
public class AopLogAspect {

    @Autowired
    private KafkaSender<JSONObject> kafkaSender;


    // 申明一個切點 裏面是 execution表達式
    @Pointcut("execution(* com.szhuangl.impl.*.service.*.*(..))")
    private void serviceAspect() {
    }


    // 請求method前打印內容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();

        String ip = IpUtils.getIpAddress(request);
        int localPort = request.getLocalPort();
        log.info("---localPort---:" + localPort);
        int serverPort = request.getServerPort();
        log.info("---serverPort---:" + serverPort);
        int remotePort = request.getRemotePort();
        log.info("---remotePort---:" + remotePort);
        JSONObject jsonObject = new JSONObject();

        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        jsonObject.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
        jsonObject.put("request_ip_port", ip);
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", joinPoint.getSignature());
        jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("szhuangl_request", jsonObject);
        kafkaSender.send(requestJsonObject);
    }

    // 在方法執行完結後打印返回內容
    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        jsonObject.put("response_time", LocalDateTime.now().format(dateTimeFormatter));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        respJSONObject.put("szhuangl_response", jsonObject);
        kafkaSender.send(respJSONObject);
    }
}

異常日誌收集代碼 

package com.szhuangl.basic.elk.aop.error;

 

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import com.szhuangl.basic.elk.kafka.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;

 

/**
 *
 * @description: 全局捕獲異常
 */
@ControllerAdvice
@Slf4j
public class GlobalExceptionHandler {

    @Autowired
    private KafkaSender<JSONObject> kafkaSender;
 

    @ExceptionHandler(RuntimeException.class)
    @ResponseBody
    public JSONObject exceptionHandler(Exception e) {
        log.info("<<<<<<<<<<<<<<<全局捕獲異常>>>>>>>>>>>>>>>>>,error:{}", e);
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
 

        // 1.封裝異常日誌信息
        JSONObject errorJson = new JSONObject();
        JSONObject logJson = new JSONObject();
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        logJson.put("request_time", LocalDateTime.now().format(dateTimeFormatter));
        logJson.put("error_info", e);
        errorJson.put("szhuangl_request_error", logJson);
        kafkaSender.send(errorJson);
        // 2. 返回錯誤信息
        JSONObject result = new JSONObject();
        result.put("code", 500);
        result.put("msg", "系統錯誤");
        return result;
    }
}

application.yml kafka配置信息

###服務端口
server:
  port: 8700
###eurake
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8100/eureka
spring:
  application:
    name:  szhuangl-server-goods
  data:
    elasticsearch:
      cluster-name: szhuangl_es
      cluster-nodes: j1ekxg71oe.52http.tech:51267
      repositories:
        enable: true
  kafka:
    #kafka配置信息
    bootstrap-servers: server.natappfree.cc:33402


#配置kafka的־topic
szhuangl:
  log:
    topic: szhuangl_goods_log
相關文章
相關標籤/搜索