【Dubbo源碼閱讀系列】服務暴露之遠程暴露

引言

什麼叫 遠程暴露 ?試着想象着這麼一種場景:假設咱們新增了一臺服務器 A,專門用於發送短信提示給指定用戶。那麼問題來了,咱們的 Message 服務上線以後,應該如何告知調用方服務器,服務器 A 提供了 Message 功能?那麼咱們是否是能夠把目前已提供的服務暴露在一個地方,讓調用方知道某臺機器提供了某個特定功能?帶着這樣的假設,咱們今天就來聊聊 Dubbo 服務暴露之遠程暴露!!java

服務遠程暴露

先回顧一下上篇文章,上篇文章咱們聊到了 ServiceConfig 的 export() 方法,而且對服務的本地暴露內容進行了分析,今天咱們接着這塊內容講講服務暴露之遠程暴露。apache

// export to remote if the config is not local (export to local only when config is local)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ...
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        if (registryURLs != null && !registryURLs.isEmpty()) {
            for (URL registryURL : registryURLs) {
                url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                // 爲了幫助你們閱讀,省略部分代碼...
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
            ...
        }
    }
    ...
}
複製代碼

這裏咱們只關注核心代碼:bootstrap

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
複製代碼

invoker 對象的構建

先來看看 invoker 對象是怎麼建立的!這裏涉及到了 Dubbo SPI 機制,調用流程大體爲
StubProxyFactoryWrapper.getInvoker() ==> JavassistProxyFactory.getInvoker()
詳細看下 JavassistProxyFactory 類的 getInvoker 方法緩存

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
	final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
	return new AbstractProxyInvoker<T>(proxy, type, url) {
		@Override
		protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
			return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
		}
	};
}
複製代碼

值得咱們重點注意的是 Wrapper 類的 getWrapper() 方法!!bash

public static Wrapper getWrapper(Class<?> c) {
	while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
	{
		c = c.getSuperclass();
	}

	if (c == Object.class) {
		return OBJECT_WRAPPER;
	}

	Wrapper ret = WRAPPER_MAP.get(c);
	if (ret == null) {
		ret = makeWrapper(c);
		WRAPPER_MAP.put(c, ret);
	}

	return ret;
}
複製代碼

這裏會使用參數 c 做爲 key 值從 WRAPPER_MAP 緩存中取值,若是沒有對應的 value 值,會調用 makeWrapper() 方法藉助 javassist 技術構建一個 Wrapper 包裝類。假設當前參數 c 的值爲 demoService,那麼最後生成的動態類爲:服務器

public class Wrapper0 extends Wrapper implements DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Wrapper0() {
    }

    public Class getPropertyType(String var1) {
        return (Class)pts.get(var1);
    }

    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        DemoService var5;
        try {
            var5 = (DemoService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }

        try {
            if("sayHello".equals(var2) && var3.length == 1) {
                return var5.sayHello((String)var4[0]);
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }

        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getPropertyNames() {
        return pns;
    }

    public Object getPropertyValue(Object var1, String var2) {
        try {
            DemoService var3 = (DemoService)var1;
        } catch (Throwable var5) {
            throw new IllegalArgumentException(var5);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public void setPropertyValue(Object var1, String var2, Object var3) {
        try {
            DemoService var4 = (DemoService)var1;
        } catch (Throwable var6) {
            throw new IllegalArgumentException(var6);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public boolean hasProperty(String var1) {
        return pts.containsKey(var1);
    }

    public String[] getMethodNames() {
        return mns;
    }
}
複製代碼

最後再回到 JavassistProxyFactory 類的 getInvoker 方法,能夠看到它實際返回的是 AbstractProxyInvoker 對象,當調用 AbstractProxyInvoker 類的 doInvoke() 方法時,實際調用的是 wrapper 類的 invokeMethod() 方法!這個知識點十分重要!在咱們講 Dubbo 遠程調用的時候會再次回顧這塊內容!網絡

exporter 對象的構建

Exporter<?> exporter = protocol.export(wrapperInvoker);
複製代碼

再來看看後半句代碼。這裏最後會調用 RegistryProtocol 類的 export() 方法,若對此有疑問請看系列文章第一篇:【Dubbo源碼閱讀系列】之 Dubbo SPI 機制,後文再也不贅述。 直接看看 RegistryProtocol 的 export() 方法:app

RegistryProtocol.export()

public class RegistryProtocol implements Protocol {
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    	//export invoker
    	final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
    	URL registryUrl = getRegistryUrl(originInvoker);
    
    	//registry provider
    	final Registry registry = getRegistry(originInvoker);
    	final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    
    	//to judge to delay publish whether or not
    	boolean register = registeredProviderUrl.getParameter("register", true);
    
    	ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    
    	if (register) {
    		register(registryUrl, registeredProviderUrl);
    		ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    	}
    
    	// Subscribe the override data
    	// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    	final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    	final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    	overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    	registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    	//Ensure that a new exporter instance is returned every time export
    	return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
}
複製代碼

RegistryProtocol.export() 方法很是重要!!能夠說是服務遠程暴露的核心了。廢話很少說,讓咱們逐行來看看吧!tcp

doLocalExport()

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	// 獲取 providerUrl ,取 originInvoker url.parameters 鍵值對中 key 爲 export 的值
	String key = getCacheKey(originInvoker);
	ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
	if (exporter == null) {
		synchronized (bounds) {
			exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
			if (exporter == null) {
				final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
				exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
				bounds.put(key, exporter);
			}
		}
	}
	return exporter;
}
複製代碼

先來看看 doLocalExport() 方法作了什麼:ide

  1. 從 getCacheKey() 方法中獲取到的,鍵 export 對應的 value 在以下代碼中被添加到 url 的 parameters 集合中。而後咱們在這裏取出對應的值。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
複製代碼
  1. 嘗試從 bounds 緩存中取對應當前鍵 key 的 exporter。
  2. 若是緩存爲 null,新建 exporter 並返回。這裏的 protocl 對象爲 Protocol$Adaptive。不難分析最後執行的實際是 DubboProtocol 的 export() 方法。

總結一下:doLocalExport() 用 ExporterChangeableWrapper 代理類包裝了 protocol.export() 方法返回的 exporter 對象,最後放到了 bounds 集合中緩存。

DubboPrtocol.export()

DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
	URL url = invoker.getUrl();

	// export service.
	String key = serviceKey(url);
	DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
	exporterMap.put(key, exporter);

	//export an stub service for dispatching event
	Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
	Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
	if (isStubSupportEvent && !isCallbackservice) {
		String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
		if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
			if (logger.isWarnEnabled()) {
				logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
						"], has set stubproxy support event ,but no stub methods founded."));
			}
		} else {
			stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
		}
	}

	openServer(url);
	optimizeSerialization(url);
	return exporter;
}
複製代碼

接着上文繼續看 DubboProtocol.export() 方法是如何建立 exporter 對象的:

  1. 調用 serviceKey() 方法構建服務的 key 值,最後的得到的 key 值形式相似 group/path:version:port
  2. 新建 DubboExporter
  3. openServer(url),此時的 url 爲 RegistryProtocol 傳遞過來的 providerUrl,openServer() 用途咱們在後文分析;
  4. optimizeSerialization(url) 序列化操做,本文不作具體分析 DubboProtocol.export() 返回的對象爲 DubboExporter。值得咱們注意是後面的 openServer() 方法!

openServer()

private void openServer(URL url) {
	// find server.
	String key = url.getAddress();
	//client can export a service which's only for server to invoke
	boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
	if (isServer) {
		ExchangeServer server = serverMap.get(key);
		if (server == null) {
			synchronized (this) {
				server = serverMap.get(key);
				if (server == null) {
					serverMap.put(key, createServer(url));
				}
			}
		} else {
			// server supports reset, use together with override
			server.reset(url);
		}
	}
}
複製代碼

openServer() 光從方法名看起來像是開啓服務鏈接的。方法比較簡單,取 url 的 address 做爲 key,嘗試從 serverMap 獲取對應的 value 值。若是 value 值爲 null 則調用 createServer(url) 方法建立 server 後添加到 serverMap 中。 createServer() 方法的流程比較冗長,咱們這裏經過一張時序圖來給出該方法內部調用流程:

上圖省略了從 ServiceConfigRegistryProtocol 以及從 RegistryProtocolDubboProtocol 的轉換過程。這部份內容涉及到 Dubbo SPI 機制,若有疑問能夠詳見:【Dubbo源碼閱讀系列】之 Dubbo SPI 機制。這裏給出簡單的轉換流程

  • ServiceConfig 到 RegistryProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》RegistryProtocol
  • RegistryProtocol 到 DubboProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》DubboProtocol

最後重點關注下 NettyServer 的 doOpen() 方法:

protected void doOpen() throws Throwable {
	NettyHelper.setNettyLoggerFactory();
	ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
	ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
	ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
	bootstrap = new ServerBootstrap(channelFactory);

	final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
	channels = nettyHandler.getChannels();
	// https://issues.jboss.org/browse/NETTY-365
	// https://issues.jboss.org/browse/NETTY-379
	// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
	bootstrap.setOption("child.tcpNoDelay", true);
	bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
		@Override
		public ChannelPipeline getPipeline() {
			NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
			ChannelPipeline pipeline = Channels.pipeline();
			/*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/
			pipeline.addLast("decoder", adapter.getDecoder());
			pipeline.addLast("encoder", adapter.getEncoder());
			pipeline.addLast("handler", nettyHandler);
			return pipeline;
		}
	});
	// bind
	channel = bootstrap.bind(getBindAddress());
}
複製代碼

能夠看到這段代碼是比較經典的 Netty 服務端啓動代碼...也就是說 openServer() 方法用於 Netty 服務端啓動。 咱們知道 Netty 經常使用於客戶端和服務端之間的通信。在這裏咱們開啓了服務端,那麼在何處會開啓對應的客戶端呢?他們之間到底會進行什麼交互呢?這個疑問咱們先留着待後續文章講解。

服務的暴露

上面講了這麼多,感受仍是和服務遠程暴露沒有沾多大的邊?到底咱們的服務是如何被其它機器感知的?別人是怎麼知道咱們某某臺機器提供了短信服務的?其實揭祕的序幕已經拉開了!讓咱們繼續娓娓道來! 回顧一下 RegistryProtocol.export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
	//export invoker
	final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

	URL registryUrl = getRegistryUrl(originInvoker);

	//registry provider
	final Registry registry = getRegistry(originInvoker);
	final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

	//to judge to delay publish whether or not
	boolean register = registeredProviderUrl.getParameter("register", true);

	ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

	if (register) {
		register(registryUrl, registeredProviderUrl);
		ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
	}

	// Subscribe the override data
	// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
	final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
	final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
	overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
	registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
	//Ensure that a new exporter instance is returned every time export
	return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
複製代碼

上面咱們已經聊完了 doLocalExport() 方法,繼續看 export() 方法的後半部分:

RegistryProtocol.java
final Registry registry = getRegistry(originInvoker);

private Registry getRegistry(final Invoker<?> originInvoker) {
	URL registryUrl = getRegistryUrl(originInvoker);
	return registryFactory.getRegistry(registryUrl);
}
複製代碼

這裏的 registryFactory 爲 RegistryFactory$Adaptive(Dubbo 源碼中充斥了大量 SPI 擴展機制的使用,這裏再也不贅述)。總之咱們獲取到的擴展類爲 ZookeeperRegistryFactory ,ZookeeperRegistryFactory 繼承自 AbstractRegistryFactory 類。所以最後調用的是 AbstractRegistryFactory 類的 getRegistry() 方法。

@Override
public Registry getRegistry(URL url) {
	url = url.setPath(RegistryService.class.getName())
			.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
			.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
	String key = url.toServiceStringWithoutResolving();
	// Lock the registry access process to ensure a single instance of the registry
	LOCK.lock();
	try {
		Registry registry = REGISTRIES.get(key);
		if (registry != null) {
			return registry;
		}
		registry = createRegistry(url);
		if (registry == null) {
			throw new IllegalStateException("Can not create registry " + url);
		}
		REGISTRIES.put(key, registry);
		return registry;
	} finally {
		// Release the lock
		LOCK.unlock();
	}
}
複製代碼

方法比較簡單,直接看重點方法 createRegistry(url)。createRegistry() 是一個抽象方法,會根據 url 來調用具體的實現方法,這裏咱們用 ZookeeperRegistryFactory 類進行分析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
	...
	public Registry createRegistry(URL url) {
		return new ZookeeperRegistry(url, zookeeperTransporter);
	}
	...
}

public class ZookeeperRegistry extends FailbackRegistry {
	...
	public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
		super(url);
		if (url.isAnyHost()) {
			throw new IllegalStateException("registry address == null");
		}
		String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
		if (!group.startsWith(Constants.PATH_SEPARATOR)) {
			group = Constants.PATH_SEPARATOR + group;
		}
		this.root = group;
		zkClient = zookeeperTransporter.connect(url);
		zkClient.addStateListener(new StateListener() {
			@Override
			public void stateChanged(int state) {
				if (state == RECONNECTED) {
					try {
						recover();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}
			}
		});
	}
	...
}
複製代碼

ZookeeperRegistryFactory 類的 createRegistry() 方法會調用 ZookeeperRegistry 類的構造方法新建 ZookeeperRegistry 實例並返回。而 ZookeeperRegistry 類的構造方法會先調用父類 FailbackRegistry 的構造方法再執行後續操做。先看 FailbackRegistry 構造方法:

public abstract class FailbackRegistry extends AbstractRegistry {
	...
	public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
					// 延遲重試
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
	...
}
複製代碼

在 FailbackRegistry 構造方法中有一個延遲重試方法 retry(),若是發現失敗集合 failedRegistered、failedUnregistered、failedSubscribed、failedUnsubscribed、failedNotified 不爲空,會進行重試操做。 繼續看 ZookeeperRegistry 類的構造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
	...
	zkClient = zookeeperTransporter.connect(url);
	zkClient.addStateListener(new StateListener() {
		@Override
		public void stateChanged(int state) {
			if (state == RECONNECTED) {
				try {
					recover();
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
		}
	});
}
複製代碼

這裏的 ZookeeperTransporter.connect() 通過 SPI 轉換實際調用爲 CuratorZookeeperTransporter.connect()。

public class CuratorZookeeperTransporter implements ZookeeperTransporter {
    @Override
    public ZookeeperClient connect(URL url) {
        return new CuratorZookeeperClient(url);
    }
}

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
    private final CuratorFramework client;

    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(timeout);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}
複製代碼

上面這段代碼使用 CuratorFrameworkFactory 工廠類建立了一個 CuratorFramework 實例,並啓動該實例建立了一個與 zookeeper 的鏈接。

再回到 RegistryProtocol 中的 getRegistry() 方法。咱們發現它經過層層調用最終建立了一個到 ZookeeperRegistry 實例。這個實例中的 ziClient 對象創建了到 zookeeper 的鏈接。 咱們知道 ZooKeeper 常常被用做註冊中心Ok。那咱們如今已經鏈接上了 ZooKeeper 了,是否是該往 Zookeeper 上寫點啥了?繼續往下看,好戲要來啦!!~

register() 註冊方法

RegistryProtocol.java
	public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
		...
		if (register) {
			register(registryUrl, registeredProviderUrl);
			ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
		}
		...
	}
    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }
複製代碼

在這裏 register() 方法最終會調用 FailbackRegistry 類的 register() 方法(不想再贅述爲何!!!!)。

public abstract class FailbackRegistry extends AbstractRegistry {
	...
	public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            // ...
        }
    }
	...
}

public class ZookeeperRegistry extends FailbackRegistry {
	protected void doRegister(URL url) {
        try {
            String str = toUrlPath(url);
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}
複製代碼

劃重點啊筒子們!!! doRegister() 方法!!這裏調用鏈也比較長。畫個簡圖總結下:

小結:總之最後的目的是在 ZooKeeper 上建立經過 url 解析生成的 path 節點。大概長這個樣子:dubbo%3A%2F%2F10.137.32.54%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4264%26side%3Dprovider%26timestamp%3D1546848704035
最後還有一個地方須要注意下:這裏調用 zkClient.create() 方法時,若是 dynamic 爲空,默認會建立 zookeeper 臨時節點。臨時節點的好處在於若是客戶端和 zookeeper 集羣斷開鏈接,對應的臨時節點則會自動被刪除。這樣一來,是否是對咱們的調用方好處多多呢?

End

礙於篇幅限制,今天就先介紹這麼多。回顧一下,咱們在 RegistryProtocol.export() 方法裏面建立了一個 DubboExporter 對象、開啓了 Netty 服務端,同時還往註冊中心 zookeeper 上建立了一個和服務有關的臨時節點!關於 RegistryProtocol.export() 方法剩餘的內容,咱們之後有機會再說吧!

本BLOG上原創文章未經本人許可,不得用於商業用途及傳統媒體。網絡媒體轉載請註明出處,不然屬於侵權行爲。

相關文章
相關標籤/搜索