本文主要研究一下skywalking的http-async-client-pluginjava
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/resources/skywalking-plugin.defreact
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncClientInstrumentation.javagit
public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient"; private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient"; private static final String METHOD = "execute"; private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor"; private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return null; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named(METHOD).and(takesArguments(4) .and(takesArgument(0, named(FIRST_ARG_TYPE)))); } @Override public String getMethodsInterceptor() { return INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return true; } } }; } @Override protected ClassMatch enhanceClass() { return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL); } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptor.javagithub
public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1]; HttpContext context = (HttpContext) allArguments[2]; FutureCallback callback = (FutureCallback) allArguments[3]; allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer); allArguments[3] = new FutureCallbackWrapper(callback); CONTEXT_LOCAL.set(context); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/SessionRequestInstrumentation.javaapache
public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor"; private static final String COMPLETED_METHOD = "completed"; private static final String TIMEOUT_METHOD = "timeout"; private static final String FAILED_METHOD = "failed"; private static final String CANCEL_METHOD = "cancel"; private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor"; private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor"; private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() { @Override public ElementMatcher<MethodDescription> getConstructorMatcher() { return any(); } @Override public String getConstructorInterceptor() { return CONSTRUCTOR_INTERCEPTOR_CLASS; } } }; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named(COMPLETED_METHOD); } @Override public String getMethodsInterceptor() { return SUCCESS_INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return false; } },new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD))); } @Override public String getMethodsInterceptor() { return FAIL_INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override protected ClassMatch enhanceClass() { return byName(ENHANCE_CLASS); } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestConstructorInterceptor.javaapp
public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor { @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) { if (ContextManager.isActive()) { if (ContextManager.activeSpan().isExit()) { CONTEXT_LOCAL.remove(); return; } ContextSnapshot snapshot = ContextManager.capture(); objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()}); } CONTEXT_LOCAL.remove(); } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestCompleteInterceptor.javaasync
public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor { public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>(); @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { Object[] array = (Object[]) objInst.getSkyWalkingDynamicField(); if (array == null || array.length == 0) { return; } ContextSnapshot snapshot = (ContextSnapshot) array[0]; ContextManager.createLocalSpan("httpasyncclient/local"); if (snapshot != null) { ContextManager.continued(snapshot); } CONTEXT_LOCAL.set((HttpContext) array[1]); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestFailInterceptor.javaide
public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { //this means actual request will not started. so the span has not been created,we cannot log the status. CONTEXT_LOCAL.remove(); objInst.setSkyWalkingDynamicField(null); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncRequestExecutorInstrumentation.javathis
public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor"; private static final String METHOD = "requestReady"; private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return null; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { return named(METHOD); } @Override public String getMethodsInterceptor() { return INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override protected ClassMatch enhanceClass() { return byName(ENHANCE_CLASS); } }
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncRequestExecutorInterceptor.javaurl
public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor { @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { HttpContext context = CONTEXT_LOCAL.get(); CONTEXT_LOCAL.remove(); if (context == null) { return; } final ContextCarrier contextCarrier = new ContextCarrier(); HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST); HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST); RequestLine requestLine = requestWrapper.getRequestLine(); String uri = requestLine.getUri(); String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri; int port = httpHost.getPort(); AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port)); span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT); Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri()); Tags.HTTP.METHOD.set(span, requestLine.getMethod()); SpanLayer.asHttp(span); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue()); } } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { } }
skywalking的http-async-client-plugin定義了三個加強,分別是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation