dubbo源碼分析系列——dubbo-rpc-api模塊源碼分析

簡化的類圖

該圖是通過簡化後的rpc-api模塊的類圖,去除了一些非關鍵的屬性和方法定義,也去除了一些非核心的類和接口,只是一個簡化了的的示意圖,這樣你們可以去除干擾看清楚該模塊的核心接口極其關係,請點擊看大圖更清晰一些java

核心類說明

Protocol

服務協議。這是rpc模塊中最核心的一個類,它定義了rpc的最主要的兩個行爲即:一、provider暴露遠程服務,即將調用信息發佈到服務器上的某個URL上去,能夠供消費者鏈接調用,通常是將某個service類的所有方法總體發佈到服務器上。二、consumer引用遠程服務,即根據service的服務類和provider發佈服務的URL轉化爲一個Invoker對象,消費者能夠經過該對象調用provider發佈的遠程服務。這其實歸納了rpc的最爲核心的職責,提供了多級抽象的實現、包裝器實現等。express

AbstractProtocol

Protocol的頂層抽象實現類,它定義了這些屬性:一、exporterMap表示發佈過的serviceKey和Exporter(遠程服務發佈的引用)的映射表;二、invokers是一個Invoker對象的集合,表示層級暴露過遠程服務的服務執行體對象集合。還提供了一個通用的服務發佈銷燬方法destroy,該方法是一個通用方法,它清空了兩個集合屬性,調用了全部invoker的destroy方法,也調用全部exporter對象的unexport方法。apache

AbstractProxyProtocol

繼承自AbstractProtoco的一個抽象代理協議類。它聚合了代理工廠ProxyFactory對象來實現服務的暴露和引用。它的源碼以下。設計模式

/*
 * Copyright 1999-2012 Alibaba Group.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.alibaba.dubbo.rpc.protocol;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;

/**
 * AbstractProxyProtocol
 * 
 * @author william.liangf
 */
public abstract class AbstractProxyProtocol extends AbstractProtocol {

    private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();;

    private ProxyFactory proxyFactory;

    public AbstractProxyProtocol() {
    }

    public AbstractProxyProtocol(Class<?>... exceptions) {
        for (Class<?> exception : exceptions) {
            addRpcException(exception);
        }
    }

    public void addRpcException(Class<?> exception) {
        this.rpcExceptions.add(exception);
    }

    public void setProxyFactory(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    public ProxyFactory getProxyFactory() {
        return proxyFactory;
    }

    @SuppressWarnings("unchecked")
	public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
        final String uri = serviceKey(invoker.getUrl());//得到Url對應的serviceKey值。
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);//根據url獲取對應的exporter。
        if (exporter != null) {//若是已經存在,則直接返回,實現接口支持冪等調用。該處難道無須考慮線程安全問題嗎?
        	return exporter;
        }
        //執行抽放方法暴露服務。runnable方法的行爲有什麼約束沒有?該處不明確。
        final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
        //調用proxyFactory.getProxy(invoker)來得到invoker的代理對象。
        exporter = new AbstractExporter<T>(invoker) {
            public void unexport() {
                super.unexport();
                exporterMap.remove(uri);
                if (runnable != null) {
                    try {
                        runnable.run();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            }
        };
        exporterMap.put(uri, exporter);
        return exporter;
    }

    public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
        //先調用doRefer得到服務服務對象,再調用proxyFactory.getInvoker得到invoker對象。
        final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = tagert.invoke(invocation);
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
        invokers.add(invoker);
        return invoker;
    }

    protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
        RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: "
                + invocation.getMethodName() + ", cause: " + e.getMessage(), e);
        re.setCode(getErrorCode(e));
        return re;
    }

    protected int getErrorCode(Throwable e) {
        return RpcException.UNKNOWN_EXCEPTION;
    }

    /**
     **留給子類實現的真正將類發佈到URL上的抽象方法定義,由具體的協議來實現。 
    **/
    protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;

    /**
     **留給子類實現的引用遠程服務的抽象方法定義,該方法是將URL和type接口類應用到一個能夠遠程調用代理對象。
     **/
    protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

}

ProtocolFilterWrapper

是一個Protocol的支持過濾器的裝飾器。經過該裝飾器的對原始對象的包裝使得Protocol支持可擴展的過濾器鏈,已經支持的包括ExceptionFilter、ExecuteLimitFilter和TimeoutFilter等多種支持不一樣特性的過濾器。api

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        //經過該句得到擴展配置的過濾器列表,具體機制須要研究該類的實現。
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            //循環將過濾器列表組裝成爲過濾器鏈,目標invoker是最後一個執行的。
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

ProtocolListenerWrapper

一個支持監聽器特性的Protocal的包裝器。支持兩種監聽器的功能擴展,分別是:ExporterListener是遠程服務發佈監聽器,能夠監聽服務發佈和取消發佈兩個事件點;InvokerListener是服務消費者引用調用器的監聽器,能夠監聽引用和銷燬兩個事件方法。支持可擴展的事件監聽模型,目前只提供了一些適配器InvokerListenerAdapter、ExporterListenerAdapter以及簡單的過時服務調用監聽器DeprecatedInvokerListener。開發者可自行擴展本身的監聽器。該類源碼以下。安全

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.rpc.protocol;

import java.util.Collections;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.ExporterListener;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.listener.ListenerExporterWrapper;
import com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper;

/**
 * ListenerProtocol
 * 
 * @author william.liangf
 */
public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //特殊協議,跳過監聽器觸發。
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //調用原始協議的發佈方法,觸發監聽器鏈事件。
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                        .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

    public void destroy() {
        protocol.destroy();
    }

}

ProxyFactory

dubbo的代理工廠。定義了兩個接口分別是:getProxy根據invoker目標接口的代理對象,通常是消費者得到代理對象觸發遠程調用;getInvoker方法將代理對象proxy、接口類type和遠程服務的URL獲取執行對象Invoker,每每是提供者得到目標執行對象執行目標實現調用。AbstractProxyFactory是其抽象實現,提供了getProxy的模版方法實現,使得能夠支持多接口的映射。dubbo最終內置了兩種動態代理的實現,分別是jdkproxy和javassist。默認的實現使用javassist。爲何選擇javassist,梁飛選型的時候作過性能測試對比分析,參考:http://javatar.iteye.com/blog/814426/服務器

Invoker

該接口是服務的執行體。它有獲取服務發佈的URL,服務的接口類等關鍵屬性的行爲;還有核心的服務執行方法invoke,執行該方法後返回執行結果Result,而傳遞的參數是調用信息Invocation。該接口有大量的抽象和具體實現類。AbstractProxyInvoker是基於代理的執行器抽象實現,AbstractInvoker是通用的抽象實現。app

服務發佈流程


首先ServiceConfig類拿到對外提供服務的實際類ref(如:HelloWorldImpl),而後經過ProxyFactory類的getInvoker方法使用ref生成一個AbstractProxyInvoker實例,到這一步就完成具體服務到Invoker的轉化。接下來就是Invoker轉換到Exporter的過程。
Dubbo處理服務暴露的關鍵就在Invoker轉換到Exporter的過程(如上圖中的紅色部分),下面咱們以Dubbo和RMI這兩種典型協議的實現來進行說明:less

服務引用流程

上圖是服務消費的主過程:
首先ReferenceConfig類的init方法調用Protocol的refer方法生成Invoker實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把Invoker轉換爲客戶端須要的接口(如:HelloWorld)。
關於每種協議如RMI/Dubbo/Web service等它們在調用refer方法生成Invoker實例的細節和上一章節所描述的相似。ide

總結

該模塊下設計較爲複雜,在設計中能夠看出來應用了大量的設計模式,包括模版方法、職責鏈、裝飾器和動態代理等設計模式。掌握該模塊下的核心概念對於後續閱讀其它部分代碼相當重要,後面的其它模塊要麼是它的實現,要麼是由它衍生出來的,要麼與它的關係很是緊密。

next

接下來咱們看看rpc的默認實現模塊——dubbo-rpc-default。該模塊提供了默認的dubbo協議的實現,也是默認使用的協議。

相關文章
相關標籤/搜索