ElasticSearch6.3.2源碼分析之節點鏈接實現

ElasticSearch6.3.2源碼分析之節點鏈接實現

這篇文章主要分析ES節點之間如何維持鏈接的。在開始以前,先扯一下ES源碼閱讀的一些心得:在使用ES過程當中碰到某個問題,想要深刻了解一下,可源碼又太多了,不知道從哪裏入手,怎麼辦?ES啓動的入口類是Elasticsearch.java,順着這個啓動流程,到Node類的構造方法:org.elasticsearch.node.Node#Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>)在構造方法中開始初始化各類模塊(插件)。若是想了解某個分詞插件是如何被載的?又或者是線程池模塊、搜索模塊是如何初始化的,那麼就在Node的構造方法裏面看相關的代碼。html

初始化完成以後,就開始啓動各類服務,這是在org.elasticsearch.node.Node#start方法中實現。好比本文要介紹的節點鏈接服務NodeConnectionsService、用於節點之間數據傳輸的TransportService、用於master選主,節點發現的ZenDiscovery服務。所以,要想了解ES某個功能的運行機制,那就從start方法開始跟蹤調試吧。
對於不一樣的服務而言,有着各自的啓動邏輯,這都在它們本身的doStart()方法中實現:好比ZenDiscovery#doStart()、NodeConnectionsService#doStart()、org.elasticsearch.cluster.service.ClusterService#doStart 等等……
java

如今進入正題,ES各個節點是如何鏈接,並保持鏈接狀態的。node

This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are removed. Also, it periodically checks that all connections are still open and if needed restores them.

當有新節點加入集羣時,集羣狀態會更新,NodeConnectionsService 就會嘗試鏈接這個節點,這在:org.elasticsearch.cluster.NodeConnectionsService#connectToNodes方法中實現。此外,NodeConnectionsService 還會週期性檢測節點之間的鏈接是不是open的,若是節點鏈接出現了故障,則嘗試恢復鏈接,這是經過後臺的週期性任務org.elasticsearch.cluster.NodeConnectionsService.ConnectionChecker實現的。多線程

Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond to pings. This is done by NodesFaultDetection  Master fault detection is done by MasterFaultDetection

若是節點之間的鏈接發生了故障,NodeConnectionsService 並不負責移除節點,移除節點的任務交給:NodesFaultDetection 和 MasterFaultDetection(處理master節點的故障)實現。併發

如今看具體源碼:在Node#start方法中建立NodeConnectionsService並啓動鏈接服務:異步

final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();

各個Service都是AbstractLifecycleComponent的子類,重寫了 AbstractLifecycleComponent#doStart,具體的啓動邏輯都在doStart方法中:elasticsearch

NodeConnectionsService節點鏈接啓動很是簡單,就是一個定時任務嘗試鏈接目標節點:org.elasticsearch.cluster.NodeConnectionsService#doStartide

@Override
protected void doStart() {
    //threadPool.schedule 啓動一個定時任務,任務的執行邏輯封裝在Runnable任務:ConnectionChecker中
    backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker());
}

那具體看org.elasticsearch.cluster.NodeConnectionsService.ConnectionChecker的實現:oop

class ConnectionChecker extends AbstractRunnable {

        @Override
        public void onFailure(Exception e) {
            logger.warn("unexpected error while checking for node reconnects", e);
        }

        protected void doRun() {
            for (DiscoveryNode node : nodes.keySet()) {
                try (Releasable ignored = nodeLocks.acquire(node)) {
                    //遍歷每一個節點,驗證鏈接(當新節點加入時,集羣狀態會更新)
                    validateAndConnectIfNeeded(node);
                }
            }
        }

        @Override
        public void onAfter() {
            if (lifecycle.started()) {
                //每一次驗證完鏈接後,在這裏從新執行定時任務開啓下一輪的鏈接驗證
                //從而實現了節點之間週期性驗證鏈接(週期性地檢測節點是存在的)
                backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this);
            }
        }
    }

ConnectionChecker 就是一個Runnable任務,若是熟悉ES線程池實現的話,ES使用 AbstractRunnable的doRun方法中 封裝各類異步操做執行邏輯,即任務的處理在doRun方法中執行,執行成功後由onAfter方法統一處理,若是Runnable任務執行過程當中出現了異常則由onFailure統一處理。所以,當一個定時任務執行完成,檢測鏈接成功後,在onAfter方法裏面,又開始下一次的鏈接檢測。源碼分析

再來看具體是如何創建鏈接的:

void validateAndConnectIfNeeded(DiscoveryNode node) {
        assert nodeLocks.isHeldByCurrentThread(node) : "validateAndConnectIfNeeded must be called under lock";
        //先判斷要不要發起一次鏈接/鏈接檢測
        if (lifecycle.stoppedOrClosed() || nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
            // nothing to do
        } else {
            //發起一次鏈接檢測
            try {
                // connecting to an already connected node is a noop
                transportService.connectToNode(node);
                nodes.put(node, 0);//若是鏈接成功了,將失敗次數設置成0
            } catch (Exception e) {
                Integer nodeFailureCount = nodes.get(node);
                assert nodeFailureCount != null : node + " didn't have a counter in nodes map";
                nodeFailureCount = nodeFailureCount + 1;//鏈接未成功,失敗次數加1
                // log every 6th failure
                if ((nodeFailureCount % 6) == 1) {
                    final int finalNodeFailureCount = nodeFailureCount;
                    //每6次 鏈接失敗 就嘗試打印告警日誌(默認是10s鍾發起一次鏈接檢測任務)
                    logger.warn(() -> new ParameterizedMessage(
                                "failed to connect to node {} (tried [{}] times)", node, finalNodeFailureCount), e);
                }
                nodes.put(node, nodeFailureCount);//保存node節點已經鏈接失敗的次數
            }
        }
    }

重點是這行代碼:transportService.connectToNode(node);,實際的鏈接請求在TcpTransport類中:org.elasticsearch.transport.TcpTransport#connectToNode,看這裏面的代碼,就知道ES節點是如何鏈接到另外一個節點上去的了。

另外,爲了防止多線程併發鏈接某個節點,在發起鏈接時,須要先獲取這個節點所對應的鎖,才能發起鏈接。ES在ReentrantLock功能基礎上,實現了本身的鎖,裏面的實現細節也很是值得借鑑。

private static final class KeyLock extends ReentrantLock {
        KeyLock(boolean fair) {
            super(fair);
        }

        private final AtomicInteger count = new AtomicInteger(1);
    }

KeyLock是可重入鎖,count 保存當前線程獲取鎖的次數(重入次數)

當向某個節點發起鏈接時,若是這個節點所對應的鎖不存在,會爲該節點建立一把鎖,並保存到ConcurrentHashMap中。獲取鎖的實現邏輯以下:org.elasticsearch.common.util.concurrent.KeyedLock#acquire

public Releasable acquire(T key) {
        while (true) {//注意while true循環, ha ha ....
            KeyLock perNodeLock = map.get(key);
            if (perNodeLock == null) {
                //鎖不存在,爲這個節點建立一把鎖
                ReleasableLock newLock = tryCreateNewLock(key);
                if (newLock != null) {
                    return newLock;
                }
            } else {//鎖已經存在了
                assert perNodeLock != null;
                int i = perNodeLock.count.get();//目前鎖的重入次數
                //外面是個while true循環, CAS必定會成功
                if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
                    perNodeLock.lock();
                    return new ReleasableLock(key, perNodeLock);
                }
            }
        }
    }

NodeConnectionsService 初始化了一個KeyedLock對象,用來保存向目標節點發起鏈接時所建立的鎖。

private final KeyedLock<DiscoveryNode> nodeLocks = new KeyedLock<>();

總結:

整個檢測鏈接流程:在Node#start()中啓動鏈接服務,接着向ThreadPool的GENERIC線程池提交一個 檢測鏈接的任務,在 ConnectionChecker#onAfter()方法中 執行下一輪的鏈接檢測(從而實現週期性檢測),發送實現的鏈接請求在:TcpTransport#connectToNode中實現

另外,ES各個節點之間的鏈接機制的具體實現幾乎都在TcpTransport這個類中,心跳發送、斷開鏈接、創建鏈接等。有時間的可繼續深究。

原文:https://www.cnblogs.com/hapjin/p/11395422.html

其餘一些文章:

Elasticsearch6.3.2啓動過程源碼閱讀記錄

Elasticsearch Transport 模塊建立及啓動分析

ElasticSearch 線程池類型分析之SizeBlockingQueue

ElasticSearch 線程池類型分析之 ExecutorScalingQueue

ElasticSearch 線程池類型分析之 ResizableBlockingQueue

相關文章
相關標籤/搜索