服務端收到客戶端的配置變動請求查詢的長輪訓請求以後,服務端怎麼來處理這個長輪訓呢?算法
上節課講到了配置更新的整個原理及源碼,咱們知道客戶端會有一個長輪訓的任務去檢查服務器端的配置是否發生了變化,若是發生了變動,那麼客戶端會拿到變動的 groupKey 再根據 groupKey 去獲取配置項的最新值更新到本地的緩存以及文件中,那麼這種每次都靠客戶端去請求,那請求的時間間隔設置多少合適呢?spring
若是間隔時間設置的太長的話有可能沒法及時獲取服務端的變動,若是間隔時間設置的過短的話,那麼頻繁的請求對於服務端來講無疑也是一種負擔,因此最好的方式是客戶端每隔一段長度適中的時間去服務端請求,而在這期間若是配置發生變動,服務端可以主動將變動後的結果推送給客戶端,這樣既能保證客戶端可以實時感知到配置的變化,也下降了服務端的壓力。 咱們來看看nacos設置的間隔時間是多久apache
那麼在講解原理以前,先給你們解釋一下什麼叫長輪訓json
客戶端發起一個請求到服務端,服務端收到客戶端的請求後,並不會馬上響應給客戶端,而是先把這個請求hold住,而後服務端會在hold住的這段時間檢查數據是否有更新,若是有,則響應給客戶端,若是一直沒有數據變動,則達到必定的時間(長輪訓時間間隔)才返回。api
長輪訓典型的場景有: 掃碼登陸、掃碼支付。緩存
回到咱們昨天上課講到的代碼,在ClientWorker這個類裏面,找到checkUpdateConfigStr
這個方法,這裏面就是去服務器端查詢發生變化的groupKey。bash
123456789101112131415161718192021222324252627282930313233343536複製代碼 |
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }複製代碼 |
這個方法最終會發起http請求,注意這裏面有一個timeout
的屬性,服務器
12複製代碼 |
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);複製代碼 |
timeout是在init這個方法中賦值的,默認狀況下是30秒,能夠經過configLongPollTimeout進行修改mvc
12345複製代碼 |
private void init(Properties properties) { this.timeout = (long)Math.max(NumberUtils.toInt(properties.getProperty("configLongPollTimeout"), 30000), 10000); this.taskPenaltyTime = NumberUtils.toInt(properties.getProperty("configRetryTime"), 2000); this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig")); }複製代碼 |
因此從這裏得出的一個基本結論是app
客戶端發起一個輪詢請求,超時時間是30s。 那麼客戶端爲何要等待30s才超時呢?不是越快越好嗎?
咱們能夠在nacos的日誌目錄下$NACOS_HOME/nacos/logs/config-client-request.log
文件
123456複製代碼 |
2019-08-04 13:22:19,736|0|nohangup|127.0.0.1|polling|1|55|02019-08-04 13:22:49,443|29504|timeout|127.0.0.1|polling|1|552019-08-04 13:23:18,983|29535|timeout|127.0.0.1|polling|1|552019-08-04 13:23:48,493|29501|timeout|127.0.0.1|polling|1|552019-08-04 13:24:18,003|29500|timeout|127.0.0.1|polling|1|552019-08-04 13:24:47,509|29501|timeout|127.0.0.1|polling|1|55複製代碼 |
能夠看到一個現象,在配置沒有發生變化的狀況下,客戶端會等29.5s以上,才請求到服務器端的結果。而後客戶端拿到服務器端的結果以後,在作後續的操做。
若是在配置變動的狀況下,因爲客戶端基於長輪訓的鏈接保持,因此返回的時間會很是的短,咱們能夠作個小實驗,在nacos console中頻繁修改數據而後再觀察一下
config-client-request.log
的變化
12345複製代碼 |
2019-08-04 13:30:17,016|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:17,022|3|null|127.0.0.1|get|example|DEFAULT_GROUP||e10e4d5973c497e490a8d7a9e4e9be64|unknown2019-08-04 13:30:20,807|10|true|0:0:0:0:0:0:0:1|publish|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|null2019-08-04 13:30:20,843|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:20,848|1|null|127.0.0.1|get|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|unknown複製代碼 |
分析完客戶端以後,隨着好奇心的驅使,服務端是如何處理客戶端的請求的?那麼一樣,咱們須要思考幾個問題
客戶端發送的請求地址是:/v1/cs/configs/listener
找到服務端對應的方法
nacos是使用spring mvc提供的rest api。這裏面會調用inner.doPollingConfig進行處理
123456789101112131415161718192021複製代碼 |
@RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }複製代碼 |
這個方法中,兼容了長輪訓和短輪詢的邏輯,咱們只須要關注長輪訓的部分。再次進入到longPollingService.addLongPollingClient
12345678910111213141516171819202122232425262728293031323334353637383940複製代碼 |
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 長輪詢 if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // else 兼容短輪詢邏輯 List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短輪詢result String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); /** * 2.0.4版本之前, 返回值放入header中 */ if (versionNum < START_LONGPOLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } // 禁用緩存 response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }複製代碼 |
從方法名字上能夠推測出,這個方法應該是把客戶端的長輪訓請求添加到某個任務中去。
isFixedPolling=true
的狀況下,不會提早返回響應generateResponse
將結果返回scheduler.execute
啓動了一個定時任務,將客戶端的長輪詢請求封裝成一個叫 ClientLongPolling 的任務,交給 scheduler 去執行12345678910111213141516171819202122232425262728293031323334353637383940複製代碼 |
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //str表示超時時間,也就是timeout String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提早500ms返回響應,爲避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance */ long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout } else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 必定要由HTTP線程調用,不然離開後容器會當即發送響應 final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超時時間不許,因此只能本身控制 asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }複製代碼 |
接下來咱們來分析一下,clientLongPolling到底作了什麼操做。或者說咱們能夠先猜想一下應該會作什麼事情
基於這些猜測,咱們能夠看看它的實現過程
從代碼粗粒度來看,它的實現彷佛和咱們的猜測一致,在run方法中,經過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會經過MD5Util.compareMd5來進行計算
那另一個,當數據發生變化之後,確定不能等到29.5s以後才通知呀,那怎麼辦呢?咱們發現有一個allSubs
的東西,它彷佛和發佈訂閱有關係。那是否是有可能當前的clientLongPolling訂閱了數據變化的事件呢?
12345678910111213141516171819202122232425262728293031323334353637383940414243複製代碼 |
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMil·1·s()); /** * 刪除訂閱關係 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this);}複製代碼 |
allSubs是一個隊列,隊列裏面放了ClientLongPolling這個對象。這個隊列彷佛和配置變動有某種關聯關係
123456複製代碼 |
/** * 長輪詢訂閱關係 */final Queue<ClientLongPolling> allSubs;allSubs.add(this);複製代碼 |
那這個時候,個人第一想法是,先去看一下當前這個類的類圖,發現LongPollingService集成了AbstractEventListener,事件監聽?果真沒猜錯。
這裏面有一個抽象的onEvent方法,明顯是用來處理事件的方法,而抽象方法必須由子類實現,因此意味着LongPollingService裏面確定實現了onEvent方法
1234567891011121314151617181920212223複製代碼 |
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感興趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 處理事件 * * @param event event */ abstract public void onEvent(Event event); }複製代碼 |
這個事件的實現方法中
1234567891011複製代碼 |
@Override public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }複製代碼 |
從名字能夠看出來,這個是數據變化的任務,最讓人興奮的應該是,它裏面有一個循環迭代器,從allSubs裏面得到ClientLongPolling
最後經過clientSub.sendResponse把數據返回到客戶端。因此,這也就可以理解爲什麼數據變化可以實時觸發更新了。
12345678910111213141516171819202122232425262728293031複製代碼 |
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 若是beta發佈且不在beta列表直接跳過 if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 若是tag發佈且不在tag列表直接跳過 if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 刪除訂閱關係 LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); }}複製代碼 |
那麼接下來還有一個疑問是,數據變化以後是如何觸發事件的呢? 因此咱們定位到數據變化的請求類中,在ConfigController這個類中,找到POST請求的方法
找到配置變動的位置, 發現數據持久化以後,會經過EventDispatcher進行事件發佈EventDispatcher.fireEvent
可是這個事件彷佛不是咱們所關心的時間,緣由是這裏發佈的事件是ConfigDataChangeEvent
, 而LongPollingService感興趣的事件是LocalDataChangeEvent
1234567891011121314151617181920212223複製代碼 |
@RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(...) throws NacosException { //省略部分代碼 ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }複製代碼 |
後來我發現,在Nacos中有一個DumpService,它會定時把變動後的數據dump到磁盤上,DumpService在spring啓動以後,會調用init方法啓動幾個dump任務。而後在任務執行結束以後,會觸發一個LocalDataChangeEvent 的事件
1234567複製代碼 |
@PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);複製代碼 |
簡單總結一下剛剛分析的整個過程。
因此總的來講,Nacos採用推+拉的形式,來解決最開始關於長輪訓時間間隔的問題。固然,30s這個時間是能夠設置的,而之因此定30s,應該是一個經驗值。
Nacos支持集羣模式,很顯然。
而一旦涉及到集羣,就涉及到主從,那麼nacos是一種什麼樣的機制來實現的集羣呢?
nacos的集羣相似於zookeeper, 它分爲leader角色和follower角色, 那麼從這個角色的名字能夠看出來,這個集羣存在選舉的機制。 由於若是本身不具有選舉功能,角色的命名可能就是master/slave了,固然這只是我基於這麼多組件的命名的一個猜想
Nacos集羣採用raft算法來實現,它是相對zookeeper的選舉算法較爲簡單的一種。
選舉算法的核心在RaftCore
中,包括數據的處理和數據同步
在Raft中,節點有三種角色:
選舉分爲兩個節點
全部節點啓動的時候,都是follower狀態。 若是在一段時間內若是沒有收到leader的心跳(多是沒有leader,也多是leader掛了),那麼follower會變成Candidate。而後發起選舉,選舉以前,會增長term,這個term和zookeeper中的epoch的道理是同樣的。
選舉的幾種狀況
對於事務操做,請求會轉發給leader
非事務操做上,能夠任意一個節點來處理
下面這段代碼摘自 RaftCore , 在發佈內容的時候,作了兩個事情
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970複製代碼 |
public void signalPublish(String key, Record value) throws Exception { if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; } try { OPERATE_LOCK.lock(); long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } JSONObject json = new JSONObject(); json.put("datum", datum); json.put("source", peers.local()); onPublish(datum, peers.local()); final String content = JSON.toJSONString(json); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); }複製代碼 |
#
(^U^)ノ~YO同窗,你已經看到告終尾嗎?
看到這裏,愛學習的你是否是在上班、學習和讀文章的時候有什麼問題想要大牛和大神解答呢?
因此,快來點擊這裏�(☄⊙ω⊙)☄
gper.club/answers/7e7…提出問題讓大牛大神們給你解惑吧~