本文主要研究一下skywalking的SamplingServicejava
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.javagit
@DefaultImplementor
public class SamplingService implements BootService {
private static final ILog logger = LogManager.getLogger(SamplingService.class);
private volatile boolean on = false;
private volatile AtomicInteger samplingFactorHolder;
private volatile ScheduledFuture<?> scheduledFuture;
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
if (scheduledFuture != null) {
/**
* If {@link #boot()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture.cancel(true);
}
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* @return true, if sampling mechanism is on, and getDefault the sampling factor successfully.
*/
public boolean trySampling() {
if (on) {
int factor = samplingFactorHolder.get();
if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success;
} else {
return false;
}
}
return true;
}
/**
* Increase the sampling factor by force,
* to avoid sampling too many traces.
* If many distributed traces require sampled,
* the trace beginning at local, has less chance to be sampled.
*/
public void forceSampled() {
if (on) {
samplingFactorHolder.incrementAndGet();
}
}
private void resetSamplingFactor() {
samplingFactorHolder = new AtomicInteger(0);
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.javagithub
@DefaultImplementor
public class ContextManagerExtendService implements BootService {
@Override public void prepare() {
}
@Override public void boot() {
}
@Override public void onComplete() {
}
@Override public void shutdown() {
}
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
context = new IgnoredTracerContext();
}
}
return context;
}
}
複製代碼
SamplingService在Config.Agent.SAMPLE_N_PER_3_SECS大於0時會註冊一個定時任務定時重置samplingFactorHolder爲0;它還提供了它還提供了trySampling方法給外部調用,該方法在samplingFactorHolder小於Config.Agent.SAMPLE_N_PER_3_SECS時,執行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法則在on的前提下執行samplingFactorHolder.incrementAndGet();ContextManagerExtendService的createTraceContext會經過ServiceManager.INSTANCE.findService而後在forceSampling或者samplingService.trySampling()爲true時返回TracingContext,其他的返回IgnoredTracerContextapache