Dubbo原理和源碼解析之服務暴露

1、框架設計

在官方《Dubbo 用戶指南》架構部分,給出了服務調用的總體架構和流程:html

另外,在官方《Dubbo 開發指南》框架設計部分,給出了總體設計:java

以及暴露服務時序圖:spring

 

本文將根據以上幾張圖,分析服務暴露的實現原理,並進行詳細的代碼跟蹤與解析。apache

2、原理和源碼解析

2.1 標籤解析

從文章《Dubbo原理和源碼解析之標籤解析》中咱們知道,<dubbo:service> 標籤會被解析成 ServiceBean。bootstrap

ServiceBean 實現了 InitializingBean,在類加載完成以後會調用 afterPropertiesSet() 方法。在 afterPropertiesSet() 方法中,依次解析如下標籤信息:緩存

  • <dubbo:provider>
  • <dubbo:application>
  • <dubbo:module>
  • <dubbo:registry>
  • <dubbo:monitor>
  • <dubbo:protocol>

ServiceBean 還實現了 ApplicationListener,在 Spring 容器初始化的時候會調用 onApplicationEvent 方法。ServiceBean 重寫了 onApplicationEvent 方法,實現了服務暴露的功能。架構

ServiceBean.javaapp

複製代碼

public void onApplicationEvent(ApplicationEvent event) {
    if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
        if (isDelay() && ! isExported() && ! isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
}

複製代碼

2.2 延遲暴露

ServiceBean 擴展了 ServiceConfig,調用 export() 方法時由 ServiceConfig 完成服務暴露的功能實現。框架

ServiceConfig.java異步

複製代碼

public synchronized void export() {
    if (provider != null) {
        if (export == null) {
            export = provider.getExport();
        }
        if (delay == null) {
            delay = provider.getDelay();
        }
    }
    if (export != null && ! export.booleanValue()) {
        return;
    }
    if (delay != null && delay > 0) {
        Thread thread = new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(delay);
                } catch (Throwable e) {
                }
                doExport();
            }
        });
        thread.setDaemon(true);
        thread.setName("DelayExportServiceThread");
        thread.start();
    } else {
        doExport();
    }
}

複製代碼

由上面代碼可知,若是設置了 delay 參數,Dubbo 的處理方式是啓動一個守護線程在 sleep 指定時間後再 doExport。

2.3 參數檢查

在 ServiceConfig 的 doExport() 方法中會進行參數檢查和設置,包括:

  • 泛化調用
  • 本地實現
  • 本地存根
  • 本地假裝
  • 配置(application、registry、protocol等)

ServiceConfig.java

複製代碼

protected synchronized void doExport() {
    if (unexported) {
        throw new IllegalStateException("Already unexported!");
    }
    if (exported) {
        return;
    }
    exported = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
    }
    checkDefault();
    //省略
    if (ref instanceof GenericService) {
        interfaceClass = GenericService.class;
        generic = true;
    } else {
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
        checkRef();
        generic = false;
    }
    if(local !=null){
        if(local=="true"){
            local=interfaceName+"Local";
        }
        Class<?> localClass;
        try {
            localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if(!interfaceClass.isAssignableFrom(localClass)){
            throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
        }
    }
    if(stub !=null){
        if(stub=="true"){
            stub=interfaceName+"Stub";
        }
        Class<?> stubClass;
        try {
            stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        if(!interfaceClass.isAssignableFrom(stubClass)){
            throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
        }
    }
    //此處省略:檢查並設置相關參數
    doExportUrls();
}

複製代碼

2.4 多協議、多註冊中心

在檢查完參數以後,開始暴露服務。Dubbo 支持多協議和多註冊中心:

ServiceConfig.java

複製代碼

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

複製代碼

2.5 組裝URL

針對每一個協議、每一個註冊中心,開始組裝 URL。

ServiceConfig.java

複製代碼

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    //處理host

    //處理port

    Map<String, String> map = new HashMap<String, String>();
    //設置參數到map

    // 導出服務
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    //此處省略:服務暴露(詳見 2.6 和 2.7)
    
    this.urls.add(url);
}

複製代碼

2.6 本地暴露

若是配置 scope=none, 則不會進行服務暴露;若是沒有配置 scope 或者 scope=local,則會進行本地暴露。

ServiceConfig.java

複製代碼

//public static final String LOCAL_PROTOCOL = "injvm";
//public static final String LOCALHOST = "127.0.0.1";

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //......
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置爲none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //......
    }
    //......
}

private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(NetUtils.LOCALHOST)
                .setPort(0);
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
    }
}

複製代碼

1. 暴露服務的時候,會經過代理建立 Invoker;

2. 本地暴露時使用 injvm 協議,injvm 協議是一個僞協議,它不開啓端口,不能被遠程調用,只在 JVM 內直接關聯,但執行 Dubbo 的 Filter 鏈。

2.7 遠程暴露

若是沒有配置 scope 或者 scope=remote,則會進行遠程暴露。

ServiceConfig.java

複製代碼

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String scope = url.getParameter(Constants.SCOPE_KEY);
    //配置爲none不暴露
    if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
        //......
        //若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露遠程服務)
        if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && registryURLs.size() > 0
                    && url.getParameter("register", true)) {
                for (URL registryURL : registryURLs) {
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    
                Exporter<?> exporter = protocol.export(invoker);
                exporters.add(exporter);
            }
        }
    }
}

複製代碼

在服務暴露時,有兩種狀況:

  • 不使用註冊中心:直接暴露對應協議的服務,引用服務時只能經過直連方式引用
  • 使用註冊中心:暴露對應協議的服務後,會將服務節點註冊到註冊中心,引用服務時能夠經過註冊中心動態獲取服務提供者列表,也能夠經過直連方式引用

2.8 暴露服務

不使用註冊中心時,直接調用對應協議(如 Dubbo 協議)的 export() 暴露服務。以 Dubbo 協議爲例:

DubboProtocol.java

複製代碼

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    
    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    
    //export an stub service for dispaching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice){
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
            if (logger.isWarnEnabled()){
                logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);
    
    return exporter;
}

複製代碼

調用 openServer() 方法建立並啓動 Server:

DubboProtocol.java

複製代碼

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也能夠暴露一個只有server能夠調用的服務。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
    }
}

private ExchangeServer createServer(URL url) {
    //默認開啓server關閉時發送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默認開啓heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

複製代碼

Exchanger (默認 HeaderExchanger)封裝請求響應模式,同步轉異步,以 Request、Response 爲中心:

HeaderExchager.java

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters.java

複製代碼

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().bind(url, handler);
}

複製代碼

底層傳輸默認使用 NettyTransporter,最終是建立 NettyServer:

NettyTransporter.java

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

NettyServer.java

複製代碼

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

複製代碼

AbstractServer.java

複製代碼

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }
}

複製代碼

NettyServer.java

複製代碼

public class NettyServer extends AbstractServer implements Server {
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
}

複製代碼

2.9 服務註冊

若是使用了註冊中心,則在經過具體協議(如 Dubbo 協議)暴露服務以後(即在 2.8 基礎之上)進入服務註冊流程,將服務節點註冊到註冊中心。

RegistryProtocol.java

複製代碼

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    //向Zookeeper註冊節點
    registry.register(registedProviderUrl);
    // 訂閱override數據
    // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保證每次export都返回一個新的exporter實例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
            try {
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
            try {
                overrideListeners.remove(overrideSubscribeUrl);
                registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    };
}

private Registry getRegistry(final Invoker<?> originInvoker){
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryFactory.getRegistry(registryUrl);
}

複製代碼

getRegistry() 方法根據註冊中心類型(默認 Zookeeper)獲取註冊中心客戶端,由註冊中心客戶端實例來進行真正的服務註冊。

註冊中心客戶端將節點註冊到註冊中心,同時訂閱對應的 override 數據,實時監聽服務的屬性變更實現動態配置功能。

最終返回的 Exporter 實現了 unexport() 方法,這樣在服務下線時清理相關資源。

 

至此,服務暴露流程結束。

 

服務發佈總結

看完源碼,咱們已經知道了dubbo的主要發佈過程,如今咱們回過頭來結合dubbo的整體架構和源碼的分析,總結一下dubbo服務發佈。服務發佈過程總共五個步驟:

  • 業務方將服務接口和實現編寫定義好,添加dubbo相關配置文件。
  • Config層加載配置文件造成上下文,Config層包括:ServiceConfig、ProviderConfig、RegistryConfig等。
  • ServiceConfig根據Protocol類型,根據ProtocolConfig、ProviderConfig加載registry,根據加載的registry建立dubbo的URL。
  • 準備工做作完後ProxyFactory上場,dubbo中有兩種代理方式,JDK代理和Javassist代理,默認使用Javassist代理,Proxy代理類根據dubbo配置信息獲取到接口信息、經過動態代理方式將接口的全部方法交給Proxy代理類進行代理,並封裝進Invoker裏面。
  • 將全部須要暴露的service封裝的Invoker組成一個list傳給信息交換層提供給消費方進行調用。
相關文章
相關標籤/搜索