聊聊skywalking的http-async-client-plugin

本文主要研究一下skywalking的http-async-client-pluginjava

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/resources/skywalking-plugin.defreact

httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation
複製代碼
  • skywalking的http-async-client-plugin定義了三個加強,分別是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

HttpAsyncClientInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncClientInstrumentation.javagit

public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient";
    private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient";
    private static final String METHOD = "execute";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor";
    private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer";

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return null;
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(METHOD).and(takesArguments(4)
                        .and(takesArgument(0, named(FIRST_ARG_TYPE))));
            }

            @Override
            public String getMethodsInterceptor() {
                return INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return true;
            }
        }
        };
    }

    @Override
    protected ClassMatch enhanceClass() {
        return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL);
    }
}
複製代碼
  • HttpAsyncClientInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor加強了org.apache.http.impl.nio.client.MinimalHttpAsyncClient以及org.apache.http.impl.nio.client.InternalHttpAsyncClient類的有四個參數且第一個參數類型爲org.apache.http.nio.protocol.HttpAsyncRequestProducer的execute方法

HttpAsyncClientInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptor.javagithub

public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor {


    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1];
        HttpContext context = (HttpContext) allArguments[2];
        FutureCallback callback = (FutureCallback) allArguments[3];
        allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer);
        allArguments[3] = new FutureCallbackWrapper(callback);
        CONTEXT_LOCAL.set(context);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

    }
}
複製代碼
  • HttpAsyncClientInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法使用HttpAsyncResponseConsumerWrapper包裝了第二個參數,使用FutureCallbackWrapper包裝了第四個參數,最後執行CONTEXT_LOCAL.set(context);

SessionRequestInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/SessionRequestInstrumentation.javaapache

public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor";
    private static final String COMPLETED_METHOD = "completed";
    private static final String TIMEOUT_METHOD = "timeout";
    private static final String FAILED_METHOD = "failed";
    private static final String CANCEL_METHOD = "cancel";
    private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor";
    private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor";
    private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getConstructorMatcher() {
                return any();
            }

            @Override
            public String getConstructorInterceptor() {
                return CONSTRUCTOR_INTERCEPTOR_CLASS;
            }
        }
        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(COMPLETED_METHOD);
            }

            @Override
            public String getMethodsInterceptor() {
                return SUCCESS_INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        },new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD)));
            }

            @Override
            public String getMethodsInterceptor() {
                return FAIL_INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
        };
    }

    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);
    }
}
複製代碼
  • SessionRequestInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它加強的類爲org.apache.http.impl.nio.reactor.SessionRequestImpl;它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor加強了構造器方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor加強了completed方法,使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor加強了timeout、failed、cancel方法

SessionRequestConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestConstructorInterceptor.javabash

public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        if (ContextManager.isActive()) {
            if (ContextManager.activeSpan().isExit()) {
                CONTEXT_LOCAL.remove();
                return;
            }
            ContextSnapshot snapshot = ContextManager.capture();
            objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()});
        }
        CONTEXT_LOCAL.remove();
    }
}
複製代碼
  • SessionRequestConstructorInterceptor實現了InstanceConstructorInterceptor接口,其onConstruct方法執行ContextManager.capture()並將snapshot及CONTEXT_LOCAL設置到object的skyWalkingDynamicField

SessionRequestCompleteInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestCompleteInterceptor.javaapp

public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor {

    public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>();

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        Object[] array = (Object[]) objInst.getSkyWalkingDynamicField();
        if (array == null || array.length == 0) {
            return;
        }
        ContextSnapshot snapshot = (ContextSnapshot) array[0];
        ContextManager.createLocalSpan("httpasyncclient/local");
        if (snapshot != null) {
            ContextManager.continued(snapshot);
        }
        CONTEXT_LOCAL.set((HttpContext) array[1]);


    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

    }
}
複製代碼
  • SessionRequestCompleteInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法主要是獲取snapshot,執行ContextManager.continued(snapshot),並設置CONTEXT_LOCAL

SessionRequestFailInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestFailInterceptor.javaasync

public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor {


    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        //this means actual request will not started. so the span has not been created,we cannot log the status.
        CONTEXT_LOCAL.remove();
        objInst.setSkyWalkingDynamicField(null);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

    }
}
複製代碼
  • SessionRequestFailInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法執行CONTEXT_LOCAL.remove()及objInst.setSkyWalkingDynamicField(null)

HttpAsyncRequestExecutorInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncRequestExecutorInstrumentation.javaide

public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";
    private static final String METHOD = "requestReady";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor";

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return null;
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named(METHOD);
            }

            @Override
            public String getMethodsInterceptor() {
                return INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
        };
    }

    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);
    }
}
複製代碼
  • HttpAsyncRequestExecutorInstrumentation繼承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor加強org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor的requestReady方法

HttpAsyncRequestExecutorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncRequestExecutorInterceptor.javaui

public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor {


    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
        HttpContext context = CONTEXT_LOCAL.get();
        CONTEXT_LOCAL.remove();
        if (context == null) {
            return;
        }
        final ContextCarrier contextCarrier = new ContextCarrier();
        HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST);
        HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST);

        RequestLine requestLine = requestWrapper.getRequestLine();
        String uri = requestLine.getUri();
        String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri;
        int port = httpHost.getPort();
        AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port));
        span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);
        Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri());
        Tags.HTTP.METHOD.set(span, requestLine.getMethod());
        SpanLayer.asHttp(span);
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue());
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

    }
}
複製代碼
  • HttpAsyncRequestExecutorInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法執行ContextManager.createExitSpan,而後設置url、method的tag,最後將contextCarrier.items()經過requestWrapper的header進行透傳

小結

skywalking的http-async-client-plugin定義了三個加強,分別是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation

doc

相關文章
相關標籤/搜索