最近發生了不少事情,甚至對本身的技術能力和學習方式產生了懷疑,因此有一段時間沒更新文章了,估計之後更新的頻率會愈來愈少,但願有更多的沉澱而不是簡單地分享。讓我有感悟的是,最近看到一篇關於ES集羣狀態更新的文章Elasticsearch Distributed Consistency Principles Analysis (2) - Meta,和 「提交給線程池的Runnable任務是以怎樣的順序執行的?」這個問題,所以,結合ES6.3.2源碼,分析一下ES的Master節點是如何更新集羣狀態的。html
分佈式系統的集羣狀態通常是指各類元數據信息,通俗地講,在ES中建立了一個Index,這個Index的Mapping結構信息、Index由幾個分片組成,這些分片分佈在哪些節點上,這樣的信息就組成了集羣的狀態。當Client建立一個新索引、或者刪除一個索、或者進行快照備份、或者集羣又進行了一次Master選舉,這些都會致使集羣狀態的變化。歸納一下就是:發生了某個事件,致使集羣狀態發生了變化,產生了新集羣狀態後,如何將新的狀態應用到各個節點上去,而且保證一致性。java
在ES中,各個模塊發生一些事件,會致使集羣狀態變化,並由org.elasticsearch.cluster.service.ClusterService#submitStateUpdateTask(java.lang.String, T)
提交集羣狀態變化更新任務。當任務執行完成時,就產生了新的集羣狀態,而後經過"二階段提交協議"將新的集羣狀態應用到各個節點上。這裏可大概瞭解一下有哪些模塊的操做會提交一個更新任務,好比:數組
所以各個Service(好比:MetaDataIndexTemplateService)都持有org.elasticsearch.cluster.service.ClusterService實例引用,經過ClusterService#submitStateUpdateTask方法提交更新集羣狀態的任務。安全
既然建立新索引、刪除索引、修改索引模板、建立快照等都會觸發集羣狀態更新,那麼如何保證這些更新操做是"安全"的?好比操做A是刪除索引,操做B是對索引作快照備份,操做A、B的順序不當,就會引起錯誤!好比,索引都已經刪除了,那還怎麼作快照?所以,爲了防止這種併發操做對集羣狀態更新的影響,org.elasticsearch.cluster.service.MasterService中採用單線程執行方式提交更新集羣狀態的任務的。狀態更新任務由org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask表示,它本質上是一個具備優先級特徵的Runnable任務:數據結構
//PrioritizedRunnable 實現了Comparable接口,compareTo方法比較任務的優先級 public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> { private final Priority priority;//Runnable任務優先級 private final long creationDate; private final LongSupplier relativeTimeProvider; @Override public int compareTo(PrioritizedRunnable pr) { return priority.compareTo(pr.priority); } }
而單線程的執行方式,則是經過org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor線程池實現的。看org.elasticsearch.common.util.concurrent.EsExecutors#newSinglePrioritizing線程池的建立:併發
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) { //core pool size == max pool size ==1,說明該線程池裏面只有一個工做線程 return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer); }
而線程池的任務隊列則是採用:PriorityBlockingQueue(底層是個數組,數據結構是:堆 Heap),經過compareTo方法比較Priority,從而決定任務的排隊順序。app
//PrioritizedEsThreadPoolExecutor#PrioritizedEsThreadPoolExecutor PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder); this.timer = timer; }
這裏想提一下這種只採用一個線程執行任務狀態更新的思路,它與Redis採用單線程執行Client的操做命令是一致的。各個Redis Client向Redis Server發起操做請求,Redis Server最終是以一個線程來"順序地"執行各個命令。單線程執行方式,避免了數據併發操做致使的不一致性,而且不須要線程同步。畢竟同步須要加鎖,而加鎖會影響程序性能。elasticsearch
在這裏,我想插一個問題:JDK線程池執行任務的順序是怎樣的?經過java.util.concurrent.ThreadPoolExecutor#execute方法先提交到線程池中的任務,必定會優先執行嗎?這個問題常常被人問到,哈哈。可是,真正地理解,卻不容易。由於它涉及到線程池參數,core pool size、max pool size 、任務隊列的長度以及任務到來的時機。其實JDK源碼中的註釋已經講得很清楚了:分佈式
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
代碼驗證一下以下,會發現:後提交的任務,反而可能先執行完成。由於,先提交的任務在隊列中排隊,然後提交的任務直接被新建立的線程執行了,省去了排隊過程。ide
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.*; /** * @author psj * @date 2019/11/14 */ public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException{ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d").build(); BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(4); ThreadPoolExecutor executorSevice = new ThreadPoolExecutor(1, 4, 0, TimeUnit.HOURS, workQueue, threadFactory, new ThreadPoolExecutor.DiscardPolicy()); for (int i = 1; i <=8; i++) { MyRunnable task = new MyRunnable(i, workQueue); executorSevice.execute(task); sleepMills(200); System.out.println("submit: " + i + ", queue size:" + workQueue.size() + ", active count:" + executorSevice.getActiveCount()); } Thread.currentThread().join(); } public static class MyRunnable implements Runnable { private int sequence; private BlockingQueue taskQueue; public MyRunnable(int sequence, BlockingQueue taskQueue) { this.sequence = sequence; this.taskQueue = taskQueue; } @Override public void run() { //模擬任務須要1秒鐘才能執行完成 sleepMills(1000); System.out.println("task :" + sequence + " finished, current queue size:" + taskQueue.size()); } } public static void sleepMills(int mills) { try { TimeUnit.MILLISECONDS.sleep(mills); } catch (InterruptedException e) { } } }
OK,分析完了線程池執行任務的順序,再看看ES的PrioritizedEsThreadPoolExecutor線程池的參數:將 core pool size 和 max pool size 都設置成1,避免了這種"插隊"的現象。各個模塊觸發的集羣狀態更新最終在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks方法中構造UpdateTask對象實例,並經過submitTasks方法提交任務執行。額外須要注意的是:集羣狀態更新任務能夠以批量執行方式提交,具體看org.elasticsearch.cluster.service.TaskBatcher的實現吧。
try { List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream() .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor)) .collect(Collectors.toList()); taskBatcher.submitTasks(safeTasks, config.timeout()); } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } }
最後來分析一下 org.elasticsearch.cluster.service.ClusterService類,在ES節點啓動的時候,在Node#start()方法中會啓動ClusterService,當其它各個模塊執行一些操做觸發集羣狀態改變時,就是經過ClusterService來提交集羣狀態更新任務。而ClusterService其實就是封裝了 MasterService和ClusterApplierService,MasterService提供任務提交接口,內部維護一個線程池處理更新任務,而ClusterApplierService則負責通知各個模塊應用新生成的集羣狀態。