聊聊Dubbo(七):自定義Filter實踐

0 前言

在現行微服務的趨勢下,一次調用的過程當中涉及多個服務節點,產生的日誌分佈在不一樣的服務器上,雖然說可使用ELK技術將分散的日誌,彙總到es中,可是如何將這些日誌貫穿起來,則是一個關鍵問題。java

若是須要查看一次調用的全鏈路日誌,則通常的作法是經過在系統邊界中產生一個 traceId,向調用鏈的後續服務傳遞 traceId,後續服務繼續使用 traceId 打印日誌,並再向其餘後續服務傳遞 traceId,此過程簡稱,traceId透傳spring

在使用HTTP協議做爲服務協議的系統裏,能夠統一使用一個封裝好的http client作traceId透傳。可是dubbo實現traceId透傳就稍微複雜些了。根據上節講的《☆聊聊Dubbo(六):核心源碼-Filter鏈原理》,通常狀況下,會自定義Filter來實現traceId透傳,但還有兩種比較特殊的實現方式:(1)從新實現dubbo內部的相關類;(2)基於RpcContext實現;express

1 基於重寫實現

1.1 源碼分析

Dubbo Consumer與Provider調用過程

Proxy 是 Dubbo 使用javassist爲consumer 端service生成的動態代理instance。apache

Implement 是provider端的service實現instance。api

traceId透傳,即要求Proxy 和 Implement具備相同的traceId。Dubbo具備良好的分層特徵,transport的對象是RPCInvocationbash

因此,重寫的重點邏輯實現,就是Proxy將traceId放入RPCInvocation,交由Client進行序列化和TCP傳輸,Server反序列化獲得RPCInvocation,取出traceId,交由Implement便可。服務器

下面爲consumer端 JavassistProxyFactory 的代碼分析:app

public class JavassistProxyFactory extends AbstractProxyFactory {

    /** * Spring容器啓動時,該代理工廠類方法會爲Consumer生成Service代理類 * invoker和interfaces都是從Spring配置文件中讀取出來 */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 生成Service代理類的每一個方法的字節碼,都調用了InvokerInvocationHandler.invoke(...)方法,
        // 作實際RpcInvocation包裝、序列化、TCP傳輸、反序列化結果
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}
複製代碼

下面爲consumer端 InvokerInvocationHandler 的代碼分析:less

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    /** * 真正調用RPC時,各個Service代理的字節碼裏調用了這個通用的invoke * proxy就是以前生成的代理對象,第二個參數是方法名,第三個參數是參數列表 * 知道了(1)哪一個接口(2)哪一個方法(3)參數是什麼,就徹底能夠映射到Provider端實現並獲取返回值 */
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 由於到這裏,仍是consumer端的業務線程,因此在這裏取ThreadLocal裏的traceId,
        // 再放入RpcInvocation的attachment,那麼Provider就能夠從收到的RpcInvocation實例取出透傳的traceId
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}
複製代碼

下面爲Provider端 DubboProtocol 的代碼分析:異步

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //若是是callback 須要處理高版本調用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }

                // Provider收到報文以後,從線程池中取出一個線程,反序列化出RpcInvocation、並調用實現類的對應方法
                // 因此,此處就是Provider端的實現類的線程,取出traceId,放入ThreadLocal中
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
複製代碼

1.2 具體實現

package com.alibaba.dubbo.rpc.proxy;

/** * traceId工具類這個類是新添加的 */
public class TraceIdUtil {

    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>();

    public static String getTraceId() {
        return TRACE_ID.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }

}

/** * InvokerHandler 這個類 是修改的 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 這裏將cosumer 端的traceId放入RpcInvocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, args);
        rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
        return invoker.invoke(rpcInvocation).recreate();
    }

}


package com.alibaba.dubbo.rpc.protocol.dubbo;

/** * dubbo protocol support 從新實現DubboProtocol * */
public class DubboProtocol extends AbstractProtocol {


    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //若是是callback 須要處理高版本調用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 這裏將收到的consumer端的traceId放入provider端的thread local
                TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
    }
}
複製代碼

2 基於RpcContext實現

在具體講解自定義filter來實現透傳traceId的方案前,咱們先來研究下RpcContext對象。其RpcContext本質上是個ThreadLocal對象,其維護了一次rpc交互的上下文信息

/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package com.alibaba.dubbo.rpc;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;

/** * Thread local context. (API, ThreadLocal, ThreadSafe) * * 注意:RpcContext是一個臨時狀態記錄器,當接收到RPC請求,或發起RPC請求時,RpcContext的狀態都會變化。 * 好比:A調B,B再調C,則B機器上,在B調C以前,RpcContext記錄的是A調B的信息,在B調C以後,RpcContext記錄的是B調C的信息。 * * @see com.alibaba.dubbo.rpc.filter.ContextFilter * @author qian.lei * @author william.liangf * @export */
public class RpcContext {
	
	private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
		@Override
		protected RpcContext initialValue() {
			return new RpcContext();
		}
	};

	/** * get context. * * @return context */
	public static RpcContext getContext() {
	    return LOCAL.get();
	}
	
	/** * remove context. * * @see com.alibaba.dubbo.rpc.filter.ContextFilter */
	public static void removeContext() {
	    LOCAL.remove();
	}

    private Future<?> future;

    private List<URL> urls;

    private URL url;

    private String methodName;

    private Class<?>[] parameterTypes;

    private Object[] arguments;

	private InetSocketAddress localAddress;

	private InetSocketAddress remoteAddress;

    private final Map<String, String> attachments = new HashMap<String, String>();

    private final Map<String, Object> values = new HashMap<String, Object>();

    // now we don't use the 'values' map to hold these objects
    // we want these objects to be as generic as possible
    private Object request;
    private Object response;

	@Deprecated
    private List<Invoker<?>> invokers;
    
	@Deprecated
    private Invoker<?> invoker;

	@Deprecated
    private Invocation invocation;
    
	protected RpcContext() {
	}

    /** * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest * * @return null if the underlying protocol doesn't provide support for getting request */
    public Object getRequest() {
        return request;
    }

    /** * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest * * @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type */
    @SuppressWarnings("unchecked")
    public <T> T getRequest(Class<T> clazz) {
        return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
    }


    public void setRequest(Object request) {
        this.request = request;
    }

    /** * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse * * @return null if the underlying protocol doesn't provide support for getting response */
    public Object getResponse() {
        return response;
    }

    /** * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse * * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type */
    @SuppressWarnings("unchecked")
    public <T> T getResponse(Class<T> clazz) {
        return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
    }

    public void setResponse(Object response) {
        this.response = response;
    }

    /** * is provider side. * * @return provider side. */
    public boolean isProviderSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() != address.getPort() || 
                ! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /** * is consumer side. * * @return consumer side. */
    public boolean isConsumerSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() == address.getPort() && 
                NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /** * get future. * * @param <T> * @return future */
    @SuppressWarnings("unchecked")
    public <T> Future<T> getFuture() {
        return (Future<T>) future;
    }

    /** * set future. * * @param future */
    public void setFuture(Future<?> future) {
        this.future = future;
    }

    public List<URL> getUrls() {
        return urls == null && url != null ? (List<URL>) Arrays.asList(url) : urls;
    }

    public void setUrls(List<URL> urls) {
        this.urls = urls;
    }

    public URL getUrl() {
        return url;
    }

    public void setUrl(URL url) {
        this.url = url;
    }

    /** * get method name. * * @return method name. */
    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /** * get parameter types. * * @serial */
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /** * get arguments. * * @return arguments. */
    public Object[] getArguments() {
        return arguments;
    }

    public void setArguments(Object[] arguments) {
        this.arguments = arguments;
    }

    /** * set local address. * * @param address * @return context */
	public RpcContext setLocalAddress(InetSocketAddress address) {
	    this.localAddress = address;
	    return this;
	}

	/** * set local address. * * @param host * @param port * @return context */
    public RpcContext setLocalAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.localAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/** * get local address. * * @return local address */
	public InetSocketAddress getLocalAddress() {
		return localAddress;
	}

	public String getLocalAddressString() {
        return getLocalHost() + ":" + getLocalPort();
    }
    
	/** * get local host name. * * @return local host name */
	public String getLocalHostName() {
		String host = localAddress == null ? null : localAddress.getHostName();
		if (host == null || host.length() == 0) {
		    return getLocalHost();
		}
		return host;
	}

    /** * set remote address. * * @param address * @return context */
    public RpcContext setRemoteAddress(InetSocketAddress address) {
        this.remoteAddress = address;
        return this;
    }
    
    /** * set remote address. * * @param host * @param port * @return context */
    public RpcContext setRemoteAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.remoteAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/** * get remote address. * * @return remote address */
	public InetSocketAddress getRemoteAddress() {
		return remoteAddress;
	}
	
	/** * get remote address string. * * @return remote address string. */
	public String getRemoteAddressString() {
	    return getRemoteHost() + ":" + getRemotePort();
	}
	
	/** * get remote host name. * * @return remote host name */
	public String getRemoteHostName() {
		return remoteAddress == null ? null : remoteAddress.getHostName();
	}

    /** * get local host. * * @return local host */
    public String getLocalHost() {
        String host = localAddress == null ? null : 
            localAddress.getAddress() == null ? localAddress.getHostName() 
                    : NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress());
        if (host == null || host.length() == 0) {
            return NetUtils.getLocalHost();
        }
        return host;
    }

    /** * get local port. * * @return port */
    public int getLocalPort() {
        return localAddress == null ? 0 : localAddress.getPort();
    }

    /** * get remote host. * * @return remote host */
    public String getRemoteHost() {
        return remoteAddress == null ? null : 
            remoteAddress.getAddress() == null ? remoteAddress.getHostName() 
                    : NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress());
    }

    /** * get remote port. * * @return remote port */
    public int getRemotePort() {
        return remoteAddress == null ? 0 : remoteAddress.getPort();
    }

    /** * get attachment. * * @param key * @return attachment */
    public String getAttachment(String key) {
        return attachments.get(key);
    }

    /** * set attachment. * * @param key * @param value * @return context */
    public RpcContext setAttachment(String key, String value) {
        if (value == null) {
            attachments.remove(key);
        } else {
            attachments.put(key, value);
        }
        return this;
    }

    /** * remove attachment. * * @param key * @return context */
    public RpcContext removeAttachment(String key) {
        attachments.remove(key);
        return this;
    }

    /** * get attachments. * * @return attachments */
    public Map<String, String> getAttachments() {
        return attachments;
    }

    /** * set attachments * * @param attachment * @return context */
    public RpcContext setAttachments(Map<String, String> attachment) {
        this.attachments.clear();
        if (attachment != null && attachment.size() > 0) {
            this.attachments.putAll(attachment);
        }
        return this;
    }
    
    public void clearAttachments() {
        this.attachments.clear();
    }

    /** * get values. * * @return values */
    public Map<String, Object> get() {
        return values;
    }

    /** * set value. * * @param key * @param value * @return context */
    public RpcContext set(String key, Object value) {
        if (value == null) {
            values.remove(key);
        } else {
            values.put(key, value);
        }
        return this;
    }

    /** * remove value. * * @param key * @return value */
    public RpcContext remove(String key) {
        values.remove(key);
        return this;
    }

    /** * get value. * * @param key * @return value */
    public Object get(String key) {
        return values.get(key);
    }

    public RpcContext setInvokers(List<Invoker<?>> invokers) {
        this.invokers = invokers;
        if (invokers != null && invokers.size() > 0) {
            List<URL> urls = new ArrayList<URL>(invokers.size());
            for (Invoker<?> invoker : invokers) {
                urls.add(invoker.getUrl());
            }
            setUrls(urls);
        }
        return this;
    }

    public RpcContext setInvoker(Invoker<?> invoker) {
        this.invoker = invoker;
        if (invoker != null) {
            setUrl(invoker.getUrl());
        }
        return this;
    }

    public RpcContext setInvocation(Invocation invocation) {
        this.invocation = invocation;
        if (invocation != null) {
            setMethodName(invocation.getMethodName());
            setParameterTypes(invocation.getParameterTypes());
            setArguments(invocation.getArguments());
        }
        return this;
    }

    /** * @deprecated Replace to isProviderSide() */
    @Deprecated
    public boolean isServerSide() {
        return isProviderSide();
    }
    
    /** * @deprecated Replace to isConsumerSide() */
    @Deprecated
    public boolean isClientSide() {
        return isConsumerSide();
    }
    
    /** * @deprecated Replace to getUrls() */
    @Deprecated
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public List<Invoker<?>> getInvokers() {
        return invokers == null && invoker != null ? (List)Arrays.asList(invoker) : invokers;
    }

    /** * @deprecated Replace to getUrl() */
    @Deprecated
    public Invoker<?> getInvoker() {
        return invoker;
    }

    /** * @deprecated Replace to getMethodName(), getParameterTypes(), getArguments() */
    @Deprecated
    public Invocation getInvocation() {
        return invocation;
    }
    
    /** * 異步調用 ,須要返回值,即便步調用Future.get方法,也會處理調用超時問題. * @param callable * @return 經過future.get()獲取返回結果. */
    @SuppressWarnings("unchecked")
	public <T> Future<T> asyncCall(Callable<T> callable) {
    	try {
	    	try {
	    		setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
				final T o = callable.call();
				//local調用會直接返回結果.
				if (o != null) {
					FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
						public T call() throws Exception {
							return o;
						}
					});
					f.run();
					return f;
				} else {
					
				}
			} catch (Exception e) {
				throw new RpcException(e);
			} finally {
				removeAttachment(Constants.ASYNC_KEY);
			}
    	} catch (final RpcException e) {
			return new Future<T>() {
				public boolean cancel(boolean mayInterruptIfRunning) {
					return false;
				}
				public boolean isCancelled() {
					return false;
				}
				public boolean isDone() {
					return true;
				}
				public T get() throws InterruptedException, ExecutionException {
					throw new ExecutionException(e.getCause());
				}
				public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
					return get();
				}
			};
		}
    	return ((Future<T>)getContext().getFuture());
    }
    
	/** * oneway調用,只發送請求,不接收返回結果. * @param callable */
	public void asyncCall(Runnable runable) {
    	try {
    		setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
    		runable.run();
		} catch (Throwable e) {
			//FIXME 異常是否應該放在future中?
			throw new RpcException("oneway call error ." + e.getMessage(), e);
		} finally {
			removeAttachment(Constants.RETURN_KEY);
		}
    }
}
複製代碼

注:RpcContext裏的attachments信息會填入到RpcInvocation對象中, 一塊兒傳遞過去

所以有人就建議能夠簡單的把traceId注入到RpcContext中,這樣就能夠簡單的實現traceId的透傳了,事實是否如此,先讓咱們來一塊兒實踐一下。

定義Dubbo接口類:

public interface IEchoService {
    String echo(String name);
}
複製代碼

編寫服務端代碼(Provider):

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml");
 
        System.out.println("server start");
        while (true) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    } 
}
複製代碼

編寫客戶端代碼(Consumer):

public class EchoServiceConsumer {
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml");
 
        IEchoService service = (IEchoService) applicationContext
                .getBean("echoService");
 
        // *) 設置traceId
        RpcContext.getContext().setAttachment("traceId", "100001");
        System.out.println(RpcContext.getContext().getAttachments());
        // *) 第一調用
        service.echo("lilei");
 
        // *) 第二次調用
        System.out.println(RpcContext.getContext().getAttachments());
        service.echo("hanmeimei");
    }
}
複製代碼

執行的結果以下:

服務端輸出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = null
 
客戶端輸出:
{traceId=100001}
{}
複製代碼

從服務端的輸出信息中,咱們能夠驚喜的發現,traceId確實傳遞過去了,可是隻有第一次有,第二次沒有。而從客戶端對RpcContext的內容輸出,也印證了這個現象,同時產生這個現象的本質緣由是 RpcContext對象的attachment在一次rpc交互後被清空了

給RpcContext的clearAttachments方法, 設置斷點後復現. 咱們能夠找到以下調用堆棧:

java.lang.Thread.State: RUNNABLE
    at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438)
    at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50)
    at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:91)
    at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
    at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
    at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227)
    at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
    at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1)
    at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20)
複製代碼

其最直接的調用爲Dubbo自帶的ConsumerContextFilter,讓咱們來分析其代碼:

@Activate(
    group = {"consumer"},
    order = -10000
)
public class ConsumerContextFilter implements Filter {
    public ConsumerContextFilter() {
    }
 
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        if(invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
 
        Result var3;
        try {
            var3 = invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
 
        return var3;
    }
}
複製代碼

確實在finally代碼片斷中,咱們發現RpcContext在每次rpc調用後, 都會清空attachment對象

既然咱們找到了本質緣由,那麼解決方法,能夠在每次調用的時候,從新設置下traceId,好比像這樣(看着感受吃像相對難看了一點):

// *) 第一調用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次調用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("hanmeimei");
複製代碼

3 基於Filter實現

先引入一個工具類:

public class TraceIdUtils {
 
    private static final ThreadLocal<String> traceIdCache
            = new ThreadLocal<String>();
 
    public static String getTraceId() {
        return traceIdCache.get();
    }
 
    public static void setTraceId(String traceId) {
        traceIdCache.set(traceId);
    }
 
    public static void clear() {
        traceIdCache.remove();
    }
 
}
複製代碼

而後咱們定義一個Filter類:

package com.test.dubbo;
 
public class TraceIdFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        if ( !StringUtils.isEmpty(traceId) ) {
            // *) 從RpcContext裏獲取traceId並保存
            TraceIdUtils.setTraceId(traceId);
        } else {
            // *) 交互前從新設置traceId, 避免信息丟失
            RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
        }
        // *) 實際的rpc調用
        return invoker.invoke(invocation);
    }
}
複製代碼

在resource目錄下, 添加META-INF/dubbo目錄, 繼而添加com.alibaba.dubbo.rpc.Filter文件:

META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件

編輯(com.alibaba.dubbo.rpc.Filter文件)內容以下:

traceIdFilter=com.test.dubbo.TraceIdFilter
複製代碼

而後咱們給dubbo的producer和consumer都配置對應的filter項:

服務端:
<dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0"
        filter="traceIdFilter"/>

客戶端:
<dubbo:reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0"
        filter="traceIdFilter"/>
複製代碼

服務端的測試代碼小改成以下:

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = TraceIdUtils.getTraceId();
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
}
複製代碼

客戶端的測試代碼片斷爲:

// *) 第一調用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次調用
service.echo("hanmeimei");
複製代碼

一樣的代碼, 測試結果以下:

服務端輸出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = 100001
 
客戶端輸出:
{traceId=100001}
{}
複製代碼

符合預期,感受這個方案就很是優雅了。RpcContext的attachment依舊被清空(ConsumerContextFilter在自定義的Filter後執行),可是每次rpc交互前,traceId會被從新注入,保證跟蹤線索透傳成功。

相關文章
相關標籤/搜索