Nacos - HostReactor的建立

Nacos - NacosNamingService初始化中提到NacosNamingService初始化會初始化EventDispatcher、NamingProxy、BeatReactor、HostReactor。其中EventDispatcher已經說了,NamingProxy的定時任務主要是默認每30毫秒更新服務器地址、默認每5毫秒登錄獲取token等信息,這裏過了。BeatReactor初始化的時候並無開啓定時任務,後面來講,那隻剩下HostReactor了。
咱們看看他的構造函數,會建立一個FailoverReactor和PushReceiver對象。json

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
        String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
    // 其餘略
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushReceiver = new PushReceiver(this);
}

FailoverReactor

FailoverReactor的構造函數,會調用他的init方法:segmentfault

public FailoverReactor(HostReactor hostReactor, String cacheDir) {
    //其餘略
    this.init();
}

在init裏會有三個任務:緩存

  1. FailoverReactor.SwitchRefresher,默認每5秒檢測是否開啓故障轉移,若是開啓,則把文件數據讀入serviceMap。
  2. FailoverReactor.DiskFileWriter,默認天天把服務信息寫入本地。
  3. 建立10秒後調用DiskFileWriter#run,檢測本地緩存文件,若是沒有則建立緩存文件。
public void init() {
        
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
    
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
    
    // backup file on startup if failover directory is empty.
    executorService.schedule(new Runnable() {
        //其餘略
    }, 10000L, TimeUnit.MILLISECONDS);
}

FailoverReactor.SwitchRefresher,默認每5秒檢測是否開啓故障轉移,若是開啓,則把文件數據讀入serviceMap。服務器

class SwitchRefresher implements Runnable {       
    long lastModifiedMillis = 0L;    
    @Override
    public void run() {
        try {
            // 其餘略    
            switchParams.put("failover-mode", "true");
            NAMING_LOGGER.info("failover-mode is on");
            // 故障轉移的時候調用FailoverFileReader#run
            new FailoverFileReader().run();
        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
        }
    }
}

class FailoverFileReader implements Runnable {
    
    @Override
    public void run() {
        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
        
        BufferedReader reader = null;
        try {
            // 其餘略
            // 讀取文件信息,賦值給dom,存入domMap
            for (File file : files) {
                // 其餘略
                ServiceInfo dom = new ServiceInfo(file.getName());                
                // 其餘略
                dom = JacksonUtils.toObj(json, ServiceInfo.class);            
                if (!CollectionUtils.isEmpty(dom.getHosts())) {
                    domMap.put(dom.getKey(), dom);
                }
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }
        // domMap的值賦值給serviceMap
        if (domMap.size() > 0) {
            serviceMap = domMap;
        }
    }
}

PushReceiver

PushReceiver實現了Runnable接口,在構造函數中把本身放入了線程池。dom

public PushReceiver(HostReactor hostReactor) {
    try {
        //其餘略
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

在run中,經過while一直監聽UDP數據,並根據不一樣的type進行處理數據,處理後響應請求。socket

@Override
public void run() {
    while (!closed) {
        try {
            
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // 獲取UDP數據
            udpSocket.receive(packet);
            
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            // 根據不一樣的type進行處理數據
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                hostReactor.processServiceJson(pushPacket.data);
                
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
            }
            // 響應請求
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

總結

HostReactor的建立任務包括每5秒檢測是否開啓故障轉移,若是開啓,則把文件數據讀入serviceMap、天天把服務信息寫入本地、檢測本地緩存文件,若是沒有則建立緩存文件、監聽UDP請求。ide

相關文章
相關標籤/搜索