nacos統一配置中心源碼解析

配置文件想必你們都很熟悉,不管什麼架構 都離不開配置,雖然spring boot已經大大簡化了配置,但若是服務不少 環境也好幾個,管理配置起來仍是很麻煩,而且每次改完配置都須要重啓服務,nacos config出現就解決了這些問題,它把配置統一放到服務進行管理,客戶端這邊進行有須要的獲取,能夠實時對配置進行修改和發佈html

如何使用nacos config

首先須要引入nacos config jar包spring

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

 在nacos控制檯提早配置須要的配置文件數據庫

 

 配置文件格式支持text、json、xml、yaml、html、properties,注意spring boot啓動支持的配置文件格式只能爲yaml或properties格式,其它格式的配置文件須要後續咱們本身寫代碼去獲取apache

咱們來看db.properties也是就數據庫配置json

 

 data id就是對應配置文件id,group爲分組,配置內容就是properties格式的bootstrap

再來看bootstrap.properties如何引用這個配置文件api

spring.application.name=nacos-config
server.port=20200

#命名空間
spring.cloud.nacos.config.namespace=${nacos_register_namingspace:0ca74337-8f42-49c3-aec9-32f268a937c4}
#組名
spring.cloud.nacos.config.group=${spring.application.name}
#文件格式
spring.cloud.nacos.config.file-extension=properties
#nacos server地址
spring.cloud.nacos.config.server-addr=localhost:8848

#加載配置文件
spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
spring.cloud.nacos.config.ext-config[1].data-id=db.properties
spring.cloud.nacos.config.ext-config[2].data-id=mybatis-plus.properties

 

注意 加載配置文件的分組名默認爲DEFAULT_GROUP,如需指定分組 須要再指定緩存

spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
spring.cloud.nacos.config.ext-config[0].group=${spring.cloud.nacos.config.group}
#或者
spring.cloud.nacos.config.ext-config[1].data-id=undertow.properties
spring.cloud.nacos.config.ext-config[1].group=MY_DEFAULT

 

在這裏解釋下namespace和group的概念,namespace能夠用來解決不一樣環境的問題,group是來管理配置分組的,它們的關係以下圖服務器

 spring boot啓動容器如何加載nacos config配置文件

 

 

這個配置做用是spring在啓動之間準備上下文時會啓用這個配置 來導入nacos相關配置文件,爲後續容器啓動作準備mybatis

來看NacosConfigBootstrapConfiguration這個配置類

 

 

NacosConfigProperties:對應咱們上面在bootstrap.properties中對應的配置信息

NacosConfigManager: 持有NacosConfigProperties和ConfigService,ConfigService用來查詢 發佈配置的相關接口

NacosPropertySourceLocator:它實現了PropertySourceLocator ,spring boot啓動時調用PropertySourceLocator.locate(env)用來加載配置信息,下面來看相關源碼

/******************************************NacosPropertySourceLocator******************************************/
public PropertySource<?> locate(Environment env) {
    ConfigService configService = this.nacosConfigProperties.configServiceInstance();
    if (null == configService) {
        log.warn("no instance of config service found, can't load config from nacos");
        return null;
    } else {
        long timeout = (long)this.nacosConfigProperties.getTimeout();
        this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
        String name = this.nacosConfigProperties.getName();
        String dataIdPrefix = this.nacosConfigProperties.getPrefix();
        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = name;
        }

        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = env.getProperty("spring.application.name");
        }

        CompositePropertySource composite = new CompositePropertySource("NACOS");
        // 加載共享的配置文件 不一樣指定分組 默認DEFAULT_GROUP,對應配置spring.cloud.nacos.config.sharedDataids=shared_1.properties,shared_2.properties
        this.loadSharedConfiguration(composite);
        // 對應spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties的配置
        this.loadExtConfiguration(composite);
        // 加載當前應用配置
        this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
        return composite;
    }
}

// 看一個加載實現便可 流程都差很少 具體實如今NacosPropertySourceBuilder.loadNacosData()方法完成
/******************************************具體實如今NacosPropertySourceBuilder******************************************/
private Properties loadNacosData(String dataId, String group, String fileExtension) {
        String data = null;

        try {
            // 向nacos server拉取配置文件
            data = this.configService.getConfig(dataId, group, this.timeout);
            if (!StringUtils.isEmpty(data)) {
                log.info(String.format("Loading nacos data, dataId: '%s', group: '%s'", dataId, group));
                // spring boot配置固然只支持properties和yaml文件格式
                if (fileExtension.equalsIgnoreCase("properties")) {
                    Properties properties = new Properties();
                    properties.load(new StringReader(data));
                    return properties;
                }

                if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml")) {
                    YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
                    yamlFactory.setResources(new Resource[]{new ByteArrayResource(data.getBytes())});
                    return yamlFactory.getObject();
                }
            }
        } catch (NacosException var6) {
            log.error("get data from Nacos error,dataId:{}, ", dataId, var6);
        } catch (Exception var7) {
            log.error("parse data from Nacos error,dataId:{},data:{},", new Object[]{dataId, data, var7});
        }

        return EMPTY_PROPERTIES;
    }

至此咱們在nacos上配置的properties和yaml文件都載入到spring配置文件中來了,後面可經過context.Environment.getProperty(propertyName)來獲取相關配置信息

配置如何隨spring boot加載進來咱們說完了,接來下來看修改完配置後如何實時刷新

nacos config動態刷新

 當nacos config更新後,根據配置中的refresh屬性來判斷是否刷新配置,配置以下

spring.cloud.nacos.config.ext-config[0].refresh=true

首先sprin.factories 配置了EnableAutoConfiguration=NacosConfigAutoConfiguration,NacosConfigAutoConfiguration配置類會注入一個NacosContextRefresher,它首先監聽了ApplicationReadyEvent,而後註冊一個nacos listener用來監聽nacos config配置修改後發佈一個spring refreshEvent用來刷新配置和應用

public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware

public void onApplicationEvent(ApplicationReadyEvent event) {
    // 只註冊一次
    if (this.ready.compareAndSet(false, true)) {
        this.registerNacosListenersForApplications();
    }
}
    
private void registerNacosListenersForApplications() {
    if (this.refreshProperties.isEnabled()) {
        Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
        while(var1.hasNext()) {
            NacosPropertySource nacosPropertySource = (NacosPropertySource)var1.next();
            // 對應剛纔所說的配置 須要配置文件是否須要刷新
            if (nacosPropertySource.isRefreshable()) {
                String dataId = nacosPropertySource.getDataId();
                // 註冊nacos監聽器
                this.registerNacosListener(nacosPropertySource.getGroup(), dataId);
            }
        }
    }

}
    
private void registerNacosListener(final String group, final String dataId) {
    Listener listener = (Listener)this.listenerMap.computeIfAbsent(dataId, (i) -> {
        return new Listener() {
            public void receiveConfigInfo(String configInfo) {
                NacosContextRefresher.refreshCountIncrement();
                String md5 = "";
                if (!StringUtils.isEmpty(configInfo)) {
                    try {
                        MessageDigest md = MessageDigest.getInstance("MD5");
                        md5 = (new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))).toString(16);
                    } catch (UnsupportedEncodingException | NoSuchAlgorithmException var4) {
                        NacosContextRefresher.log.warn("[Nacos] unable to get md5 for dataId: " + dataId, var4);
                    }
                }
                // 添加刷新記錄
                NacosContextRefresher.this.refreshHistory.add(dataId, md5);
                // 發佈一個spring refreshEvent事件 對應監聽器爲RefreshEventListener 該監聽器會完成配置的更新應用
                NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
                if (NacosContextRefresher.log.isDebugEnabled()) {
                    NacosContextRefresher.log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
                }

            }
            public Executor getExecutor() {
                return null;
            }
        };
    });

    try {
        this.configService.addListener(dataId, group, listener);
    } catch (NacosException var5) {
        var5.printStackTrace();
    }

}

咱們說完了nacos config動態刷新,那麼確定有對應的動態監聽,nacos config會監聽nacos server上配置的更新狀態

nacos config動態監聽

通常來講客戶端和服務端數據交互無非就兩種方式

pull:客戶端主動從服務器拉取數據

push: 由服務端主動向客戶端推送數據

這兩種模式優缺點各不同,pull模式須要考慮的是何時向服務端拉取數據 可能會存在數據延遲問題,而push模式須要客戶端和服務端維護一個長鏈接 若是客戶端較多會給服務端形成壓力 但它的實時性會更好

nacos採用的是pull模式,但它做了優化 能夠看作是pull+push,客戶端會輪詢向服務端發出一個長鏈接請求,這個長鏈接最多30s就會超時,服務端收到客戶端的請求會先判斷當前是否有配置更新,有則當即返回

若是沒有服務端會將這個請求拿住「hold」29.5s加入隊列,最後0.5s再檢測配置文件不管有沒有更新都進行正常返回,但等待的29.5s期間有配置更新能夠提早結束並返回,下面會在源碼中講解具體怎麼處理的

nacos client處理

動態監聽的發起是在ConfigService的實現類NacosConfigService的構造方法中,它是對外nacos config api接口,在以前加載配置文件和NacosContextRefresher構造方法中都會獲取或建立

 

 

 

 這裏都會先判斷是否已經建立了ConfigServer,沒有則實例化一個NacosConfigService,來看它的構造函數

/***************************************** NacosConfigService *****************************************/
public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    // 用來向nacos server發起請求的代理,這裏用到了裝飾模式
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    // 客戶端的一個工做類,agent做爲它的構造傳參 可猜測到裏面確定會作一些遠程調用
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

/***************************************** ClientWorker *****************************************/
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter

    init(properties);
    // 這個線程池只有一個核心線程 用來執行checkConfigInfo()方法
    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    // 其它須要執行線程的地方都交給這個線程池來處理
    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    
    // 執行一個調用checkConfigInfo()方法的週期性任務,每10ms執行一次,首次執行延遲1ms後執行
    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

NacosConfigService構造方法主要建立一個agent 它是用來向nacos server發出請求的,而後又建立了一個clientwoker,它的構造方法建立了兩個線程池,第一個線程池只有一個核心線程,它會執行一個週期性任務只用來調用checkconfiginfo()方法,第二個線程是後續由須要執行線程的地方都交給它來執行,下面重點來看checkconfiginfo()方法

public void checkConfigInfo() {
    // 分任務
    int listenerSize = cacheMap.get().size();
    // 向上取整爲批數
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}
AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());

 cacheMap:緩存着須要刷新的配置,它是在調用ConfigService 添加監聽器方式時會放入,能夠自定義監聽配置刷新

// 添加一個config監聽器,用來監聽dataId爲ErrorCode,group爲DEFAULT_GROUP的config
configService.addListener("ErrorCode","DEFAULT_GROUP",new Listener() {
    @Override
    public Executor getExecutor() {
        return null;
    }

    @Override
    public void receiveConfigInfo(String s) { //當配置更新時會調用監聽器該方法
        Map<String, Map<String, String>> map = JSON.parseObject(s, Map.class);
        // 根據本身的業務須要來處理
    }
});

這裏採用了一個策略:將cacheMap中的數量以3000分一個組,分別建立一個LongPollingRunnable用來監聽配置更新,這個LongPollingRunnable就是咱們以前所說的長鏈接任務,來看這個長鏈接任務

class LongPollingRunnable implements Runnable {
    private int taskId;

    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.get().values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        // 一、檢查本地配置
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // 二、向nacos server發出一個長鏈接 30s超時,返回nacos server有更新過的dataIds
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    // 三、向nacos server請求獲取config最新內容
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                } 
            }
            // 四、對有變化的config調用對應監聽器去處理
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            // 繼續輪詢
            executorService.execute(this);
        } catch (Throwable e) {
            // 發生異常延遲執行
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

 這個長輪詢主要作了4個步驟

  1. 檢查本地配置,若是存在本地配置,而且與緩存中的本地配置版本不同,把本地配置內容更新到緩存,並觸發事件,這塊源碼比較簡單,讀者跟到源碼一讀編制
  2. 向nacos server發出一個長鏈接,30s超時,nacos server會返回有變化的dataIds
  3. 根據變化的dataId,從服務端拉取最新的配置內容而後更新到緩存中
  4. 對有變化的配置 觸發事件監聽器來處理

講完了nacos client處理流程,再來看服務端這邊怎麼處理這個長鏈接的

nacos server處理

服務端長鏈接接口是/config/listener,對應源碼包爲config

/****************************************** ConfigController ******************************************/
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    // 須要檢查更新的config信息
    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    // 長鏈接處理
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

/****************************************** ConfigServletInner ******************************************/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    
    // 判斷是否支持長輪詢
    if (LongPollingService.isSupportLongPolling(request)) {
        // 長輪詢處理
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    
    // 不支持長輪詢,直接與當前配置做比較,返回有變動的配置
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    /*
    * 省略
    * 會響應變動的配置信息
    */
    return HttpServletResponse.SC_OK + "";
}

/****************************************** LongPollingService ******************************************/
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    
    // 服務端這邊最多處理時長29.5s,須要留0.5s來返回,以避免客戶端那邊超時
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        // 不支持長輪詢 本地對比返回
        long start = System.currentTimeMillis();
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            // log....
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            // log....
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);
    
    // 將http響應交給異步線程,返回一個異步響應上下文, 當配置更新後能夠主動調用及時返回,不用非等待29.5s
    final AsyncContext asyncContext = req.startAsync();
    
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L);
    // 執行客戶端長鏈接任務,
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

/****************************************** ClientLongPolling ******************************************/
class ClientLongPolling implements Runnable {
        
    @Override
    public void run() {
        // 提交一個任務,延遲29.5s執行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() {
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this);
                    
                    if (isFixedPolling()) {
                        // 檢查變動配置 並相應
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null);
                        }
                    } else {
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
                
            }
            
        }, timeoutTime, TimeUnit.MILLISECONDS);
        
        allSubs.add(this);
    }
}

final Queue<ClientLongPolling> allSubs

 

上面大部分地方都比較好懂,主要解釋下ClientLongPolling做用,它首先會提交一個任務,不管配置有沒有更新 最終都會進行響應,延遲29.5s執行,而後會把本身添加到一個隊列中,以前說過,服務端這邊配置有更新後 會找出正在等待配置更新的長鏈接任務,提早結束這個任務並返回,

來看這一步是怎麼處理的

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe LocalDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
    
}

class DataChangeTask implements Runnable {
        
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey);
            // 找出等在該配置的長鏈接,而後進行提早返回
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next();
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                        continue;
                    }
                    
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                        continue;
                    }
                    
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships.
                    clientSub.sendResponse(Arrays.asList(groupKey));
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

 

LongPollingService構造函數中,會註冊一個訂閱,用來監聽LocalDataChangeEvent,當發生該事件時,會執行一個數據變動任務,這個任務就是找出等在配置的長鏈接,提早返回

咱們在nacos控制檯修改一個配置文件進行發佈,會調用ConfigController.publishConfig接口,但這個接口發佈的是ConfigDataChangeEvent事件,大意了。。。LocalDataChangeEvent事件發佈在ConfigCacheService,這裏怎麼調用的我就不深追,留給有興趣的讀者

至此nacos config動態監聽、刷新就串聯起來了,nacos的相關源碼都比較好理解,跟着源碼追進去就一目瞭然了

相關文章
相關標籤/搜索