首先看怎麼構建一個Reporter:api
@Bean public Reporter<Span> reporter(){ Reporter<Span> reporter = AsyncReporter.builder( URLConnectionSender.create("http://localhost:9411/api/v1/spans")) .build(); return reporter; }
上面代碼建立了一個http類型reporter,返回一個BoundedAsyncReporter類型的reporter併發
下面是URLConnectionSender.create方法,主要設置編碼類型、鏈接讀取超時設置、是否壓縮以及最大字節數設置。jvm
public abstract class URLConnectionSender implements Sender { /** Creates a sender that posts {@link Encoding#THRIFT} messages. */ public static URLConnectionSender create(String endpoint) { return builder().endpoint(endpoint).build(); } public static Builder builder() { return new AutoValue_URLConnectionSender.Builder() .encoding(Encoding.THRIFT) .connectTimeout(10 * 1000) .readTimeout(60 * 1000) .compressionEnabled(true) .messageMaxBytes(5 * 1024 * 1024); }
接下來分析下AsyncReporter。ide
首先看下AsyncReporter的建立者build的build方法:oop
public AsyncReporter<Span> build() { switch (sender.encoding()) { case JSON: return build(Encoder.JSON); case THRIFT: return build(Encoder.THRIFT); default: throw new UnsupportedOperationException(sender.encoding().name()); } } //接下來會執行build(Encoder.THRIFT)方法: public <S> AsyncReporter<S> build(Encoder<S> encoder) { checkNotNull(encoder, "encoder"); //檢查encode是否同樣,不同ao拋出異常 checkArgument(encoder.encoding() == sender.encoding(), "Encoder.encoding() %s != Sender.encoding() %s", encoder.encoding(), sender.encoding()); //建立有限制的AsyncReporter,設置了上傳消息的最大字節數、消息的超時時間 //以及初始化ByteBoundedQueue並設置最大長度以及最大字節數(默認jvm總內存的1%) final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder); if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop. final BufferNextMessage consumer = new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos); //啓動一個線程處理隊列中須要上報的消息 final Thread flushThread = new Thread(() -> { try { while (!result.closed.get()) { //上報消息 result.flush(consumer); } } finally { for (byte[] next : consumer.drain()) result.pending.offer(next); result.close.countDown(); } }, "AsyncReporter(" + sender + ")"); flushThread.setDaemon(true); flushThread.start(); } return result; } }
下面分析result.flush(consumer)方法:post
void flush(BufferNextMessage bundler) { if (closed.get()) throw new IllegalStateException("closed"); //該方法阻塞直到有數據轉移到consumer中,即bundler //bundler.remainingNanos()獲取若是cout爲0,即隊列沒有數據的時候須要阻塞多久直到有數據 //下面主要看下ByteBoundedQueue的doDrain(Consumer consumer)方法 -》 pending.drainTo(bundler, bundler.remainingNanos()); // record after flushing reduces the amount of gauge events vs on doing this on report metrics.updateQueuedSpans(pending.count); metrics.updateQueuedBytes(pending.sizeInBytes); if (!bundler.isReady()) return; // try to fill up the bundle // Signal that we are about to send a message of a known size in bytes metrics.incrementMessages(); metrics.incrementMessageBytes(bundler.sizeInBytes()); //獲取bundler中的數據集合 List<byte[]> nextMessage = bundler.drain(); // In failure case, we increment messages and spans dropped. Callback failureCallback = sendSpansCallback(nextMessage.size()); try { //對spans編碼併發送到collect sender.sendSpans(nextMessage, failureCallback); } catch (RuntimeException e) { failureCallback.onError(e); // Raise in case the sender was closed out-of-band. if (e instanceof IllegalStateException) throw e; } } //ByteBoundedQueue的doDrain int doDrain(Consumer consumer) { int drainedCount = 0; int drainedSizeInBytes = 0; while (drainedCount < count) { //讀取須要處理的數據 byte[] next = elements[readPos]; if (next == null) break; //判斷consumer可否接受數據,能的話存入consumer的list中 if (consumer.accept(next)) { drainedCount++; drainedSizeInBytes += next.length; elements[readPos] = null; if (++readPos == elements.length) readPos = 0; // circle back to the front of the array } else { break; } } //更新count跟sizeInBytes count -= drainedCount; sizeInBytes -= drainedSizeInBytes; return drainedCount; }
接下來分析BoundedAsyncReporter的report若是上報span相關數據到queue中:ui
@Override public void report(S span) { checkNotNull(span, "span"); metrics.incrementSpans(1); //對span進行編碼,這裏使用thrift byte[] next = encoder.encode(span); //計算span的字節數 int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next)); metrics.incrementSpanBytes(next.length); //判斷reporter是否關閉,以及span的字節數超過最大限制,沒有的話則進行pending.offer操做 if (closed.get() || // don't enqueue something larger than we can drain messageSizeOfNextSpan > messageMaxBytes || !pending.offer(next)) { metrics.incrementSpansDropped(1); } } //ByteBoundedQueue的offer方法 boolean offer(byte[] next) { lock.lock(); try { //queue的長度已經滿了 if (count == elements.length) return false; //queue的字節長度加上將要offer的字節數超過了最大的字節數 if (sizeInBytes + next.length > maxBytes) return false; //保存span數據 elements[writePos++] = next; if (writePos == elements.length) writePos = 0; // circle back to the front of the array count++; sizeInBytes += next.length; //喚醒消費線程 available.signal(); // alert any drainers return true; } finally { lock.unlock(); } }