dubbo源碼解析(二十二)遠程調用——Protocol

遠程調用——Protocol

目標:介紹遠程調用中協議的設計和實現,介紹dubbo-rpc-api中的各類protocol包的源碼,是重點內容。

前言

在遠程調用中協議是很是重要的一層,看下面這張圖:java

dubbo-framework

該層是在信息交換層之上,分爲了而且夾雜在服務暴露和服務引用中間,爲了有一個約定的方式進行調用。git

dubbo支持不一樣協議的擴展,好比http、thrift等等,具體的能夠參照官方文檔。本文講解的源碼大部分是對於公共方法的實現,而具體的服務暴露和服務引用會在各個協議實現中講到。github

下面是該包下面的類圖:api

protocol包類圖

源碼分析

(一)AbstractProtocol

該類是協議的抽象類,實現了Protocol接口,其中實現了一些公共的方法,抽象方法在它的子類AbstractProxyProtocol中定義。app

1.屬性

/**
 * 服務暴露者集合
 */
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

/**
 * 服務引用者集合
 */
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();

2.serviceKey

protected static String serviceKey(URL url) {
    // 得到綁定的端口號
    int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
    return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
            url.getParameter(Constants.GROUP_KEY));
}

protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
    return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}

該方法是爲了獲得服務key group+"/"+serviceName+":"+serviceVersion+":"+port異步

3.destroy

@Override
public void destroy() {
    // 遍歷服務引用實體
    for (Invoker<?> invoker : invokers) {
        if (invoker != null) {
            // 從集合中移除
            invokers.remove(invoker);
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Destroy reference: " + invoker.getUrl());
                }
                // 銷燬
                invoker.destroy();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 遍歷服務暴露者
    for (String key : new ArrayList<String>(exporterMap.keySet())) {
        // 從集合中移除
        Exporter<?> exporter = exporterMap.remove(key);
        if (exporter != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                }
                // 取消暴露
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

該方法是對invoker和exporter的銷燬。ide

(二)AbstractProxyProtocol

該類繼承了AbstractProtocol類,其中利用了代理工廠對AbstractProtocol中的兩個集合進行了填充,而且對異常作了處理。源碼分析

1.屬性

/**
 * rpc的異常類集合
 */
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();

/**
 * 代理工廠
 */
private ProxyFactory proxyFactory;

2.export

@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    // 得到uri
    final String uri = serviceKey(invoker.getUrl());
    // 得到服務暴露者
    Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
    if (exporter != null) {
        return exporter;
    }
    // 新建一個線程
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
    exporter = new AbstractExporter<T>(invoker) {
        /**
         * 取消暴露
         */
        @Override
        public void unexport() {
            super.unexport();
            // 移除該key對應的服務暴露者
            exporterMap.remove(uri);
            if (runnable != null) {
                try {
                    // 啓動線程
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    };
    // 加入集合
    exporterMap.put(uri, exporter);
    return exporter;
}

其中分爲兩個步驟,建立一個exporter,放入到集合匯中。在建立exporter時對unexport方法進行了重寫。ui

3.refer

@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
    // 經過代理得到實體域
    final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
    Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            try {
                // 得到調用結果
                Result result = target.invoke(invocation);
                Throwable e = result.getException();
                // 若是拋出異常,則拋出相應異常
                if (e != null) {
                    for (Class<?> rpcException : rpcExceptions) {
                        if (rpcException.isAssignableFrom(e.getClass())) {
                            throw getRpcException(type, url, invocation, e);
                        }
                    }
                }
                return result;
            } catch (RpcException e) {
                // 拋出未知異常
                if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                    e.setCode(getErrorCode(e.getCause()));
                }
                throw e;
            } catch (Throwable e) {
                throw getRpcException(type, url, invocation, e);
            }
        }
    };
    // 加入集合
    invokers.add(invoker);
    return invoker;
}

該方法是服務引用,先從代理工廠中得到Invoker對象target,而後建立了真實的invoker在重寫方法中調用代理的方法,最後加入到集合。this

protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;

protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

能夠看到其中抽象了服務引用和暴露的方法,讓各種協議各自實現。

(三)AbstractInvoker

該類是invoker的抽象方法,由於協議被夾在服務引用和服務暴露中間,不管什麼協議都有一些通用的Invoker和exporter的方法實現,而該類就是實現了Invoker的公共方法,而把doInvoke抽象出來,讓子類只關注這個方法。

1.屬性

/**
 * 服務類型
 */
private final Class<T> type;

/**
 * url對象
 */
private final URL url;

/**
 * 附加值
 */
private final Map<String, String> attachment;

/**
 * 是否可用
 */
private volatile boolean available = true;

/**
 *  是否銷燬
 */
private AtomicBoolean destroyed = new AtomicBoolean(false);

2.convertAttachment

private static Map<String, String> convertAttachment(URL url, String[] keys) {
    if (keys == null || keys.length == 0) {
        return null;
    }
    Map<String, String> attachment = new HashMap<String, String>();
    // 遍歷key,把值放入附加值集合中
    for (String key : keys) {
        String value = url.getParameter(key);
        if (value != null && value.length() > 0) {
            attachment.put(key, value);
        }
    }
    return attachment;
}

該方法是轉化爲附加值,把url中的值轉化爲服務調用invoker的附加值。

3.invoke

@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    // 若是服務引用銷燬,則打印告警日誌,可是經過
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }

    RpcInvocation invocation = (RpcInvocation) inv;
    // 會話域中加入該調用鏈
    invocation.setInvoker(this);
    // 把附加值放入會話域
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    // 把上下文的附加值放入會話域
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        /**
         * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
         * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
         * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
         * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
         */
        invocation.addAttachments(contextAttachments);
    }
    // 若是開啓的是異步調用,則把該設置也放入附加值
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    // 加入編號
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        // 執行調用鏈
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

該方法作了一些公共的操做,好比服務引用銷燬的檢測,加入附加值,加入調用鏈實體域到會話域中等。而後執行了doInvoke抽象方法。各協議本身去實現。

(四)AbstractExporter

該類和AbstractInvoker相似,也是在服務暴露中實現了一些公共方法。

1.屬性

/**
 * 實體域
 */
private final Invoker<T> invoker;

/**
 * 是否取消暴露服務
 */
private volatile boolean unexported = false;

2.unexport

@Override
public void unexport() {
    // 若是已經消取消暴露,則之間返回
    if (unexported) {
        return;
    }
    // 設置爲true
    unexported = true;
    // 銷燬該實體域
    getInvoker().destroy();
}

(五)InvokerWrapper

該類是Invoker的包裝類,其中用到類裝飾模式,不過並無實現實際的功能加強。

public class InvokerWrapper<T> implements Invoker<T> {

    /**
     * invoker對象
     */
    private final Invoker<T> invoker;

    private final URL url;

    public InvokerWrapper(Invoker<T> invoker, URL url) {
        this.invoker = invoker;
        this.url = url;
    }

    @Override
    public Class<T> getInterface() {
        return invoker.getInterface();
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public boolean isAvailable() {
        return invoker.isAvailable();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

    @Override
    public void destroy() {
        invoker.destroy();
    }

}

(六)ProtocolFilterWrapper

該類實現了Protocol接口,其中也用到了裝飾模式,是對Protocol的裝飾,是在服務引用和暴露的方法上加上了過濾器功能。

1.buildInvokerChain

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 得到過濾器的全部擴展實現類實例集合
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        // 從最後一個過濾器開始循環,建立一個帶有過濾器鏈的invoker對象
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            // 記錄last的invoker
            final Invoker<T> next = last;
            // 新建last
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                /**
                 * 關鍵在這裏,調用下一個filter表明的invoker,把每個過濾器串起來
                 * @param invocation
                 * @return
                 * @throws RpcException
                 */
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

該方法就是建立帶 Filter 鏈的 Invoker 對象。倒序的把每個過濾器串連起來,造成一個invoker。

2.export

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 若是是註冊中心,則直接暴露服務
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 服務提供側暴露服務
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

該方法是在服務暴露上作了過濾器鏈的加強,也就是加上了過濾器。

3.refer

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 若是是註冊中心,則直接引用
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 消費者側引用服務
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

該方法是在服務引用上作了過濾器鏈的加強,也就是加上了過濾器。

(七)ProtocolListenerWrapper

該類也實現了Protocol,也是裝飾了Protocol接口,可是它是在服務引用和暴露過程當中加上了監聽器的功能。

1.export

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 若是是註冊中心,則暴露該invoker
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 建立一個暴露者監聽器包裝類對象
    return new ListenerExporterWrapper<T>(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

該方法是在服務暴露上作了監聽器功能的加強,也就是加上了監聽器。

2.refer

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 若是是註冊中心。則直接引用服務
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 建立引用服務監聽器包裝類對象
    return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                            .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

該方法是在服務引用上作了監聽器功能的加強,也就是加上了監聽器。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了遠程調用中關於協議的部分,其實就是講了一些公共的方法,而且把關鍵方法抽象出來讓子類實現,具體的方法實現都在各個協議中本身實現。接下來我將開始對rpc模塊的代理進行講解。

相關文章
相關標籤/搜索