Nacos - 服務端處理註冊請求

Nacos - 客戶端註冊已經講過了,那這裏講一下服務端是怎麼處理請求的。
處理客戶的請求在InstanceController裏,咱們看看register方法。segmentfault

InstanceController#register

這裏主要是封裝Instance,並調用serviceManager的registerInstance方法進行服務註冊。緩存

public String register(HttpServletRequest request) throws Exception {
    // 獲取namespaceId
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 獲取serviceName
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    // 驗證serviceName的合法性
    NamingUtils.checkServiceNameFormat(serviceName);
    // 封裝並驗證Instance的合法性
    final Instance instance = parseInstance(request);
    // 服務註冊
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

ServiceManager#registerInstance

判斷是否已經註冊過,若是沒有註冊,則建立一個Service並註冊,而後再添加實例。ide

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 是否已經註冊過,若是沒有註冊,則建立一個Service並註冊
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 從註冊的服務中取Service
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // 添加實例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

ServiceManager#createEmptyService

直接調用createServiceIfAbsent方法。ui

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
    createServiceIfAbsent(namespaceId, serviceName, local, null);
}

ServiceManager#createServiceIfAbsent

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    // 獲取Service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        // 沒有獲取到則建立
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        // checksum用於校驗的
        service.recalculateChecksum();
        if (cluster != null) {
            // 加入到集羣
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        // 驗證服務和集羣名稱的合法性
        service.validate();
        // 放入緩存並檢查心跳
        putServiceAndInit(service);
        if (!local) {
            // 一致性協議保存
            addOrReplaceService(service);
        }
    }
}

ServiceManager#putServiceAndInit

Service存入serviceMap緩存,並每5秒健康檢查spa

private void putServiceAndInit(Service service) throws NacosException {
    // 存入serviceMap緩存
    putService(service);
    // 每5秒健康檢查,包括service和cluster
    service.init();
    // 添加監聽,包括臨時和永久
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

ServiceManager#getService

從serviceMap緩存中取值。debug

public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    return chooseServiceMap(namespaceId).get(serviceName);
}

public Map<String, Service> chooseServiceMap(String namespaceId) {
    return serviceMap.get(namespaceId);
}

ServiceManager#addInstance

保存實例code

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 獲取實例key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // 從緩存serviceMap獲取Service
    Service service = getService(namespaceId, serviceName);
    
    synchronized (service) {
        // 獲取service的全部實例
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 一致性保存,實際調用DistroConsistencyServiceImpl#put
        consistencyService.put(key, instances);
    }
}

ServiceManager#addIpAddresses

直接調用updateIpAddresses方法orm

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

ServiceManager#updateIpAddresses

獲取service的全部實例,這裏會更新老數據blog

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 獲取舊數據
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    // 獲取集羣中全部的實例
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();
    // 遍歷全部實例,key爲ip+端口
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    // 定義新數據
    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        // 若是有老數據,經過currentInstances來更新健康狀態和心跳時間
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        // 沒有就建立一個
        instanceMap = new HashMap<>(ips.length);
    }
    
    for (Instance instance : ips) {
        // 不存在,就建立一個Cluster集羣,並開啓健康檢查
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            // 從新建立一個
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            // 開啓健康檢查
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }
        // 刪除操做的話,就刪除實例
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            // 新增實例,設置惟一id
            instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }
    // 返回全部實例
    return new ArrayList<>(instanceMap.values());
}

DistroConsistencyServiceImpl#put

若是是臨時,則加入緩存,並放入阻塞隊列。隊列

public void put(String key, Record value) throws NacosException {
    onPut(key, value);
    // 一致性服務
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}


public void onPut(String key, Record value) {
    // 臨時加入dataStore緩存
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    
    if (!listeners.containsKey(key)) {
        return;
    }
    // 在notifier的阻塞隊列加入ArrayBlockingQueue
    notifier.addTask(key, DataOperation.CHANGE);
}

DistroConsistencyServiceImp.Notifierl#run

調用handle方法

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    
    for (; ; ) {
        try {
            Pair<String, DataOperation> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

DistroConsistencyServiceImp.Notifierl#handle

對事件進行通知

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();
        
        services.remove(datumKey);
        
        int count = 0;
        // 沒有監聽,返回
        if (!listeners.containsKey(datumKey)) {
            return;
        }
        
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == DataOperation.CHANGE) {
                    // 處理變動事件
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == DataOperation.DELETE) {
                    // 處理刪除事件
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

總結

  1. 服務、集羣若是不存在,則建立,並註冊監聽事件。
  2. 開啓服務、集羣的健康檢查。
  3. 若是有舊服務數據,則更新健康狀態和心跳時間。
  4. 節點的數據一致性。
  5. 調用監聽。

image

相關文章
相關標籤/搜索