ElasticSearch如何更新集羣的狀態

ElasticSearch如何更新集羣的狀態

最近發生了不少事情,甚至對本身的技術能力和學習方式產生了懷疑,因此有一段時間沒更新文章了,估計之後更新的頻率會愈來愈少,但願有更多的沉澱而不是簡單地分享。讓我有感悟的是,最近看到一篇關於ES集羣狀態更新的文章Elasticsearch Distributed Consistency Principles Analysis (2) - Meta,和 「提交給線程池的Runnable任務是以怎樣的順序執行的?」這個問題,所以,結合ES6.3.2源碼,分析一下ES的Master節點是如何更新集羣狀態的。html

分佈式系統的集羣狀態通常是指各類元數據信息,通俗地講,在ES中建立了一個Index,這個Index的Mapping結構信息、Index由幾個分片組成,這些分片分佈在哪些節點上,這樣的信息就組成了集羣的狀態。當Client建立一個新索引、或者刪除一個索、或者進行快照備份、或者集羣又進行了一次Master選舉,這些都會致使集羣狀態的變化。歸納一下就是:發生了某個事件,致使集羣狀態發生了變化,產生了新集羣狀態後,如何將新的狀態應用到各個節點上去,而且保證一致性。java

在ES中,各個模塊發生一些事件,會致使集羣狀態變化,並由org.elasticsearch.cluster.service.ClusterService#submitStateUpdateTask(java.lang.String, T)提交集羣狀態變化更新任務。當任務執行完成時,就產生了新的集羣狀態,而後經過"二階段提交協議"將新的集羣狀態應用到各個節點上。這裏可大概瞭解一下有哪些模塊的操做會提交一個更新任務,好比:數組

  • MetaDataDeleteIndexService#deleteIndices 刪除索引
  • org.elasticsearch.snapshots.SnapshotsService#createSnapshot 建立快照
  • org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService#putTemplate 建立索引模板

所以各個Service(好比:MetaDataIndexTemplateService)都持有org.elasticsearch.cluster.service.ClusterService實例引用,經過ClusterService#submitStateUpdateTask方法提交更新集羣狀態的任務。安全

既然建立新索引、刪除索引、修改索引模板、建立快照等都會觸發集羣狀態更新,那麼如何保證這些更新操做是"安全"的?好比操做A是刪除索引,操做B是對索引作快照備份,操做A、B的順序不當,就會引起錯誤!好比,索引都已經刪除了,那還怎麼作快照?所以,爲了防止這種併發操做對集羣狀態更新的影響,org.elasticsearch.cluster.service.MasterService中採用單線程執行方式提交更新集羣狀態的任務的。狀態更新任務由org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask表示,它本質上是一個具備優先級特徵的Runnable任務:數據結構

//PrioritizedRunnable 實現了Comparable接口,compareTo方法比較任務的優先級
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
    
    private final Priority priority;//Runnable任務優先級
    private final long creationDate;
    private final LongSupplier relativeTimeProvider;
    
     @Override
    public int compareTo(PrioritizedRunnable pr) {
        return priority.compareTo(pr.priority);
    }
}

而單線程的執行方式,則是經過org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor線程池實現的。看org.elasticsearch.common.util.concurrent.EsExecutors#newSinglePrioritizing線程池的建立:併發

public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
    //core pool size == max pool size ==1,說明該線程池裏面只有一個工做線程
        return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
    }

而線程池的任務隊列則是採用:PriorityBlockingQueue(底層是個數組,數據結構是:堆 Heap),經過compareTo方法比較Priority,從而決定任務的排隊順序。app

//PrioritizedEsThreadPoolExecutor#PrioritizedEsThreadPoolExecutor
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
        super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
        this.timer = timer;
    }

這裏想提一下這種只採用一個線程執行任務狀態更新的思路,它與Redis採用單線程執行Client的操做命令是一致的。各個Redis Client向Redis Server發起操做請求,Redis Server最終是以一個線程來"順序地"執行各個命令。單線程執行方式,避免了數據併發操做致使的不一致性,而且不須要線程同步。畢竟同步須要加鎖,而加鎖會影響程序性能。elasticsearch

在這裏,我想插一個問題:JDK線程池執行任務的順序是怎樣的?經過java.util.concurrent.ThreadPoolExecutor#execute方法先提交到線程池中的任務,必定會優先執行嗎?這個問題常常被人問到,哈哈。可是,真正地理解,卻不容易。由於它涉及到線程池參數,core pool size、max pool size 、任務隊列的長度以及任務到來的時機。其實JDK源碼中的註釋已經講得很清楚了:分佈式

/*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
  • 任務提交到線程池,若是線程池的活躍線程數量小於 core pool size,那麼直接建立新線程執行任務,這種狀況下任務是不會入隊列的。
  • 當線程池中的活躍線程數量已經達到core pool size時,繼續提交任務,這時的任務就會入隊列排隊。
  • 當任務隊列已經滿了時,同時又有新任務提交過來,若是線程池的活躍線程數小於 max pool size,那麼會建立新的線程,執行這些剛提交過來的任務,此時的任務也不會入隊列排隊。(注意:這裏新建立的線程並非從任務隊列中取任務,而是直接執行剛剛提交過來的任務,而那些前面已經提交了的在任務隊列中排隊的任務反而不能優先執行,換句話說:任務的執行順序並非嚴格按提交順序來執行的)

代碼驗證一下以下,會發現:後提交的任務,反而可能先執行完成。由於,先提交的任務在隊列中排隊,然後提交的任務直接被新建立的線程執行了,省去了排隊過程。ide

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;

/**
 * @author psj
 * @date 2019/11/14
 */
public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException{

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(4);
        ThreadPoolExecutor executorSevice = new ThreadPoolExecutor(1, 4, 0, TimeUnit.HOURS,
                workQueue, threadFactory, new ThreadPoolExecutor.DiscardPolicy());

        for (int i = 1; i <=8; i++) {
            MyRunnable task = new MyRunnable(i, workQueue);
            executorSevice.execute(task);
            sleepMills(200);
            System.out.println("submit: " + i  + ", queue size:" + workQueue.size() + ", active count:" + executorSevice.getActiveCount());
        }
        Thread.currentThread().join();
    }


    public static class MyRunnable implements Runnable {
        private int sequence;
        private BlockingQueue taskQueue;
        public MyRunnable(int sequence, BlockingQueue taskQueue) {
            this.sequence = sequence;
            this.taskQueue = taskQueue;
        }
        @Override
        public void run() {
            //模擬任務須要1秒鐘才能執行完成
            sleepMills(1000);
            System.out.println("task :" + sequence + " finished, current queue size:" + taskQueue.size());
        }
    }

    public static void sleepMills(int mills) {
        try {
            TimeUnit.MILLISECONDS.sleep(mills);
        } catch (InterruptedException e) {

        }
    }
}

OK,分析完了線程池執行任務的順序,再看看ES的PrioritizedEsThreadPoolExecutor線程池的參數:將 core pool size 和 max pool size 都設置成1,避免了這種"插隊"的現象。各個模塊觸發的集羣狀態更新最終在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks方法中構造UpdateTask對象實例,並經過submitTasks方法提交任務執行。額外須要注意的是:集羣狀態更新任務能夠以批量執行方式提交,具體看org.elasticsearch.cluster.service.TaskBatcher的實現吧。

try {
            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
                .collect(Collectors.toList());
            taskBatcher.submitTasks(safeTasks, config.timeout());
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }

最後來分析一下 org.elasticsearch.cluster.service.ClusterService類,在ES節點啓動的時候,在Node#start()方法中會啓動ClusterService,當其它各個模塊執行一些操做觸發集羣狀態改變時,就是經過ClusterService來提交集羣狀態更新任務。而ClusterService其實就是封裝了 MasterService和ClusterApplierService,MasterService提供任務提交接口,內部維護一個線程池處理更新任務,而ClusterApplierService則負責通知各個模塊應用新生成的集羣狀態。

相關文章
相關標籤/搜索