Elasticsearch分佈式一致性-節點選主

1.概述

        全部的分佈式系統都須要考慮一致性問題,ES採用的是主從模式的一致性解決方案。本文將主要基於Elasticsearch 6.5.4版本的discovery模塊從:ES節點類型、啓動流程、Master選舉、節點監測等方面介紹ES的主從模式的一致性解決方案。Elasticsearch6.5.4 debug環境搭建能夠參考:java

Elasticsearch源碼編譯運行node

2.ES節點類型

        一個Elasticsearch集羣是由許多Node組成的,Node能夠在Elasticsearch的啓動腳本elasticsearch.yml進行設置。算法

node.master: true/false
node.data: true/false複製代碼

        根據配置,ES Node類型一共有四種:MasterNode+DataNode、MasterNode、DataNode和最後一種Coordinating節點。bash

MasterNode:是可以成爲master的候選節點,能夠參與選選舉,主要是存儲集羣的元數據。網絡

DataNode:主要存儲shard的數據,而且負責這些shard數據的讀寫。app

Coordinating Node:節點既不做爲master也不存儲數據,接受請求轉發,聚合等操做。elasticsearch

3.啓動流程

        ZenDiscovery.java是Elasticsearch discovery的默認實現,選舉相關的邏輯基本在該類中。啓動的時候,ZenDiscovery類由Node.java,初始化,調用ZenDiscovery#startInitialJoin()方法啓動。分佈式

4.Master選舉

4.1 腦裂問題        

        爲了集羣不出現腦裂問題,通常會在啓動的時候配置quorum策略。ide

discovery.zen.minimum_master_nodes: (master_num/2)+1複製代碼

        master_num爲集羣中master候選節點的個數。該配置表示:當投票的人超過半數,節點才能被選舉爲master。post

        以下圖所示的集羣,若是出現了網絡分區。Node1和Node2忽然掉除了集羣,此時Node1和Node2會發起選舉,假設選舉除了Node1成爲master,右邊也發起了一輪選舉,選出了Node3做爲master。這樣集羣被分爲兩部分,每一部分都會維護本身的集羣狀態。若是配置了quorum策略,則左邊小於3,沒法進行選主,那左邊的集羣暫時沒法工做,右邊集羣能夠正常提供服務。

4.2 什麼時候發起選舉

  • 集羣啓動的時候

        集羣啓動的時候,經過 joinThreadControl#startNewThreadIfNotRunning() 來選出Master節點。

  • 檢測到和原來master的鏈接斷開

        經過ZenDiscovery#handleMasterGone來處理,最終調用 joinThreadControl#startNewThreadIfNotRunning()從新發起選舉

        咱們看到startNewThreadIfNotRunning會啓動一個線程去執行innerJoinCluster(),innerJoinCluster()中有一個while()循環,會一直等待findMaster()方法找到master節點。

  • 監測到node節點斷開
          每次斷開節點後會檢查當前集羣節點數是否知足quorum配置,若是不知足,從新發起選舉。

4.3 候選節點

        咱們看到findMaster的第一步,調用pingAndWait()獲取集羣中的節點。集羣的節點能夠在

elasticsearch.yml中配置:

discovery.zen.ping.unicast.hosts: [1.1.1.1, 1.1.1.2, 1.1.1.3]
複製代碼
private DiscoveryNode findMaster() {    
    logger.trace("starting to ping");    
     //查找當前活躍的master 
     List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }

        final DiscoveryNode localNode = transportService.getLocalNode();

        // add our selves
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

        //加入當前節點
        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

        // filter responses
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

        List<DiscoveryNode> activeMasters = new ArrayList<>();
        //收集ping到的節點的master信息,這裏先不考慮本身,Discovery的策略是非直到最後一刻都不會選本身爲master,可能預防腦裂在一開始就發生吧。
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging master 候選者=> 可以ping到的全部master設置爲true的節點
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        //若是收集到的節點沒有master信息,怎開始選舉
        if (activeMasters.isEmpty()) {
            //master 爲空,代表節點剛啓動,進行選主
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                //選主
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                //若是沒有足夠多候選節點,選主失敗
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            //master列表不爲空,選擇一個nodeid最小的做爲master
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
}複製代碼

        獲得的fullPingResponses表示,如今集羣中的全部節點信息,包括這些節點當前的master信息(有可能爲空)。

       接下來以下圖所示對fullPingResponse進行過濾,若是配置了ignore_non_master_pings爲true,則要把那些node.master配置爲false的過濾掉,而後判斷過濾後的結果是否爲當前節點,若是是當前節點,也過濾掉(ZenDiscovery通常最後才考慮當前節點,多是爲了防止腦裂)。最後拿到一個activeMasters的名單,該名單表示目前集羣中存活的master節點,通常個數爲0或者1。

    

4.4 選主

        有了activeMasters,還要作一件事情,就是拿到集羣中全部配置了node.master爲true的節點,從剛纔的fullPingResponses能夠很容易的找到配置爲node.master爲true的節點,最後生成一個masterCandidates。有了這兩個列表以後,就能夠正在的選主工做了。


  •         如上圖所示:先判斷activeMasters是否爲空,若是activeMasters不爲空,則從activeMasters中選出nodeId最小的節點做爲master(正常健康的集羣只會有一個,若是出現了腦裂,則會存在多個)。若是activeMasters爲空,則判斷當前集羣是否達到選舉個數要求,若是達到,則從masterCandidates中選舉一個版本號最新的節點,若是版本號一致選擇nodeId小的那個。若是集羣沒有達到規定個數,選舉失敗。

4.5 肯定選主

        當前節點選出master後,並不能肯定這個master就能成爲整個集羣的master,這只是當前節點認爲的master。這時還須要判斷master的狀況:

//若是master選出來是本身
        if (transportService.getLocalNode().equals(masterNode)) {
            //須要等待discovery.zen.minimum_master_nodes-1個節點加入纔算成功
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        //選舉本身成功
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }

                        //選舉本身失敗,從新發起一輪ping
                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }

            );
        } else {
            // process any incoming joins (they will fail because we are not the master)
            //阻止其餘節點加入,localNode
            nodeJoinController.stopElectionContext(masterNode + " elected");

            // send join request
            final boolean success = joinElectedMaster(masterNode);

            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // update cluster state
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }

                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }複製代碼

1)若是選出來的master爲當前節點,則當前節點須要等待其餘節點的加入,等待的數目爲discovery.zen.minimum_master_nodes-1,加上當前節點maste,恰好投票大於半數。若是在必定時間內沒有收到足夠多的投票(即其餘節點的加入),則選舉失敗,從新開始選舉。若是加入的節點達到數目,則選舉成功。

2)若是當前選舉的master是其餘節點,則當前節點關閉其餘節點的加入請求,假設當前節爲node1,目標master節點爲node2。則此時有三種可能:

  • node2已是集羣master,怎把node1做爲一個新節點加入,同步集羣信息,node1選舉成功
  • node2正在參與選舉,即1)所描述的狀態,則node2會把此次鏈接當成一個投票,node1繼續等待,若是node1在規定時間內沒有等到node2成爲master,則node1選舉失敗,node1從新選舉
  • node2認爲node3纔是master,此時node3會拒接鏈接請求,node1選舉失敗,從新選舉。

5.節點監測

        選舉流程結束後,爲了保證集羣服務過程當中節點的意外退出,須要啓動兩個重要的task。分別是masterFaultDetection和NodeFaultDetection。相似於心跳機制,按期監測node和master的狀態。若是node監測不到master心跳,調用,會notifyMasterFailure進行選舉。若是master檢測不到NodeFaultDetection心跳,調用notifyNodeFailure,將node移除,發佈新的cluster_state,執行相應的primary和replica操做。移除node的時候會監測當前節點數據是否足夠,若是不足,則從新發起選舉。

if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
         final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
         rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                         masterNodes, electMasterService.minimumMasterNodes()));
         return resultBuilder.build(currentState);
} else {
         return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
}複製代碼

6.總結

        相較於zookeeper的選舉,es的選舉有點像Bully算法,比較簡單。zookeeper基於Paxos的算法則比較複雜。Es的discovery模塊代碼量不大,核心的ZenDiscovery.java一共才1000多行代碼,認真看幾遍就能明白Elasticsearch選舉的主要思想。

相關文章
相關標籤/搜索