聊聊skywalking的HTTPAccessLog

本文主要研究一下skywalking的HTTPAccessLogjava

HTTPAccessLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/HTTPAccessLog.javagit

@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog")
public class HTTPAccessLog extends AbstractLog {
    @Override public int scope() {
        return HTTP_ACCESS_LOG;
    }
}
  • HTTPAccessLog繼承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG

AbstractLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/AbstractLog.javagithub

@Setter
@Getter
public abstract class AbstractLog extends Source {
    private long timeBucket;
    private long timestamp;
    private int serviceId;
    private int serviceInstanceId;
    private int endpointId;
    private String traceId;
    private int isError;
    private String statusCode;
    private ContentType contentType = ContentType.NONE;
    private String content;

    @Override public String getEntityId() {
        throw new UnexpectedException("getEntityId is not supported in AbstractLog source");
    }
}
  • AbstractLog繼承了Source,它定義了timeBucket、timestamp、serviceId、serviceInstanceId、endpointId、traceId、isError、statusCode、contentType、content屬性

HTTPAccessLogDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogDispatcher.javaapache

public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {

    @Override public void dispatch(HTTPAccessLog source) {
        HTTPAccessLogRecord record = new HTTPAccessLogRecord();
        record.setTimestamp(source.getTimestamp());
        record.setTimeBucket(source.getTimeBucket());
        record.setServiceId(source.getServiceId());
        record.setServiceInstanceId(source.getServiceInstanceId());
        record.setEndpointId(source.getEndpointId());
        record.setTraceId(source.getTraceId());
        record.setIsError(source.getIsError());
        record.setStatusCode(source.getStatusCode());
        record.setContentType(source.getContentType().value());
        record.setContent(source.getContent());

        RecordStreamProcessor.getInstance().in(record);
    }
}
  • HTTPAccessLogDispatcher實現了SourceDispatcher接口,其dispatch將HTTPAccessLog轉換爲HTTPAccessLogRecord,而後執行RecordStreamProcessor.getInstance().in(record)

RecordStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.javaasync

public class RecordStreamProcessor implements StreamProcessor<Record> {

    private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();

    private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();

    public static RecordStreamProcessor getInstance() {
        return PROCESSOR;
    }

    public void in(Record record) {
        RecordPersistentWorker worker = workers.get(record.getClass());
        if (worker != null) {
            worker.in(record);
        }
    }

    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
        if (DisableRegister.INSTANCE.include(stream.name())) {
            return;
        }

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
        } catch (InstantiationException | IllegalAccessException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
        }

        IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
        Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);

        workers.put(recordClass, persistentWorker);
    }
}
  • RecordStreamProcessor實現了StreamProcessor接口,其in方法從workers中找出record.getClass()對應的RecordPersistentWorker,而後執行其in方法

RecordPersistentWorker

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.javaide

public class RecordPersistentWorker extends AbstractWorker<Record> {

    private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);

    private final Model model;
    private final IRecordDAO recordDAO;
    private final IBatchDAO batchDAO;

    RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
        super(moduleDefineHolder);
        this.model = model;
        this.recordDAO = recordDAO;
        this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
    }

    @Override public void in(Record record) {
        try {
            InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
            batchDAO.asynchronous(insertRequest);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }
}
  • RecordPersistentWorker繼承了AbstractWorker,其in方法執行recordDAO.prepareBatchInsert(model, record),而後用返回的insertRequest執行batchDAO.asynchronous(insertRequest)

小結

HTTPAccessLog繼承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG;HTTPAccessLogDispatcher實現了SourceDispatcher接口,其dispatch將HTTPAccessLog轉換爲HTTPAccessLogRecord,而後執行RecordStreamProcessor.getInstance().in(record)ui

doc

相關文章
相關標籤/搜索