Alibaba Nacos配置中心服務端處理源碼

服務端收到客戶端的配置變動請求查詢的長輪訓請求以後,服務端怎麼來處理這個長輪訓呢?算法

長輪訓的時間間隔

上節課講到了配置更新的整個原理及源碼,咱們知道客戶端會有一個長輪訓的任務去檢查服務器端的配置是否發生了變化,若是發生了變動,那麼客戶端會拿到變動的 groupKey 再根據 groupKey 去獲取配置項的最新值更新到本地的緩存以及文件中,那麼這種每次都靠客戶端去請求,那請求的時間間隔設置多少合適呢?spring

若是間隔時間設置的太長的話有可能沒法及時獲取服務端的變動,若是間隔時間設置的過短的話,那麼頻繁的請求對於服務端來講無疑也是一種負擔,因此最好的方式是客戶端每隔一段長度適中的時間去服務端請求,而在這期間若是配置發生變動,服務端可以主動將變動後的結果推送給客戶端,這樣既能保證客戶端可以實時感知到配置的變化,也下降了服務端的壓力。 咱們來看看nacos設置的間隔時間是多久apache

長輪訓的概念

那麼在講解原理以前,先給你們解釋一下什麼叫長輪訓json

客戶端發起一個請求到服務端,服務端收到客戶端的請求後,並不會馬上響應給客戶端,而是先把這個請求hold住,而後服務端會在hold住的這段時間檢查數據是否有更新,若是有,則響應給客戶端,若是一直沒有數據變動,則達到必定的時間(長輪訓時間間隔)才返回。api

長輪訓典型的場景有: 掃碼登陸、掃碼支付。緩存

1564895144314

客戶端長輪訓

回到咱們昨天上課講到的代碼,在ClientWorker這個類裏面,找到checkUpdateConfigStr這個方法,這裏面就是去服務器端查詢發生變化的groupKey。bash

123456789101112131415161718192021222324252627282930313233343536複製代碼
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);        List<String> headers = new ArrayList<String>(2);        headers.add("Long-Pulling-Timeout");        headers.add("" + timeout);        // told server do not hang me up if new initializing cacheData added in        if (isInitializingCacheList) {            headers.add("Long-Pulling-Timeout-No-Hangup");            headers.add("true");        }        if (StringUtils.isBlank(probeUpdateString)) {            return Collections.emptyList();        }        try {            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,                agent.getEncode(), timeout);            if (HttpURLConnection.HTTP_OK == result.code) {                setHealthServer(true);                return parseUpdateDataIdResponse(result.content);            } else {                setHealthServer(false);                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);            }        } catch (IOException e) {            setHealthServer(false);            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);            throw e;        }        return Collections.emptyList();    }複製代碼

這個方法最終會發起http請求,注意這裏面有一個timeout的屬性,服務器

12複製代碼
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,                agent.getEncode(), timeout);複製代碼

timeout是在init這個方法中賦值的,默認狀況下是30秒,能夠經過configLongPollTimeout進行修改mvc

12345複製代碼
private void init(Properties properties) {        this.timeout = (long)Math.max(NumberUtils.toInt(properties.getProperty("configLongPollTimeout"), 30000), 10000);        this.taskPenaltyTime = NumberUtils.toInt(properties.getProperty("configRetryTime"), 2000);        this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig"));    }複製代碼

因此從這裏得出的一個基本結論是app

客戶端發起一個輪詢請求,超時時間是30s。 那麼客戶端爲何要等待30s才超時呢?不是越快越好嗎?

客戶端長輪訓的時間間隔

咱們能夠在nacos的日誌目錄下$NACOS_HOME/nacos/logs/config-client-request.log文件

123456複製代碼
2019-08-04 13:22:19,736|0|nohangup|127.0.0.1|polling|1|55|02019-08-04 13:22:49,443|29504|timeout|127.0.0.1|polling|1|552019-08-04 13:23:18,983|29535|timeout|127.0.0.1|polling|1|552019-08-04 13:23:48,493|29501|timeout|127.0.0.1|polling|1|552019-08-04 13:24:18,003|29500|timeout|127.0.0.1|polling|1|552019-08-04 13:24:47,509|29501|timeout|127.0.0.1|polling|1|55複製代碼

能夠看到一個現象,在配置沒有發生變化的狀況下,客戶端會等29.5s以上,才請求到服務器端的結果。而後客戶端拿到服務器端的結果以後,在作後續的操做。

若是在配置變動的狀況下,因爲客戶端基於長輪訓的鏈接保持,因此返回的時間會很是的短,咱們能夠作個小實驗,在nacos console中頻繁修改數據而後再觀察一下

config-client-request.log的變化

12345複製代碼
2019-08-04 13:30:17,016|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:17,022|3|null|127.0.0.1|get|example|DEFAULT_GROUP||e10e4d5973c497e490a8d7a9e4e9be64|unknown2019-08-04 13:30:20,807|10|true|0:0:0:0:0:0:0:1|publish|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|null2019-08-04 13:30:20,843|0|in-advance|127.0.0.1|polling|1|55|example+DEFAULT_GROUP2019-08-04 13:30:20,848|1|null|127.0.0.1|get|example|DEFAULT_GROUP||81360b7e732a5dbb37d62d81cebb85d2|unknown複製代碼

1564896925676

服務端的處理

分析完客戶端以後,隨着好奇心的驅使,服務端是如何處理客戶端的請求的?那麼一樣,咱們須要思考幾個問題

  • 客戶端的長輪訓響應時間受到哪些因素的影響
  • 客戶端的超時時間爲何要設置30s

客戶端發送的請求地址是:/v1/cs/configs/listener 找到服務端對應的方法

ConfigController

nacos是使用spring mvc提供的rest api。這裏面會調用inner.doPollingConfig進行處理

123456789101112131415161718192021複製代碼
@RequestMapping(value = "/listener", method = RequestMethod.POST)    public void listener(HttpServletRequest request, HttpServletResponse response)        throws ServletException, IOException {        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);        String probeModify = request.getParameter("Listening-Configs");        if (StringUtils.isBlank(probeModify)) {            throw new IllegalArgumentException("invalid probeModify");        }        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);        Map<String, String> clientMd5Map;        try {            clientMd5Map = MD5Util.getClientMd5Map(probeModify);        } catch (Throwable e) {            throw new IllegalArgumentException("invalid probeModify");        }        // do long-polling        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());    }複製代碼

doPollingConfig

這個方法中,兼容了長輪訓和短輪詢的邏輯,咱們只須要關注長輪訓的部分。再次進入到longPollingService.addLongPollingClient

12345678910111213141516171819202122232425262728293031323334353637383940複製代碼
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,                                  Map<String, String> clientMd5Map, int probeRequestSize)        throws IOException, ServletException {        // 長輪詢        if (LongPollingService.isSupportLongPolling(request)) {            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);            return HttpServletResponse.SC_OK + "";        }        // else 兼容短輪詢邏輯        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);        // 兼容短輪詢result        String oldResult = MD5Util.compareMd5OldResult(changedGroups);        String newResult = MD5Util.compareMd5ResultString(changedGroups);        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);        if (version == null) {            version = "2.0.0";        }        int versionNum = Protocol.getVersionNumber(version);        /**         * 2.0.4版本之前, 返回值放入header中         */        if (versionNum < START_LONGPOLLING_VERSION_NUM) {            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);        } else {            request.setAttribute("content", newResult);        }        // 禁用緩存        response.setHeader("Pragma", "no-cache");        response.setDateHeader("Expires", 0);        response.setHeader("Cache-Control", "no-cache,no-store");        response.setStatus(HttpServletResponse.SC_OK);        return HttpServletResponse.SC_OK + "";    }複製代碼

longPollingService.addLongPollingClient

從方法名字上能夠推測出,這個方法應該是把客戶端的長輪訓請求添加到某個任務中去。

  • 得到客戶端傳遞過來的超時時間,而且進行本地計算,提早500ms返回響應,這就能解釋爲何客戶端響應超時時間是29.5+了。固然若是isFixedPolling=true的狀況下,不會提早返回響應
  • 根據客戶端請求過來的md5和服務器端對應的group下對應內容的md5進行比較,若是不一致,則經過generateResponse將結果返回
  • 若是配置文件沒有發生變化,則經過scheduler.execute 啓動了一個定時任務,將客戶端的長輪詢請求封裝成一個叫 ClientLongPolling 的任務,交給 scheduler 去執行
12345678910111213141516171819202122232425262728293031323334353637383940複製代碼
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,                                     int probeRequestSize) {        //str表示超時時間,也就是timeout        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);        String tag = req.getHeader("Vipserver-Tag");        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);        /**         * 提早500ms返回響應,爲避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動  add delay time for LoadBalance         */        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);        if (isFixedPolling()) {            timeout = Math.max(10000, getFixedPollingInterval());            // do nothing but set fix polling timeout        } else {            long start = System.currentTimeMillis();            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);            if (changedGroups.size() > 0) {                generateResponse(req, rsp, changedGroups);                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",                    clientMd5Map.size(), probeRequestSize, changedGroups.size());                return;            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,                    changedGroups.size());                return;            }        }        String ip = RequestUtil.getRemoteIp(req);        // 必定要由HTTP線程調用,不然離開後容器會當即發送響應        final AsyncContext asyncContext = req.startAsync();        // AsyncContext.setTimeout()的超時時間不許,因此只能本身控制        asyncContext.setTimeout(0L);        scheduler.execute(            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));    }複製代碼

ClientLongPolling

接下來咱們來分析一下,clientLongPolling到底作了什麼操做。或者說咱們能夠先猜想一下應該會作什麼事情

  • 這個任務要阻塞29.5s才能執行,由於立馬執行沒有任何意義,畢竟前面已經執行過一次了
  • 若是在29.5s+以內,數據發生變化,須要提早通知。須要有一種監控機制

基於這些猜測,咱們能夠看看它的實現過程

從代碼粗粒度來看,它的實現彷佛和咱們的猜測一致,在run方法中,經過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會經過MD5Util.compareMd5來進行計算

那另一個,當數據發生變化之後,確定不能等到29.5s以後才通知呀,那怎麼辦呢?咱們發現有一個allSubs的東西,它彷佛和發佈訂閱有關係。那是否是有可能當前的clientLongPolling訂閱了數據變化的事件呢?

12345678910111213141516171819202122232425262728293031323334353637383940414243複製代碼
public void run() {    asyncTimeoutFuture = scheduler.schedule(new Runnable() {        @Override        public void run() {            try {                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMil·1·s());                /**                 * 刪除訂閱關係                 */                allSubs.remove(ClientLongPolling.this);                if (isFixedPolling()) {                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                                           (System.currentTimeMillis() - createTime),                                           "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                                           "polling",                                           clientMd5Map.size(), probeRequestSize);                    List<String> changedGroups = MD5Util.compareMd5(                        (HttpServletRequest)asyncContext.getRequest(),                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);                    if (changedGroups.size() > 0) {                        sendResponse(changedGroups);                    } else {                        sendResponse(null);                    }                } else {                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",                                           (System.currentTimeMillis() - createTime),                                           "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),                                           "polling",                                           clientMd5Map.size(), probeRequestSize);                    sendResponse(null);                }            } catch (Throwable t) {                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());            }        }    }, timeoutTime, TimeUnit.MILLISECONDS);    allSubs.add(this);}複製代碼

allSubs

allSubs是一個隊列,隊列裏面放了ClientLongPolling這個對象。這個隊列彷佛和配置變動有某種關聯關係

123456複製代碼
/** * 長輪詢訂閱關係 */final Queue<ClientLongPolling> allSubs;allSubs.add(this);複製代碼

那這個時候,個人第一想法是,先去看一下當前這個類的類圖,發現LongPollingService集成了AbstractEventListener,事件監聽?果真沒猜錯。

1564902541390

AbstractEventListener

這裏面有一個抽象的onEvent方法,明顯是用來處理事件的方法,而抽象方法必須由子類實現,因此意味着LongPollingService裏面確定實現了onEvent方法

1234567891011121314151617181920212223複製代碼
static public abstract class AbstractEventListener {        public AbstractEventListener() {            /**             * automatic register             */            EventDispatcher.addEventListener(this);        }        /**         * 感興趣的事件列表         *         * @return event list         */        abstract public List<Class<? extends Event>> interest();        /**         * 處理事件         *         * @param event event         */        abstract public void onEvent(Event event);    }複製代碼

LongPollingService.onEvent

這個事件的實現方法中

  • 判斷事件類型是否爲LocalDataChangeEvent
  • 經過scheduler.execute執行DataChangeTask這個任務
1234567891011複製代碼
@Override    public void onEvent(Event event) {        if (isFixedPolling()) {            // ignore        } else {            if (event instanceof LocalDataChangeEvent) {                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));            }        }    }複製代碼

DataChangeTask.run

從名字能夠看出來,這個是數據變化的任務,最讓人興奮的應該是,它裏面有一個循環迭代器,從allSubs裏面得到ClientLongPolling

最後經過clientSub.sendResponse把數據返回到客戶端。因此,這也就可以理解爲什麼數據變化可以實時觸發更新了。

12345678910111213141516171819202122232425262728293031複製代碼
public void run() {    try {        ConfigService.getContentBetaMd5(groupKey);        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {            ClientLongPolling clientSub = iter.next();            if (clientSub.clientMd5Map.containsKey(groupKey)) {                // 若是beta發佈且不在beta列表直接跳過                if (isBeta && !betaIps.contains(clientSub.ip)) {                    continue;                }                // 若是tag發佈且不在tag列表直接跳過                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {                    continue;                }                getRetainIps().put(clientSub.ip, System.currentTimeMillis());                iter.remove(); // 刪除訂閱關係                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",                                       (System.currentTimeMillis() - changeTime),                                       "in-advance",                                       RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),                                       "polling",                                       clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);                clientSub.sendResponse(Arrays.asList(groupKey));            }        }    } catch (Throwable t) {        LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());    }}複製代碼

那麼接下來還有一個疑問是,數據變化以後是如何觸發事件的呢? 因此咱們定位到數據變化的請求類中,在ConfigController這個類中,找到POST請求的方法

找到配置變動的位置, 發現數據持久化以後,會經過EventDispatcher進行事件發佈EventDispatcher.fireEvent 可是這個事件彷佛不是咱們所關心的時間,緣由是這裏發佈的事件是ConfigDataChangeEvent, 而LongPollingService感興趣的事件是LocalDataChangeEvent

1234567891011121314151617181920212223複製代碼
@RequestMapping(method = RequestMethod.POST)    @ResponseBody    public Boolean publishConfig(...)        throws NacosException {        //省略部分代碼        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);        if (StringUtils.isBlank(betaIps)) {            if (StringUtils.isBlank(tag)) {                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));            } else {                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));            }        } else { // beta publish            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));        }        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),            LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);        return true;    }複製代碼

後來我發現,在Nacos中有一個DumpService,它會定時把變動後的數據dump到磁盤上,DumpService在spring啓動以後,會調用init方法啓動幾個dump任務。而後在任務執行結束以後,會觸發一個LocalDataChangeEvent 的事件

1234567複製代碼
@PostConstruct    public void init() {        LogUtil.defaultLog.warn("DumpService start");        DumpProcessor processor = new DumpProcessor(this);        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);        DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);        DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);複製代碼

簡單總結

簡單總結一下剛剛分析的整個過程。

  • 客戶端發起長輪訓請求
  • 服務端收到請求之後,先比較服務端緩存中的數據是否相同,若是不通,則直接返回
  • 若是相同,則經過schedule延遲29.5s以後再執行比較
  • 爲了保證當服務端在29.5s以內發生數據變化可以及時通知給客戶端,服務端採用事件訂閱的方式來監聽服務端本地數據變化的事件,一旦收到事件,則觸發DataChangeTask的通知,而且遍歷allStubs隊列中的ClientLongPolling,把結果寫回到客戶端,就完成了一次數據的推送
  • 若是 DataChangeTask 任務完成了數據的 「推送」 以後,ClientLongPolling 中的調度任務又開始執行了怎麼辦呢?
    很簡單,只要在進行 「推送」 操做以前,先將原來等待執行的調度任務取消掉就能夠了,這樣就防止了推送操做寫完響應數據以後,調度任務又去寫響應數據,這時確定會報錯的。因此,在ClientLongPolling方法中,最開始的一個步驟就是刪除訂閱事件

因此總的來講,Nacos採用推+拉的形式,來解決最開始關於長輪訓時間間隔的問題。固然,30s這個時間是能夠設置的,而之因此定30s,應該是一個經驗值。

集羣選舉問題

Nacos支持集羣模式,很顯然。

而一旦涉及到集羣,就涉及到主從,那麼nacos是一種什麼樣的機制來實現的集羣呢?

nacos的集羣相似於zookeeper, 它分爲leader角色和follower角色, 那麼從這個角色的名字能夠看出來,這個集羣存在選舉的機制。 由於若是本身不具有選舉功能,角色的命名可能就是master/slave了,固然這只是我基於這麼多組件的命名的一個猜想

選舉算法

Nacos集羣採用raft算法來實現,它是相對zookeeper的選舉算法較爲簡單的一種。

選舉算法的核心在RaftCore 中,包括數據的處理和數據同步

raft算法演示地址

在Raft中,節點有三種角色:

  • Leader:負責接收客戶端的請求
  • Candidate:用於選舉Leader的一種角色
  • Follower:負責響應來自Leader或者Candidate的請求

選舉分爲兩個節點

  • 服務啓動的時候
  • leader掛了的時候

全部節點啓動的時候,都是follower狀態。 若是在一段時間內若是沒有收到leader的心跳(多是沒有leader,也多是leader掛了),那麼follower會變成Candidate。而後發起選舉,選舉以前,會增長term,這個term和zookeeper中的epoch的道理是同樣的。

  • follower會投本身一票,而且給其餘節點發送票據vote,等到其餘節點回復
  • 在這個過程當中,可能出現幾種狀況
    • 收到過半的票數經過,則成爲leader
    • 被告知其餘節點已經成爲leader,則本身切換爲follower
    • 一段時間內沒有收到過半的投票,則從新發起選舉
  • 約束條件在任一term中,單個節點最多隻能投一票

選舉的幾種狀況

  • 第一種狀況,贏得選舉以後,leader會給全部節點發送消息,避免其餘節點觸發新的選舉
  • 第二種狀況,好比有三個節點A B C。A B同時發起選舉,而A的選舉消息先到達C,C給A投了一票,當B的消息到達C時,已經不能知足上面提到的第一個約束,即C不會給B投票,而A和B顯然都不會給對方投票。A勝出以後,會給B,C發心跳消息,節點B發現節點A的term不低於本身的term,知道有已經有Leader了,因而轉換成follower
  • 第三種狀況, 沒有任何節點得到majority投票,多是平票的狀況。加入總共有四個節點(A/B/C/D),Node C、Node D同時成爲了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,這就出現了平票 split vote的狀況。這個時候你們都在等啊等,直到超時後從新發起選舉。若是出現平票的狀況,那麼就延長了系統不可用的時間,所以raft引入了randomized election timeouts來儘可能避免平票狀況

數據的處理

對於事務操做,請求會轉發給leader

非事務操做上,能夠任意一個節點來處理

下面這段代碼摘自 RaftCore , 在發佈內容的時候,作了兩個事情

  • 若是當前的節點不是leader,則轉發給leader節點處理
  • 若是是,則向全部節點發送onPublish
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970複製代碼
public void signalPublish(String key, Record value) throws Exception {        if (!isLeader()) {            JSONObject params = new JSONObject();            params.put("key", key);            params.put("value", value);            Map<String, String> parameters = new HashMap<>(1);            parameters.put("key", key);            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);            return;        }        try {            OPERATE_LOCK.lock();            long start = System.currentTimeMillis();            final Datum datum = new Datum();            datum.key = key;            datum.value = value;            if (getDatum(key) == null) {                datum.timestamp.set(1L);            } else {                datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());            }            JSONObject json = new JSONObject();            json.put("datum", datum);            json.put("source", peers.local());            onPublish(datum, peers.local());            final String content = JSON.toJSONString(json);            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());            for (final String server : peers.allServersIncludeMyself()) {                if (isLeader(server)) {                    latch.countDown();                    continue;                }                final String url = buildURL(server, API_ON_PUB);                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {                    @Override                    public Integer onCompleted(Response response) throws Exception {                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {                            Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",                                datum.key, server, response.getStatusCode());                            return 1;                        }                        latch.countDown();                        return 0;                    }                    @Override                    public STATE onContentWriteCompleted() {                        return STATE.CONTINUE;                    }                });            }            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {                // only majority servers return success can we consider this update success                Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);            }            long end = System.currentTimeMillis();            Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);        } finally {            OPERATE_LOCK.unlock();        }複製代碼

#

(^U^)ノ~YO同窗,你已經看到告終尾嗎?
看到這裏,愛學習的你是否是在上班、學習和讀文章的時候有什麼問題想要大牛和大神解答呢?
因此,快來點擊這裏�(☄⊙ω⊙)☄
gper.club/answers/7e7…提出問題讓大牛大神們給你解惑吧~

相關文章
相關標籤/搜索