dubbo源碼分析系列(3)服務的引用

#1 系列目錄java

#2 服務引用案例介紹spring

先看一個簡單的客戶端引用服務的例子,dubbo配置以下:服務器

<dubbo:application name="consumer-of-helloService" />

<dubbo:registry  protocol="zookeeper"  address="127.0.0.1:2181" />

<dubbo:reference id="helloService" interface="com.demo.dubbo.service.HelloService" />
  • 使用zooKeeper做爲註冊中心
  • 引用遠程的HelloService接口服務

HelloService接口內容以下:網絡

public interface HelloService {
	public String hello(String msg);
}

dubbo源碼分析系列(2)服務的發佈這篇文章的前面部分就能夠看到dubbo與Spring的接入過程的實質:app

利用Spring的xml配置建立出一系列的配置對象,存至Spring容器中負載均衡

  • application對應ApplicationConfig
  • registry對應RegistryConfig
  • monitor對應MonitorConfig
  • provider對應ProviderConfig
  • consumer對應ConsumerConfig
  • protocol對應ProtocolConfig
  • service對應ServiceConfig
  • reference對應ReferenceConfig

上面的對象不依賴Spring,也就是說你能夠手動去建立上述對象。dom

爲了在Spring啓動的時候,也相應的啓動provider發佈服務註冊服務的過程:又加入了一個和Spring相關聯的ServiceBean,繼承了ServiceConfig異步

爲了在Spring啓動的時候,也相應的啓動consumer發現服務的過程:又加入了一個和Spring相關聯的ReferenceBean,繼承了ReferenceConfigjvm

利用Spring就作了上述過程,獲得相應的配置數據,而後啓動相應的服務。若是想剝離Spring,咱們就能夠手動來建立上述配置對象,經過ServiceConfig和ReferenceConfig的API來啓動相應的服務ide

具體針對上述案例,則是 根據dubbo:reference配置建立了一個ReferenceBean,該bean又實現了Spring的org.springframework.beans.factory.FactoryBean接口,因此咱們以下方式使用時:

@Autowired
private HelloService helloService;

使用的不是ReferenceBean對象,而是ReferenceBean的getObject()方法返回的對象。該對象經過代理實現了HelloService接口。因此要看服務引用的整個過程就須要從ReferenceBean的getObject()方法開始入手。

下面來具體說明這個過程。

#3 服務引用過程

第一步:收集配置的參數,參數以下:

methods=hello,
timestamp=1443695417847,
dubbo=2.5.3
application=consumer-of-helloService
side=consumer
pid=7748
interface=com.demo.dubbo.service.HelloService

第二步:從註冊中心引用服務,建立出Invoker對象

若是是單個註冊中心,代碼以下:

Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

invoker = refprotocol.refer(interfaceClass, url);

上述url內容以下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
application=consumer-of-helloService&
dubbo=2.5.3&
pid=8292&
registry=zookeeper&
timestamp=1443707173909&

refer=
	application=consumer-of-helloService&
	dubbo=2.5.3&
	interface=com.demo.dubbo.service.HelloService&
	methods=hello&
	pid=8292&
	side=consumer&
	timestamp=1443707173884&

前面的信息是註冊中心的配置信息,如使用zookeeper來做爲註冊中心

後面refer的內容是要引用的服務信息,如引用HelloService服務

使用協議Protocol根據上述的url和服務接口來引用服務,建立出一個Invoker對象

第三步:使用ProxyFactory建立出一個接口的代理對象,該代理對象的方法的執行都交給上述Invoker來執行,代碼以下:

ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

proxyFactory.getProxy(invoker);

下面就來詳細的說明下上述第二步和第三步的過程當中涉及到的幾個概念

Protocol、Invoker、ProxyFactory

##3.1 概念介紹

分別介紹下Invoker、Protocol、ProxyFactory的概念

###3.1.1 Invoker概念

Invoker一個可執行對象。

這個概念已經在上一篇文章dubbo源碼分析系列(2)服務的發佈中詳細介紹了。這裏再簡單重複下

這個可執行對象的執行過程分紅三種類型:

  • 類型1:本地執行類的Invoker

  • 類型2:遠程通訊執行類的Invoker

  • 類型3:多個類型2的Invoker聚合成的集羣版的Invoker

以HelloService接口方法爲例:

  • 本地執行類的Invoker: server端,含有對應的HelloServiceImpl實現,要執行該接口方法,僅僅只須要經過反射執行HelloServiceImpl對應的方法便可

  • 遠程通訊執行類的Invoker: client端,要想執行該接口方法,須要須要進行遠程通訊,發送要執行的參數信息給server端,server端利用上述本地執行的Invoker執行相應的方法,而後將返回的結果發送給client端。這整個過程算是該類Invoker的典型的執行過程

  • 集羣版的Invoker:client端,擁有某個服務的多個Invoker,此時client端須要作的就是將這個多個Invoker聚合成一個集羣版的Invoker,client端使用的時候,僅僅經過集羣版的Invoker來進行操做。集羣版的Invoker會從衆多的遠程通訊類型的Invoker中選擇一個來執行(從中加入路由和負載均衡策略),還能夠採用一些失敗轉移策略等

因此來看下Invoker的實現狀況:

Invoker的實現狀況

對於客戶端來講,Invoker則應該是遠程通訊執行類的Invoker、多個遠程通訊類型的Invoker聚合成的集羣版的Invoker這兩種類型。先來講說非集羣版的Invoker,即遠程通訊類型的Invoker。來看下DubboInvoker的具體實現

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
        	boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
        	ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
        	RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

大概內容就是:

將經過遠程通訊將Invocation信息傳遞給服務器端,服務器端接收到該Invocation信息後,找到對應的本地Invoker,而後經過反射執行相應的方法,將方法的返回值再經過遠程通訊將結果傳遞給客戶端。

這裏分紅3種狀況:

  • 執行的方法不須要返回值:直接使用ExchangeClient的send方法

  • 執行的方法的結果須要異步返回:使用ExchangeClient的request方法,返回一個ResponseFuture,經過ThreadLocal方式與當前線程綁定,未等服務器端響應結果就直接返回

  • 執行的方法的結果須要同步返回:使用ExchangeClient的request方法,返回一個ResponseFuture,一直阻塞到服務器端返回響應結果

###3.1.2 Protocol概念

從上面得知服務引用的第二個過程就是:

invoker = refprotocol.refer(interfaceClass, url);

使用協議Protocol根據上述的url和服務接口來引用服務,建立出一個Invoker對象

針對server端來講,會以下使用Protocol

Exporter<?> exporter = protocol.export(invoker);

Protocol要解決的問題就是:根據url中指定的協議(沒有指定的話使用默認的dubbo協議)對外公佈這個HelloService服務,當客戶端根據協議調用這個服務時,將客戶端傳遞過來的Invocation參數交給服務器端的Invoker來執行。因此Protocol加入了遠程通訊協議的這一塊,根據客戶端的請求來獲取參數Invocation invocation。

而針對客戶端,則須要根據服務器開放的協議(服務器端在註冊中心註冊的url地址中含有該信息)來建立相應的協議的Invoker對象,如

  • DubboInvoker
  • InjvmInvoker
  • ThriftInvoker

等等

如服務器端在註冊中心中註冊的url地址爲:

dubbo://192.168.1.104:20880/com.demo.dubbo.service.HelloService?
anyhost=true&
application=helloService-app&dubbo=2.5.3&
interface=com.demo.dubbo.service.HelloService&
methods=hello&
pid=3904&
side=provider&
timestamp=1444003718316

會看到上述服務是以dubbo協議註冊的,因此這裏產生的Invoker就是DubboInvoker。咱們來具體的看下這個過程

先來看下Protocol的接口定義:

@Extension("dubbo")
public interface Protocol {
    
    int getDefaultPort();

	//針對server端來講,將本地執行類的Invoker經過協議暴漏給外部。這樣外部就能夠經過協議發送執行參數Invocation,而後交給本地Invoker來執行
    @Adaptive
	<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

	//這個是針對客戶端的,客戶端從註冊中心獲取服務器端發佈的服務信息
	//經過服務信息得知服務器端使用的協議,而後客戶端仍然使用該協議構造一個Invoker。這個Invoker是遠程通訊類的Invoker。
	//執行時,須要將執行信息經過指定協議發送給服務器端,服務器端接收到參數Invocation,而後交給服務器端的本地Invoker來執行
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

	void destroy();

}

咱們再來詳細看看服務引用的第二步:

invoker = refprotocol.refer(interfaceClass, url);

protocol的來歷是:

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

咱們從第一篇文章dubbo源碼分析系列(1)擴展機制的實現,能夠知道上述獲取Protocol protocol的原理,這裏就再也不多說了,直接貼出最終的Protocol的實現代碼:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException{
    if (arg0 == null)  { 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 
    }
    if (arg0.getUrl() == null) { 
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 
    }
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) {
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
    }
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)com.alibaba.dubbo.common.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
}

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException{
    if (arg1 == null)  { 
        throw new IllegalArgumentException("url == null"); 
    }
    com.alibaba.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) {
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
    }
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)com.alibaba.dubbo.common.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
}

refer(interfaceClass, url)的過程即根據url的配置信息來最終選擇的Protocol實現,默認實現是"dubbo"的擴展實現即DubboProtocol,而後再對DubboProtocol進行依賴注入,進行wrap包裝。先來看看Protocol的實現狀況:

Protocol的實現狀況

能夠看到在返回DubboProtocol以前,通過了ProtocolFilterWrapper、ProtocolListenerWrapper、RegistryProtocol的包裝。

所謂的包裝就是以下相似的內容:

package com.alibaba.xxx;

import com.alibaba.dubbo.rpc.Protocol;
 
public class XxxProtocolWrapper implemenets Protocol {
    Protocol impl;
 
    public XxxProtocol(Protocol protocol) { impl = protocol; }
 
    // 接口方法作一個操做後,再調用extension的方法
    public Exporter<T> export(final Invoker<T> invoker) {
        //... 一些操做
        impl .export(invoker);
        // ... 一些操做
    }
 
    // ...
}

使用裝飾器模式,相似AOP的功能。

因此上述服務引用的過程

invoker = refprotocol.refer(interfaceClass, urls.get(0));

中的refprotocol會先通過RegistryProtocol(先暫時忽略ProtocolFilterWrapper、ProtocolListenerWrapper),它幹了哪些事呢?

  • 根據註冊中心的registryUrl獲取註冊服務Registry,將自身的consumer信息註冊到註冊中心上

    //先根據客戶端的註冊中心配置找到對應註冊服務
    Registry registry = registryFactory.getRegistry(url);
    
    //使用註冊服務將客戶端的信息註冊到註冊中心上
    registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));

    上述subscribeUrl地址以下:

    consumer://192.168.1.104/com.demo.dubbo.service.HelloService?
    	application=consumer-of-helloService&
    	dubbo=2.5.3&
    	interface=com.demo.dubbo.service.HelloService&
    	methods=hello&
    	pid=6444&
    	side=consumer&
    	timestamp=1444606047076

    該url表述了本身是consumer,同時本身的ip地址是192.168.1.104,引用的服務是com.demo.dubbo.service.HelloService,以及註冊時間等等

  • 建立一個RegistryDirectory,從註冊中心中訂閱本身引用的服務,將訂閱到的url在RegistryDirectory內部轉換成Invoker

    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));

    上述RegistryDirectory是Directory的實現,Directory表明多個Invoker,能夠把它當作List類型的Invoker,但與List不一樣的是,它的值多是動態變化的,好比註冊中心推送變動。

    RegistryDirectory內部含有二者重要屬性:

    • 註冊中心服務Registry registry
    • Protocol protocol。

    它會利用註冊中心服務Registry registry來獲取最新的服務器端註冊的url地址,而後再利用協議Protocol protocol將這些url地址轉換成一個具備遠程通訊功能的Invoker對象,如DubboInvoker

  • 而後使用Cluster cluster對象將上述多個Invoker對象(此時尚未真正建立出來,異步訂閱,訂閱成功以後,回調時纔會建立出Invoker)聚合成一個集羣版的Invoker對象。

    Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    
    cluster.join(directory)

這裏再詳細看看Cluster接口:

@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     * 
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

只有一個功能就是把上述Directory(至關於一個List類型的Invoker)聚合成一個Invoker,同時也能夠對List進行過濾處理(這些過濾操做也是配置在註冊中心的)等實現路由的功能,主要是對用戶進行透明。看看接口實現狀況:

Cluster接口實現狀況

默認採用的是FailoverCluster,看下FailoverCluster:

/**
 * 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。 
 * 
 * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a>
 * 
 * @author william.liangf
 */
public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

僅僅是建立了一個FailoverClusterInvoker,具體的邏輯留在調用的時候即調用該Invoker的invoke(final Invocation invocation)方法時來進行處理。其中又會涉及到另外一個接口LoadBalance(從衆多的Invoker中挑選出一個Invoker來執行這次調用任務),接口以下:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

	/**
	 * select one invoker in list.
	 * 
	 * @param invokers invokers.
	 * @param url refer url
	 * @param invocation invocation.
	 * @return selected invoker.
	 */
    @Adaptive("loadbalance")
	<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

實現狀況以下:

LoadBalance接口實現狀況

默認採用的是隨機策略,具體的內容就請各自詳細去研究。

###3.1.3 ProxyFactory概念

前一篇文章已經講過了,對於server端,ProxyFactory主要負責將服務如HelloServiceImpl統一進行包裝成一個Invoker,這些Invoker經過反射來執行具體的HelloServiceImpl對象的方法。而對於client端,則是將上述建立的集羣版Invoker建立出代理對象。

接口定義以下:

@Extension("javassist")
public interface ProxyFactory {

  	//針對client端,對Invoker對象建立出代理對象
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

	//針對server端,將服務對象如HelloServiceImpl包裝成一個Invoker對象
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

ProxyFactory的接口實現有JdkProxyFactory、JavassistProxyFactory,默認是JavassistProxyFactory, JdkProxyFactory內容以下:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}

能夠看到是利用jdk自帶的Proxy來動態代理目標對象Invoker。因此咱們調用建立出來的代理對象如HelloService helloService的方法時,會執行InvokerInvocationHandler中的邏輯:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

能夠看到仍是交給目標對象Invoker來執行。

#4 結束語

本文簡略地介紹了客戶端引用服務過程以及涉及到的幾個概念,接下來的打算是:

  • 客戶端與服務器端網絡通訊模塊
相關文章
相關標籤/搜索