聊聊skywalking的GRPCStreamServiceStatus

本文主要研究一下skywalking的GRPCStreamServiceStatusjava

GRPCStreamServiceStatus

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.javagit

public class GRPCStreamServiceStatus {
    private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class);
    private volatile boolean status;

    public GRPCStreamServiceStatus(boolean status) {
        this.status = status;
    }

    public boolean isStatus() {
        return status;
    }

    public void finished() {
        this.status = true;
    }

    /**
     * @param maxTimeout max wait time, milliseconds.
     */
    public boolean wait4Finish(long maxTimeout) {
        long time = 0;
        while (!status) {
            if (time > maxTimeout) {
                break;
            }
            try2Sleep(5);
            time += 5;
        }
        return status;
    }

    /**
     * Wait until success status reported.
     */
    public void wait4Finish() {
        long recheckCycle = 5;
        long hasWaited = 0L;
        long maxCycle = 30 * 1000L;// 30 seconds max.
        while (!status) {
            try2Sleep(recheckCycle);
            hasWaited += recheckCycle;

            if (recheckCycle >= maxCycle) {
                logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited / 1000);
            } else {
                recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2;
            }
        }
    }

    /**
     * Try to sleep, and ignore the {@link InterruptedException}
     *
     * @param millis the length of time to sleep in milliseconds
     */
    private void try2Sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {

        }
    }
}
  • GRPCStreamServiceStatus提供了finished方法用於將status設置爲true;它還提供了wait4Finish方法,該方法會一直等待status變爲true,同時在recheckCycle大於等於maxCycle時打印warn日誌,日誌格式爲Collector traceSegment service doesn't response in {} seconds.,參數值爲hasWaited / 1000

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.javagithub

@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {

    //......

    @Override
    public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    if (logger.isErrorEnable()) {
                        logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
                    }
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            try {
                for (TraceSegment segment : data) {
                    UpstreamSegment upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                logger.error(t, "Transform and send UpstreamSegment to collector fail.");
            }

            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
    }

    private void printUplinkStatus() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - lastLogTime > 30 * 1000) {
            lastLogTime = currentTimeMillis;
            if (segmentUplinkedCounter > 0) {
                logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);
                segmentUplinkedCounter = 0;
            }
            if (segmentAbandonedCounter > 0) {
                logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);
                segmentAbandonedCounter = 0;
            }
        }
    }

    //......

}
  • TraceSegmentServiceClient的consume方法經過upstreamSegmentStreamObserver來發送TraceSegment,該upstreamSegmentStreamObserver的withDeadlineAfter值爲GRPC_UPSTREAM_TIMEOUT,默認爲30秒,其StreamObserver的onCompleted方法會調用GRPCStreamServiceStatus.finished方法;consume方法最後會執行printUplinkStatus方法,打印trace segments的發送或者丟棄信息

小結

GRPCStreamServiceStatus提供了finished方法用於將status設置爲true;它還提供了wait4Finish方法,該方法會一直等待status變爲true,同時在recheckCycle大於等於maxCycle時打印warn日誌apache

doc

相關文章
相關標籤/搜索