dubbo服務暴露過程源碼分析

本例以一個簡單典型的服務發佈爲例,spring配置以下java

//dubbo協議
 <dubbo:protocol name="dubbo" port="20880" id="dubbo1"/>
   //zk註冊中心
 <dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181"/>
 <dubbo:service  interface="demo.dubbo.api.DemoService" ref="demoService" protocol="dubbo1" />
//服務接口
public interface DemoService {

 public String sayHello(String name);

}
//實現
public class DemoServiceImpl implements DemoService {

public String sayHello(String name) {

Random random=new Random();
try {
    Thread.sleep(800* random.nextInt(6));
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}

有博文 dubbo基於spring的構建分析,能夠看到dubbo服務發佈註冊的啓動方法有兩個入口,
第一在ServiceBean類的onApplicationEvent()方法,在容器初始化完成後,執行暴露邏輯
第二在ServiceBean類的afterPropertiesSet()方法,當前servicebean屬性構造完成後,執行暴露邏輯
具體暴露方法在其父類ServiceConfig的ServiceConfig方法裏spring

//這是個同步的方法
    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        //延遲暴露 經過delay參數配置的,延遲暴露,放入單獨線程中。
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            //暴露方法
            doExport();
        }
    }
/***
     * 也是個同步的方法,暴露過程具體過程
     */
    protected synchronized void doExport() {
        //....屬性檢查和賦值,代碼略

        //暴露過程
        doExportUrls();
    }

     private void doExportUrls() {
        //獲取註冊中心信息
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //多個協議,暴露屢次
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

     //暴露過程
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //屬性的解析和賦值.....
	//..... 代碼略.....

        //獲取服務暴露範圍,本地或者遠程
        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置爲none不暴露
        //不配默認,服務暴露遠程同時在本地暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //本地暴露 (***看這裏***)關鍵1
                exportLocal(url);
            }
            //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    //有多個註冊中心,暴露到多個註冊中心
                    for (URL registryURL : registryURLs) {
		        //url 添加dynamic 屬性值
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
			//(***看這裏***)關鍵2
                        //默認走到JavassistProxyFactory.getInvoker方法
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //這裏的invoker的url的協議是register類型
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    //沒有註冊中心 ,只在本機ip打開服務端口,生成服務代理,並不註冊到註冊中心。
                    //(***看這裏***)關鍵3 此處的url協議爲dubbo
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

上面方法中有三個地方涉及到具體的服務暴露先看,關鍵1bootstrap

本地暴露的邏輯

private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)//設置爲injvm 協議
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            //這裏的protocol是Protocol$Adpative的實例(spi機制)
        //proxyFactory是ProxyFactory$Adpative實例(spi機制)
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            //放到暴露列表
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

ProxyFactory$Adpative的getInvoker方法api

public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) 
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        //這裏的extension默認是JavassistProxyFactory實例,
	return extension.getInvoker(arg0, arg1, arg2);
    }

JavassistProxyFactory的getInvoker方法緩存

//proxy 是服務實現類,type是服務接口
 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	//利用Wrapper類經過服務接口生成對應的代理類。
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //實現抽象類AbstractProxyInvoker抽象方法doInvoke,並調用(proxy, type, url)構造函數實例化匿名類
	return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //這裏調用代理類的invokeMethod方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

AbstractProxyInvoker的構造方法併發

public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
        if (proxy == null) {
            throw new IllegalArgumentException("proxy == null");
        }
        if (type == null) {
            throw new IllegalArgumentException("interface == null");
        }
        if (!type.isInstance(proxy)) {
            throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
        }
        this.proxy = proxy;
        this.type = type;
        this.url = url;//賦值ur到自身(invoker),這個後面用到
    }

AbstractProxyInvoker的invoke方法app

public Result invoke(Invocation invocation) throws RpcException {
        try {
	   //回調用doInvoke方法,會傳入執行實例proxy,方法名和方法參數類型和值
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

這裏以文章開頭DemoService接口爲例經過Wrapper.getWrapper返回的類代碼,這裏須要代碼hack框架

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.DemoService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

public class Wrapper0 extends Wrapper
  implements ClassGenerator.DC
{
  public static String[] pns;
  public static Map pts;
  public static String[] mns;
  public static String[] dmns;
  public static Class[] mts0;

  public String[] getPropertyNames()
  {
    return pns;
  }

  public boolean hasProperty(String paramString)
  {
    return pts.containsKey(paramString);
  }

  public Class getPropertyType(String paramString)
  {
    return (Class)pts.get(paramString);
  }

  public String[] getMethodNames()
  {
    return mns;
  }

  public String[] getDeclaredMethodNames()
  {
    return dmns;
  }

  public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2)
  {
    try
    {
      DemoService localDemoService = (DemoService)paramObject1;
    }
    catch (Throwable localThrowable)
    {
      throw new IllegalArgumentException(localThrowable);
    }
    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");
  }

  public Object getPropertyValue(Object paramObject, String paramString)
  {
    try
    {
      DemoService localDemoService = (DemoService)paramObject;
    }
    catch (Throwable localThrowable)
    {
      throw new IllegalArgumentException(localThrowable);
    }
    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");
  }
   
  //(**看這裏,關鍵方法實現****)
  public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
    throws InvocationTargetException
  {
    DemoService localDemoService;
    try
    {
    //賦值執行實例,這裏是接口實現類,DemoServiceImpl對象
      localDemoService = (DemoService)paramObject;
    }
    catch (Throwable localThrowable1)
    {
      throw new IllegalArgumentException(localThrowable1);
    }
    try
    {
    //根據傳入的要調用的方法名paramString,方法參數值,調用執行實例方法
      if (("sayHello".equals(paramString)) || (paramArrayOfClass.length == 1))
        return localDemoService.sayHello((String)paramArrayOfObject[0]);
    }
    catch (Throwable localThrowable2)
    {
      throw new InvocationTargetException(localThrowable2);
    }
    throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.common.DemoService.");
  }
}

到這比較清楚瞭解,具體的代理過程了。dom

第一步上面invoker對象生成好後,接下來就要經過Protocol$Adpative的export方法暴露服務jvm

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
	    //根據上文本地調用這裏的protocal協議別設置爲injvm
	    //因此這裏會走到InjvmProtocol的export方法
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

看下InjvmProtocol的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //返回InjvmExporter對象
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

InjvmExporter構造函數

InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);//invoker賦給自身
        this.key = key;
        this.exporterMap = exporterMap;
	//存的形式,serviceKey:自身(exporter) put到map關聯起來,這樣能夠經過servciekey找到exporterMap而後找到invoker
        exporterMap.put(key, this);
    }

這裏的exporterMap是由InjvmProtocol實例擁有,而InjvmProtocol有時單例的,由於InjvmProtocol類有如下實例和方法:

//靜態自身成員變量
    private static InjvmProtocol INSTANCE;
    //構造方法,把自身賦值給INSTANCE對象
    public InjvmProtocol() {
        INSTANCE = this;
    }

因此exporterMap對象也是單例的。同時這裏順便看下InjvmProtocol的refer方法,本地服務的引用查找也是經過自身exporterMap對象。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        //把exporterMap對象賦值給InjvmInvoker
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }
    //具體查找過程
    public Result doInvoke(Invocation invocation) throws Throwable {
        //經過exporterMap獲取exporter
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);
        return exporter.getInvoker().invoke(invocation);
    }

以上是本地服務的發佈和引用過程

遠程服務發佈暴露過程

接下來看下本例中會用到的遠程服務發佈暴露過程,即暴露服務端口併發布服務信息到註冊中心。也便是關鍵2代碼處
經過上面本地服務暴露過程分析能夠知道,遠程服務的態代理生成過程和本地服務代理生成是同樣的,惟一區別點是構造invoker是傳入的url是registryURL
傳入url的不一樣,會形成下面這句的執行過程的不一樣

Exporter<?> exporter = protocol.export(invoker);

這裏protocol經過spi走的是Protocol$Adpative的export方法:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //因爲傳入的url是registryURL因此會走RegistryProtocol的export方法
	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

RegistryProtocol的export方法以下

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker 暴露invoker (***看doLocalExport方法**)
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider 獲取對應註冊中心操做對象
        final Registry registry = getRegistry(originInvoker);
        //獲取要註冊到註冊中心的地址
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //註冊服務url到註冊中心(***把服務信息註冊到註冊中心**)
        registry.register(registedProviderUrl);
        // 訂閱override數據
        // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都返回一個新的exporter實例
        //實現一個匿名類實現接口Exporter
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            //取消暴露的過程
            public void unexport() {
                try {

                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    //取消註冊
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    //取消訂閱
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

doLocalExport方法,這裏方法裏涉及過程比較多

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        //經過原始originInvoker構造緩存key
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        //緩存沒有,走具體暴露邏輯
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    //InvokerDelegete是RegistryProtocol類的靜態內部類,繼承自InvokerWrapper,
		    //經過構造器賦值持有代理originInvoker和服務暴露協議url對象,算是包裝一層
                    //而url 是經過getProviderUrl(originInvoker)返回的,此時url的協議已經是dubbo,即服務暴露的協議
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

                    //ExporterChangeableWrapper是RegistryProtocol的私有內部類實現了Exporter接口。
                    //經過調用它的構造方法(Exporter<T> exporter, Invoker<T> originInvoker)構造exporterWrapper實例
		    //而這裏傳入的exporter是經過(Exporter<T>) protocol.export(invokerDelegete)語句建立
		    //由上一步知道,這裏的invokerDelegete裏url屬性的protocol協議已是dubbo
                    //下面具體看下protocol.export(invokerDelegete)方法。
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

還對Protocol$Adpative類的export邏輯分析

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        //因爲這裏invokerDelegete的url屬性的protocol是dubbo,因此這裏走DubboProtocol的export協議
	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

 接着看下DubboProtocol的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // get serviceKey.
        String key = serviceKey(url);//key的組成group/service:version:port
        //構造服務的exporter
        //如同InjvmProtocol同樣,DubboProtocol也是單例的 因此這裏exporterMap也是單例的
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //經過key放入exporterMap,把持有invoker的exporter 和serviceKey關聯
        //這個在後面服務調用時,能夠經過key找到對應的exporter進而找到invoker提供服務
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //根據url開啓一個服務,好比綁定端口,開始接受請求信息(**繼續看這裏***)
        openServer(url);

        return exporter;
    }
private void openServer(URL url) {
        //key=host:port 用於定位server
        String key = url.getAddress();
        //client也能夠暴露一個只有server能夠調用的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            //服務實例放到serverMap,key是host:port
            //這裏的serverMap也單例的
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //經過createServer(url)方法獲取server (***看這裏***)
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

     /***
     * 開啓服務
     * @param url
     * @return
     */
    private ExchangeServer createServer(URL url) {
        //默認開啓server關閉時發送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默認開啓heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        /***
         * 經過server key 檢查是不是dubbo目前spi擴展支持的傳輸框架。默認是netty
         */
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

         //經過codec key 獲取編解碼方案,兼容dubbo1,默認是dubbo1compatible ,不然默認dubbo 編解碼方案
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //構造具體服務實例,
	    //Exchangers是門面類,裏面封裝了具體交換層實現,並調用它的bind方法(***看這裏***)
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        //這裏會驗證一下,客戶端傳輸層實現
        //若是沒有對應的實現,會拋出異常
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

跟蹤方法

//Exchangers.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
	//經過spi會走HeaderExchanger的bind邏輯
        return getExchanger(url).bind(url, handler);
    }

    //HeaderExchanger的bind方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //Transporters也是門面類,封裝了具體的傳輸層實現查找和bind過程
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

    //HeaderExchangeServer的構造函數
      public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        //開啓心跳
        startHeatbeatTimer();
    }

  繼續跟進方法

//Transporters.bind方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
	//根據spi 這裏具體走 NettyTransporter.bind
        return getTransporter().bind(url, handler);
    }

    //NettyTransporter的bind方法

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
       //能夠看到這裏是NettyServer實例
        return new NettyServer(url, listener);
    }
   //NettyServer構造器

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
       //調用父類AbstractServer構造器
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

   父類AbstractServer的構造函數

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false)
                || NetUtils.isInvalidLocalHost(getUrl().getHost())
                ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            //打開端口,啓動服務,在其子類實現,這裏是NettyServer的doOpen()方法(***看這裏**)
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

NettyServer的doOpen()方法:

protected void doOpen() throws Throwable {
        //經過netty 開啓服務監聽端口
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());//解碼器
                pipeline.addLast("encoder", adapter.getEncoder());//編碼器
                pipeline.addLast("handler", nettyHandler);//NettyHandler 擴展netty雙向handler基類r 能夠接受進站和出站數據流
                return pipeline;
            }
        });
        // bind 地址 開啓端口
        channel = bootstrap.bind(getBindAddress());
    }

    以上基本梳理了的dubbo從service配置解析到生成服務代理,並經過netty開啓服務端口的服務暴露過程。

相關文章
相關標籤/搜索