聊聊skywalking的jdk-threading-plugin

本文主要研究一下skywalking的jdk-threading-pluginjava

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/bootstrap-plugins/jdk-threading-plugin/src/main/resources/skywalking-plugin.defgit

jdk-threading-plugin=org.apache.skywalking.apm.plugin.jdk.threading.define.RunnableInstrumentation
jdk-threading-plugin=org.apache.skywalking.apm.plugin.jdk.threading.define.CallableInstrumentation
  • skywalking的jdk-threading-plugin提供了RunnableInstrumentation、CallableInstrumentation兩個加強

RunnableInstrumentation

skywalking-6.6.0/apm-sniffer/bootstrap-plugins/jdk-threading-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdk/threading/define/RunnableInstrumentation.javagithub

public class RunnableInstrumentation extends ClassEnhancePluginDefine {
    private static final String RUNNABLE_CLASS = "java.lang.Runnable";
    private static final String RUNNABLE_CLASS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdk.threading.ThreadingConstructorInterceptor";

    private static final String RUNNABLE_RUN_METHOD = "run";
    private static final String RUNNABLE_RUN_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdk.threading.ThreadingMethodInterceptor";

    @Override
    protected ClassMatch enhanceClass() {
        final IndirectMatch prefixMatches = ThreadingConfig.prefixesMatchesForJdkThreading();

        if (prefixMatches == null) {
            return null;
        }

        return LogicalMatchOperation.and(prefixMatches, byHierarchyMatch(RUNNABLE_CLASS));
    }

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[]{
            new ConstructorInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return any();
                }

                @Override
                public String getConstructorInterceptor() {
                    return RUNNABLE_CLASS_INTERCEPTOR;
                }
            }
        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(RUNNABLE_RUN_METHOD).and(takesArguments(0));
                }

                @Override
                public String getMethodsInterceptor() {
                    return RUNNABLE_RUN_METHOD_INTERCEPTOR;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
        return new StaticMethodsInterceptPoint[0];
    }

    @Override
    public boolean isBootstrapInstrumentation() {
        return true;
    }
}
  • RunnableInstrumentation繼承了ClassEnhancePluginDefine,它加強的是實現了java.lang.Runnable接口的類;它使用org.apache.skywalking.apm.plugin.jdk.threading.ThreadingConstructorInterceptor加強了其構造器;它使用org.apache.skywalking.apm.plugin.jdk.threading.ThreadingMethodInterceptor加強了其run方法

CallableInstrumentation

skywalking-6.6.0/apm-sniffer/bootstrap-plugins/jdk-threading-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdk/threading/define/CallableInstrumentation.javaapache

public class CallableInstrumentation extends ClassEnhancePluginDefine {
    private static final String CALLABLE_CLASS = "java.util.concurrent.Callable";
    private static final String CALLABLE_CLASS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdk.threading.ThreadingConstructorInterceptor";

    private static final String CALLABLE_CALL_METHOD = "call";
    private static final String CALLABLE_CALL_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdk.threading.ThreadingMethodInterceptor";

    @Override
    protected ClassMatch enhanceClass() {
        final IndirectMatch prefixMatches = ThreadingConfig.prefixesMatchesForJdkThreading();

        if (prefixMatches == null) {
            return null;
        }

        return LogicalMatchOperation.and(prefixMatches, byHierarchyMatch(CALLABLE_CLASS));
    }

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[]{
            new ConstructorInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return any();
                }

                @Override
                public String getConstructorInterceptor() {
                    return CALLABLE_CLASS_INTERCEPTOR;
                }
            }
        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(CALLABLE_CALL_METHOD).and(takesArguments(0));
                }

                @Override
                public String getMethodsInterceptor() {
                    return CALLABLE_CALL_METHOD_INTERCEPTOR;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
        return new StaticMethodsInterceptPoint[0];
    }

    @Override
    public boolean isBootstrapInstrumentation() {
        return true;
    }
}
  • CallableInstrumentation繼承了ClassEnhancePluginDefine,它加強的是實現java.util.concurrent.Callable的類;它使用org.apache.skywalking.apm.plugin.jdk.threading.ThreadingConstructorInterceptor加強其構造器;它使用org.apache.skywalking.apm.plugin.jdk.threading.ThreadingMethodInterceptor加強其call方法

ThreadingConstructorInterceptor

skywalking-6.6.0/apm-sniffer/bootstrap-plugins/jdk-threading-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdk/threading/ThreadingConstructorInterceptor.javabootstrap

public class ThreadingConstructorInterceptor implements InstanceConstructorInterceptor {

    @Override
    public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) {
        if (ContextManager.isActive()) {
            objInst.setSkyWalkingDynamicField(ContextManager.capture());
        }
    }

}
  • ThreadingConstructorInterceptor實現了InstanceConstructorInterceptor接口,其onConstruct方法將ContextManager.capture()設置給objInst的skyWalkingDynamicField

ThreadingMethodInterceptor

skywalking-6.6.0/apm-sniffer/bootstrap-plugins/jdk-threading-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdk/threading/ThreadingMethodInterceptor.javaide

public class ThreadingMethodInterceptor implements InstanceMethodsAroundInterceptor {

    @Override
    public void beforeMethod(
        final EnhancedInstance objInst,
        final Method method,
        final Object[] allArguments,
        final Class<?>[] argumentsTypes,
        final MethodInterceptResult result) {

        AbstractSpan span = ContextManager.createLocalSpan(generateOperationName(objInst, method));
        span.setComponent(ComponentsDefine.JDK_THREADING);

        final Object storedField = objInst.getSkyWalkingDynamicField();
        if (storedField != null) {
            final ContextSnapshot contextSnapshot = (ContextSnapshot) storedField;
            ContextManager.continued(contextSnapshot);
        }

    }

    @Override
    public Object afterMethod(
        final EnhancedInstance objInst,
        final Method method,
        final Object[] allArguments,
        final Class<?>[] argumentsTypes,
        final Object ret) {

        final Object storedField = objInst.getSkyWalkingDynamicField();
        if (storedField != null) {
            ContextManager.stopSpan();
        }

        return ret;
    }

    @Override
    public void handleMethodException(
        final EnhancedInstance objInst,
        final Method method,
        final Object[] allArguments,
        final Class<?>[] argumentsTypes,
        final Throwable t) {

        if (ContextManager.isActive()) {
            ContextManager.activeSpan().errorOccurred().log(t);
        }
    }

    private String generateOperationName(final EnhancedInstance objInst, final Method method) {
        return "Threading/" + objInst.getClass().getName() + "/" + method.getName();
    }

}
  • ThreadingMethodInterceptor實現了InstanceMethodsAroundInterceptor接口,其beforeMethod方法執行ContextManager.createLocalSpan,而後獲取objInst.getSkyWalkingDynamicField(),若不爲null則執行ContextManager.continued(contextSnapshot);其afterMethod方法獲取objInst.getSkyWalkingDynamicField(),若不爲null則執行ContextManager.stopSpan();其handleMethodException方法執行ContextManager.activeSpan().errorOccurred().log(t)

小結

skywalking的jdk-threading-plugin提供了RunnableInstrumentation、CallableInstrumentation兩個加強spa

doc

相關文章
相關標籤/搜索