dubbo原理

dubbo 2.6.2版本

1.框架設計

  1. config 配置層:對外配置接口,以 ServiceConfig, ReferenceConfig 爲中心,能夠直接初始化配置類,也能夠經過 spring 解析配置生成配置類
  2. proxy 服務代理層:服務接口透明代理,生成服務的客戶端 Stub 和服務器端 Skeleton, ServiceProxy 爲中心,擴展接口爲 ProxyFactory
  3. registry 註冊中心層:封裝服務地址的註冊與發現,以服務 URL 爲中心,擴展接口爲 RegistryFactory, Registry, RegistryService
  4. cluster 路由層:封裝多個提供者的路由及負載均衡,並橋接註冊中心,以 Invoker 爲中心,擴展接口爲 Cluster, Directory, Router, LoadBalance
  5. monitor 監控層:RPC 調用次數和調用時間監控,以 Statistics 爲中心,擴展接口爲 MonitorFactory, Monitor, MonitorService
  6. protocol 遠程調用層:封裝 RPC 調用,以 Invocation, Result 爲中心,擴展接口爲 Protocol, Invoker, Exporter
  7. exchange 信息交換層:封裝請求響應模式,同步轉異步,以 Request, Response 爲中心,擴展接口爲 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  8. transport 網絡傳輸層:抽象 mina netty 爲統一接口,以 Message 爲中心,擴展接口爲 Channel, Transporter, Client, Server, Codec
  9. serialize 數據序列化層:可複用的一些工具,擴展接口爲 Serialization, ObjectInput, ObjectOutput, ThreadPool

2.dubbo原理  -啓動解析、加載配置信息

DubboNamespaceHandler.class 命名空間解析

3.服務暴露

源碼解析:spring

主要看serviceBean.class緩存

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {

看下繼承的接口InitializingBean,ApplicationContextAware,ApplicationListener<ContextRefreshedEvent>服務器

public interface InitializingBean {
    //屬性設置完以後調用這個方法
    void afterPropertiesSet() throws Exception;
}
public interface ApplicationContextAware extends Aware {
     //容器啓動完成以後調用這個方法
    void setApplicationContext(ApplicationContext var1) throws BeansException;
}

ApplicationListener<ContextRefreshedEvent> 這個就是監聽器,事件源就是context,監聽事件ContextRefreshedEvent網絡

而後咱們看,當容器啓動完以後app

public void onApplicationEvent(ContextRefreshedEvent event) {
    if(this.isDelay() && !this.isExported() && !this.isUnexported()) {
        if(logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + this.getInterface());
        }
        //執行服務暴露
        this.export();
    }
//延遲加載
private boolean isDelay() {
    //獲取配置的延遲時間數
    Integer delay = this.getDelay();
    ProviderConfig provider = this.getProvider();
    if(delay == null && provider != null) {
        delay = provider.getDelay();
    }
    //若是delay是空或者-1,就在Spring容器啓動後執行暴露
    return this.supportedApplicationListener && (delay == null || delay.intValue() == -1);
}
public synchronized void export() {
    //判斷,省略不貼
    ....
    if(this.export == null || this.export.booleanValue()) {
        //延遲加載,是新增一個線程調用暴露方法
        if(this.delay != null && this.delay.intValue() > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    ServiceConfig.this.doExport();
                }
            }, (long)this.delay.intValue(), TimeUnit.MILLISECONDS);
        } else {
            //調用暴露方法
            this.doExport();
        }

    }
}
protected synchronized void doExport() {
        ...
        this.doExportUrls();
        ProviderModel providerModel1 = new ProviderModel(this.getUniqueServiceName(), this, this.ref);
        ApplicationModel.initProviderModel(this.getUniqueServiceName(), providerModel1);
  
}

繼續往下看doExportUrls方法負載均衡

private void doExportUrls() {
    //這裏咱們能夠看到註冊中心是list,也就是能夠有多個註冊中心
    List registryURLs = this.loadRegistries(true);
    //也能夠有多個通訊協議
    Iterator i$ = this.protocols.iterator();

    while(i$.hasNext()) {
        ProtocolConfig protocolConfig = (ProtocolConfig)i$.next();
        this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }

}

doExportUrlsFor1Protocol方法框架

太長了,寫一下主要的邏輯步驟:異步

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //把服務的元數據拼成一個url的字符串,like this:dubbo://192.168.56.1:20880/com.atguigu.gmall.serevice.UserService?anyhost=true&
application=hello-world-app&bind.ip=192.168.56.1&bind.port=20880&dubbo=2.6.2&generic=false&interface=com.atguigu.gmall.serevice.UserService&
methods=getAddressName&pid=8536&side=provider&timestamp=1589695242970
    //暴露本地服務,調用exportLocal(var22)方法
    ...
}

接下來跳到RegistryProtocol類ide

public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
    //接着暴露本地服務
    RegistryProtocol.ExporterChangeableWrapper exporter = this.doLocalExport(originInvoker);
    URL registryUrl = this.getRegistryUrl(originInvoker);
    Registry registry = this.getRegistry(originInvoker);
    URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
    boolean register = registedProviderUrl.getParameter("register", true);
    //將key爲暴露服務類的名字,註冊的包裝類做爲value存到concurrntMap()裏面
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
    if(register) {
        //label-1 在zk上註冊服務提供者的信息
        this.register(registryUrl, registeedProviderUrl);
        ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    }
    URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl); 
    RegistryProtocol.OverrideListener overrideSubscribListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker); 
    this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 
    //訂閱
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 
    return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

跳到DubboProtocol類工具

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   ...
    this.openServer(url);
    this.optimizeSerialization(url);
    return exporter;
}

主要就是open(url)方法

private void openServer(URL url) {
    String key = url.getAddress();
    boolean isServer = url.getParameter("isserver", true);
    if(isServer) {
        ExchangeServer server = (ExchangeServer)this.serverMap.get(key);
        if(server == null) {
            //開啓服務器,底層就是建立netty服務器,監聽20880端口號
            this.serverMap.put(key, this.createServer(url));
        } else {
            server.reset(url);
        }
    }

}

到這裏,就至關於咱們開啓的對外的服務器,端口號就是dubbo端口號,因此本機每一個dubbo服務的端口都不能同樣,而後回到RegisteryProcotol類的lable-1

public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = this.registryFactory.getRegistry(registryUrl);
    //進行註冊
    registry.register(registedProviderUrl);
}
protected void doRegister(URL url) {
    try {
        //建立臨時節點
        this.zkClient.create(this.toUrlPath(url), url.getParameter("dynamic", true));
    } catch (Throwable var3) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + this.getUrl() + ", cause: " + var3.getMessage(), var3);
    }
}

這裏咱們就能夠看到,每一個服務的節點信息都是臨時節點,只要關閉zk的鏈接,臨時節點將會失效。這樣咱們的服務修改配置信息重啓,就能保證最新的數據註冊在zk上。而後轉到RegistryProtocol的label-2

進入FailbackRegistry類

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    this.removeFailedSubscribed(url, listener);

    try {
        //訂閱
        this.doSubscribe(url, listener);
    } catch (Exception var8) {
        Object t = var8;
        List urls = this.getCacheUrls(url);
        if(urls != null && !urls.isEmpty()) {
            this.notify(url, listener, urls);
            this.logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + this.getUrl().getParameter("file", System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + var8.getMessage(), var8);
        } else {
            boolean check = this.getUrl().getParameter("check", true) && url.getParameter("check", true);
            boolean skipFailback = var8 instanceof SkipFailbackWrapperException;
            if(check || skipFailback) {
                if(skipFailback) {
                    t = var8.getCause();
                }

                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + ((Throwable)t).getMessage(), (Throwable)t);
            }

            this.logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + var8.getMessage(), var8);
        }

        this.addFailedSubscribed(url, listener);
    }

}

this.doSubscribe(url, listener); 其實就是幹了兩件事,一是建立永久節點-configurators,二是將節點信息變動notify給全部訂閱者,更新緩存信息

本地暴露就完成了

相關文章
相關標籤/搜索