Nacos - NacosNamingService初始化中提到NacosNamingService初始化會初始化EventDispatcher、NamingProxy、BeatReactor、HostReactor。其中EventDispatcher已經說了,NamingProxy的定時任務主要是默認每30毫秒更新服務器地址、默認每5毫秒登錄獲取token等信息,這裏過了。BeatReactor初始化的時候並無開啓定時任務,後面來講,那隻剩下HostReactor了。
咱們看看他的構造函數,會建立一個FailoverReactor和PushReceiver對象。json
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { // 其餘略 this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); }
FailoverReactor的構造函數,會調用他的init方法:segmentfault
public FailoverReactor(HostReactor hostReactor, String cacheDir) { //其餘略 this.init(); }
在init裏會有三個任務:緩存
public void init() { executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS); executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES); // backup file on startup if failover directory is empty. executorService.schedule(new Runnable() { //其餘略 }, 10000L, TimeUnit.MILLISECONDS); }
FailoverReactor.SwitchRefresher,默認每5秒檢測是否開啓故障轉移,若是開啓,則把文件數據讀入serviceMap。服務器
class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L; @Override public void run() { try { // 其餘略 switchParams.put("failover-mode", "true"); NAMING_LOGGER.info("failover-mode is on"); // 故障轉移的時候調用FailoverFileReader#run new FailoverFileReader().run(); } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read failover switch.", e); } } } class FailoverFileReader implements Runnable { @Override public void run() { Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16); BufferedReader reader = null; try { // 其餘略 // 讀取文件信息,賦值給dom,存入domMap for (File file : files) { // 其餘略 ServiceInfo dom = new ServiceInfo(file.getName()); // 其餘略 dom = JacksonUtils.toObj(json, ServiceInfo.class); if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to read cache file", e); } // domMap的值賦值給serviceMap if (domMap.size() > 0) { serviceMap = domMap; } } }
PushReceiver實現了Runnable接口,在構造函數中把本身放入了線程池。dom
public PushReceiver(HostReactor hostReactor) { try { //其餘略 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } }
在run中,經過while一直監聽UDP數據,並根據不一樣的type進行處理數據,處理後響應請求。socket
@Override public void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 獲取UDP數據 udpSocket.receive(packet); String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; // 根據不一樣的type進行處理數據 if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { hostReactor.processServiceJson(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } // 響應請求 udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while receiving push data", e); } } }
HostReactor的建立任務包括每5秒檢測是否開啓故障轉移,若是開啓,則把文件數據讀入serviceMap、天天把服務信息寫入本地、檢測本地緩存文件,若是沒有則建立緩存文件、監聽UDP請求。ide