本文主要研究一下skywalking的http-async-client-pluginjava
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/resources/skywalking-plugin.defreact
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncClientInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.SessionRequestInstrumentation
httpasyncclient-4.x=org.apache.skywalking.apm.plugin.httpasyncclient.v4.define.HttpAsyncRequestExecutorInstrumentation
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncClientInstrumentation.javagit
public class HttpAsyncClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS_MINIMAL = "org.apache.http.impl.nio.client.MinimalHttpAsyncClient";
private static final String ENHANCE_CLASS_INTERNAL = "org.apache.http.impl.nio.client.InternalHttpAsyncClient";
private static final String METHOD = "execute";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncClientInterceptor";
private static final String FIRST_ARG_TYPE = "org.apache.http.nio.protocol.HttpAsyncRequestProducer";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(METHOD).and(takesArguments(4)
.and(takesArgument(0, named(FIRST_ARG_TYPE))));
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return true;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_MINIMAL, ENHANCE_CLASS_INTERNAL);
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncClientInterceptor.javagithub
public class HttpAsyncClientInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpAsyncResponseConsumer consumer = (HttpAsyncResponseConsumer) allArguments[1];
HttpContext context = (HttpContext) allArguments[2];
FutureCallback callback = (FutureCallback) allArguments[3];
allArguments[1] = new HttpAsyncResponseConsumerWrapper(consumer);
allArguments[3] = new FutureCallbackWrapper(callback);
CONTEXT_LOCAL.set(context);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/SessionRequestInstrumentation.javaapache
public class SessionRequestInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestConstructorInterceptor";
private static final String COMPLETED_METHOD = "completed";
private static final String TIMEOUT_METHOD = "timeout";
private static final String FAILED_METHOD = "failed";
private static final String CANCEL_METHOD = "cancel";
private static final String SUCCESS_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestCompleteInterceptor";
private static final String FAIL_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.SessionRequestFailInterceptor";
private static final String ENHANCE_CLASS = "org.apache.http.impl.nio.reactor.SessionRequestImpl";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(COMPLETED_METHOD);
}
@Override
public String getMethodsInterceptor() {
return SUCCESS_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(TIMEOUT_METHOD).or(named(FAILED_METHOD).or(named(CANCEL_METHOD)));
}
@Override
public String getMethodsInterceptor() {
return FAIL_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestConstructorInterceptor.javabash
public class SessionRequestConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
if (ContextManager.isActive()) {
if (ContextManager.activeSpan().isExit()) {
CONTEXT_LOCAL.remove();
return;
}
ContextSnapshot snapshot = ContextManager.capture();
objInst.setSkyWalkingDynamicField(new Object[]{snapshot, CONTEXT_LOCAL.get()});
}
CONTEXT_LOCAL.remove();
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestCompleteInterceptor.javaapp
public class SessionRequestCompleteInterceptor implements InstanceMethodsAroundInterceptor {
public static ThreadLocal<HttpContext> CONTEXT_LOCAL = new ThreadLocal<HttpContext>();
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Object[] array = (Object[]) objInst.getSkyWalkingDynamicField();
if (array == null || array.length == 0) {
return;
}
ContextSnapshot snapshot = (ContextSnapshot) array[0];
ContextManager.createLocalSpan("httpasyncclient/local");
if (snapshot != null) {
ContextManager.continued(snapshot);
}
CONTEXT_LOCAL.set((HttpContext) array[1]);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/SessionRequestFailInterceptor.javaasync
public class SessionRequestFailInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
//this means actual request will not started. so the span has not been created,we cannot log the status.
CONTEXT_LOCAL.remove();
objInst.setSkyWalkingDynamicField(null);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/define/HttpAsyncRequestExecutorInstrumentation.javaide
public class HttpAsyncRequestExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.apache.http.nio.protocol.HttpAsyncRequestExecutor";
private static final String METHOD = "requestReady";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.httpasyncclient.v4.HttpAsyncRequestExecutorInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
複製代碼
skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/httpasyncclient-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpasyncclient/v4/HttpAsyncRequestExecutorInterceptor.javaui
public class HttpAsyncRequestExecutorInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
HttpContext context = CONTEXT_LOCAL.get();
CONTEXT_LOCAL.remove();
if (context == null) {
return;
}
final ContextCarrier contextCarrier = new ContextCarrier();
HttpRequestWrapper requestWrapper = (HttpRequestWrapper) context.getAttribute(HttpClientContext.HTTP_REQUEST);
HttpHost httpHost = (HttpHost) context.getAttribute(HttpClientContext.HTTP_TARGET_HOST);
RequestLine requestLine = requestWrapper.getRequestLine();
String uri = requestLine.getUri();
String operationName = uri.startsWith("http") ? new URL(uri).getPath() : uri;
int port = httpHost.getPort();
AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, httpHost.getHostName() + ":" + (port == -1 ? 80 : port));
span.setComponent(ComponentsDefine.HTTP_ASYNC_CLIENT);
Tags.URL.set(span, requestWrapper.getOriginal().getRequestLine().getUri());
Tags.HTTP.METHOD.set(span, requestLine.getMethod());
SpanLayer.asHttp(span);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
requestWrapper.setHeader(next.getHeadKey(), next.getHeadValue());
}
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}
複製代碼
skywalking的http-async-client-plugin定義了三個加強,分別是HttpAsyncClientInstrumentation、SessionRequestInstrumentation、HttpAsyncRequestExecutorInstrumentation