準備工做:java
先啓動兩個provider:bootstrap
來看一下ReferenceBean的繼承實現關係圖:數組
在執行DemoService demoService = (DemoService) context.getBean("demoService")時,因爲ReferenceBean是一個FactoryBean,因此這裏會經過FactoryBean.getObject方法獲取Bean。緩存
來看一下ReferenceBean的核心代碼:服務器
public Object getObject() throws Exception { return get(); } public synchronized T get() { if (destroyed) { throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; } private void init() { ... ref = createProxy(map); } private T createProxy(Map<String, String> map) { ... if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0)); } ... // 建立服務代理 return (T) proxyFactory.getProxy(invoker); }
最核心的兩行代碼如上紅色。app
一 使用Protocol將interfaceClass轉化爲Invokersocket
1 invoker = refprotocol.refer(interfaceClass, urls.get(0))
這裏的refprotocol是Protocol$Adaptive實例。tcp
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { ... public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } ... }
這裏extName="registry"。以後通過ProtocolListenerWrapper.refer->ProtocolFilterWrapper.refer->RegistryProtocol.refer,前兩步什麼都不作(registry協議)。來看RegistryProtocol.refer方法核心代碼:ide
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 3 Registry registry = registryFactory.getRegistry(url); 4 ... 5 return doRefer(cluster, registry, type, url); 6 }
參數:性能
第一行代碼執行完成以後,替換了協議,此時的url爲:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358
以後開始獲取Registry。這裏的registryFactory是RegistryFactory$Adaptive實例。
1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { 2 public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { 3 if (arg0 == null) 4 throw new IllegalArgumentException("url == null"); 5 com.alibaba.dubbo.common.URL url = arg0; 6 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper 7 if(extName == null) 8 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])"); 9 com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName); 10 return extension.getRegistry(arg0); 11 } 12 }
這裏的extName是zookeeper。以後執行ZookeeperRegistryFactory的父類AbstractRegistryFactory.getRegistry,以下:
1 public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 String key = url.toServiceString(); 6 // 鎖定註冊中心獲取過程,保證註冊中心單一實例 7 LOCK.lock(); 8 try { 9 Registry registry = REGISTRIES.get(key); 10 if (registry != null) { 11 return registry; 12 } 13 registry = createRegistry(url); 14 if (registry == null) { 15 throw new IllegalStateException("Can not create registry " + url); 16 } 17 REGISTRIES.put(key, registry); 18 return registry; 19 } finally { 20 // 釋放鎖 21 LOCK.unlock(); 22 } 23 }
通過處理的url爲:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267×tamp=1510225984358
以後調用ZookeeperRegistryFactory.createRegistry(URL url):
1 public Registry createRegistry(URL url) { 2 return new ZookeeperRegistry(url, zookeeperTransporter); 3 }
這裏的zookeeperTransporter爲ZookeeperTransporter$Adaptive實例。
1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2 super(url); 3 if (url.isAnyHost()) { 4 throw new IllegalStateException("registry address == null"); 5 } 6 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); 7 if (!group.startsWith(Constants.PATH_SEPARATOR)) { 8 group = Constants.PATH_SEPARATOR + group; 9 } 10 this.root = group; 11 zkClient = zookeeperTransporter.connect(url); 12 zkClient.addStateListener(new StateListener() { 13 public void stateChanged(int state) { 14 if (state == RECONNECTED) { 15 try { 16 recover(); 17 } catch (Exception e) { 18 logger.error(e.getMessage(), e); 19 } 20 } 21 } 22 }); 23 }
經過super(url)這句代碼,調用了ZookeeperRegistry的父類FailbackRegistry(啓動失敗處理器:註冊失敗/註銷失敗/訂閱失敗/反訂閱失敗/通知失敗)和AbstractRegistry(將信息寫入properties文件,進行相應通知-這裏沒有url的訂閱器,因此沒作什麼事)。
而後獲取ZkClient客戶端,最後添加失敗重連監聽器。
執行zookeeperTransporter.connect(url),該類中的extName是"zkClient"(咱們在provider部分使用了curator)。以後執行ZkclientZookeeperTransporter.connect:
1 public ZookeeperClient connect(URL url) { 2 return new ZkclientZookeeperClient(url); 3 }
1 public ZkclientZookeeperClient(URL url) { 2 super(url); 3 client = new ZkClientWrapper(url.getBackupAddress(), 30000); 4 client.addListener(new IZkStateListener() { 5 public void handleStateChanged(KeeperState state) throws Exception { 6 ZkclientZookeeperClient.this.state = state; 7 if (state == KeeperState.Disconnected) { 8 stateChanged(StateListener.DISCONNECTED); 9 } else if (state == KeeperState.SyncConnected) { 10 stateChanged(StateListener.CONNECTED); 11 } 12 } 13 14 public void handleNewSession() throws Exception { 15 stateChanged(StateListener.RECONNECTED); 16 } 17 }); 18 client.start(); 19 }
此處的client是ZkClientWrapper實例,來看ZkClientWrapper.start():
1 private ListenableFutureTask<ZkClient> listenableFutureTask; 2 3 public ZkClientWrapper(final String serverAddr, long timeout) { 4 this.timeout = timeout; 5 listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() { 6 @Override 7 public ZkClient call() throws Exception { 8 return new ZkClient(serverAddr, Integer.MAX_VALUE); 9 } 10 }); 11 } 12 13 public void start() { 14 if (!started) { 15 Thread connectThread = new Thread(listenableFutureTask); 16 connectThread.setName("DubboZkclientConnector"); 17 connectThread.setDaemon(true); 18 connectThread.start(); 19 try { 20 client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS); 21 } catch (Throwable t) { 22 logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t); 23 } 24 started = true; 25 } else { 26 logger.warn("Zkclient has already been started!"); 27 } 28 }
此處會new ZkClient,鏈接zookeeper。
以後設置失敗重連監聽器。到此爲止,建立Registry就完成了!再回到RegistryProtocol.refer方法核心代碼:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
2 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 3 Registry registry = registryFactory.getRegistry(url); 4 ... 5 return doRefer(cluster, registry, type, url); 6 }
以後執行最後一行代碼:
1 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 2 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 3 directory.setRegistry(registry); 4 directory.setProtocol(protocol); 5 // REFER_KEY的全部屬性 6 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 7 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 8 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 9 && url.getParameter(Constants.REGISTER_KEY, true)) { 10 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 11 Constants.CHECK_KEY, String.valueOf(false))); 12 } 13 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 14 Constants.PROVIDERS_CATEGORY 15 + "," + Constants.CONFIGURATORS_CATEGORY 16 + "," + Constants.ROUTERS_CATEGORY)); 17 return cluster.join(directory); 18 }
整體步驟:
首先是建立RegistryDirectory,建立完成的實例:
-->List<Router> routers: [MockInvokersSelector實例]
-->Registry registry: 上述的ZookeeperRegistry實例(zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=25267×tamp=1510225984358)
-->String serviceKey: com.alibaba.dubbo.registry.RegistryService
-->String[] serviceMethods: [sayHello]
-->Class<T> serviceType: interface com.alibaba.dubbo.demo.DemoService
-->URL url: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358
-->URL consumerUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=25267&refer=application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509×tamp=1510225984358
-->URL directoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509
-->URL overrideDirectoryUrl: zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&side=consumer×tamp=1510225913509
-->Map<String, String> queryMap: {side=consumer, application=demo-consumer, register.ip=10.10.10.10, methods=sayHello, dubbo=2.0.0, pid=25267, check=false, interface=com.alibaba.dubbo.demo.DemoService, timestamp=1510225913509}
其中Router是在RegistryDirectory的父類AbstractDirectory中建立的,代碼以下:
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) { if (url == null) throw new IllegalArgumentException("url == null"); this.url = url; this.consumerUrl = consumerUrl; setRouters(routers); } protected void setRouters(List<Router> routers) { // copy list routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers); // append url router String routerkey = url.getParameter(Constants.ROUTER_KEY); if (routerkey != null && routerkey.length() > 0) { RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey); routers.add(routerFactory.getRouter(url)); } // append mock invoker selector routers.add(new MockInvokersSelector()); Collections.sort(routers); this.routers = routers; }
以後向註冊中心註冊消費者,註冊的方式與服務提供者同樣。先是經過FailbackRegistry.register,內部調用子類ZookeeperRegistry的doRegister(),若是失敗,加入註冊失敗列表(會被修復線程後臺從新註冊)。
1 public void register(URL url) { 2 if (destroyed.get()){ 3 return; 4 } 5 super.register(url); 6 failedRegistered.remove(url); 7 failedUnregistered.remove(url); 8 try { 9 // 向服務器端發送註冊請求 10 doRegister(url); 11 } catch (Exception e) { 12 Throwable t = e; 13 14 // 若是開啓了啓動時檢測,則直接拋出異常 15 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 16 && url.getParameter(Constants.CHECK_KEY, true) 17 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 18 boolean skipFailback = t instanceof SkipFailbackWrapperException; 19 if (check || skipFailback) { 20 if (skipFailback) { 21 t = t.getCause(); 22 } 23 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 24 } else { 25 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 26 } 27 28 // 將失敗的註冊請求記錄到失敗列表,定時重試 29 failedRegistered.add(url); 30 } 31 }
最後來看ZookeeperRegistry的doRegister方法:
1 protected void doRegister(URL url) { 2 try { 3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }
在zk上建立臨時節點:/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509
到此,消費者註冊完成!以後directory.subscribe進行訂閱。RegistryDirectory.subscribe(URL url):
1 public void subscribe(URL url) { 2 setConsumerUrl(url); 3 registry.subscribe(url, this); 4 }
FailbackRegistry.subscribe(URL url, NotifyListener listener)核心代碼:
1 public void subscribe(URL url, NotifyListener listener) { 2 ... 3 super.subscribe(url, listener); 4 removeFailedSubscribed(url, listener); 5 try { 6 // 向服務器端發送訂閱請求 7 doSubscribe(url, listener); 8 } catch (Exception e) { 9 ... 10 // 將失敗的訂閱請求記錄到失敗列表,定時重試 11 addFailedSubscribed(url, listener); 12 } 13 }
ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
1 protected void doSubscribe(final URL url, final NotifyListener listener) { 2 try { 3 ... 4 List<URL> urls = new ArrayList<URL>(); 5 for (String path : toCategoriesPath(url)) { 6 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 7 if (listeners == null) { 8 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 9 listeners = zkListeners.get(url); 10 } 11 ChildListener zkListener = listeners.get(listener); 12 if (zkListener == null) { 13 listeners.putIfAbsent(listener, new ChildListener() { 14 public void childChanged(String parentPath, List<String> currentChilds) { 15 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); 16 } 17 }); 18 zkListener = listeners.get(listener); 19 } 20 zkClient.create(path, false); 21 List<String> children = zkClient.addChildListener(path, zkListener); 22 if (children != null) { 23 urls.addAll(toUrlsWithEmpty(url, path, children)); 24 } 25 } 26 notify(url, listener, urls); 27 } catch (Throwable e) { 28 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 29 } 30 }
這裏的for循環是3次:
執行完上述for循環後,來看此時的:
ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:
1 { 2 consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 3 = 4 {RegistryDirectory實例=ZookeeperRegistry中的匿名內部類ChildListener實例} 5 }
List<URL> urls:(4個元素)
[ dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider×tamp=1510225244315, dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider×tamp=1510225334486, empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509, empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ]
注意:前邊兩個元素是在執行List<String> children = zkClient.addChildListener(path, zkListener)代碼時,會返回當前path下的節點(實際上就是第一次服務發現)。
以後一路執行到AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
1 protected void notify(URL url, NotifyListener listener, List<URL> urls) { 2 ... 3 Map<String, List<URL>> result = new HashMap<String, List<URL>>(); 4 for (URL u : urls) { 5 if (UrlUtils.isMatch(url, u)) { 6 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 7 List<URL> categoryList = result.get(category); 8 if (categoryList == null) { 9 categoryList = new ArrayList<URL>(); 10 result.put(category, categoryList); 11 } 12 categoryList.add(u); 13 } 14 } 15 if (result.size() == 0) { 16 return; 17 } 18 Map<String, List<URL>> categoryNotified = notified.get(url); 19 if (categoryNotified == null) { 20 notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); 21 categoryNotified = notified.get(url); 22 } 23 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { 24 String category = entry.getKey(); 25 List<URL> categoryList = entry.getValue(); 26 categoryNotified.put(category, categoryList); 27 saveProperties(url); 28 listener.notify(categoryList); 29 } 30 }
首先是一個for循環對傳入的url列表進行分類,分類結果以下:
Map<String, List<URL>> result:
{ configurators=[ empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ], routers=[ empty://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509 ], providers=[ dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=318&revision=2.5.7&side=provider×tamp=1510225244315, dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25215&side=provider×tamp=1510225334486 ] }
以後執行第二個for循環,對上述的result進行遍歷,分別進行保存文件和通知。其中前兩個entry沒作什麼核心事,直接來看providers的entry的通知。代碼RegistryDirectory.
notify(List<URL> urls)。這裏的urls就是上邊的providers的兩個value值。
1 public synchronized void notify(List<URL> urls) { 2 List<URL> invokerUrls = new ArrayList<URL>(); 3 List<URL> routerUrls = new ArrayList<URL>(); 4 List<URL> configuratorUrls = new ArrayList<URL>(); 5 for (URL url : urls) { 6 String protocol = url.getProtocol(); 7 String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 8 if (Constants.ROUTERS_CATEGORY.equals(category) 9 || Constants.ROUTE_PROTOCOL.equals(protocol)) { 10 routerUrls.add(url); 11 } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 12 || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { 13 configuratorUrls.add(url); 14 } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { 15 invokerUrls.add(url); 16 } else { 17 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); 18 } 19 } 20 // configurators 21 if (configuratorUrls != null && configuratorUrls.size() > 0) { 22 this.configurators = toConfigurators(configuratorUrls); 23 } 24 // routers 25 if (routerUrls != null && routerUrls.size() > 0) { 26 List<Router> routers = toRouters(routerUrls); 27 if (routers != null) { // null - do nothing 28 setRouters(routers); 29 } 30 } 31 List<Configurator> localConfigurators = this.configurators; // local reference 32 // 合併override參數 33 this.overrideDirectoryUrl = directoryUrl; 34 if (localConfigurators != null && localConfigurators.size() > 0) { 35 for (Configurator configurator : localConfigurators) { 36 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); 37 } 38 } 39 // providers 40 refreshInvoker(invokerUrls); 41 }
這裏首先將輸入的兩個provider的url存放在invokerUrls列表中,以後調用refreshInvoker(invokerUrls)。
1 private void refreshInvoker(List<URL> invokerUrls) { 2 ... 3 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference 4 ... 5 this.cachedInvokerUrls = new HashSet<URL>(); 6 this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便於交叉對比 7 ... 8 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 將URL列表轉成Invoker列表 9 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表 10 this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; 11 this.urlInvokerMap = newUrlInvokerMap; 12 ... 13 }
1 private Map<String, Invoker<T>> toInvokers(List<URL> urls) { 2 Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); 3 ... 4 for (URL providerUrl : urls) { 5 String key = url.toFullString(); // URL參數是排序的 6 ... 7 Invoker<T> invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 8 9 newUrlInvokerMap.put(key, invoker); 10 } 11 ... 12 return newUrlInvokerMap; 13 }
這裏會遍歷兩個providerUrl:protocol是Protocol$Adaptive實例,依舊是走listener->filter->DubboProtocol,看一下filter部分:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 3 return protocol.refer(type, url); 4 } 5 return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); 6 }
兩個常量是:reference.filter和consumer。最後來看DubboProtocol.refer
1 public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { 2 // create rpc invoker. 3 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); 4 invokers.add(invoker); 5 return invoker; 6 }
這裏首先執行getClients建立Netty客戶端,建立客戶端與服務端的長鏈接,以後封裝爲DubboInvoker,最後返回。返回以後進行filter鏈包裝該DubboInvoker實例。最後又會使用InvokerDelegete包裝帶有filter鏈的DubboInvoker實例。在最後,將該InvokerDelegete實例放置到newUrlInvokerMap緩存中,這就是整個toInvokers(List<URL> urls)的邏輯。最後再將newUrlInvokerMap轉換封裝到Map<String, List<Invoker<T>>> newMethodInvokerMap緩存中。這就是整個refreshInvoker(List<URL> invokerUrls)的邏輯。執行完成以後,訂閱通知就執行完了。
來看一下getClients(url):
1 private ExchangeClient[] getClients(URL url) { 2 //是否共享鏈接 3 boolean service_share_connect = false; 4 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); 5 //若是connections不配置,則共享鏈接,不然每服務每鏈接 6 if (connections == 0) { 7 service_share_connect = true; 8 connections = 1; 9 } 10 11 ExchangeClient[] clients = new ExchangeClient[connections]; 12 for (int i = 0; i < clients.length; i++) { 13 if (service_share_connect) { 14 clients[i] = getSharedClient(url); 15 } else { 16 clients[i] = initClient(url); 17 } 18 } 19 return clients; 20 } 21 22 /** 23 * 獲取共享鏈接 24 */ 25 private ExchangeClient getSharedClient(URL url) { 26 String key = url.getAddress(); 27 ReferenceCountExchangeClient client = referenceClientMap.get(key); 28 if (client != null) { 29 if (!client.isClosed()) { 30 client.incrementAndGetCount(); 31 return client; 32 } else { 33 referenceClientMap.remove(key); 34 } 35 } 36 synchronized (key.intern()) { 37 ExchangeClient exchangeClient = initClient(url); 38 client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); 39 referenceClientMap.put(key, client); 40 ghostClientMap.remove(key); 41 return client; 42 } 43 } 44 45 46 /** 47 * 建立新鏈接. 48 */ 49 private ExchangeClient initClient(URL url) { 50 51 // client type setting. 52 String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); 53 54 String version = url.getParameter(Constants.DUBBO_VERSION_KEY); 55 boolean compatible = (version != null && version.startsWith("1.0.")); 56 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); 57 //默認開啓heartbeat 58 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 59 60 // BIO存在嚴重性能問題,暫時不容許使用 61 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { 62 throw new RpcException("Unsupported client type: " + str + "," + 63 " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); 64 } 65 66 ExchangeClient client; 67 try { 68 //設置鏈接應該是lazy的 69 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { 70 client = new LazyConnectExchangeClient(url, requestHandler); 71 } else { 72 client = Exchangers.connect(url, requestHandler); 73 } 74 } catch (RemotingException e) { 75 throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); 76 } 77 return client; 78 }
注意:這裏因爲使用了共享連接,實際上就是在一個消費者機器和一個服務提供者機器之間只創建一條nio長鏈接,也能夠指定鏈接數,那樣就會創建多條鏈接。
最後執行到HeaderExchanger.connect(URL url, ExchangeHandler handler)
1 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 2 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); 3 }
執行Transporters.connect:
1 public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 ChannelHandler handler; 6 if (handlers == null || handlers.length == 0) { 7 handler = new ChannelHandlerAdapter(); 8 } else if (handlers.length == 1) { 9 handler = handlers[0]; 10 } else { 11 handler = new ChannelHandlerDispatcher(handlers); 12 } 13 return getTransporter().connect(url, handler); 14 }
執行NettyTransporter.connect:
1 public Client connect(URL url, ChannelHandler listener) throws RemotingException { 2 return new NettyClient(url, listener); 3 }
1 public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { 2 super(url, wrapChannelHandler(url, handler)); 3 }
這裏繼續包裝handler。和provider同樣,6層。以後進行一系列的賦值後,打開netty客戶端:
1 protected void doOpen() throws Throwable { 2 NettyHelper.setNettyLoggerFactory(); 3 bootstrap = new ClientBootstrap(channelFactory); 4 // config 5 // @see org.jboss.netty.channel.socket.SocketChannelConfig 6 bootstrap.setOption("keepAlive", true); 7 bootstrap.setOption("tcpNoDelay", true); 8 bootstrap.setOption("connectTimeoutMillis", getTimeout()); 9 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 10 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 11 public ChannelPipeline getPipeline() { 12 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 13 ChannelPipeline pipeline = Channels.pipeline(); 14 pipeline.addLast("decoder", adapter.getDecoder()); 15 pipeline.addLast("encoder", adapter.getEncoder()); 16 pipeline.addLast("handler", nettyHandler); 17 return pipeline; 18 } 19 }); 20 }
以後進行鏈接netty服務端:
1 protected void doConnect() throws Throwable { 2 long start = System.currentTimeMillis(); 3 ChannelFuture future = bootstrap.connect(getConnectAddress()); 4 try { 5 boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); 6 7 if (ret && future.isSuccess()) { 8 Channel newChannel = future.getChannel(); 9 newChannel.setInterestOps(Channel.OP_READ_WRITE); 10 try { 11 // 關閉舊的鏈接 12 Channel oldChannel = NettyClient.this.channel; // copy reference 13 if (oldChannel != null) { 14 try { 15 if (logger.isInfoEnabled()) { 16 logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); 17 } 18 oldChannel.close(); 19 } finally { 20 NettyChannel.removeChannelIfDisconnected(oldChannel); 21 } 22 } 23 } finally { 24 if (NettyClient.this.isClosed()) { 25 try { 26 if (logger.isInfoEnabled()) { 27 logger.info("Close new netty channel " + newChannel + ", because the client closed."); 28 } 29 newChannel.close(); 30 } finally { 31 NettyClient.this.channel = null; 32 NettyChannel.removeChannelIfDisconnected(newChannel); 33 } 34 } else { 35 NettyClient.this.channel = newChannel; 36 } 37 } 38 } else if (future.getCause() != null) { 39 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " 40 + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); 41 } else { 42 throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " 43 + getRemoteAddress() + " client-side timeout " 44 + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " 45 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); 46 } 47 } finally { 48 if (!isConnected()) { 49 future.cancel(); 50 } 51 } 52 }
到此爲止NettyClient就建立好了,以後將該client封裝爲HeaderExchangeClient中。
1 public HeaderExchangeClient(Client client, boolean needHeartbeat) { 2 if (client == null) { 3 throw new IllegalArgumentException("client == null"); 4 } 5 this.client = client; 6 this.channel = new HeaderExchangeChannel(client); 7 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); 8 this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); 9 this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); 10 if (heartbeatTimeout < heartbeat * 2) { 11 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 12 } 13 if (needHeartbeat) { 14 startHeatbeatTimer(); 15 } 16 }
啓動心跳。
最後將HeaderExchangeClient實例封裝爲ReferenceCountExchangeClient:
1 public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) { 2 this.client = client; 3 refenceCount.incrementAndGet(); 4 this.url = client.getUrl(); 5 if (ghostClientMap == null) { 6 throw new IllegalStateException("ghostClientMap can not be null, url: " + url); 7 } 8 this.ghostClientMap = ghostClientMap; 9 }
最後放到緩存Map<String, ReferenceCountExchangeClient> referenceClientMap中。最後將ReferenceCountExchangeClient封裝到DubboInvoker中。咱們來看此時的DubboInvoker:
-->Map<String, String> attachment: {interface=com.alibaba.dubbo.demo.DemoService} -->ExchangeClient[] clients:[ReferenceCountExchangeClient實例]//若是設置了多條鏈接,此處有多個client -->Class<T> type: interface com.alibaba.dubbo.demo.DemoService -->Url url: dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&remote.timestamp=1510225244315&revision=2.5.7&side=consumer×tamp=1510225913509
以後對DubboInvoker實例進行filter鏈的包裝。
ConsumerContextFilter->FutureFilter->MonitorFilter->DubboInvoker.
最後將包裝後的Invoker實例包裝爲InvokerDelegete實例。最後的最後,咱們的終極目的:初始化RegistryDirectory的兩個屬性:
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例], *=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例]}
Map<String, Invoker<T>> urlInvokerMap={dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&remote.timestamp=1510225334486&side=consumer×tamp=1510225913509
=
provider1的RegistryDirectory$InvokerDelegete實例, dubbo://10.211.55.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.5.7&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267®ister.ip=10.10.10.10&remote.timestamp=1510225244315&revision=2.5.7&side=consumer×tamp=1510225913509=provider2的RegistryDirectory$InvokerDelegete實例}
到此爲止,訂閱就完成了。如今來看RegistryProtocol.doRefer的最後一行代碼:return cluster.join(directory)
這裏的cluster是Cluster$Adaptive實例:
1 public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster { 2 public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException { 3 if (arg0 == null) 4 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); 5 if (arg0.getUrl() == null) 6 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null"); 7 com.alibaba.dubbo.common.URL url = arg0.getUrl(); 8 String extName = url.getParameter("cluster", "failover"); 9 if (extName == null) 10 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); 11 com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); 12 return extension.join(arg0); 13 } 14 }
這裏的extName="failover",這裏會進行aop:MockClusterWrapper包裝FailoverCluster。
1 public class MockClusterWrapper implements Cluster { 2 private Cluster cluster; 3 4 public MockClusterWrapper(Cluster cluster) { 5 this.cluster = cluster; 6 } 7 8 public <T> Invoker<T> join(Directory<T> directory) throws RpcException { 9 return new MockClusterInvoker<T>(directory, 10 this.cluster.join(directory)); 11 } 12 }
這裏的cluster是FailoverCluster實例。
1 /** 2 * 失敗轉移,當出現失敗,重試其它服務器,一般用於讀操做,但重試會帶來更長延遲。 3 */ 4 public class FailoverCluster implements Cluster { 5 public final static String NAME = "failover"; 6 7 public <T> Invoker<T> join(Directory<T> directory) throws RpcException { 8 return new FailoverClusterInvoker<T>(directory); 9 } 10 }
1 public FailoverClusterInvoker(Directory<T> directory) { 2 super(directory); 3 }
這裏實際上就是建立一個FailoverClusterInvokers實例,經過其父類AbstractClusterInvoker存儲屬性。
最後建立一個MockClusterInvoker實例:
1 private final Directory<T> directory; 2 private final Invoker<T> invoker; 3 4 public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) { 5 this.directory = directory; 6 this.invoker = invoker; 7 }
到此爲止,下邊的第一行代碼就結束了!最終獲得一個MockClusterInvoker實例:
1 private T createProxy(Map<String, String> map) { 2 ... 3 if (urls.size() == 1) { 4 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 5 } 6 ... 7 // 建立服務代理 8 return (T) proxyFactory.getProxy(invoker); 9 }
二 使用ProxyFactory建立代理
1 (T) proxyFactory.getProxy(invoker)
上述的proxyFactory是ProxyFactory$Adaptive實例,其getProxy內部最終獲得是一個被StubProxyFactoryWrapper包裝後的JavassistProxyFactory。直接來看JavassistProxyFactory.getProxy方法
1 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 2 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); 3 }
注意這裏的Proxy不是jdk的,而是dubbo的。
Proxy.getProxy(interfaces)
1 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { 2 ... 3 Proxy proxy = null; 4 ... 5 // create ProxyInstance class. 6 String pcn = pkg + ".proxy" + id; 7 ccp.setClassName(pcn); 8 ccp.addField("public static java.lang.reflect.Method[] methods;"); 9 ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); 10 ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); 11 ccp.addDefaultConstructor(); 12 Class<?> clazz = ccp.toClass(); 13 clazz.getField("methods").set(null, methods.toArray(new Method[0])); 14 15 // create Proxy class. 16 String fcn = Proxy.class.getName() + id; 17 ccm = ClassGenerator.newInstance(cl); 18 ccm.setClassName(fcn); 19 ccm.addDefaultConstructor(); 20 ccm.setSuperClass(Proxy.class); 21 ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); 22 Class<?> pc = ccm.toClass(); 23 proxy = (Proxy) pc.newInstance(); 24 ... 25 return proxy; 26 }
從代碼來看,會生成兩個Class對象:pc是建立代理類的工廠類;clazz是真實對象的代理類。最終返回的proxy是以下Proxy0對象;以後調用了Proxy0.newInstance(InvocationHandler paramInvocationHandler)方法:建立出了proxy0對象,並初始化了其中的InvocationHandler handler對象爲InvokerInvocationHandler。
最終會生成兩個類:(這兩個類都是筆者都是直接導出.class文件以後經過jd-gui反編譯出來的)。
工廠類:
1 package com.alibaba.dubbo.common.bytecode; 2 3 import java.lang.reflect.InvocationHandler; 4 5 public class Proxy0 extends Proxy { 6 public Object newInstance(InvocationHandler paramInvocationHandler) { 7 return new proxy0(paramInvocationHandler); 8 } 9 }
真實對象代理類:
1 package com.alibaba.dubbo.common.bytecode; 2 3 import com.alibaba.dubbo.demo.DemoService; 4 import com.alibaba.dubbo.rpc.service.EchoService; 5 import java.lang.reflect.InvocationHandler; 6 import java.lang.reflect.Method; 7 8 public class proxy0 implements EchoService, DemoService { 9 public static Method[] methods; 10 private InvocationHandler handler; 11 12 public String sayHello(String paramString) { 13 Object[] arrayOfObject = new Object[1]; 14 arrayOfObject[0] = paramString; 15 Object localObject = this.handler.invoke(this, methods[0], arrayOfObject); 16 return (String) localObject; 17 } 18 19 public Object $echo(Object paramObject) { 20 Object[] arrayOfObject = new Object[1]; 21 arrayOfObject[0] = paramObject; 22 Object localObject = this.handler.invoke(this, methods[1], arrayOfObject); 23 return (Object) localObject; 24 } 25 26 public proxy0() { 27 } 28 29 public proxy0(InvocationHandler paramInvocationHandler) { 30 this.handler = paramInvocationHandler; 31 } 32 }
上邊的methods數組實際上已經包含了兩個元素:
[public abstract java.lang.String com.alibaba.dubbo.demo.DemoService.sayHello(java.lang.String),
public abstract java.lang.Object com.alibaba.dubbo.rpc.service.EchoService.$echo(java.lang.Object)]
如上所示,咱們最終返回的代理對象實際上是一個proxy0對象,當咱們調用其sayHello方法時,其調用內部的handler.invoke方法。
1 public class InvokerInvocationHandler implements InvocationHandler { 2 private final Invoker<?> invoker; 3 4 public InvokerInvocationHandler(Invoker<?> handler) { 5 this.invoker = handler; 6 } 7 8 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 9 String methodName = method.getName(); 10 Class<?>[] parameterTypes = method.getParameterTypes(); 11 if (method.getDeclaringClass() == Object.class) { 12 return method.invoke(invoker, args); 13 } 14 if ("toString".equals(methodName) && parameterTypes.length == 0) { 15 return invoker.toString(); 16 } 17 if ("hashCode".equals(methodName) && parameterTypes.length == 0) { 18 return invoker.hashCode(); 19 } 20 if ("equals".equals(methodName) && parameterTypes.length == 1) { 21 return invoker.equals(args[0]); 22 } 23 return invoker.invoke(new RpcInvocation(method, args)).recreate(); 24 } 25 26 }
這裏的invoke是上述的MockClusterInvoker實例。
到此爲止,DemoService demoService = (DemoService) context.getBean("demoService"); 該行代碼就結束了。最終獲得的demoService是一個proxy0實例(是一個代理)!