本文主要研究一下skywalking的SamplingServicejava
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.javagit
@DefaultImplementor public class SamplingService implements BootService { private static final ILog logger = LogManager.getLogger(SamplingService.class); private volatile boolean on = false; private volatile AtomicInteger samplingFactorHolder; private volatile ScheduledFuture<?> scheduledFuture; @Override public void prepare() throws Throwable { } @Override public void boot() throws Throwable { if (scheduledFuture != null) { /** * If {@link #boot()} invokes twice, mostly in test cases, * cancel the old one. */ scheduledFuture.cancel(true); } if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) { on = true; this.resetSamplingFactor(); ScheduledExecutorService service = Executors .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService")); scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() { @Override public void run() { resetSamplingFactor(); } }, new RunnableWithExceptionProtection.CallbackWhenException() { @Override public void handle(Throwable t) { logger.error("unexpected exception.", t); } }), 0, 3, TimeUnit.SECONDS); logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS); } } @Override public void onComplete() throws Throwable { } @Override public void shutdown() throws Throwable { if (scheduledFuture != null) { scheduledFuture.cancel(true); } } /** * @return true, if sampling mechanism is on, and getDefault the sampling factor successfully. */ public boolean trySampling() { if (on) { int factor = samplingFactorHolder.get(); if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) { boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1); return success; } else { return false; } } return true; } /** * Increase the sampling factor by force, * to avoid sampling too many traces. * If many distributed traces require sampled, * the trace beginning at local, has less chance to be sampled. */ public void forceSampled() { if (on) { samplingFactorHolder.incrementAndGet(); } } private void resetSamplingFactor() { samplingFactorHolder = new AtomicInteger(0); } }
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.javagithub
@DefaultImplementor public class ContextManagerExtendService implements BootService { @Override public void prepare() { } @Override public void boot() { } @Override public void onComplete() { } @Override public void shutdown() { } public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) { AbstractTracerContext context; int suffixIdx = operationName.lastIndexOf("."); if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) { context = new IgnoredTracerContext(); } else { SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); if (forceSampling || samplingService.trySampling()) { context = new TracingContext(); } else { context = new IgnoredTracerContext(); } } return context; } }
SamplingService在Config.Agent.SAMPLE_N_PER_3_SECS大於0時會註冊一個定時任務定時重置samplingFactorHolder爲0;它還提供了它還提供了trySampling方法給外部調用,該方法在samplingFactorHolder小於Config.Agent.SAMPLE_N_PER_3_SECS時,執行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法則在on的前提下執行samplingFactorHolder.incrementAndGet();ContextManagerExtendService的createTraceContext會經過ServiceManager.INSTANCE.findService而後在forceSampling或者samplingService.trySampling()爲true時返回TracingContext,其他的返回IgnoredTracerContextapache