聊聊Dubbo(六):核心源碼-Filter鏈原理

0 前言

對於Java WEB應用來講,Spring的Filter能夠攔截WEB接口調用,但對於Dubbo接口,Spring的Filter就不起做用了。java

Dubbo中的Filter實現是 專門爲服務提供方和服務消費方調用過程進行攔截,Dubbo自己的大多功能均基於此擴展點實現,每次遠程方法執行,該攔截都會被執行,但請注意其對性能的影響。數組

因此,在實際業務開發中,使用最多的可能就是對Filter接口進行擴展,在服務調用鏈路中嵌入咱們自身的處理邏輯,如日誌打印、調用耗時統計等。安全

Dubbo官方針對Filter作了不少的原生支持,目前大體有20來個吧,包括咱們熟知的RpcContext,accesslog功能都是經過filter來實現了,下面一塊兒詳細看一下Filter的實現。bash

1 構造Filter鏈

Dubbo的Filter實現入口是 在ProtocolFilterWrapper,由於ProtocolFilterWrapper是Protocol的包裝類,因此會在加載的Extension的時候被自動包裝進來(理解這裏的前提是 理解Dubbo的SPI機制 ),該封裝器實現了Protocol接口,並提供了一個參數類型爲Protocol的構造方法。Dubbo依據這個構造方法識別出封裝器,並將該封裝器做爲其餘Protocol接口實現的代理網絡

接下來,咱們看一下ProtocolFilterWrapper中是如何構造Filter鏈:併發

public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
    // 帶參數構造器,ExtensionLoad經過該構造器識別封裝器
    public ProtocolFilterWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }
    // 對提供方服務暴露進行封裝,組裝filter調用鏈
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 向註冊中心發佈服務的時候並不會進行filter調用鏈
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    // 對消費方服務引用進行封裝,組裝filter調用鏈
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 向註冊中心引用服務的時候並不會進行filter調用鏈
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
    public void destroy() {
        protocol.destroy();
    }
    // 構造filter調用鏈
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 得到全部激活的Filter(已經排好序的)
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                // 複製引用,構建filter調用鏈
                final Invoker<T> next = last;
                // 這裏只是構造一個最簡化的Invoker做爲調用鏈的載體Invoker
                last = new Invoker<T>() {
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    // 關鍵代碼,單向鏈表指針傳遞
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
}
複製代碼

這裏的關鍵代碼在buildInvokerChain方法,參數invoker爲實際的服務( 對於消費方而言,就是服務的動態代理 )。從ExtensionLoader獲取到已通過排序的Filter列表(排序規則可參見ActivateComparator),而後開始倒序組裝。app

這裏是個典型的裝飾器模式,不過裝飾器鏈條上的每一個節點都是一個匿名內部類Invoker實例異步

  1. 每一個節點invoker持有一個Filter引用,一個下級invoker節點引用以及實際調用的invoker實例(雖然持有但並不實際調用,僅僅是提供獲取實際invoker相關參數的功能,如getInterface,getUrl等方法);
  2. 經過invoke方法,invoker節點將下級節點傳遞給當前的filter進行調用
  3. filter在執行invoke方法時,就會觸發下級節點invoker調用其invoke方法,實現調用的向下傳遞;
  4. 當到達最後一級invoker節點,即實際服務invoker,便可執行真實業務邏輯

這條調用鏈的每一個節點都爲真實的invoker增長了自定義的功能,在整個鏈條上不斷豐富功能,是典型的裝飾器模式。async

看到上面的內容,咱們大體能明白實現是這樣子的,經過獲取全部能夠被激活的Filter鏈,而後根據必定順序構造出一個Filter的調用鏈,最後的調用鏈大體是這樣子:Filter1->Filter2->Filter3->......->Invoker,這個構造Filter鏈的邏輯很是簡單,重點就在於如何獲取被激活的Filter鏈ide

// 將key在url中對應的配置值切換成字符串信息數組
public List<T> getActivateExtension(URL url, String key, String group) {
    String value = url.getParameter(key);
    return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
    
public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();
    // 全部用戶本身配置的filter信息(有些Filter是默認激活的,有些是配置激活的,這裏這裏的names就指的配置激活的filter信息)
    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);

    // 若是這些名稱裏不包括去除default的標誌(-default),換言之就是加載Dubbo提供的默認Filter
    if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        // 加載extension信息
        getExtensionClasses();
        for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
            // name指的是SPI讀取的配置文件的key
            String name = entry.getKey();
            Activate activate = entry.getValue();
            // group主要是區分是在provider端生效仍是consumer端生效
            if (isMatchGroup(group, activate.group())) {
                T ext = getExtension(name);
                // 這裏以Filter爲例:三個判斷條件的含義依次是:
                // 1. 用戶配置的filter列表中不包含當前ext
                // 2. 用戶配置的filter列表中不包含當前ext的加-的key
                // 3. 若是用戶的配置信息(url中體現)中有能夠激活的配置key而且數據不爲0,false,null,N/A,也就是說有正常的使用
                if (! names.contains(name)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                        && isActive(activate, url)) {
                    exts.add(ext);
                }
            }
        }
        // 根據Activate註解上的order排序
        Collections.sort(exts, ActivateComparator.COMPARATOR);
    }
    // 進行到此步驟的時候Dubbo提供的原生的Filter已經被添加完畢了,下面處理用戶本身擴展的Filter
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i ++) {
        String name = names.get(i);
        // 若是單個name不是以-開頭而且全部的key裏面並不包含-'name'(也就是說若是配置成了"dubbo,-dubbo"這種的能夠,這個if是進不去的)
        if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
            // 能夠經過default關鍵字替換Dubbo原生的Filter鏈,主要用來控制調用鏈順序
            if (Constants.DEFAULT_KEY.equals(name)) {
                if (usrs.size() > 0) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                // 加入用戶本身定義的擴展Filter
                T ext = getExtension(name);
                usrs.add(ext);
            }
        }
    }
    if (usrs.size() > 0) {
        exts.addAll(usrs);
    }
    return exts;
}
複製代碼

基本上到這裏就能看到Filter鏈是如何被加載進來的,這裏設計的很是靈活,忍不住要感嘆一下:經過簡單的配置‘-’能夠手動剔除Dubbo原生的必定加載Filter,經過default來代替Dubbo原生的必定會加載的Filter從而來控制順序。這些設計雖然都是很小的功能點,可是整體的感受是十分靈活,考慮的比較周到。

因此,從上面源碼分析得知:

默認filter鏈,先執行原生filter,再依次執行自定義filter,繼而回溯到原點

知道了Filter構造的過程以後,咱們就詳細看幾個比較重要的Filter信息。首先,看一下com.alibaba.dubbo.rpc.Filter接口的源碼,以下:

@SPI
public interface Filter {
    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}
複製代碼

Dubbo原生的filter定義在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter文件中,具體以下:

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
複製代碼

Dubbo自帶超時過濾器TimeoutFilter實現以下:

@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        long start = System.currentTimeMillis();
        Result result = invoker.invoke(invocation);
        long elapsed = System.currentTimeMillis() - start;
        if (invoker.getUrl() != null
                && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
                        "timeout", Integer.MAX_VALUE)) {
            if (logger.isWarnEnabled()) {
                logger.warn("invoke time out. method: " + invocation.getMethodName()
                        + "arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                        + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
            }
        }
        return result;
    }
}
複製代碼
  1. 註解@Activate是不是Dubbo Filter必須的,其上的group和order分別扮演什麼樣的角色?

    對於Dubbo原生自帶的filter,註解@Activate是必須,其group用於provider/consumer的站隊,而order值是filter順序的依據。可是對於自定義filter而言,註解@Activate沒被用到,其分組和順序,徹底由用戶手工配置指定。若是自定義filter添加了@Activate註解,並指定了group了,則這些自定義filter將升級爲原生filter組

  2. Filter的順序是否能夠調整, 如何實現?

    能夠調整,經過'-'符號能夠去除某些filter,而default表明默認激活的原生filter子鏈,經過重排default和自定義filter的順序,達到實現順序控制的目的

讓咱們來構建幾個case,來看看如何配置能知足。假定自定義filter的對象爲filter1,filter2:

case 1: 其執行順序爲, 原生filter子鏈->filter1->filter2

<dubbo:reference filter="filter1,filter2"/>
複製代碼

case 2: 其執行順序爲, filter1->filter2->原生filter子鏈

<dubbo:reference filter="filter1,filter2,default"/>
複製代碼

case 3: 其執行順序爲, filter1->原生filter子鏈->filter2, 同時去掉原生的TokenFilter(token)

<dubbo:service filter="filter1,default,filter2,-token"/>
複製代碼

Filter在做用端區分的話主要是區分爲consumer和provider,下面咱們就以這個爲區分來分別介紹一些重點的Filter。

2 Consumer

2.1 ConsumerContextFilter

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;

/** * ConsumerContextInvokerFilter(默認觸發) */
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 在當前的RpcContext中記錄本地調用的一次狀態信息
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}
複製代碼

其實簡單來看這個Filter的話是十分簡單,它又是怎麼將客戶端設置的隱式參數傳遞給服務端呢

載體就是Invocation對象,在客戶端調用Invoker.invoke方法時候,會去取當前狀態記錄器RpcContext中的attachments屬性,而後設置到RpcInvocation對象中,在RpcInvocation傳遞到provider的時候會經過另一個過濾器ContextFilter將RpcInvocation對象從新設置回RpcContext中供服務端邏輯從新獲取隱式參數

這就是爲何RpcContext只能記錄一次請求的狀態信息,由於在第二次調用的時候參數已經被新的RpcInvocation覆蓋掉,第一次的請求信息對於第二次執行是不可見的。

2.2 ActiveLimitFilter

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;

/** * LimitInvokerFilter(當配置了actives而且值不爲0的時候觸發) */
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 主要記錄每臺機器針對某個方法的併發數量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            if (active >= max) {
                synchronized (count) {
                    // 這個while循環是必要的,由於在一次wait結束後,可能線程調用已經結束了,騰出來consumer的空間
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        // 若是wait方法被中斷的話,remain這時候有可能大於0
                        // 若是其中一個線程運行結束後自動調用notify方法的話,也有可能remain大於0
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            // 調用開始和結束後增減併發數量
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                // 這裏很關鍵,由於一個調用完成後要通知正在等待執行的隊列
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}
複製代碼

ActiveLimitFilter主要用於 限制同一個客戶端對於一個服務端方法的併發調用量(客戶端限流)。

2.3 FutureFilter

Future主要是處理事件信息,主要有如下幾個事件:

  1. oninvoke 在方法調用前觸發(若是調用出現異常則會直接觸發onthrow方法)
  2. onreturn 在方法返回會觸發(若是調用出現異常則會直接觸發onthrow方法)
  3. onthrow 調用出現異常時候觸發
package com.alibaba.dubbo.rpc.protocol.dubbo.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.StaticContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;

/** * EventFilter */
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
        
        // 這裏主要處理回調邏輯,主要區分三個時間:oninvoke:調用前觸發,onreturn:調用後觸發 onthrow:出現異常狀況時候觸發
        fireInvokeCallback(invoker, invocation);
        
        // 須要在調用前配置好是否有返回值,已供invoker判斷是否須要返回future.
        Result result = invoker.invoke(invocation);
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        return result;
    }
    
    private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
        if (result.hasException()) {
            fireThrowCallback(invoker, invocation, result.getException());
        } else {
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }
    /** * 同步異步的主要處理區別: * 1. 同步調用的話,事件觸發是直接調用的,沒有任何邏輯; * 2. 異步的話就是首先獲取到調用產生的Future對象,而後複寫Future的done()方法, * 將fireThrowCallback和fireReturnCallback邏輯引入便可。 */
    private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
        Future<?> f = RpcContext.getContext().getFuture();
        if (f instanceof FutureAdapter) {
            ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
            future.setCallback(new ResponseCallback() {
                public void done(Object rpcResult) {
                    if (rpcResult == null) {
                        logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                        return;
                    }
                    ///must be rpcResult
                    if (!(rpcResult instanceof Result)) {
                        logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                        return;
                    }
                    Result result = (Result) rpcResult;
                    if (result.hasException()) {
                        fireThrowCallback(invoker, invocation, result.getException());
                    } else {
                        fireReturnCallback(invoker, invocation, result.getValue());
                    }
                }

                public void caught(Throwable exception) {
                    fireThrowCallback(invoker, invocation, exception);
                }
            });
        }
    }

    private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
        final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));

        if (onInvokeMethod == null && onInvokeInst == null) {
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        // 因爲JDK的安全檢查耗時較多.因此經過setAccessible(true)的方式關閉安全檢查就能夠達到提高反射速度的目的
        if (!onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
        // 從下面代碼能夠看出oninvoke的方法參數要與調用的方法參數一致
        Object[] params = invocation.getArguments();
        try {
            onInvokeMethod.invoke(onInvokeInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }
    
    // fireReturnCallback的邏輯與fireThrowCallback基本同樣,因此不用看了
    private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
        final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
        final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

        //not set onreturn callback
        if (onReturnMethod == null && onReturnInst == null) {
            return;
        }

        if (onReturnMethod == null || onReturnInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onReturnMethod.isAccessible()) {
            onReturnMethod.setAccessible(true);
        }

        Object[] args = invocation.getArguments();
        Object[] params;
        Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
        if (rParaTypes.length > 1) {
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                params = new Object[2];
                params[0] = result;
                params[1] = args;
            } else {
                params = new Object[args.length + 1];
                params[0] = result;
                System.arraycopy(args, 0, params, 1, args.length);
            }
        } else {
            params = new Object[]{result};
        }
        try {
            onReturnMethod.invoke(onReturnInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

    // fireReturnCallback的邏輯與fireThrowCallback基本同樣,因此不用看了
    private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
        final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
        final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

        //onthrow callback not configured
        if (onthrowMethod == null && onthrowInst == null) {
            return;
        }
        if (onthrowMethod == null || onthrowInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onthrowMethod.isAccessible()) {
            onthrowMethod.setAccessible(true);
        }
        Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
        if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
            try {
                // 由於onthrow方法的參數第一個值必須爲異常信息,因此這裏須要構造參數列表
                Object[] args = invocation.getArguments();
                Object[] params;

                if (rParaTypes.length > 1) {
                    // 回調方法只有一個參數並且這個參數是數組(單獨拎出來計算的好處是這樣能夠少複製一個數組)
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                        params = new Object[2];
                        params[0] = exception;
                        params[1] = args;
                    } else {
                        // 回調方法有多於一個參數
                        params = new Object[args.length + 1];
                        params[0] = exception;
                        System.arraycopy(args, 0, params, 1, args.length);
                    }
                } else {
                    // 回調方法沒有參數
                    params = new Object[]{exception};
                }
                onthrowMethod.invoke(onthrowInst, params);
            } catch (Throwable e) {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
            }
        } else {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
        }
    }
}
複製代碼

3 Provider

3.1 ContextFilter

ContextFilter和ConsumerContextFilter是結合使用的,以前的介紹中已經看了ConsumerContextFilter,下面再簡單看一下ContextFilter,來驗證上面講到的邏輯。

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;

import java.util.HashMap;
import java.util.Map;

/** * ContextInvokerFilter */
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            // 隱式參數重剔除一些核心消息
            attachments = new HashMap<String, String>(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

        // mreged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            // 這裏又從新將invocation和attachments信息設置到RpcContext,
            // 這裏設置之後provider的代碼就能夠獲取到consumer端傳遞的一些隱式參數了
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }

        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.removeContext();
        }
    }
}
複製代碼

3.2 EchoFilter

迴響測試主要用來檢測服務是否正常(網絡狀態),單純的檢測網絡狀況的話其實不須要執行真正的業務邏輯的,因此經過Filter驗證一下便可。

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;

/** * EchoInvokerFilter */
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
            return new RpcResult(inv.getArguments()[0]);
        return invoker.invoke(inv);
    }

}
複製代碼

3.3 ExecuteLimitFilter

服務端接口限制限流的具體執行邏輯就是在ExecuteLimitFilter中,由於服務端不須要考慮重試等待邏輯,一旦當前執行的線程數量大於指定數量,就直接返回失敗了,因此實現邏輯相對於ActiveLimitFilter卻是簡便了很多。

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;

import java.util.concurrent.Semaphore;

/** * ThreadLimitInvokerFilter */
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
            /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }
}
複製代碼

3.4 ExceptionFilter

Dubbo 對於異常的處理有本身的一套規則:

  1. 若是是 checked異常 則直接拋出;
  2. 若是是unchecked異常 可是在接口上有聲明,也會直接拋出;
  3. 若是異常類和接口類在同一jar包裏,直接拋出;
  4. 若是是 JDK自帶的異常 ,直接拋出;
  5. 若是是 Dubbo的異常 ,直接拋出;
  6. 其他的都包裝成RuntimeException而後拋出(避免異常在Client不能反序列化問題)
package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericService;

import java.lang.reflect.Method;

/** * ExceptionInvokerFilter * <p> * Functions: * <ol> * <li>unexpected exception will be logged in ERROR level on provider side. Unexpected exception are unchecked * exception not declared on the interface</li> * <li>Wrap the exception not introduced in API package into RuntimeException. Framework will serialize the outer exception but stringnize its cause in order to avoid of possible serialization problem on client side</li> * </ol> */
@Activate(group = Constants.PROVIDER)
public class ExceptionFilter implements Filter {

    private final Logger logger;

    public ExceptionFilter() {
        this(LoggerFactory.getLogger(ExceptionFilter.class));
    }

    public ExceptionFilter(Logger logger) {
        this.logger = logger;
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        try {
            Result result = invoker.invoke(invocation);
            if (result.hasException() && GenericService.class != invoker.getInterface()) {
                try {
                    Throwable exception = result.getException();

                    // directly throw if it's checked exception
                    if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                        return result;
                    }
                    // directly throw if the exception appears in the signature
                    try {
                        Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                        Class<?>[] exceptionClassses = method.getExceptionTypes();
                        for (Class<?> exceptionClass : exceptionClassses) {
                            if (exception.getClass().equals(exceptionClass)) {
                                return result;
                            }
                        }
                    } catch (NoSuchMethodException e) {
                        return result;
                    }

                    // for the exception not found in method's signature, print ERROR message in server's log.
                    logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                            + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                            + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                    // directly throw if exception class and interface class are in the same jar file.
                    String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                    String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                    if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                        return result;
                    }
                    // directly throw if it's JDK exception
                    String className = exception.getClass().getName();
                    if (className.startsWith("java.") || className.startsWith("javax.")) {
                        return result;
                    }
                    // directly throw if it's dubbo exception
                    if (exception instanceof RpcException) {
                        return result;
                    }

                    // otherwise, wrap with RuntimeException and throw back to the client
                    return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
                } catch (Throwable e) {
                    logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                            + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                            + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                    return result;
                }
            }
            return result;
        } catch (RuntimeException e) {
            logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                    + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                    + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
            throw e;
        }
    }
}
複製代碼

到這,Dubbo中的幾個核心Filter已經講完,Filter其實沒有那麼複雜,在開發過程當中,也能夠參考此思路實現本身的Filter鏈。

相關文章
相關標籤/搜索