ReferenceBean.getObject() -->ReferenceConfig.get() -->init() -->createProxy(map) -->refprotocol.refer(interfaceClass, urls.get(0)) -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry"); -->extension.refer(arg0, arg1); -->ProtocolFilterWrapper.refer (三個AOP類) -->RegistryProtocol.refer -->registryFactory.getRegistry(url)//創建zk的鏈接,和服務端發佈同樣(省略代碼) -->doRefer(cluster, registry, type, url) -->FailbackRegistry.register //建立zk的節點,和服務端發佈同樣(省略代碼)。 //節點名爲:dubbo/per.qiao.service.TestService/consumers -->registry.subscribe//訂閱zk的節點,和服務端發佈同樣(省略代碼) //dubbo/per.qiao.service.TestService/providers, //dubbo/per.qiao.service.TestService/configurators //dubbo/per.qiao.service.TestService/routers -->notify(url, listener, urls); -->FailbackRegistry.notify -->doNotify(url, listener, urls); -->AbstractRegistry.notify -->saveProperties(url); //把註冊信息保存到cache文件(路徑規則與暴露時同樣) -->registryCacheExecutor.execute(new SaveProperties(...)); //採用線程池來處理 -->listener.notify(categoryList); -->RegistryDirectory.notify -->refreshInvoker(invokerUrls); //將URL轉換成Invoker key爲URL的字符串形式 -->toInvokers(invokerUrls) -->protocol.refer(serviceType, url), url, providerUrl); -->Protocol$Adaptive.refer -->ExtensionLoader.getExtensionLoader(class) .getExtension("dubbo"); -->extension.refer(type, url); -->QosProtocolWrapper.refer //這裏建立了一個過濾連 //buildInvokerChain(invoker, //"refernce.filter","consumer") -->ProtocolFilterWrapper.refer //return new ListenerInvokerWrapper -->ProtocolListenerWrapper.refer //return new DubboInvoker -->DubboProtocol.refer -->destroyUnusedInvokers( oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker //最終目的:刷新Map<String, Invoker<T>> urlInvokerMap 對象 ,刷新Map<String, List<Invoker<T>>> methodInvokerMap對象 -->cluster.join(directory)//加入集羣路由 -->ExtensionLoader.getExtensionLoader(Cluster.class) .getExtension("failover"); -->MockClusterWrapper.join -->this.cluster.join(directory) -->FailoverCluster.join -->return new FailoverClusterInvoker<T>(directory) -->new MockClusterInvoker // 返回的invoker對象 -------------------------------------------------------------------------------------------- -->proxyFactory.getProxy(invoker) //建立服務代理 -->ProxyFactory$Adpative.getProxy -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class) .getExtension("javassist"); -->StubProxyFactoryWrapper.getProxy(invoker) //進行了後置加強 -->AbstractProxyFactory.getProxy -->getProxy(invoker, interfaces) -->Proxy.getProxy(interfaces) -->JavassistProxyFactory.getProxy -->Proxy.getProxy(interfaces) //目前代理對象per.qiao.service.TestSevice //, interface com.alibaba.dubbo.rpc.service.EchoService -->newInstance(InvokerInvocationHandler(MockClusterInvoker)) //這個MockClusterInvoker是上面refprotocol.refer返回的invoker對象 //採用jdk自帶的InvocationHandler,建立InvokerInvocationHandler對象。
入口:java
ReferenceConfig#init, ref = createProxy(map);
JavassistProxyFactory#getProxy會生成一個代理類緩存
與其說生成一個代理類,倒不如說是兩個(具體在com.alibaba.dubbo.common.bytecode.Proxy#getProxy中)服務器
一個clazz(ccp),一個pc(ccm)app
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
Proxy.getProxy方法會生成兩個類,並返回Proxy0,調用newInstance時,ide
會調用Proxy0#newInstance(handler), 最終返回proxy0對象ui
也就是說,ReferenceBean.getObject(調用者)就是這個proxy0對象this
public class Proxy0 extends Proxy { @Override public Object newInstance() { return super.newInstance(); } @Override public Object newInstance(java.lang.reflect.InvocationHandler h) { return new proxy0(h); } }
具體操做類(注意:這兩個類只有第一個字的大小寫不一樣)url
import java.lang.reflect.InvocationHandler; /** * Create by IntelliJ Idea 2018.2 * * @author: qyp * Date: 2019-05-27 10:46 */ public class proxy0 implements com.alibaba.dubbo.rpc.service.EchoService, per.qiao.service.TestService { /** * 包含這兩個接口的實現方法,這裏爲($echo,getData,getList) */ public static java.lang.reflect.Method[] methods; /** * 這個hanlder就是上面執行過程refprotocol.refer返回的結果(MockClusterInvoker) */ private java.lang.reflect.InvocationHandler handler; public proxy0(InvocationHandler h) { this.handler = h; } // ---------這個方法是EchoService中的------------- @Override public Object $echo(java.lang.Object arg0) { Object[] args = new Object[1]; args[0] = arg0; Object ret = null; try { ret = handler.invoke(this, methods[2], args); } catch (Throwable throwable) { throwable.printStackTrace(); } return (java.lang.Object) ret; } // ----------下面兩個方法是服務引用的接口中的方法----------- @Override public java.lang.String getData(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = arg0; Object ret = null; try { ret = handler.invoke(this, methods[0], args); } catch (Throwable throwable) { throwable.printStackTrace(); } return (java.lang.String) ret; } @Override public java.util.List getList() { Object[] args = new Object[0]; Object ret = null; try { ret = handler.invoke(this, methods[1], args); } catch (Throwable throwable) { throwable.printStackTrace(); } return (java.util.List) ret; } }
詳細說如下服務引用時,是怎麼和zookeeper產生聯繫的;線程
問題:若是服務端(生產端)已經啓動,客戶端(消費段)後,zookeeper上的節點已經存在,那麼久不會通知到客戶端,那麼zookeeper是怎麼刷新本地服務列表的??代理
RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); //將url轉爲Registry對象 Registry registry = registryFactory.getRegistry(url); ... // type是接口 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //建立一個註冊目錄 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url); //註冊消費者節點 registry.register(registeredConsumerUrl); directory.setRegisteredConsumerUrl(registeredConsumerUrl); } //訂閱 providers,configurators,routers這三個節點 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); // 將訂閱信息保存到本地註冊表 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
cluster不是咱們要分析的重點,
FailbackRegistry
public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 重試註冊失敗的URL 默認5秒以後重試,間隔是5秒 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { retry(); } catch (Throwable t) { ... } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } public void register(URL url) { super.register(url); //刪除註冊失敗的URL failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服務器端發送註冊請求 doRegister(url); } catch (Exception e) { // 若是打開啓動檢測,則直接拋出異常 (配置的check屬性) ... } // 將失敗的註冊請求記錄到失敗的列表中,按期重試 failedRegistered.add(url); } }
ZookeeperRegistry
用來建立消費者節點
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { ... } }
再來分析消費者訂閱
RegistryProtocol.doRefer
//訂閱 providers,configurators,routers這三個節點 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
RegistryDirectory.subscribe
public void subscribe(URL url) { //設置當前訂閱URL setConsumerUrl(url); registry.subscribe(url, this); }
FailbackRegistry
public void subscribe(URL url, NotifyListener listener) { //設置訂閱的回調監聽器 super.subscribe(url, listener); //刪除失敗的訂閱路徑 removeFailedSubscribed(url, listener); try { // 註冊客戶端信息到zookeeper並建立監聽三個節點,順便刷新本地註冊表 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; // 若是訂閱失敗,則從本地緩存文件中獲取監聽的URL刷新註冊表 // 須要瞭解到的是, 緩存中的數據是經過消費段註冊,或者zookeeper通知時調用notify纔有的 // 也只有訂閱失敗了纔會有此操做 List<URL> urls = getCacheUrls(url); if (urls != null && !urls.isEmpty()) { notify(url, listener, urls); } else { ... } // Record a failed registration request to a failed list, retry regularly addFailedSubscribed(url, listener); } }
ZookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; //連接到zookeeper zkClient = zookeeperTransporter.connect(url); //設置狀態監聽器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } @Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { ... } else { List<URL> urls = new ArrayList<URL>(); // 遍歷須要監聽的URL (三個) for (String path : toCategoriesPath(url)) { //從緩存中獲取監聽 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); //若是緩存中沒有,建立監聽 if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { //監聽器回調方法爲ZookeeperRegistry#notify ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } //建立三個監聽的節點 zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 在註冊zookeeper的節點監聽器後,自動去刷新本地列表 notify(url, listener, urls); } } catch (Throwable e) { ... } } protected void notify(URL url, NotifyListener listener, List<URL> urls) { try { doNotify(url, listener, urls); } catch (Exception t) { ... } } protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); }
AbstractRegistry
protected void notify(URL url, NotifyListener listener, List<URL> urls) { Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 遍歷監聽的URL 3個 添加到result for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); //將監聽過的URL保存到本地文件 saveProperties(url); //刷新本地註冊表 listener.notify(categoryList); } }
RegistryDirectory
public synchronized void notify(List<URL> urls) { // 分別對應 provider, router 和 configurator節點 List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { List<Router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers refreshInvoker(invokerUrls); } // 刷新本地註冊表 private void refreshInvoker(List<URL> invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } //轉換URL爲Invoker對象 只有provider節點的url才能生成Invoker對象 // 這裏返回的是一個invoker的過濾連結構,終點是DubboInvoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); //轉換成方法名對應Invoker Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { // 銷燬無用的Invoker對象 destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
客戶端在服務的時候會將消費端信息註冊到zookeeper(也能夠試別的)節點上,順便監聽了providers,configurators,routers這三個節點,而後調用了RegistryDirectory.notiry刷新本地註冊表, 返回的結果(引用對象)爲MockClusterInvoker包含了RegistryDirectory對象
小結:
1. 註冊到zookeeper,並訂閱providers,configurators和routers節點 2. 經過refprotocol.refer獲取的invoker對象是MockClusterInvoker(默認包裝了FailoverClusterInvoker) 3. ReferenceBean#getObject獲取的對象是上面的proxy0對象, 依賴了(2)中的MockClusterInvoker