【dubbo】dubbo服務註冊(dubbo:registry)

發佈方式

<dubbo:service  interface="com.tofuwang.myrpc.api.HelloService" ref="helloServiceImpl" />java

Bean名稱生成規則

    發佈完成之後,註冊到spring容器中的BeanName的生成規則以下:git

    此方法位於DubboBeanDefinitionParser類的parse方法。github

    若是設置了id屬性,則把id屬性的值做爲此Bean的name,若是沒有設置id屬性,則獲取interface節點的值,而後把interface的值做爲bean的name以及id。redis

    在上例中,沒有設置,則經過applicationContext.getBean(「com.tofuwang.myrpc.api.HelloService」) 能夠獲取這個service的bean實例。spring

    service 節點的會被映射到ServiceBean對象,對象實現了InitializingBean類的afterPropertiesSet方法,用於在類的建立以前進行的一些初始化工做,實際上在spring中還有一個相似的方法,init-method,例如:api

<bean id="testInitializingBean" class="com.TestInitializingBean" init-method="testInit"></bean>緩存

    他們只是執行順序上的不一樣,若是進行了同時配置,則會先調用afterPropertiesSet方法,然後纔會調用Init-method中配置的方法。app

    對於service的初始化,dubbo大量的工做都放在了afterPropertiesSet這個方法執行。這個方法比較長,這個方法是爲了設置一些初始化的數據,分爲如下幾個步驟:異步

  • 設置providerconfig
  • 設置spring的applicationContext上下文
  • 設置註冊中心配置(RegistryConfig)
  • 設置監控和協議信息
  • 設置classpath

相關配置信息以下圖jvm

服務註冊(doExport)

     查看ServiceBean這個類能夠看到,它實現了ApplicationListener這個接口,這樣會在spring啓動最後,會調用finishRefresh()方法,經過調用子類的onApplicationEvent方法進行通知,因此在spring啓動完成以後,會調用ServiceBean的onApplicationEvent方法,而dubbo把服務註冊到註冊中心,就是在這個方法中完成的。
    程序進行判斷是否要延遲註冊,若是不是延遲註冊,則直接調用ServiceConfig中的doExport(),須要延遲註冊的話,就新啓動一個線程,而後Thread.sleep相應的延遲時間之後再調用doExport()。
doExport()主要用來各類前置校驗,各類校驗完成之後才調用doExportUrls()進行發佈服務。

    首先獲取全部註冊中心,而且封裝到List<URL>對象中,獲取配置的protocols。從這種寫法上能夠看出來,dubbo是支持多註冊中心,多協議的。

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

    而後調用doExportUrlsFor1Protocol進行逐個的註冊到註冊中心的中。裏面有一小段Socket的代碼,用來測試一下注冊中心的地址的連通性。若是鏈接不上,程序不不阻斷,只是打出來一行warn日誌進行提醒。
而後封裝method,把服務者的全部method方法封裝到config中。

  1. 設置Token
  2. 根據設置是不是inJVM,若是是injvm的話,則不進行異步調用。
  3. 根據service 的scope配置,若是配置爲了爲none,則也不註冊。
  4. 根據配置,範圍有兩種,一種是local,另一種是remote,配置不是remote的狀況下作本地暴露 (配置爲remote,則表示只暴露遠程服務),若是配置不是local則暴露爲遠程服務.(配置爲local,則表示只暴露本地服務),默認不配置,則兩種都有,本地和遠程的URL中設置了兩種protocol類型,injvm和在配置文件中配置的<dubbo:protocol name="dubbo" port="20880" />節點的protocol,一般的話咱們配置的是dubbo協議。而後根據協議分別被InjvmProtocol和DubboProtocol這兩個類進行了處理。分別調用各個協議的export方法對服務進行導出。

服務導出(export)  

服務導出的代碼以下(RegistryProtocol)

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    registry.register(registedProviderUrl);
    // 訂閱override數據
    // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,由於subscribed以服務名爲緩存的key,致使訂閱信息覆蓋。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保證每次export都返回一個新的exporter實例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
           try {
              exporter.unexport();
           } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               overrideListeners.remove(overrideSubscribeUrl);
               registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
        }
    };
}

該方法主要講述的東西以下:

  1. 首先調用doLocalExport方法實現真正的服務發佈,Protocol是經過SPI註冊進來的,默認是dubbo,而且進行了緩存,避免了屢次發佈。
  2.  根據URL,經過RegistryFactory獲取Registry對象
  3. 而後調用相關的registry方法進行服務註冊。
  4. 監聽服務變化,監聽到變化了之後,對服務進行更新。
  5. 緩存exporter,服務下線的時候進行銷燬等後續操做。

以dubboProtocol的export方法爲例,服務導出有兩步操做,一是開啓netty服務端口進行監聽和接收數據(若是使用的是netty的話),另外去註冊中心進行註冊。

開啓服務、等待鏈接

    開啓服務端端口,接收並處理RegistryProtocol.doLocalExport
對於localExport,根據相關參數,實際上真正執行的是DubboProtol中的export方法,在這個方法中返回了一個exporter,而比較關鍵的方法是openServer,在這個server中對外暴露服務。

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也能夠暴露一個只有server能夠調用的服務。
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
    if (isServer) {
       ExchangeServer server = serverMap.get(key);
       if (server == null) {
          serverMap.put(key, createServer(url));
       } else {
          //server支持reset,配合override功能使用
          server.reset(url);
       }
    }
}

    經過以上能夠看出,從URL中取得要暴露服務的地址,這個地址的格式其實是ip:port,在serverMap中查詢服務是否存在,若是存在的就再也不重複建立,不存在則調用createServer方法建立服務。
關鍵是服務如何建立的?

private ExchangeServer createServer(URL url) {
    //默認開啓server關閉時發送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默認開啓heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

以上就是建立服務的代碼,經過以上代碼能夠看出相關邏輯

  1. 默認開啓關閉事件接收回調readonly和開啓心跳
  2. 能夠看出dubbo封裝了一箇中間層,transporter,用來封裝服務,dubbo支持三種服務類型,netty,mina,grizzly,默認使用的是netty,若是想更改的話,能夠在<dubbo:protocol name="dubbo"  port="20880" server="netty"/> 的server節點更改。
  3. 封裝一個ExchangeHandlerAdapter,這個也是個中間層,用來統一處理請求服務,做爲接收服務請求的一個句柄。
  4. 調用Exchangers的bind方法,把url和requestHandler(也就是ExchangeHandler)做爲參數傳遞進去,進行真正的綁定服務。

具體是如何bind的,在分析dubbo的remoteing-api這一層的時候再進行具體分析。

服務註冊到註冊中心

    調用Registry的registry方法進行服務註冊,註冊到註冊中心,registry.register(registedProviderUrl);

    若是是zookeeper的話,則真正的實現是ZookeeperRegistry。如何自動找到具體的實現類的,請查看【dubbo】dubbo SPI機制(ExtensionLoader)
服務註冊這一層,被dubbo封裝在了dubbo-registry這個模塊

    能夠看出,dubbo支持默認的直連方式,廣播方式和用redis以及zookeeper做爲註冊中心的方式,用zookeeper做爲註冊中心是dubbo推薦的方式。
繼續以zookeeper爲例,看如何註冊到zookeeper上的。首先zookeeperRegister繼承FailbackRegistry,這個類對於快速失敗修復作了一個抽象層,記錄已經註冊的服務,而且開啓心跳檢測重試機制。把註冊失敗的服務放到一個CurrentHashSet中,進行無限次的重試。真正註冊的時候調用doRegistry方法,這是一個鉤子方法,有具體的子類實現,在這裏就是有ZookeeperRegistry類來實現。
Dubbo使用的sgroschupf的zkclient(https://github.com/sgroschupf/zkclient)的鏈接zookeeper,建立的時候分爲永久節點和臨時節點,duboo建立的服務都是臨時節點

zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

最後一個參數表明的是節點類型,true是臨時節點,false是永久節點

public void create(String path, boolean ephemeral) {
   int i = path.lastIndexOf('/');
   if (i > 0) {
      create(path.substring(0, i), false);
   }
   if (ephemeral) {
      createEphemeral(path);
   } else {
      createPersistent(path);
   }
}

    經過這種方式,dubbo就把本身的服務註冊到了註冊中心上。經過zkclient客戶端鏈接上zk,查看zk上的dubbo節點數據以下:

 

附:服務註冊流程圖

相關文章
相關標籤/搜索