dubbo 2.6.2版本
1.框架設計
- config 配置層:對外配置接口,以 ServiceConfig, ReferenceConfig 爲中心,能夠直接初始化配置類,也能夠經過 spring 解析配置生成配置類
- proxy 服務代理層:服務接口透明代理,生成服務的客戶端 Stub 和服務器端 Skeleton, 以 ServiceProxy 爲中心,擴展接口爲 ProxyFactory
- registry 註冊中心層:封裝服務地址的註冊與發現,以服務 URL 爲中心,擴展接口爲 RegistryFactory, Registry, RegistryService
- cluster 路由層:封裝多個提供者的路由及負載均衡,並橋接註冊中心,以 Invoker 爲中心,擴展接口爲 Cluster, Directory, Router, LoadBalance
- monitor 監控層:RPC 調用次數和調用時間監控,以 Statistics 爲中心,擴展接口爲 MonitorFactory, Monitor, MonitorService
- protocol 遠程調用層:封裝 RPC 調用,以 Invocation, Result 爲中心,擴展接口爲 Protocol, Invoker, Exporter
- exchange 信息交換層:封裝請求響應模式,同步轉異步,以 Request, Response 爲中心,擴展接口爲 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 網絡傳輸層:抽象 mina 和 netty 爲統一接口,以 Message 爲中心,擴展接口爲 Channel, Transporter, Client, Server, Codec
- serialize 數據序列化層:可複用的一些工具,擴展接口爲 Serialization, ObjectInput, ObjectOutput, ThreadPool
2.dubbo原理 -啓動解析、加載配置信息
DubboNamespaceHandler.class 命名空間解析
3.服務暴露
源碼解析:spring
主要看serviceBean.class緩存
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
看下繼承的接口InitializingBean,ApplicationContextAware,ApplicationListener<ContextRefreshedEvent>服務器
public interface InitializingBean { //屬性設置完以後調用這個方法 void afterPropertiesSet() throws Exception; }
public interface ApplicationContextAware extends Aware { //容器啓動完成以後調用這個方法 void setApplicationContext(ApplicationContext var1) throws BeansException; }
ApplicationListener<ContextRefreshedEvent> 這個就是監聽器,事件源就是context,監聽事件ContextRefreshedEvent網絡
而後咱們看,當容器啓動完以後app
public void onApplicationEvent(ContextRefreshedEvent event) { if(this.isDelay() && !this.isExported() && !this.isUnexported()) { if(logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + this.getInterface()); } //執行服務暴露 this.export(); }
//延遲加載 private boolean isDelay() { //獲取配置的延遲時間數 Integer delay = this.getDelay(); ProviderConfig provider = this.getProvider(); if(delay == null && provider != null) { delay = provider.getDelay(); } //若是delay是空或者-1,就在Spring容器啓動後執行暴露 return this.supportedApplicationListener && (delay == null || delay.intValue() == -1); }
public synchronized void export() { //判斷,省略不貼 .... if(this.export == null || this.export.booleanValue()) { //延遲加載,是新增一個線程調用暴露方法 if(this.delay != null && this.delay.intValue() > 0) { delayExportExecutor.schedule(new Runnable() { public void run() { ServiceConfig.this.doExport(); } }, (long)this.delay.intValue(), TimeUnit.MILLISECONDS); } else { //調用暴露方法 this.doExport(); } } }
protected synchronized void doExport() { ... this.doExportUrls(); ProviderModel providerModel1 = new ProviderModel(this.getUniqueServiceName(), this, this.ref); ApplicationModel.initProviderModel(this.getUniqueServiceName(), providerModel1); }
繼續往下看doExportUrls方法負載均衡
private void doExportUrls() { //這裏咱們能夠看到註冊中心是list,也就是能夠有多個註冊中心 List registryURLs = this.loadRegistries(true); //也能夠有多個通訊協議 Iterator i$ = this.protocols.iterator(); while(i$.hasNext()) { ProtocolConfig protocolConfig = (ProtocolConfig)i$.next(); this.doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
doExportUrlsFor1Protocol方法框架
太長了,寫一下主要的邏輯步驟:異步
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //把服務的元數據拼成一個url的字符串,like this:dubbo://192.168.56.1:20880/com.atguigu.gmall.serevice.UserService?anyhost=true& application=hello-world-app&bind.ip=192.168.56.1&bind.port=20880&dubbo=2.6.2&generic=false&interface=com.atguigu.gmall.serevice.UserService& methods=getAddressName&pid=8536&side=provider×tamp=1589695242970 //暴露本地服務,調用exportLocal(var22)方法 ... }
接下來跳到RegistryProtocol類ide
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException { //接着暴露本地服務 RegistryProtocol.ExporterChangeableWrapper exporter = this.doLocalExport(originInvoker); URL registryUrl = this.getRegistryUrl(originInvoker); Registry registry = this.getRegistry(originInvoker); URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register", true); //將key爲暴露服務類的名字,註冊的包裝類做爲value存到concurrntMap()裏面 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if(register) { //label-1 在zk上註冊服務提供者的信息 this.register(registryUrl, registeedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl); RegistryProtocol.OverrideListener overrideSubscribListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); //訂閱 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
跳到DubboProtocol類工具
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { ... this.openServer(url); this.optimizeSerialization(url); return exporter; }
主要就是open(url)方法
private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter("isserver", true); if(isServer) { ExchangeServer server = (ExchangeServer)this.serverMap.get(key); if(server == null) { //開啓服務器,底層就是建立netty服務器,監聽20880端口號 this.serverMap.put(key, this.createServer(url)); } else { server.reset(url); } } }
到這裏,就至關於咱們開啓的對外的服務器,端口號就是dubbo端口號,因此本機每一個dubbo服務的端口都不能同樣,而後回到RegisteryProcotol類的lable-1
public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = this.registryFactory.getRegistry(registryUrl); //進行註冊 registry.register(registedProviderUrl); }
protected void doRegister(URL url) { try { //建立臨時節點 this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true)); } catch (Throwable var3) { throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3); } }
這裏咱們就能夠看到,每一個服務的節點信息都是臨時節點,只要關閉zk的鏈接,臨時節點將會失效。這樣咱們的服務修改配置信息重啓,就能保證最新的數據註冊在zk上。而後轉到RegistryProtocol的label-2
進入FailbackRegistry類
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); this.removeFailedSubscribed(url, listener); try { //訂閱 this.doSubscribe(url, listener); } catch (Exception var8) { Object t = var8; List urls = this.getCacheUrls(url); if(urls != null && !urls.isEmpty()) { this.notify(url, listener, urls); this.logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + this.getUrl().getParameter("file", System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + var8.getMessage(), var8); } else { boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true); boolean skipFailback = var8 instanceof SkipFailbackWrapperException; if(check || skipFailback) { if(skipFailback) { t = var8.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t); } this.logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + var8.getMessage(), var8); } this.addFailedSubscribed(url, listener); } }
this.doSubscribe(url, listener); 其實就是幹了兩件事,一是建立永久節點-configurators,二是將節點信息變動notify給全部訂閱者,更新緩存信息
本地暴露就完成了