zipkin源碼 4.reporter

首先看怎麼構建一個Reporter:api

@Bean
public Reporter<Span> reporter(){
    Reporter<Span> reporter = AsyncReporter.builder(
            URLConnectionSender.create("http://localhost:9411/api/v1/spans"))
            .build();
    return reporter;
}

上面代碼建立了一個http類型reporter,返回一個BoundedAsyncReporter類型的reporter併發

下面是URLConnectionSender.create方法,主要設置編碼類型、鏈接讀取超時設置、是否壓縮以及最大字節數設置。jvm

public abstract class URLConnectionSender implements Sender {
  /** Creates a sender that posts {@link Encoding#THRIFT} messages. */
  public static URLConnectionSender create(String endpoint) {
    return builder().endpoint(endpoint).build();
  }

  public static Builder builder() {
    return new AutoValue_URLConnectionSender.Builder()
        .encoding(Encoding.THRIFT)
        .connectTimeout(10 * 1000)
        .readTimeout(60 * 1000)
        .compressionEnabled(true)
        .messageMaxBytes(5 * 1024 * 1024);
  }

接下來分析下AsyncReporter。ide

首先看下AsyncReporter的建立者build的build方法:oop

public AsyncReporter<Span> build() {
  switch (sender.encoding()) {
    case JSON:
      return build(Encoder.JSON);
    case THRIFT:
      return build(Encoder.THRIFT);
    default:
      throw new UnsupportedOperationException(sender.encoding().name());
  }
}

//接下來會執行build(Encoder.THRIFT)方法:
public <S> AsyncReporter<S> build(Encoder<S> encoder) {
  checkNotNull(encoder, "encoder");
  
  //檢查encode是否同樣,不同ao拋出異常
  checkArgument(encoder.encoding() == sender.encoding(),
      "Encoder.encoding() %s != Sender.encoding() %s",
      encoder.encoding(), sender.encoding());
  
  //建立有限制的AsyncReporter,設置了上傳消息的最大字節數、消息的超時時間
  //以及初始化ByteBoundedQueue並設置最大長度以及最大字節數(默認jvm總內存的1%)
  final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

  if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
    final BufferNextMessage consumer =
        new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos);
    
    //啓動一個線程處理隊列中須要上報的消息    
    final Thread flushThread = new Thread(() -> {
      try {
        while (!result.closed.get()) {
          //上報消息
          result.flush(consumer);
        }
      } finally {
        for (byte[] next : consumer.drain()) result.pending.offer(next);
        result.close.countDown();
      }
    }, "AsyncReporter(" + sender + ")");
    flushThread.setDaemon(true);
    flushThread.start();
  }
  return result;
}
}

下面分析result.flush(consumer)方法:post

void flush(BufferNextMessage bundler) {
  if (closed.get()) throw new IllegalStateException("closed");
  //該方法阻塞直到有數據轉移到consumer中,即bundler
  //bundler.remainingNanos()獲取若是cout爲0,即隊列沒有數據的時候須要阻塞多久直到有數據
  //下面主要看下ByteBoundedQueue的doDrain(Consumer consumer)方法 -》
  pending.drainTo(bundler, bundler.remainingNanos());

  // record after flushing reduces the amount of gauge events vs on doing this on report
  metrics.updateQueuedSpans(pending.count);
  metrics.updateQueuedBytes(pending.sizeInBytes);

  if (!bundler.isReady()) return; // try to fill up the bundle

  // Signal that we are about to send a message of a known size in bytes
  metrics.incrementMessages();
  metrics.incrementMessageBytes(bundler.sizeInBytes());
  //獲取bundler中的數據集合
  List<byte[]> nextMessage = bundler.drain();

  // In failure case, we increment messages and spans dropped.
  Callback failureCallback = sendSpansCallback(nextMessage.size());
  try {
    //對spans編碼併發送到collect
    sender.sendSpans(nextMessage, failureCallback);
  } catch (RuntimeException e) {
    failureCallback.onError(e);
    // Raise in case the sender was closed out-of-band.
    if (e instanceof IllegalStateException) throw e;
  }
}

//ByteBoundedQueue的doDrain
int doDrain(Consumer consumer) {
    int drainedCount = 0;
    int drainedSizeInBytes = 0;
    while (drainedCount < count) {
      //讀取須要處理的數據
      byte[] next = elements[readPos];

      if (next == null) break;
      //判斷consumer可否接受數據,能的話存入consumer的list中
      if (consumer.accept(next)) {
        drainedCount++;
        drainedSizeInBytes += next.length;

        elements[readPos] = null;
        if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
      } else {
        break;
      }
    }
    //更新count跟sizeInBytes
    count -= drainedCount;
    sizeInBytes -= drainedSizeInBytes;
    return drainedCount;
  }

接下來分析BoundedAsyncReporter的report若是上報span相關數據到queue中:ui

@Override
public void report(S span) {
  checkNotNull(span, "span");
  metrics.incrementSpans(1);
  //對span進行編碼,這裏使用thrift
  byte[] next = encoder.encode(span);
  //計算span的字節數
  int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next));
  metrics.incrementSpanBytes(next.length);
  //判斷reporter是否關閉,以及span的字節數超過最大限制,沒有的話則進行pending.offer操做
  if (closed.get() ||
      // don't enqueue something larger than we can drain
      messageSizeOfNextSpan > messageMaxBytes ||
      !pending.offer(next)) {
    metrics.incrementSpansDropped(1);
  }
}
//ByteBoundedQueue的offer方法
boolean offer(byte[] next) {
    lock.lock();
    try {
      //queue的長度已經滿了
      if (count == elements.length) return false;
      //queue的字節長度加上將要offer的字節數超過了最大的字節數
      if (sizeInBytes + next.length > maxBytes) return false;
      //保存span數據
      elements[writePos++] = next;
      
      if (writePos == elements.length) writePos = 0; // circle back to the front of the array

      count++;
      sizeInBytes += next.length;
      //喚醒消費線程
      available.signal(); // alert any drainers
      return true;
    } finally {
      lock.unlock();
    }
  }
相關文章
相關標籤/搜索