聊聊skywalking的SamplingService

本文主要研究一下skywalking的SamplingServicejava

SamplingService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.javagit

@DefaultImplementor
public class SamplingService implements BootService {
    private static final ILog logger = LogManager.getLogger(SamplingService.class);

    private volatile boolean on = false;
    private volatile AtomicInteger samplingFactorHolder;
    private volatile ScheduledFuture<?> scheduledFuture;

    @Override
    public void prepare() throws Throwable {

    }

    @Override
    public void boot() throws Throwable {
        if (scheduledFuture != null) {
            /**
             * If {@link #boot()} invokes twice, mostly in test cases,
             * cancel the old one.
             */
            scheduledFuture.cancel(true);
        }
        if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors
                .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
                @Override
                public void run() {
                    resetSamplingFactor();
                }
            }, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, 3, TimeUnit.SECONDS);
            logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
        }
    }

    @Override
    public void onComplete() throws Throwable {

    }

    @Override
    public void shutdown() throws Throwable {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
    }

    /**
     * @return true, if sampling mechanism is on, and getDefault the sampling factor successfully.
     */
    public boolean trySampling() {
        if (on) {
            int factor = samplingFactorHolder.get();
            if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
                boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
                return success;
            } else {
                return false;
            }
        }
        return true;
    }

    /**
     * Increase the sampling factor by force,
     * to avoid sampling too many traces.
     * If many distributed traces require sampled,
     * the trace beginning at local, has less chance to be sampled.
     */
    public void forceSampled() {
        if (on) {
            samplingFactorHolder.incrementAndGet();
        }
    }

    private void resetSamplingFactor() {
        samplingFactorHolder = new AtomicInteger(0);
    }
}
  • SamplingService實現了BootService接口,其boot方法在Config.Agent.SAMPLE_N_PER_3_SECS大於0時標記on爲true,而後先執行一下resetSamplingFactor,以後註冊一個定時任務定時執行resetSamplingFactor;其shutdown方法會cancel掉這個定時任務;resetSamplingFactor方法會重置samplingFactorHolder爲0;它還提供了trySampling方法給外部調用,該方法在samplingFactorHolder小於Config.Agent.SAMPLE_N_PER_3_SECS時,執行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法則在on的前提下執行samplingFactorHolder.incrementAndGet()

ContextManagerExtendService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.javagithub

@DefaultImplementor
public class ContextManagerExtendService implements BootService {
    @Override public void prepare() {

    }

    @Override public void boot() {

    }

    @Override public void onComplete() {

    }

    @Override public void shutdown() {

    }

    public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
        AbstractTracerContext context;
        int suffixIdx = operationName.lastIndexOf(".");
        if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
            context = new IgnoredTracerContext();
        } else {
            SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
            if (forceSampling || samplingService.trySampling()) {
                context = new TracingContext();
            } else {
                context = new IgnoredTracerContext();
            }
        }

        return context;
    }
}
  • ContextManagerExtendService實現了BootService接口,不過都是空方法,它提供了createTraceContext方法,該方法根據配置決定返回IgnoredTracerContext仍是TracingContext;在非Config.Agent.IGNORE_SUFFIX的條件下,在forceSampling或者samplingService.trySampling()返回true時,會返回TracingContext

小結

SamplingService在Config.Agent.SAMPLE_N_PER_3_SECS大於0時會註冊一個定時任務定時重置samplingFactorHolder爲0;它還提供了它還提供了trySampling方法給外部調用,該方法在samplingFactorHolder小於Config.Agent.SAMPLE_N_PER_3_SECS時,執行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法則在on的前提下執行samplingFactorHolder.incrementAndGet();ContextManagerExtendService的createTraceContext會經過ServiceManager.INSTANCE.findService而後在forceSampling或者samplingService.trySampling()爲true時返回TracingContext,其他的返回IgnoredTracerContextapache

doc

相關文章
相關標籤/搜索