本文記錄ElasticSearch建立索引執行源碼流程。從執行流程角度看一下建立索引會涉及到哪些服務(好比AllocationService、MasterService),因爲本人對分佈式系統理解不是很深,因此不少一些細節原理也是不懂。html
建立索引請求。這裏僅僅是建立索引,沒有寫入文檔。java
curl -X PUT "localhost:9200/twitter"node
ElasticSearch服務器端收到Client的建立索引請求後,是從org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
開始執行索引建立流程的。git
建立索引是須要ElasticSearch Master節點參與的,所以TransportCreateIndexAction繼承了TransportMasterNodeAction,而建立索引的具體操做由實例屬性MetaDataCreateIndexService完成。github
/** * Create index action. */ public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> { // private final MetaDataCreateIndexService createIndexService;
在MetaDataCreateIndexService.createIndex(...)
調用onlyCreateIndex方法執行建立索引操做。算法
public void createIndex(...) { onlyCreateIndex(request, ActionListener.wrap(response -> { if (response.isAcknowledged()) { activeShardsObserver.waitForActiveShards }
Creates an index in the cluster state and waits for the specified number of shard copies to become active as specified in CreateIndexClusterStateUpdateRequest#waitForActiveShards()before sending the response on the listener.api
建立索引須要檢查 Active shards,默認狀況下:只要Primary Shard是Active的,就能夠建立索引。若是Active shards未達到指定的數目,則會建立索引請求會阻塞,直到集羣中Active shards恢復到指定數目或者超時返回。可參考:ActiveShardsObserver#waitForActiveShards(...)
方法。服務器
索引的建立封裝在org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.IndexCreationTask#IndexCreationTask
對象中,最終由具備優先級任務隊列的線程池PrioritizedEsThreadPoolExecutor執行。併發
建立索引這樣的操做須要通知到集羣中各個節點,修改集羣的狀態,所以IndexCreationTask繼承了AckedClusterStateUpdateTask。app
在MetaDataCreateIndexService#onlyCreateIndex(...)
提交IndexCreationTask。
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings, this::validate));
跟蹤submitStateUpdateTasks(...)調用棧,在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks(...)
方法中lambda map函數 將IndexCreationTask對象轉換成可供線程池執行的Runnable任務Batcher.UpdateTask。
public <T> void submitStateUpdateTasks(...,Map<T, ClusterStateTaskListener> tasks,...) { try { List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream() .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor)) .collect(Collectors.toList()); //taskBatcher org.elasticsearch.cluster.service.TaskBatcher taskBatcher.submitTasks(safeTasks, config.timeout()); } } //PrioritizedEsThreadPoolExecutor execute(...)提交建立索引任務 public abstract class TaskBatcher { private final PrioritizedEsThreadPoolExecutor threadExecutor; public void submitTasks(...){ if (timeout != null) { threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout)); } else { threadExecutor.execute(firstTask); } } }
org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask
的繼承關係以下:能夠看出它是一個Runnable任務,建立索引操做最終由PrioritizedEsThreadPoolExecutor線程池提交任務執行。
PrioritizedEsThreadPoolExecutor擴充自ThreadPoolExecutor,參考這個類的源代碼,能夠了解ElasticSearch是如何自定義一個帶有任務優先級隊列的線程池的,也能夠學習一些如何擴展線程池的功能。
跟蹤threadExecutor.execute(...)
代碼,
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { //給Runnable任務再添加一些額外的功能,好比優先級 command = wrapRunnable(command); // doExecute(command); } //EsThreadPoolExecutor protected void doExecute(final Runnable command) { try { super.execute(command);//提交任務 }catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } }
固然了,因爲PrioritizedEsThreadPoolExecutor擴展自ThreadPoolExecutor,最終的執行是在:ThreadPoolExecutor的內部類Worker#runWorker(Worker w)
中執行。可參考探究ElasticSearch中的線程池實現中的第3點分析。
上面分析的是線程執行流程,而具體的業務邏輯代碼(建立索引更新集羣的狀態信息)在Runnable#run()
中,也就是org.elasticsearch.cluster.service.TaskBatcher.BatchedTask#run()
方法中。
//BatchedTask public void run() {runIfNotProcessed(this);} void runIfNotProcessed(BatchedTask updateTask) { //任務的判斷、檢查是否重複、是否已經執行過了…… //忽略其餘無關代碼.... run(updateTask.batchingKey, toExecute, tasksSummary); } /** * Action to be implemented by the specific batching implementation * All tasks have the given batching key. */ protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
抽象run(...)具體實如今:org.elasticsearch.cluster.service.MasterService.Batcher#run
@Override protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) { ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey; List<UpdateTask> updateTasks = (List<UpdateTask>) tasks; //TaskInputs Represents a set of tasks to be processed together with their executor runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); } //最終節點狀態更新信息實現邏輯 protected void runTasks(TaskInputs taskInputs) { final ClusterState previousClusterState = state(); //改變集羣的狀態(各個分片的處理邏輯) TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS); //將變化了的狀態同步給其餘節點 if (taskOutputs.clusterStateUnchanged()) { //未檢測到集羣狀態信息變化 }else{ ClusterState newClusterState = taskOutputs.newClusterState; try { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); //Returns the DiscoveryNodes.Delta between the previous cluster state and the new cluster state. final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); } if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { String nodeSummary = nodesDelta.shortSummary(); if (nodeSummary.length() > 0) { logger.info("{}, reason: {}", summary, nodeSummary); } } //Called when the result of the ClusterStateTaskExecutor#execute(ClusterState, List) have //been processed properly by all listeners. taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState); //Callback invoked after new cluster state is published taskOutputs.clusterStatePublished(clusterChangedEvent); }
在這行代碼:TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);
輸入建立索引任務,輸出集羣狀態變化結果。
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) { ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState); //... } protected ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs,...){ List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); //ShardStartedClusterStateTaskExecutor#execute clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs); } public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks) { List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); for (StartedShardEntry task : tasks) { ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); //....省略其餘代碼 shardRoutingsToBeApplied.add(matched); } maybeUpdatedState = allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied); builder.successes(tasksToBeApplied); }
最終是在org.elasticsearch.cluster.action.shard.ShardStateAction.ShardStartedClusterStateTaskExecutor#execute
方法裏面更新各個分片的狀態,具體實現邏輯我也不是很懂。裏面涉及到:ShardRouting路由表、AllocationService。
AllocationService manages the node allocation of a cluster. For this reason the AllocationService keeps AllocationDeciders to choose nodes for shard allocation. This class also manages new nodes joining the cluster and rerouting of shards.
集羣狀態信息包括:集羣uuid、版本號、索引的配置信息及修改/刪除記錄、分片的在各個節點上的分配信息……保證各個節點上擁有一致的集羣狀態信息是很重要的,TLA+是驗證集羣狀態一致性的一種方法。
The cluster state contains important metadata about the cluster, including what the mappings look like, what settings the indices have, which shards are allocated to which nodes, etc. Inconsistencies in the cluster state can have the most horrid consequences including inconsistent search results and data loss, and the job of the cluster state coordination subsystem is to prevent any such inconsistencies.
集羣狀態信息示例:
cluster uuid: 3LZs2L1TRiCw6P2Xm6jfSQ version: 7 state uuid: bPCusHNGRrCxGcEpkA6XkQ from_diff: false meta data version: 34 [twitter/5gMqkF9oQaCdCCVXs7VrtA]: v[9] 0: p_term [2], isa_ids [QSYDJpzBRtOQjUDJIIPm7g] 1: p_term [2], isa_ids [LF3sOw51R1eS7XS_iXkkvQ] 2: p_term [2], isa_ids [gEfexQgbQRmd1qRplOjmag] 3: p_term [2], isa_ids [yZkB1nFHT22wtBnDBqGsKQ] 4: p_term [2], isa_ids [9oFMzwuwSOK1Ir-1SLxqHg] metadata customs: index-graveyard: IndexGraveyard[[[index=[twitter/KisMFiobQDSN23mjdugD0g], deleteDate=2018/12/25 02:05:54], [index=[twitter/trTv2ambSFOvKlGr_y0IKw], deleteDate=2019/01/03 03:19:44], [index=[twitter/sfWVXeklQ321QFxwLxSUPw], deleteDate=2019/01/03 09:51:45]]] ingest: org.elasticsearch.ingest.IngestMetadata@6d83dbd7 licenses: LicensesMetaData{license={"uid":"2a6f6ac2-2b3a-4e7b-a6c6-aed3e6e8edce","type":"basic","issue_date_in_millis":1545294198272,"max_nodes":1000,"issued_to":"elasticsearch","issuer":"elasticsearch","signature":"/////QAAANDadY9WjYQDyz2N6XstmWiReALKju/xLVk8VGXRfRbPPJxRbjxUMfiX9PHLz5AdfV2aFaGS6aGTtzoyKC5sOZQQbXCxzq8YTt+zbs+ld5OxOfDJ3yMVaJS5vAZuIlQQfkmMdIAnq7VolQbiADUHjKJnIZc0/Sb51YEUTtPykjPRrHF0NEKCOfhbQ4Jn5xOaweKvsTjOqxp1JJkOUOA+vvGqgxuZVxbDATEnW+6+kGP8WdkcvRpFlhwKMAKso9LzPaJ3NCO4zrZ+N9WUfA+TlRz4","start_date_in_millis":-1}, trialVersion=null} nodes: {debug_node}{JLqmOfYoTcS8IENG4pmnOA}{yhUOUQfXS7-Xzzm8_wzjoA}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=8277266432, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, local, master routing_table (version 5): -- index [[twitter/5gMqkF9oQaCdCCVXs7VrtA]] ----shard_id [twitter][0] --------[twitter][0], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=QSYDJpzBRtOQjUDJIIPm7g] --------[twitter][0], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] ----shard_id [twitter][1] --------[twitter][1], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=LF3sOw51R1eS7XS_iXkkvQ] --------[twitter][1], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] ----shard_id [twitter][2] --------[twitter][2], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=gEfexQgbQRmd1qRplOjmag] --------[twitter][2], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] ----shard_id [twitter][3] --------[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=yZkB1nFHT22wtBnDBqGsKQ] --------[twitter][3], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] ----shard_id [twitter][4] --------[twitter][4], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=9oFMzwuwSOK1Ir-1SLxqHg] --------[twitter][4], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] routing_nodes: -----node_id[JLqmOfYoTcS8IENG4pmnOA][V] --------[twitter][1], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=LF3sOw51R1eS7XS_iXkkvQ] --------[twitter][4], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=9oFMzwuwSOK1Ir-1SLxqHg] --------[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=yZkB1nFHT22wtBnDBqGsKQ] --------[twitter][2], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=gEfexQgbQRmd1qRplOjmag] --------[twitter][0], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=QSYDJpzBRtOQjUDJIIPm7g] ---- unassigned --------[twitter][1], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] --------[twitter][4], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] --------[twitter][3], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] --------[twitter][2], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] --------[twitter][0], node[null], [R], recovery_source[peer recovery], s[UNASSIGNED], unassigned_info[[reason=CLUSTER_RECOVERED], at[2019-01-22T00:50:44.164Z], delayed=false, allocation_status[no_attempt]] customs: snapshots: SnapshotsInProgress[] restore: RestoreInProgress[] snapshot_deletions: SnapshotDeletionsInProgress[] security_tokens: TokenMetaData{ everything is secret }
PUT twitter/_doc/1 { "user" : "kimchy", "post_date" : "2009-11-15T14:12:12", "message" : "trying out Elasticsearch" }
在建立索引時,寫入文檔到索引中。
整個具體流程從:TransportBulkAction#doExecute(...)
方法開始,分別兩部分:建立索引、寫入文檔。
其中,建立索引由createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener)
實現,整個具體流程以下:
//1 TransportBulkAction#doExecute(BulkRequest, ActionListener<BulkResponse>) for (String index : autoCreateIndices) { createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() //2 TransportBulkAction#createIndex void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) { createIndexAction.execute(createIndexRequest, listener); } //3 TransportAction#execute(Request, ActionListener<Response>) execute(task, request, new ActionListener<Response>() {...} //4 TransportAction#execute(Task, Request, ActionListener<Response>) requestFilterChain.proceed(task, actionName, request, listener); //5 TransportAction.RequestFilterChain#proceed(Task,String,Request,ActionListener<Response>) this.action.doExecute(task, request, listener); //6 TransportMasterNodeAction#doExecute protected void doExecute(Task task, final Request request, ActionListener<Response> listener) { new AsyncSingleAction(task, request, listener).start(); } //7 TransportMasterNodeAction.AsyncSingleAction#start public void start() {doStart(state);} //8 TransportMasterNodeAction.AsyncSingleAction#doStart threadPool.executor(executor).execute(new ActionRunnable(delegate) { @Override protected void doRun() throws Exception { masterOperation(task, request, clusterState, delegate); } }); //9 TransportMasterNodeAction#masterOperation(Task, Request,ClusterState,ActionListener<Response>) masterOperation(request, state, listener); //10 TransportCreateIndexAction#masterOperation createIndexService.createIndex(updateRequest, ActionListener.wrap(response -> listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)), listener::onFailure)); //11 MetaDataCreateIndexService.createIndex(...) //到這裏就是本文中提到 MetaDataCreateIndexService 建立索引的流程了
寫入文檔由executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)
實現,整個代碼流程以下,寫入文檔,先寫primary shard,主要在TransportShardBulkAction
類中實現。
//1 TransportBulkAction#executeBulk executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated) //2 org.elasticsearch.common.util.concurrent.AbstractRunnable#run new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); //3 TransportBulkAction.BulkOperation#doRun shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() //4 TransportAction#execute(Request, org.elasticsearch.action.ActionListener<Response>) execute(task, request, new ActionListener<Response>() { //5 TransportAction.RequestFilterChain#proceed // 到這裏步驟,就已經和上面建立索引的第4步是同樣的了,都是由TransportAction#doExecute提交任務 requestFilterChain.proceed(task, actionName, request, listener) //6 --->各類TransportXXXAction都實現了TransportAction#doExecute //建立索引:TransportBulkAction#doExecute() //轉發給 primary shard 進行寫操做:TransportReplicationAction#doExecute() this.action.doExecute(task, request, listener); //7 TransportReplicationAction#doExecute(Task, Request, ActionListener<Response>) protected void doExecute(Task task, Request request, ActionListener<Response> listener) { new ReroutePhase((ReplicationTask) task, request, listener).run(); } //8 TransportReplicationAction.ReroutePhase#doRun //獲取primary shard相關信息,示例信息以下:文檔被reroute到編號爲3的primary shard上 //[twitter][3], node[JLqmOfYoTcS8IENG4pmnOA], [P], s[STARTED], a[id=N8n0QgxBQVeHljx1RpkYMg] final ShardRouting primary = primary(state) //獲取 "primary shard在哪一個節點上?" final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); //根據節點id判斷primary shard是否在當前節點上 if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { //primary shard在本節點上 performLocalAction(state, primary, node, indexMetaData); } else { //primary shard不在本節點,須要將索引操做轉發到正確的節點上 performRemoteAction(state, primary, node); } //9 TransportReplicationAction.ReroutePhase#performLocalAction(假設primary shard 在本機節點上) //這裏有2個重要的概念(屬性): allocationId 和 primary term performAction(node, transportPrimaryAction, true, new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetaData.primaryTerm(primary.id()))); //10 TransportReplicationAction.ReroutePhase#performAction transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() { //11 TransportService#sendRequest(Transport.Connection, String, TransportRequest, TransportRequestOptions, TransportResponseHandler<T>) asyncSender.sendRequest(connection, action, request, options, handler); sendLocalRequest(requestId, action, request, options) //TransportService#sendLocalRequest final String executor = reg.getExecutor(); //到這裏最終是EsThreadPoolExecutor#execute提交任務 threadPool.executor(executor).execute(new AbstractRunnable() {
allocationId是org.elasticsearch.cluster.routing.ShardRouting類的屬性,引用書中一段話:「Allocation ID 存儲在shard級元信息中,每一個shard都有一個惟一的Allocation ID」,同時master節點在集羣級元信息中維護一個被認爲是最新shard的Allocation ID集合,這個集合稱爲in-sync allocation IDs
再看org.elasticsearch.cluster.routing.AllocationId類的註釋:
Uniquely identifies an allocation. An allocation is a shard moving from unassigned to initializing,or relocation. Relocation is a special case, where the origin shard is relocating with a relocationId and same id, and the target shard (only materialized in RoutingNodes) is initializing with the id set to the origin shard relocationId. Once relocation is done, the new allocation id is set to the relocationId. This is similar behavior to how ShardRouting#currentNodeId is used.
再看ES副本模型官方文檔:in-sync 集合裏面的shard維護着當前最新的索引文檔操做寫入的document。
Elasticsearch maintains a list of shard copies that should receive the operation. This list is called the in-sync copies and is maintained by the master node. As the name implies, these are the set of "good" shard copies that are guaranteed to have processed all of the index and delete operations that have been acknowledged to the user. The primary is responsible for maintaining this invariant and thus has to replicate all operations to each copy in this set.
在正常狀況下,primary shard確定是in-sync集合裏面的shard,它是一個"good" shard copy。當primay shard所在的機器掛了時,master節點會當即從in-sync集合中選出一個replica shard做爲 primary shard,這個replica shard升級爲primary shard的操做是很快的,畢竟in-sync集合中的shard有着最新的數據,所以,也避免了因「將某個不太新的shard升級爲primary shard而致使數據丟失的狀況」。關於這個解釋,耐心的話,可參考:ElasticSearch index 剖析
關於primary term參考:elasticsearch-sequence-ids和下面的源碼註釋:
The term of the current selected primary. This is a non-negative number incremented when a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard that can be indexed into) is larger than 0.
它爲Index操做引入了一個全局順序號。
The primary shard accepts indexing operations (indexing operations are things like "add documents" or "delete a document") first and replicates the index operations to replicas. It is therefore fairly straightforward to keep incrementing a counter and assign each operation a sequence number before it's being forwarded to the replicas.
看完了這篇介紹,知道設計一個分佈式系統下全局的惟一ID有多難。有時候,難的地方不在於如何生成一個這樣的ID,由於生成全局惟一ID只是手段。最終的目的,是須要這些的一個全局ID來幹什麼?當涉及到各類各樣的應用場景時,這樣的一個全局ID還能不能保證正確性?由於,也許引入了ID解決了當前的這個問題,可是由它引發的其餘問題,或者還沒有考慮到的其餘問題,則極有可能致使數據不正確。而具體到ElasticSearch索引操做,一篇文檔寫入primary shard後,爲了數據可靠性,還得寫入replica。ES的數據副本模型借鑑了pacifica-replication-in-log-based-distributed-storage-systems算法,也引入了TLA+規範。而引入primary term,就能區分index操做是否由舊的primary shard執行,仍是在當前新的primary shard上執行。各個shard的primary term信息由master維護而且持久化到集羣狀態中,每當shard的身份改變時(好比被提高爲primary shard 或者 被降級爲普通的shard/replica),primary term就會加1,解決在併發寫情形下可能出現的數據不一致的問題。
The first step we took was to be able to distinguish between old and new primaries. We have to have a way to identify operations that came from an older primary vs operations that come from a newer primary. On top of that, the entire cluster needs to be in agreement as to that so there's no contention in the event of a problem. This drove us to implement primary terms.
global checkpoint做用:
The global checkpoint is a sequence number for which we know that all active shards histories are aligned at least up to it.
這裏的active shards應該是 上面討論的in-sync集合列表裏面的shards。
all operations with a lower sequence number than the global checkpoint are guaranteed to have been processed by all active shards and are equal in their respective histories. This means that after a primary fails, we only need to compare operations above the last known global checkpoint between the new primary and any remaining replicas.
引入global checkpoint以後,當前的primary shard由於故障宕機後,變成了舊的primary shard,master從in-sync集合列表中選出一個replica做爲新的primary shard,Client發起的index操做可繼續請求給新的primary shard(這也是爲何index操做默認有1分鐘的超時的緣由,只要在這1分鐘裏面順利地選出新primary shard,就不會影響Client的index操做)。當舊的primary shard恢復過來後,對比舊primary shard的global checkpoint和新的primary shard的global checkpoint,進行數據同步。
Advancing the global checkpoint is the responsibility of the primary shard.It does so by keeping track of completed operations on the replicas.Once it detects that all replicas have advanced beyond a given sequence number, it will update the global checkpoint accordingly.
global checkpoint的更新由 primary shard推動完成。至於primary shard如何更新global checkpoint的值,可參考上面提到的pacifica-replication-in-log-based-distributed-storage-systems這篇論文。另外elasticsearch-sequence-ids-6-0動畫圖演示了 checkpoint 是如何更新的。
在這個動畫演示中,有一個步驟:當Client發起第四、五、6篇文檔的index請求操做時,當前primary shard將第五、6篇文檔複製給了中間那個shard,將第四、6篇文檔複製給了最右邊那個shard,而後primary shard掛了。此時,master選擇了中間那個shard做爲 new primary shard,那new primary shard上的第4篇文檔是如何來的呢?
個人理解是:仍是前面提到的,Client 發起的Index操做默認有1分鐘的超時,若是Client未收到索引成功Ack,ElasticSearch High Level Restful JAVA api 應該要從新發起請求。而後新的請求,就會被路由到中間那個shard,也即new primary shard,從而new primary shard有了第4篇文檔。
這個issue中描述了Index操做的基本步驟(index flow)
public void onRejection(Exception e)
,當線程池提交任務被拒絕時(線程池的任務拒絕策略)就會調用該方法;裏面還定義了抽象方法public abstract void onFailure(Exception e);
,當線程執行過程當中出現了異常時,就會調用該方法。(This method is invoked for all exception thrown by doRun())@Override public final void run() { try { doRun(); } catch (Exception t) { onFailure(t); } finally { onAfter(); } }
關於ElasticSearch Index操做的流程,參考ElasticSearch 索引 剖析
ElasticSearch源碼閱讀相關文章:
原文:https://www.cnblogs.com/hapjin/p/10219219.html