本文主要研究一下skywalking的TopNDatabaseStatementjava
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.javagit
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class) public class TopNDatabaseStatement extends TopN { public static final String INDEX_NAME = "top_n_database_statement"; @Setter private String id; @Override public String id() { return id; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TopNDatabaseStatement statement = (TopNDatabaseStatement)o; return getServiceId() == statement.getServiceId(); } @Override public int hashCode() { return Objects.hash(getServiceId()); } public static class Builder implements StorageBuilder<TopNDatabaseStatement> { @Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) { TopNDatabaseStatement statement = new TopNDatabaseStatement(); statement.setStatement((String)dbMap.get(STATEMENT)); statement.setTraceId((String)dbMap.get(TRACE_ID)); statement.setLatency(((Number)dbMap.get(LATENCY)).longValue()); statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); return statement; } @Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) { Map<String, Object> map = new HashMap<>(); map.put(STATEMENT, storageData.getStatement()); map.put(TRACE_ID, storageData.getTraceId()); map.put(LATENCY, storageData.getLatency()); map.put(SERVICE_ID, storageData.getServiceId()); map.put(TIME_BUCKET, storageData.getTimeBucket()); return map; } } }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.javagithub
public abstract class TopN extends Record implements ComparableStorageData { public static final String STATEMENT = "statement"; public static final String LATENCY = "latency"; public static final String TRACE_ID = "trace_id"; public static final String SERVICE_ID = "service_id"; @Getter @Setter @Column(columnName = STATEMENT, content = true) private String statement; @Getter @Setter @Column(columnName = LATENCY) private long latency; @Getter @Setter @Column(columnName = TRACE_ID) private String traceId; @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId; @Override public int compareTo(Object o) { TopN target = (TopN)o; return (int)(latency - target.latency); } }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.javaapache
public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> { @Override public void dispatch(DatabaseSlowStatement source) { TopNDatabaseStatement statement = new TopNDatabaseStatement(); statement.setId(source.getId()); statement.setServiceId(source.getDatabaseServiceId()); statement.setLatency(source.getLatency()); statement.setStatement(source.getStatement()); statement.setTimeBucket(source.getTimeBucket()); statement.setTraceId(source.getTraceId()); TopNStreamProcessor.getInstance().in(statement); } }
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.javaide
public class TopNStreamProcessor implements StreamProcessor<TopN> { private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor(); @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>(); private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>(); @Setter @Getter private int topNWorkerReportCycle = 10; @Setter @Getter private int topSize = 50; public static TopNStreamProcessor getInstance() { return PROCESSOR; } @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) { if (DisableRegister.INSTANCE.include(stream.name())) { return; } StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRecordDAO recordDAO; try { recordDAO = storageDAO.newRecordDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO); persistentWorkers.add(persistentWorker); workers.put(topNClass, persistentWorker); } public void in(TopN topN) { TopNWorker worker = workers.get(topN.getClass()); if (worker != null) { worker.in(topN); } } }
TopNDatabaseStatement繼承了TopN,定義了Builder;DatabaseStatementDispatcher實現了SourceDispatcher接口,其dispatch方法將DatabaseSlowStatement轉換爲TopNDatabaseStatement,而後執行TopNStreamProcessor.getInstance().in(statement)ui