本文主要研究一下skywalking的GRPCStreamServiceStatusjava
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCStreamServiceStatus.javagit
public class GRPCStreamServiceStatus { private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class); private volatile boolean status; public GRPCStreamServiceStatus(boolean status) { this.status = status; } public boolean isStatus() { return status; } public void finished() { this.status = true; } /** * @param maxTimeout max wait time, milliseconds. */ public boolean wait4Finish(long maxTimeout) { long time = 0; while (!status) { if (time > maxTimeout) { break; } try2Sleep(5); time += 5; } return status; } /** * Wait until success status reported. */ public void wait4Finish() { long recheckCycle = 5; long hasWaited = 0L; long maxCycle = 30 * 1000L;// 30 seconds max. while (!status) { try2Sleep(recheckCycle); hasWaited += recheckCycle; if (recheckCycle >= maxCycle) { logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited / 1000); } else { recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2; } } } /** * Try to sleep, and ignore the {@link InterruptedException} * * @param millis the length of time to sleep in milliseconds */ private void try2Sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { } } }
Collector traceSegment service doesn't response in {} seconds.
,參數值爲hasWaited / 1000
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.javagithub
@DefaultImplementor public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener { //...... @Override public void consume(List<TraceSegment> data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { @Override public void onNext(Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); } @Override public void onError(Throwable throwable) { status.finished(); if (logger.isErrorEnable()) { logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception."); } ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable); } @Override public void onCompleted() { status.finished(); } }); try { for (TraceSegment segment : data) { UpstreamSegment upstreamSegment = segment.transform(); upstreamSegmentStreamObserver.onNext(upstreamSegment); } } catch (Throwable t) { logger.error(t, "Transform and send UpstreamSegment to collector fail."); } upstreamSegmentStreamObserver.onCompleted(); status.wait4Finish(); segmentUplinkedCounter += data.size(); } else { segmentAbandonedCounter += data.size(); } printUplinkStatus(); } private void printUplinkStatus() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - lastLogTime > 30 * 1000) { lastLogTime = currentTimeMillis; if (segmentUplinkedCounter > 0) { logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter); segmentUplinkedCounter = 0; } if (segmentAbandonedCounter > 0) { logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter); segmentAbandonedCounter = 0; } } } //...... }
GRPCStreamServiceStatus提供了finished方法用於將status設置爲true;它還提供了wait4Finish方法,該方法會一直等待status變爲true,同時在recheckCycle大於等於maxCycle時打印warn日誌apache