本文主要研究一下skywalking的spring-webflux-pluginjava
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/DispatcherHandlerInstrumentation.javareact
public class DispatcherHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[0]; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{ new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named("handle"); } @Override public String getMethodsInterceptor() { return "org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor"; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override protected ClassMatch enhanceClass() { return byName("org.springframework.web.reactive.DispatcherHandler"); } }
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/DispatcherHandlerHandleMethodInterceptor.javagit
public class DispatcherHandlerHandleMethodInterceptor implements InstanceMethodsAroundInterceptor { private static final String DEFAULT_OPERATION_NAME = "WEBFLUX.handle"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { EnhancedInstance instance = getInstance(allArguments[0]); ServerWebExchange exchange = (ServerWebExchange) allArguments[0]; ContextCarrier carrier = new ContextCarrier(); CarrierItem next = carrier.items(); HttpHeaders headers = exchange.getRequest().getHeaders(); while (next.hasNext()) { next = next.next(); List<String> header = headers.get(next.getHeadKey()); if (header != null && header.size() > 0) { next.setHeadValue(header.get(0)); } } AbstractSpan span = ContextManager.createEntrySpan(DEFAULT_OPERATION_NAME, carrier); span.setComponent(ComponentsDefine.SPRING_WEBFLUX); SpanLayer.asHttp(span); Tags.URL.set(span, exchange.getRequest().getURI().toString()); HTTP.METHOD.set(span, exchange.getRequest().getMethodValue()); instance.setSkyWalkingDynamicField(ContextManager.capture()); span.prepareForAsync(); ContextManager.stopSpan(span); return ((Mono) ret).doFinally(s -> { try { Object pathPattern = exchange.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE); if (pathPattern != null) { span.setOperationName(((PathPattern) pathPattern).getPatternString()); } HttpStatus httpStatus = exchange.getResponse().getStatusCode(); // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support if (httpStatus != null) { Tags.STATUS_CODE.set(span, Integer.toString(httpStatus.value())); if (httpStatus.isError()) { span.errorOccurred(); } } } finally { span.asyncFinish(); } }); } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } public static EnhancedInstance getInstance(Object o) { EnhancedInstance instance = null; if (o instanceof DefaultServerWebExchange) { instance = (EnhancedInstance) o; } else if (o instanceof ServerWebExchangeDecorator) { ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate(); return getInstance(delegate); } return instance; } }
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/define/ServerWebExchangeInstrumentation.javagithub
public class ServerWebExchangeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[]{ new ConstructorInterceptPoint() { @Override public ElementMatcher<MethodDescription> getConstructorMatcher() { return any(); } @Override public String getConstructorInterceptor() { return "org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor"; } } }; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[0]; } @Override protected ClassMatch enhanceClass() { return byName("org.springframework.web.server.adapter.DefaultServerWebExchange"); } }
skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/spring-webflux-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/webflux/v5/ServerWebExchangeConstructorInterceptor.javaweb
public class ServerWebExchangeConstructorInterceptor implements InstanceConstructorInterceptor { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { } }
DispatcherHandlerInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor加強org.springframework.web.reactive.DispatcherHandler的handle方法;ServerWebExchangeInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,其使用org.apache.skywalking.apm.plugin.spring.webflux.v5.ServerWebExchangeConstructorInterceptor加強org.springframework.web.server.adapter.DefaultServerWebExchange的全部方法spring