import org.springframework.kafka.annotation.KafkaListener; import java.lang.annotation.*; @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @Documented @KafkaListener public @interface Listener { String[] topics() default {}; } import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; @Aspect @Component @Slf4j public class ListenAspect { @Before("@annotation(Listener)") public void beforeListener(JoinPoint point) { Object[] args = point.getArgs(); if (args[0] != null && args[0] instanceof ConsumerRecord) { ConsumerRecord consumerRecord = (ConsumerRecord) args[0]; log.debug("consumerRecord:{}", consumerRecord.toString()); String value = consumerRecord.value().toString(); log.trace("value:{}", value); } Class<?> className = point.getTarget().getClass(); String methodName = point.getSignature().getName(); Class[] argClass = ((MethodSignature) point.getSignature()).getParameterTypes(); log.trace("className:" + className); log.trace("methodName:" + methodName); log.trace("argClass:" + argClass); if (argClass[0] != null) { log.trace("argClass[0]:" + argClass[0].toString()); // try { // ConsumerRecord consumerRecord = (ConsumerRecord)argClass[0].newInstance(); // log.debug("consumerRecord:{}", consumerRecord.toString()); // } catch (InstantiationException|IllegalAccessException e) { // e.printStackTrace(); // } } try { Method method = className.getMethod(methodName, argClass); log.trace("method:" + method); if (method.isAnnotationPresent(Listener.class)) { Listener annotation = method.getAnnotation(Listener.class); log.trace("annotation:" + annotation); log.trace("topics:" + annotation.topics()); } } catch (Exception e) { log.warn(e.getMessage(), e); } } }