咱們從原生SDK代碼中入手,能夠發現最核心的兩行代碼:算法
ConfigService configService=NacosFactory.createConfigService(properties);
String content=configService.getConfig(dataId,groupId,3000);
首先咱們先來看 NacosFactory.createConfigService :spring
public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class);
//調用反射建立一個NacosConfigService實例 ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
這一步的代碼很簡單,及經過類的全類名經過反射建立一個 NacosConfigService 實例,咱們跟進該類的構造方法:sql
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); }//初始化命名空間 initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager, properties); }
這一步主要初始化了 agent 與 worker 兩個實例。這裏又看到熟悉的包裝器模式,將ServerHttpAgent 包裝成MetricsHttpAgent,這裏咱們須要知道,其中MetricsHttpAgent是對ServerHttpAgent功能的拓展,核心功能仍是由ServerHttpAgent去實現,接下去咱們來看一下 worker 的初始化,從名字上看能知道 最後真的工做的是他:apache
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter // 初始化一些參數
init(properties); //建立了一個定時任務的線程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //建立了一個保持長鏈接的線程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //建立了一個延遲任務線程池來每隔10ms來檢查配置信息的線程池
executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
這一步建立了兩個線程池,第一個線程池負責與配置中心進行數據的交互,而且啓動後延遲1ms,以後每隔10ms對配置信息進行定時檢查,第二個線程池則是負責保持一個長鏈接。咱們再服務啓動以後便會執行 checkConfigInfo(),跟進去看看:api
public void checkConfigInfo() { // 分任務(解決大數據量的傳輸問題)
int listenerSize = cacheMap.get().size(); // 向上取整爲批數,分批次進行檢查 // ParamUtil.getPerTaskConfigSize() =3000
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); // currentLongingTaskCount =0
if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務是否在執行 這塊須要好好想一想。 任務列表如今是無序的。變化過程可能有問題
executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
這裏主要是先去除緩存中 Map<String, CacheData> 的數量,爲避免處理過量的數據,這裏對緩存數據進行了分組,最後建立 LongPollingRunnable 去執行,能夠知道 這裏會進入 LongPollingRunnable 的 Run 方法:緩存
public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config
for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { //檢查本地配置
checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) {
//檢查緩存的MD5 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } //檢查服務端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //將配置設置進緩存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
總的來講,該方法主要流程是先檢查本地緩存,再檢查服務端的配置,由改變最後再回寫到本地及加載到緩存。服務器
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant;
//本地文件緩存 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 沒有 -> 有 //不使用本地配置,可是持久化文件存在,須要讀取文件加載至內存
if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到配置後通知。 //使用本地配置,可是持久化文件不存在
if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變動 //使用本地配置,持久化文件存在,緩存跟文件最後修改時間不一致
if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
本地檢查主要是經過是否使用本地配置,繼而尋找持久化緩存文件,再經過判斷文件的最後修改事件與本地緩存的版本是否一致來判斷是否由變動。本地檢查完畢,若是使用本地配置會進入下列代碼:mvc
if (cacheData.isUseLocalConfigInfo()) { //檢查緩存的MD5
cacheData.checkListenerMd5(); }
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
//MD5由變動,說明數據變動
if (!md5.equals(wrap.lastCallMd5)) {
//執行回調
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
本地檢查完畢會進行遠程服務器檢查:app
//檢查服務端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
這裏會去獲取一個發生變化的GroupKeys 集合:dom
/** * 從Server獲取值變化了的DataID列表。返回的對象裏只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // cacheData 首次出如今cacheMap中&首次check更新
inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
這裏將可能發生變化的配置信息封裝成一個 StringBuilder ,繼而調用 checkUpdateConfigStr:
/** * 從Server獲取值變化了的DataID列表。返回的對象裏只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//發起一個Post請求 HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
就這樣從Server獲取值變化了的DataID列表。返回的對象裏只有dataId和group是有效的。 保證不返回NULL。獲取到這個列表之後就便利這個列表,去服務器端獲取對應變動後的配置:
for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //將配置設置進緩存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } }
這裏會發起請求從服務器端獲取配置:getServerConfig:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
經過初始化時候的 agent.httpGet 去發起一個Get請求,就這樣變動本例的配置,當從遠程服務器獲取玩配置之後還有一個循環:
for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } }
這個循環主要是對有變化的配置進行監聽回調。整個流程就差不都完成了,最後來一張流程圖:
咱們知道客戶端會有一個長輪訓的任務去檢查服務器端的配置是否發生了變化,若是發生了變動,那麼客戶端會拿到變動的 groupKey 再根據 groupKey 去獲取配置項的最新值更新到本地的緩存以及文件中,那麼這種每次都靠客戶端去請求,那請求的時間間隔設置多少合適呢?
若是間隔時間設置的太長的話有可能沒法及時獲取服務端的變動,若是間隔時間設置的過短的話,那麼頻繁的請求對於服務端來講無疑也是一種負擔,因此最好的方式是客戶端每隔一段長度適中的時間去服務端請求,而在這期間若是配置發生變動,服務端可以主動將變動後的結果推送給客戶端,這樣既能保證客戶端可以實時感知到配置的變化,也下降了服務端的壓力。 咱們來看看nacos設置的間隔時間是多久。
客戶端發起一個請求到服務端,服務端收到客戶端的請求後,並不會馬上響應給客戶端,而是先把這個請求hold住,而後服務端會在hold住的這段時間檢查數據是否有更新,若是有,則響應給客戶端,若是一直沒有數據變動,則達到必定的時間(長輪訓時間間隔)才返回。
長輪訓典型的場景有: 掃碼登陸、掃碼支付。
在ClientWorker這個類裏面,找到 checkUpdateConfigStr 這個方法,這裏面就是去服務器端查詢發生變化的groupKey。
/** * 從Server獲取值變化了的DataID列表。返回的對象裏只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//客戶端發送的請求地址是: /v1/cs/configs/listener HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
這個方法最終會發起http請求,注意這裏面有一個 timeout 的屬性,
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
timeout是在init這個方法中賦值的,默認狀況下是30秒,能夠經過configLongPollTimeout進行修改
private void init(Properties properties) { // 默認長輪詢的事件就是30S timeout = Math.max(NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), //public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; //public static final int MIN_CONFIG_LONG_POLL_TIMEOUT = 10000; Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); taskPenaltyTime = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); }
因此從這裏得出的一個基本結論是:客戶端發起一個輪詢請求,超時時間是30s。 那麼客戶端爲何要等待30s才超時呢?不是越快越好嗎? 咱們能夠在nacos的日誌目錄下 $NACOS_HOME/nacos/logs/config-client-request.log 文件.
能夠看到一個現象,在配置沒有發生變化的狀況下,客戶端會等29.5s以上,才請求到服務器端的結果。而後客戶端拿到服務器端的結果以後,在作後續的操做。當服務器端頻繁的修改,那麼服務器端頻繁客戶端進行推送.
服務端是如何處理客戶端的請求的?那麼一樣,咱們須要思考幾個問題:
nacos是使用spring mvc提供的rest api,其中有個類是 ConfigController ,咱們在其中找到了Post 請求的 listener 路徑的接口方法:
/** * 比較MD5 */ @RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
先是獲取了客戶端的MD5集合,這裏面會調用inner.doPollingConfig進行處理,這個方法中,兼容了長輪訓和短輪詢的邏輯,咱們只須要關注長輪訓的部分:
/** * 輪詢接口 */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 長輪詢 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }
......//省略代碼
}
這裏咱們進入長輪詢的代碼塊:
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //超時時間 String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提早500ms返回響應,爲避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 必定要由HTTP線程調用,不然離開後容器會當即發送響應 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超時時間不許,因此只能本身控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
這個方法是把客戶端的長輪訓請求添加到任務中去。得到客戶端傳遞過來的超時時間,而且進行本地計算,提早500ms返回響應,這就能解釋爲何客戶端響應超時時間是29.5+了。固然若是 isFixedPolling=true 的狀況下,不會提早返回響應根據客戶端請求過來的md5和服務器端對應的group下對應內容的md5進行比較,若是不一致,則經過 generateResponse 將結果返回若是配置文件沒有發生變化,則經過 scheduler.execute 啓動了一個定時任務,將客戶端的長輪詢請求封裝成一個叫 ClientLongPolling 的任務,交給 scheduler 去執行,那麼接下去必定會進入ClientLongPolling 的Run 方法:
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 刪除訂閱關係 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
//有變化當即執行返回 if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } //延遲29.5秒後執行 }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
在run方法中,經過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會經過MD5Util.compareMd5來進行計算那另一個,當數據發生變化之後,確定不能等到29.5s以後才通知呀,那怎麼辦呢?咱們發現有一個allSubs 的東西,它彷佛和發佈訂閱有關係。那是否是有可能當前的clientLongPolling訂閱了數據變化的事件呢?allSubs是一個隊列,隊列裏面放了ClientLongPolling這個對象。這個隊列彷佛和配置變動有某種關聯關係:
/** * 長輪詢訂閱關係 */ final Queue<ClientLongPolling> allSubs;
註釋裏寫明瞭他是和長輪詢訂閱相關的,接着咱們先來看一下他所歸屬的類的類圖:
發現LongPollingService集成了AbstractEventListener,事件監聽.
AbstractEventListener:
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感興趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 處理事件 * * @param event event */ abstract public void onEvent(Event event); }
這裏面有一個抽象的onEvent方法,明顯是用來處理事件的方法,而抽象方法必須由子類實現,因此意味着LongPollingService裏面確定實現了onEvent方法:
public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
因此到了這裏,確定是修改了配置以後會有一個觸發點去出發該事件,當匹配上事件類型,那麼就會去執行這個回調,這個事件的實現方法中判斷事件類型是否爲LocalDataChangeEvent,經過scheduler.execute執行DataChangeTask這個任務:
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 若是beta發佈且不在beta列表直接跳過 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 若是tag發佈且不在tag列表直接跳過 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 刪除訂閱關係 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } }
這個是數據變化的任務,最讓人興奮的應該是,它裏面有一個循環迭代器,從allSubs裏面得到ClientLongPolling最後經過clientSub.sendResponse把數據返回到客戶端。因此,這也就可以理解爲什麼數據變化可以實時觸發更新了。
那麼接下來還有一個疑問是,數據變化以後是如何觸發事件的呢? 因此咱們定位到數據變化的請求類中,在ConfigController這個類中,找到POST請求的方法找到配置變動的位置:
/** * 增長或更新非聚合數據。 * * @throws NacosException */ @RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam("content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { final String srcIp = RequestUtil.getRemoteIp(request); String requestIpApp = RequestUtil.getAppName(request); ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); ......//省略代碼 final Timestamp time = TimeUtils.getCurrentTime(); String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }
發現數據持久化以後,會經過EventDispatcher進行事件發佈EventDispatcher.fireEvent 可是這個事件彷佛不是咱們所關心的時間,緣由是這裏發佈的事件是ConfigDataChangeEvent , 而LongPollingService感興趣的事件是 LocalDataChangeEvent。
在Nacos中有一個DumpService,它會定時把變動後的數據dump到磁盤上,DumpService在spring啓動以後,會調用init方法啓動幾個dump任務。而後在任務執行結束以後,會觸發一個LocalDataChangeEvent 的事件:
@PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
......//省略代碼
}
其中在 DumpProcessor的 process方法中會調用 ConfigService 的相關API對數據進行操做,其中調用 remove 後會傳播這麼一個事件:
/** * 刪除配置文件,刪除緩存。 */ static public boolean remove(String dataId, String group, String tenant) { final String groupKey = GroupKey2.getKey(dataId, group, tenant); final int lockResult = tryWriteLock(groupKey); /** * 數據不存在 */ if (0 == lockResult) { dumpLog.info("[remove-ok] {} not exist.", groupKey); return true; } /** * 加鎖失敗 */ if (lockResult < 0) { dumpLog.warn("[remove-error] write lock failed. {}", groupKey); return false; } try { if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) { DiskUtil.removeConfigInfo(dataId, group, tenant); } CACHE.remove(groupKey); EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey)); return true; } finally { releaseWriteLock(groupKey); } }
簡單總結一下剛剛分析的整個過程。
因此總的來講,Nacos採用推+拉的形式,來解決最開始關於長輪訓時間間隔的問題。固然,30s這個時間是能夠設置的,而之因此定30s,應該是一個經驗值。
Nacos支持集羣模式,很顯然。而一旦涉及到集羣,就涉及到主從,那麼nacos是一種什麼樣的機制來實現的集羣呢?
nacos的集羣相似於zookeeper, 它分爲leader角色和follower角色, 那麼從這個角色的名字能夠看出來,這個集羣存在選舉的機制。 由於若是本身不具有選舉功能,角色的命名可能就是master/slave了,
Nacos集羣採用 raft 算法來實現,它是相對zookeeper的選舉算法較爲簡單的一種。選舉算法的核心在 RaftCore 中,包括數據的處理和數據同步.
raft算法動畫演示地址:http://thesecretlivesofdata.com/raft/ 。能夠很直觀的看到整個算法選舉的過程。
在Raft中,節點有三種角色:
選舉分爲兩個節點:
全部節點啓動的時候,都是follower狀態。 若是在一段時間內若是沒有收到leader的心跳(多是沒有leader,也多是leader掛了),那麼follower會變成Candidate。而後發起選舉,選舉以前,會增長term,這個term和zookeeper中的epoch的道理是同樣的。
follower會投本身一票,而且給其餘節點發送票據vote,等到其餘節點回覆在這個過程當中,可能出現幾種狀況
約束條件在任一term中,單個節點最多隻能投一票
選舉的幾種狀況:
在動畫演示中能夠看到選舉超時後,即每一個小球外圍都變化先消失的座位候選人,接着發出請求讓其餘人投票選舉本身,同時修改Term:
與Zookeeper同樣,對於事務操做,請求會轉發給leader,非事務操做上,能夠任意一個節點來處理.