ES6.3.2 副本失敗處理

ES6.3.2 副本失敗處理

副本的失敗處理對理解ES的數據副本模型頗有幫助。在ES6.3.2 index操做源碼流程的總結中提到:ES的寫操做會先寫主分片,而後主分片再將操做同步到副本分片。本文給出ES中的源碼片段,分析副本執行操做失敗時,ES是如何處理的。html

副本執行源碼:replicasProxy.performOn實現了副本操做,執行正常結束回調onResponse(),異常回調onFailure()java

replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
            @Override
            public void onResponse(ReplicaResponse response) {
                successfulShards.incrementAndGet();
                try {
                    primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());//執行成功回調更新檢查點
                    primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                } catch (final AlreadyClosedException e) {
                    // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                } catch (final Exception e) {
                    // fail the primary but fall through and let the rest of operation processing complete
                    final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                    primary.failShard(message, e);
                }
                decPendingAndFinishIfNeeded();//無論是正常的onResponse仍是異常的onFailure,都會調用這個方法,表明已經完成了一個操做,pendingActions減1
            }

            @Override
            public void onFailure(Exception replicaException) {
                logger.trace(() -> new ParameterizedMessage(
                    "[{}] failure while performing [{}] on replica {}, request [{}]",
                    shard.shardId(), opType, shard, replicaRequest), replicaException);
                // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                }
                String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                //---> failShardIfNeeded 具體執行何種操做要看 replicasProxy的真正實現類:若是是WriteActionReplicasProxy則會報告shard錯誤
                replicasProxy.failShardIfNeeded(shard, message,
                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
            }
        });
    }

執行正常結束回調onResponse()

successfulShards.incrementAndGet();,在返回的結果裏面,_shards 字段裏面就能看到 successful 數值。node

更新 local checkpoint 和 global checkpoint:若是檢查點更新失敗,觸發:replica shard engine 關閉。git

/**
     * Fails the shard and marks the shard store as corrupted if
     * <code>e</code> is caused by index corruption
     *
     * org.elasticsearch.index.shard.IndexShard#failShard
     */
    public void failShard(String reason, @Nullable Exception e) {
        // fail the engine. This will cause this shard to also be removed from the node's index service.
        getEngine().failEngine(reason, e);
    }
fail engine due to some error. the engine will also be closed.
The underlying store is marked corrupted iff failure is caused by index corruption

關於檢查點,可參考這篇文章:elasticsearch-sequence-ids-6-0es6

異常結束回調 onFailure()

replicasProxy.failShardIfNeeded(shard, message,
                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());

failShardIfNeeded 能夠作2件事情,具體是如何執行得看failShardIfNeeded的實現類。github

  1. onPrimaryDemotedelasticsearch

    通知master primary stale(過期)了。index操做首先在primary shard執行成功了,而後同步給replica,可是replica發現此primary shard 的 primary term 比它知道的該索引的primary term 還小,因而replica就認爲此primary shard是一個已通過時了的primary shard,所以就回調onFailure()拒絕執行,並執行onPrimaryDemoted通知master節點。ide

    private void onPrimaryDemoted(Exception demotionFailure) {
            String primaryFail = String.format(Locale.ROOT,
                "primary shard [%s] was demoted while failing replica shard",
                primary.routingEntry());
            // we are no longer the primary, fail ourselves and start over
            primary.failShard(primaryFail, demotionFailure);
            finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
        }

  2. decPendingAndFinishIfNeeded源碼分析

    計數。一個請求會由ReplicationGroup中的 多個分片執行,這些分片是否都已經執行完成了?就由pendingActions計數。無論是執行正常結束onResponse仍是異常結束onFailure都會調用這個方法。this

    private void decPendingAndFinishIfNeeded() {
            assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
            if (pendingActions.decrementAndGet() == 0) {//當全部的shard都處理完這個請求,client收到ACK(裏面容許一些replica執行失敗), 或者是收到一個請求超時的響應
                finish();
            }
        }

    對於發起index操做的Client而言,該 index 操做會由primary shard 執行,也會由若干個replica執行。所以,pendingActions統計到底有多少個分片(既包括主分片也包括副本分片)執行完成(在某些副本分片上執行失敗也算執行完成)了。正是因爲無論是 onResponse() 仍是 onFailure(),都會執行decPendingAndFinishIfNeeded()方法,每執行一次,意味着有一個分片返回了響應,這時if (pendingActions.decrementAndGet() == 0)就減1,直到減爲0時,調用finish()方法給Client返回ACK響應。

private void finish() {
        if (finished.compareAndSet(false, true)) {
            final ReplicationResponse.ShardInfo.Failure[] failuresArray;
            if (shardReplicaFailures.isEmpty()) {
                failuresArray = ReplicationResponse.EMPTY;
            } else {
                failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
                shardReplicaFailures.toArray(failuresArray);
            }
            primaryResult.setShardInfo(new ReplicationResponse.ShardInfo(
                    totalShards.get(),
                    successfulShards.get(),
                    failuresArray
                )
            );
            resultListener.onResponse(primaryResult);
        }
    }

Client要麼收到一個執行成功的ACK(默認狀況下,只要primary shard執行成功,若存在 replica執行失敗,Client也會收到一個執行成功的ACK,只不過 返回的ACK裏面 _shards參數下的 failed 不爲0而已),以下:

{
"_index": "user",
"_type": "profile",
"_id": "10",
"_version": 1,
"result": "created",
"_shards": {
​ "total": 3,
​ "successful": 1,
​ "failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

另外,ES6.3.2 index操做源碼流程 的總結部分,詳細解釋了Client收到執行成功的ACK的緣由。

要麼收到一個超時ACK,以下:(這篇文章提到了如何產生一個超時的ACK)

{
"statusCode": 504,
"error": "Gateway Time-out",
"message": "Client request timeout"
}

failShardIfNeeded方法一共有2個具體實現,看類圖:

TransportReplicationAction.ReplicasProxy#failShardIfNeeded (默認實現)

@Override
        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
            // This does not need to fail the shard. The idea is that this
            // is a non-write operation (something like a refresh or a global
            // checkpoint sync) and therefore the replica should still be
            // "alive" if it were to fail.
            onSuccess.run();
        }

TransportResyncReplicationAction.ResyncActionReplicasProxy#failShardIfNeeded(副本resync操做的實現)

/**
     * A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted.
     * Replica shards fail to execute resync operations will be failed but won't be marked as stale.
     * This avoids marking shards as stale during cluster restart but enforces primary-replica resync mandatory.
     */
    class ResyncActionReplicasProxy extends ReplicasProxy {
        @Override
        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
                                      Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
                createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
        }
    }

TransportWriteAction.WriteActionReplicasProxy#failShardIfNeeded(index 寫操做的實現)

/**
 * A proxy for <b>write</b> operations that need to be performed on the
 * replicas, where a failure to execute the operation should fail
 * the replica shard and/or mark the replica as stale.
 *
 * This extends {@code TransportReplicationAction.ReplicasProxy} to do the
 * failing and stale-ing.
 */
class WriteActionReplicasProxy extends ReplicasProxy {

    @Override
    public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
        if (TransportActions.isShardNotAvailableException(exception) == false) {
            logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);}
        shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
            createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
    }

總結

從上面代碼中可看出:副本resync操做、副本上 index 寫操做失敗都會致使 調用 onPrimaryDemoted() 方法,通知master節點判斷當前primary shard 是否已通過時(stale)。這能夠說是:replica 檢驗 primary shard是否stale的方式。
另外,primary shard 和 各個replica之間也會經過 租約機制 進行故障檢測,以判斷對方是否stale,不過這不是本文要討論的內容了。

參考文章:

原文:https://www.cnblogs.com/hapjin/p/10585555.html

相關文章
相關標籤/搜索