Dubbo分析之Protocol層

系列文章

Dubbo分析Serialize層
Dubbo分析之Transport層
Dubbo分析之Exchange 層
Dubbo分析之Protocol層
Dubbo分析之Cluster層
Dubbo分析之Registry層git

前言

緊接着上文Dubbo分析之Exchange層,繼續分析protocol遠程調用層,官方介紹:封裝RPC調用,以Invocation, Result爲中心,擴展接口爲Protocol, Invoker, Exporter;github

Protocol接口類分析

Protocol能夠說是Dubbo的核心層了,在此基礎上能夠擴展不少主流的服務,好比:redis,Memcached,rmi,WebService,http(tomcat,jetty)等等;下面看一下接口類源碼:web

public interface Protocol {
    /**
     * 暴露遠程服務:<br>
     * 1. 協議在接收請求時,應記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別。<br>
     * 3. export()傳入的Invoker由框架實現並傳入,協議不須要關心。<br>
     * 
     * @param <T> 服務的類型
     * @param invoker 服務的執行體
     * @return exporter 暴露服務的引用,用於取消暴露
     * @throws RpcException 當暴露服務出錯時拋出,好比端口已佔用
     */
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
  
    /**
     * 引用遠程服務:<br>
     * 1. 當用戶調用refer()所返回的Invoker對象的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker對象的invoke()方法。<br>
     * 2. refer()返回的Invoker由協議實現,協議一般須要在此Invoker中發送遠程請求。<br>
     * 3. 當url中有設置check=false時,鏈接失敗不能拋出異常,需內部自動恢復。<br>
     * 
     * @param <T> 服務的類型
     * @param type 服務的類型
     * @param url 遠程服務的URL地址
     * @return invoker 服務的本地代理
     * @throws RpcException 當鏈接服務提供方失敗時拋出
     */
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
  
}

主要定義了2個接口,一個是暴露遠程服務,另外一個是引用遠程服務,其實就是服務端和客戶端;dubbo提供了對多種服務的擴展,能夠查看META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol:redis

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper

dubbo協議是默認提供的協議,其餘擴展的協議包括:hessian,http(tomcat,jetty),injvm,memcached,redis,rest,rmi,thrift,webservice;以上擴展的協議有些僅僅是做爲引用遠程服務存在(客戶端),好比redis,memcached,經過特定的命令對緩存進行操做;固然也能夠擴展本身的協議,分別實現接口類Protocol, Invoker, Exporter;以前分別介紹的serialize層,transport層以及exchange層主要是在使用默認的DubboProtocol才依賴這幾個底層,其餘擴展協議直接依賴第三方擴展包;
下面重點分析一下DubboProtocol類,首先看一下refer實現方法:api

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

在客戶端定一個的每一個dubbo:reference,都會在此處實例化一個對應的DubboInvoker;在方法內部首先對序列化優化進行處理,主要是對Kryo,FST等序列化方式進行優化,此方法不只在客戶端,同時服務器端也存在;接下來就是建立了一個DubboInvoker,同時建立與服務器端的鏈接:緩存

private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }
 
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

默認向指定的服務器建立一個鏈接,能夠經過指定connections設置創建多個鏈接,在併發比較大的狀況下能夠設置多個;tomcat

private ExchangeClient initClient(URL url) {
 
        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
 
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 
        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }
 
        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

此方法主要經過Exchange層接口來和服務端創建鏈接,同時提供了懶鏈接的方式,要等到真正發送請求的時候才創建鏈接,返回ExchangeClient;DubboInvoker內部經過ExchangeClient來發送請求給服務端;再來看一下export方法:服務器

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
       URL url = invoker.getUrl();
 
       // export service.
       String key = serviceKey(url);
       DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
       exporterMap.put(key, exporter);
 
       //export an stub service for dispatching event
       Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
       Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
       if (isStubSupportEvent && !isCallbackservice) {
           String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
           if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
               if (logger.isWarnEnabled()) {
                   logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                           "], has set stubproxy support event ,but no stub methods founded."));
               }
           } else {
               stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
           }
       }
 
       openServer(url);
       optimizeSerialization(url);
       return exporter;
   }

每一個dubbo:service都會綁定一個Exporter,首先經過url獲取一個key(包括:port,serviceName,serviceVersion,serviceGroup),而後將實例化的DubboExporter經過key值保存在一個Map中,後續在接收到消息的時候重新定位到具體的Exporter;接下來就是建立服務器:多線程

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
 
    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
 
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
 
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

以上主要就是經過Exchangers的bind方法來啓動服務器,並返回對應的ExchangeServer,一樣也保存在本地的Map中;最後一樣作了序列化優化處理;併發

Invoker類分析

refer()返回的Invoker由協議實現,協議一般須要在此Invoker中發送遠程請求,export()傳入的Invoker由框架實現並傳入,協議不須要關心;接口類以下:

public interface Invoker<T> extends Node {
 
    Class<T> getInterface();
 
    Result invoke(Invocation invocation) throws RpcException;
}

本節介紹的是refer方法返回的Invoker,默認的dubbo協議下,實現了DubboInvoker,實現了其中的invoke方法,此方法在客戶端調用遠程方法的時候會被調用;

public Result invoke(Invocation inv) throws RpcException {
    if (destroyed.get()) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion()
                + " is DESTROYED, can not be invoked any more!");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
 
 
    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}
 
protected abstract Result doInvoke(Invocation invocation) throws Throwable;

在DubboInvoker的抽象類中提供了invoke方法,作統一的附件(Attachment)處理,方法傳入的參數是一個RpcInvocation對象,包含了方法調用的相關參數:

public class RpcInvocation implements Invocation, Serializable {
 
    private static final long serialVersionUID = -4355285085441097045L;
 
    private String methodName;
 
    private Class<?>[] parameterTypes;
 
    private Object[] arguments;
 
    private Map<String, String> attachments;
 
    private transient Invoker<?> invoker;
     
    ....省略...
}

包含了方法名稱,方法參數,參數值,附件信息;可能你會發現沒有接口,版本等信息,這些信息其實包含在附件中;在invoke方法中首先處理的就是把attachment信息保存到RpcInvocation中;接下來就是調用DubboInvoker中的doInvoke方法:

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
 
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

此方法首先獲取ExchangeClient,若是實例化了多個ExchangeClient,會經過順序的方式遍歷使用ExchangeClient;經過ExchangeClient將RpcInvocation發送給服務器端,提供了三種發送方式:單邊通訊方式,雙邊通訊(同步),雙邊通訊(異步);在上文Dubbo分析之Exchange層中,發送完請求以後直接返回DefaultFuture參數,若是調用get方法將阻塞直到返回結果或者超時,同步方式就是直接調用get方法,阻塞等待結果,下面重點看一下異步方式;異步方式將返回的DefaultFuture放入了RpcContext中,而後返回了一個空對象,這裏其實使用了ThreadLocal功能,因此每次在客戶端業務代碼中,調用完異步請求,都須要經過RpcContext獲取ResponseFuture,好比:

// 此調用會當即返回null
fooService.findFoo(fooId);
// 拿到調用的Future引用,當結果返回後,會被通知和設置到此Future
Future<Foo> fooFuture = RpcContext.getContext().getFuture(); 
  
// 此調用會當即返回null
barService.findBar(barId);
// 拿到調用的Future引用,當結果返回後,會被通知和設置到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture(); 
  
// 此時findFoo和findBar的請求同時在執行,客戶端不須要啓動多線程來支持並行,而是藉助NIO的非阻塞完成
  
// 若是foo已返回,直接拿到返回值,不然線程wait住,等待foo返回後,線程會被notify喚醒
Foo foo = fooFuture.get(); 
// 同理等待bar返回
Bar bar = barFuture.get(); 
  
// 若是foo須要5秒返回,bar須要6秒返回,實際只需等6秒,便可獲取到foo和bar,進行接下來的處理。

官網的一個列子,很好的說明了異步的使用方式以及其優點;

Exporter類分析

在上文Dubbo分析之Exchange層中,服務端接收到消息以後,調用handler的reply方法處理消息,而此handler定義在DubboProtocol中,以下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
 
        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
         
        ...省略...
}

服務端接收到message就是上面的RpcInvocation,裏面包含了接口,方法,參數等信息,服務器端經過反射的方式來處理;首先獲取了對應的DubboExporter,若是獲取,經過key(包括:port,serviceName,serviceVersion,serviceGroup)獲取對應的DubboExporter,而後調用DubboExporter中的invoker,此時的invoker是系統傳過來的,不像客戶端Invoker是協議端本身建立的,系統建立的invoker,以鏈表的方式存在,內部調用對應的filter,具體有哪些filter,在啓動服務時已經初始化好了在ProtocolFilterWrapper的buildInvokerChain中,具體有哪些filter能夠查看META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter:

cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

這裏列出了全部的filter,包含消費端和服務端,具體使用哪些,經過filter的註解@Activate來進行過濾,每一個filter就好了分組;具體執行的順序是怎麼樣的,一樣在註解裏面指定了,格式以下:

@Activate(group = Constants.PROVIDER, order = -110000)
@Activate(group = Constants.PROVIDER, order = -10000)
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)

每一個固定的filter有各自的功能,一樣也能夠進行擴展,處理完了交給下一個,最後經過反射調用返回RpcResult;

總結

本文大致介紹了一下Protocol層使用的默認dubbo協議介紹,Protocol層還對其餘第三方協議進行了擴展,後面會繼續介紹;另外關於filter還能夠在詳細介紹一下;

示例代碼地址

https://github.com/ksfzhaohui...
https://gitee.com/OutOfMemory...

相關文章
相關標籤/搜索