client 發送請求html
TransportBulkAction#doExecute(Task,BulkRequest,listener)java
TransportBulkAction#doRun()node
獲取集羣的狀態信息git
/** sets the last observed state to the currently applied cluster state and returns it */ public ClusterState setAndGetObservedState() { if (observingContext.get() != null) { throw new ElasticsearchException("cannot set current cluster state while waiting for a cluster state change"); } ClusterState clusterState = clusterApplierService.state(); lastObservedState.set(new StoredState(clusterState)); return clusterState; }
cluster uuid: 5yBoKgbYQ1ibdZ5WG7bRAA version: 7 state uuid: QVCOkCv_Q_mBGzjwTVDNJw from_diff: true meta data version: 5 [test/t-tC0rHESDqNm5SQFO7kPQ]: v[4] 0: p_term [1], isa_ids [UDR6UFa0Sa27ul74kRpyTQ] 1: p_term [1], isa_ids [VeuqdSp8R3ub2_a1a9zHJg] 2: p_term [1], isa_ids [0q3mCMLaSFWgOG5eQJ-EXQ] 3: p_term [1], isa_ids [maBX8A3sRRK8FPG3VzmfKA] metadata customs: index-graveyard: IndexGraveyard[[]] nodes: {node_sm0}{Xs6SXo4kRj6ylKwLE1dgkA}{bLOl8jv2SGWXt1hk7b_V7g}{127.0.0.1}{127.0.0.1:42641}, master {node_sd3}{H4rct3ZxRvKJG2dnF0oFtg}{OwRuFVwkTLufu5LBzvFa0w}{127.0.0.1}{127.0.0.1:33747} {node_sm2}{dUEAma7HQJG4eRFx18dRnA}{WOf3n9RoSSCEOkXa9fgWPQ}{127.0.0.1}{127.0.0.1:36963} {node_sm1}{kSSol9RjSwyfueUowUdHnQ}{HAgo4XEHS5qWRAokNtzFow}{127.0.0.1}{127.0.0.1:34537}, local routing_table (version 4): -- index [[test/t-tC0rHESDqNm5SQFO7kPQ]] ----shard_id [test][0] --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ] ----shard_id [test][1] --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg] ----shard_id [test][2] --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ] ----shard_id [test][3] --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA] routing_nodes: -----node_id[H4rct3ZxRvKJG2dnF0oFtg][V] --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA] --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ] --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg] --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ] ---- unassigned customs: snapshots: SnapshotsInProgress[] snapshot_deletions: SnapshotDeletionsInProgress[] restore: RestoreInProgress[]
解析路由信息github
/* resolve the routing if needed */ public void resolveRouting(MetaData metaData) { routing(metaData.resolveIndexRouting(parent, routing, index)); }
routing_table (version 4): -- index [[test/t-tC0rHESDqNm5SQFO7kPQ]] ----shard_id [test][0] --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ] ----shard_id [test][1] --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg] ----shard_id [test][2] --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ] ----shard_id [test][3] --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA]
請求中是否有docId?若沒有doc id,則自動生成。網絡
// generate id if not already provided if (id == null) { assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!"; autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia String uid; if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) { uid = UUIDs.base64UUID(); } else { uid = UUIDs.legacyBase64UUID(); } id(uid); }
批量請求分組。計算出請求將要發往哪些shard,路由到相同shard上的請求做爲一組。app
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); shardRequests.add(new BulkItemRequest(i, request));
向各個分片提交請求,回調中檢查提交了請求的那些分片 是否 都成功響應了?異步
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); } }
TransportReplicatonAction#messageReceived(ConcreteShardRequest)elasticsearch
接收到請求信息。這裏是primary shard所在的節點接收到 index 請求開始時的入口點。Ingest node 在發送 index 請求時,首先根據路由信息和 docid 計算出該請求發往哪一個shard,而後從cluster state 中獲取 allocationId(allocationId惟一標識了一個shard)。ide
request: BulkShardRequest [[test][0]] containing [index {[test][type][bogus_doc_ݑݜݢݧݧݯݼa1], source[{}]}], target allocation id: UDR6UFa0Sa27ul74kRpyTQ, primary term: 1
建立異步的primary操做任務AsyncPrimaryAction
@Override public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) { new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run(); }
TransportReplicationAction.AsyncPrimaryAction#doRun
異步primary操做任務執行。在IndexShard上的操做須要得到 permits
protected void doRun() throws Exception { acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request); }
放一張IndexShard對象的狀態,感覺一下:
TransportReplicatonAction#acquirePrimaryShardReference()
獲取IndexShard對象,IndexShard是個很重要的類,裏面封裝了不少shard操做
建立得到 permits 的監聽器,成功得到 permits 後回調 onResponse()
ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() { @Override public void onResponse(Releasable releasable) { //--->TransportReplicationAction.AsyncPrimaryAction.onResponse onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable)); }
開始獲取 permit
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
IndexShardOperationPermits#acquire(listener,executor...)
獲取 permit
synchronized (this) { if (delayed) { final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false); final ActionListener<Releasable> wrappedListener; if (executorOnDelay != null) { wrappedListener = new PermitAwareThreadedActionListener(threadPool, executorOnDelay, new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution); } else { wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); } delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace)); return; } else { releasable = acquire(debugInfo, stackTrace); } }
獲取成功後,回調onAcquired.onResponse(releasable);
TransportReplicationAction.AsyncPrimaryAction#onResponse
判斷是否 relocated,若是relocated則從新轉發請求,不然建立操做對象
if (primaryShardReference.isRelocated()) { final ShardRouting primary = primaryShardReference.routingEntry(); DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction,....); }else{ setPhase(replicationTask, "primary"); final ActionListener<Response> listener = createResponseListener(primaryShardReference); createReplicatedOperation(request, ActionListener.wrap(result -> result.respond(listener), listener::onFailure),primaryShardReference) .execute();//開始真正執行副本操做 }
索引請求
BulkShardRequest [[test][1]] containing [index {[test][type][w3KQpWkBhFoYx7tjRcg3], source[{"field":"value_0"}]}]
ReplicationOperation#execute
檢查 active shards數量是否符合要求,若是 active shards 數量小於wait_for_active_shards
則拒絕執行。ReplicationGroup 裏面有三種類型的分片集合:inSyncAllocationIds(同步副本集合,當前活躍的分片)、trackedAllcationIds、unavailableInSyncShards(stale replica)。
/** * Checks whether we can perform a write based on the required active shard count setting. * Returns **null* if OK to proceed, or a string describing the reason to stop */ protected String checkActiveShardCount() { final ShardId shardId = primary.routingEntry().shardId(); final ActiveShardCount waitForActiveShards = request.waitForActiveShards(); if (waitForActiveShards == ActiveShardCount.NONE) { return null; // not waiting for any shards } final IndexShardRoutingTable shardRoutingTable = primary.getReplicationGroup().getRoutingTable(); if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) { return null; } else { final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL ? Integer.toString(shardRoutingTable.shards().size()) : waitForActiveShards.toString(); logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " + "request [{}]", shardId, waitForActiveShards, shardRoutingTable.activeShards().size(), resolvedShards, opType, request); return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " + shardRoutingTable.activeShards().size() + ", needed " + resolvedShards + ")."; } }
獲取主分片信息,在主分片上執行 primary 請求
final ShardRouting primaryRouting = primary.routingEntry(); final ShardId primaryId = primaryRouting.shardId(); totalShards.incrementAndGet(); pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination primaryResult = primary.perform(request);//---> primary 操做的執行
primary shard 更新 local checkpoint
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
TransportReplicationAction.PrimaryShardReference#perform
主分片上請求執行成功,纔會去建立副本分片請求result.replicaRequest()
@Override public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request, indexShard); assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]"; return result; }
TransportShardBulkAction#shardOperationOnPrimary
indexmetadata 和 translog,批量執行executeBulkItemRequest
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); Translog.Location location = null; for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) { location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater); } }
IndexShard#applyIndexOperationOnPrimary
Secequence Number生成
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
IndexShard#applyIndexOperation
驗證 primary shard的 primary term 是不是最新的(防止已通過時的primary shard 還在執行操做致使髒數據)
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; assert versionType.validateVersionForWrites(version);
生成底層Index操做:primary term、source源文本……
operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry);
org.elasticsearch.index.shard.IndexShard#prepareIndex
Lucene Engine執行
return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
因爲debug斷點跟蹤的時候,線程掛起時間太長,會致使底層transport 關閉。因此文檔寫入主分片後,同步到副本的過程,是第二個請求的debug,真實情形下是一個請求,但不影響index操做執行的整個流程。
接下來,在primary shard上執行成功後,從新返回到第9步的:ReplicationOperation#execute
ReplicationOperation#performOnReplicas(ReplicaRequest,globalCheckpoint,ReplicationGroup)
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { if (shard.isSameAllocation(primaryRouting) == false) { performOnReplica(shard, replicaRequest, globalCheckpoint); } }
當前的ShardRouting信息
[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg]
當前的副本請求信息
BulkShardRequest [[test][1]] containing [index {[test][type][w3KQpWkBhFoYx7tjRcg3], source[{"field":"value_0"}]}]
ReplicationOperation#performOnReplica(ShardRouting,ReplicaRequest,globalCheckpoint,)
操做在副本上執行成功後,在回調中更新local checkpoint 和 global checkpoint
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() { @Override public void onResponse(ReplicaResponse response) { successfulShards.incrementAndGet(); try { primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());//執行成功回調更新檢查點 primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint()); } catch (final AlreadyClosedException e) { // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally } catch (final Exception e) { // fail the primary but fall through and let the rest of operation processing complete final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard); primary.failShard(message, e); } decPendingAndFinishIfNeeded(); }
TransportReplicationAction.ReplicasProxy#performOn
副本操做是在一個代理類TransportReplicationAction.ReplicasProxy
上執行的
建立 ConcreteReplicaRequest 對象,裏面有 global checkpoint 這樣副本就能更新到最新的全局檢查點、有primary term 這樣副本就能判斷當前的primary shard是不是最新的(副本會拒絕那些已經被 master 節點標記爲stale的主分片,好比由於網絡故障primary shard未意識到它本身已通過時了)、有allocationId 這樣就能找到目標副本shard。
sendReplicaRequest
將它轉發到各個副本所在的節點上去執行。
String nodeId = replica.currentNodeId(); final DiscoveryNode node = clusterService.state().nodes().get(nodeId); final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint); sendReplicaRequest(replicaRequest, node, listener);
TransportReplicationAction#sendReplicaRequest(ConcreteReplicaRequest,DiscoveryNode,ActionListener)
完。
這篇文章詳細記錄了ES寫操做的流程。是從測試方法org.elasticsearch.indexing.IndexActionIT#testAutoGenerateIdNoDuplicates
開始調試的。從github上git clone下來ES的源碼,gradle 編譯成 IDEA 工程後,有若干測試目錄,其中不少測試類可很好的模擬ES集羣的功能,從這些測試方法入手,提升閱讀源碼效率。
因爲ES是先將index操做在primary shard執行成功後,再「同步」到各個replica,各個replica 將同步的結果返回給primary shard,而後 primary shard再給Client返回ACK。顯然,primary shard執行失敗了,那這個 index 操做確定執行失敗了,返回給Client的ACK那應該是失敗的。若是 index 操做在 primary shard 上執行成功了,在primary shard將 index 操做同步給各個replica時,在有些replica上執行失敗,那麼 primary shard 最終返回給Client的ACK 是成功的。在默認狀況下,只要 primary shard 是活躍的,即只要 primary shard 成功執行了 index 操做,就算該 index 操做同步到全部的replica上都失敗了時,也會給Client返回一個成功的確認。只不過,在返回的響應中,有一個_shards 的參數,其中的 total 標識了一共須要在多少個分片上執行、successful 標識了執行成功的分片有多少個,這樣Client也能知道 一共有多少個分片(primary和相應的replica)成功執行了 index 操做。
爲了保證數據的高可靠性,ES中有個配置參數 wait_for_active_shards,默認爲1,也即前面提到的只要 primary shard 是活躍的,就能夠執行 index 操做。這個參數在上面的第9步操做流程中起做用。在第9步ReplicationOperation#execute
方法執行時,首先檢查當前的 ReplicationGroup 中的活躍分片是否大於等於wait_for_active_shards,只有大於等於纔會繼續執行後續的 index 操做。若是將 wait_for_active_shards 設置爲2,那麼當整個ES集羣中只有 primary shard 可用時,index 操做是不能執行的,Client最終會收到一個 Client request timeout 的響應,由於還須要一個活躍的replica才知足 index 操做要求,這樣就避免了 只有 primary shard 一個分片接收數據的狀況(試想,若是primary shard 所在的節點宕機了會怎麼樣?)
原文:https://www.cnblogs.com/hapjin/p/10577427.html