聊聊skywalking的TopNDatabaseStatement

本文主要研究一下skywalking的TopNDatabaseStatementjava

TopNDatabaseStatement

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.javagit

@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
public class TopNDatabaseStatement extends TopN {

    public static final String INDEX_NAME = "top_n_database_statement";

    @Setter private String id;

    @Override public String id() {
        return id;
    }

    @Override public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
        return getServiceId() == statement.getServiceId();
    }

    @Override public int hashCode() {
        return Objects.hash(getServiceId());
    }

    public static class Builder implements StorageBuilder<TopNDatabaseStatement> {

        @Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
            TopNDatabaseStatement statement = new TopNDatabaseStatement();
            statement.setStatement((String)dbMap.get(STATEMENT));
            statement.setTraceId((String)dbMap.get(TRACE_ID));
            statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
            statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
            statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
            return statement;
        }

        @Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
            Map<String, Object> map = new HashMap<>();
            map.put(STATEMENT, storageData.getStatement());
            map.put(TRACE_ID, storageData.getTraceId());
            map.put(LATENCY, storageData.getLatency());
            map.put(SERVICE_ID, storageData.getServiceId());
            map.put(TIME_BUCKET, storageData.getTimeBucket());
            return map;
        }
    }
}
  • TopNDatabaseStatement繼承了TopN,定義了Builder

TopN

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.javagithub

public abstract class TopN extends Record implements ComparableStorageData {
    public static final String STATEMENT = "statement";
    public static final String LATENCY = "latency";
    public static final String TRACE_ID = "trace_id";
    public static final String SERVICE_ID = "service_id";

    @Getter @Setter @Column(columnName = STATEMENT, content = true) private String statement;
    @Getter @Setter @Column(columnName = LATENCY) private long latency;
    @Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
    @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;

    @Override public int compareTo(Object o) {
        TopN target = (TopN)o;
        return (int)(latency - target.latency);
    }
}
  • TopN定義了statement、latency、trace_id、service_id屬性

DatabaseStatementDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.javaapache

public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {

    @Override public void dispatch(DatabaseSlowStatement source) {
        TopNDatabaseStatement statement = new TopNDatabaseStatement();
        statement.setId(source.getId());
        statement.setServiceId(source.getDatabaseServiceId());
        statement.setLatency(source.getLatency());
        statement.setStatement(source.getStatement());
        statement.setTimeBucket(source.getTimeBucket());
        statement.setTraceId(source.getTraceId());

        TopNStreamProcessor.getInstance().in(statement);
    }
}
  • DatabaseStatementDispatcher實現了SourceDispatcher接口,其dispatch方法將DatabaseSlowStatement轉換爲TopNDatabaseStatement,而後執行TopNStreamProcessor.getInstance().in(statement)

TopNStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.javaide

public class TopNStreamProcessor implements StreamProcessor<TopN> {

    private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor();

    @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
    private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
    @Setter @Getter private int topNWorkerReportCycle = 10;
    @Setter @Getter private int topSize = 50;

    public static TopNStreamProcessor getInstance() {
        return PROCESSOR;
    }

    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
        if (DisableRegister.INSTANCE.include(stream.name())) {
            return;
        }

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
        }

        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
        Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);

        TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
        persistentWorkers.add(persistentWorker);
        workers.put(topNClass, persistentWorker);
    }

    public void in(TopN topN) {
        TopNWorker worker = workers.get(topN.getClass());
        if (worker != null) {
            worker.in(topN);
        }
    }
}
  • TopNStreamProcessor實現了StreamProcessor接口,其in方法從workers中獲取TopNWorker,執行worker.in(topN)

小結

TopNDatabaseStatement繼承了TopN,定義了Builder;DatabaseStatementDispatcher實現了SourceDispatcher接口,其dispatch方法將DatabaseSlowStatement轉換爲TopNDatabaseStatement,而後執行TopNStreamProcessor.getInstance().in(statement)ui

doc

相關文章
相關標籤/搜索