聊聊storm的CheckpointSpout

本文主要研究一下storm的CheckpointSpouthtml

TopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.javajava

public StormTopology createTopology() {
        Map<String, Bolt> boltSpecs = new HashMap<>();
        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                        "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " +
                        "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " +
                        "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(_workerHooks);

        if (!_componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(_componentToSharedMemory);
            stormTopology.set_shared_memory(_sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

    /**
     * If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology.
     */
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }

    private void maybeAddCheckpointInputs(ComponentCommon common) {
        if (hasStatefulBolt) {
            addCheckPointInputs(common);
        }
    }

    /**
     * If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the
     * checkpoint tuples can flow through the topology.
     */
    private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
        if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
            bolt = new CheckpointTupleForwarder(bolt);
        }
        return bolt;
    }

    /**
     * For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other
     * bolts, add checkpoint stream from the previous bolt to its input.
     */
    private void addCheckPointInputs(ComponentCommon component) {
        Set<GlobalStreamId> checkPointInputs = new HashSet<>();
        for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
            String sourceId = inputStream.get_componentId();
            if (_spouts.containsKey(sourceId)) {
                checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
            } else {
                checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
            }
        }
        for (GlobalStreamId streamId : checkPointInputs) {
            component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
        }
    }
  • TopologyBuilder在createTopology的時候,會調用maybeAddCheckpointSpout,若是是hasStatefulBolt的話,則會自動建立並添加CheckpointSpout
  • 若是是hasStatefulBolt,bolt不是StatefulBoltExecutor類型,則會使用CheckpointTupleForwarder進行包裝
  • 若是是hasStatefulBolt,會調用addCheckPointInputs,配置inputs

CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.javaapache

/**
 * Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology.
 * If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per
 * topology. Checkpoint spout stores its internal state in a {@link KeyValueState}.
 *
 * @see CheckPointState
 */
public class CheckpointSpout extends BaseRichSpout {
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context));
    }

    // package access for unit test
    void open(TopologyContext context, SpoutOutputCollector collector,
              int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) {
        this.context = context;
        this.collector = collector;
        this.checkpointInterval = checkpointInterval;
        this.sleepInterval = checkpointInterval / 10;
        this.checkpointState = checkpointState;
        this.curTxState = checkpointState.get(TX_STATE_KEY);
        lastCheckpointTs = 0;
        recoveryStepInProgress = false;
        checkpointStepInProgress = false;
        recovering = true;
    }

    @Override
    public void nextTuple() {
        if (shouldRecover()) {
            handleRecovery();
            startProgress();
        } else if (shouldCheckpoint()) {
            doCheckpoint();
            startProgress();
        } else {
            Utils.sleep(sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
        if (curTxState.getTxid() == ((Number) msgId).longValue()) {
            if (recovering) {
                handleRecoveryAck();
            } else {
                handleCheckpointAck();
            }
        } else {
            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
        }
        resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.debug("Got fail with msgid {}", msgId);
        if (!recovering) {
            LOG.debug("Checkpoint failed, will trigger recovery");
            recovering = true;
        }
        resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
            interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
        }
        // ensure checkpoint interval is not less than a sane low value.
        interval = Math.max(100, interval);
        LOG.info("Checkpoint interval is {} millis", interval);
        return interval;
    }

    private boolean shouldCheckpoint() {
        return !recovering && !checkpointStepInProgress &&
               (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
    }

    private void doCheckpoint() {
        LOG.debug("In checkpoint");
        if (curTxState.getState() == COMMITTED) {
            saveTxState(curTxState.nextState(false));
            lastCheckpointTs = System.currentTimeMillis();
        }
        Action action = curTxState.nextAction(false);
        emit(curTxState.getTxid(), action);
    }

    private void emit(long txid, Action action) {
        LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
        collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
    }

    //......
}
  • CheckpointSpout從Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms)讀取checkpoint的時間間隔,defaults.yaml中默認是1000,若是沒有指定,則使用100,最低值爲100
  • nextTuple方法首先判斷shouldRecover,若是須要恢復則調用handleRecovery進行恢復,而後startProgress;若是須要checkpoint則進行checkpoint,不然sleepInterval再進行下次判斷
  • 若是不須要recover,則調用shouldCheckpoint方法判斷是否須要進行checkpoint,若是當前狀態不是COMMITTED或者當前時間距離上次checkpoint的時間超過了checkpointInterval,則進行doCheckpoint操做,往CHECKPOINT_STREAM_ID發送下一步的action
  • CheckpointSpout在收到ack以後會進行saveTxState操做,調用checkpointState.commit提交整個checkpoint,而後調用resetProgress重置狀態
  • 若是是fail的ack,則調用resetProgress重置狀態

CheckPointState

storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.javaapp

/**
     * Get the next state based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next checkpoint state based on this state.
     */
    public CheckPointState nextState(boolean recovering) {
        CheckPointState nextState;
        switch (state) {
            case PREPARING:
                nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
                break;
            case COMMITTING:
                nextState = new CheckPointState(txid, COMMITTED);
                break;
            case COMMITTED:
                nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return nextState;
    }

    /**
     * Get the next action to perform based on this checkpoint state.
     *
     * @param recovering if in recovering phase
     * @return the next action to perform based on this state
     */
    public Action nextAction(boolean recovering) {
        Action action;
        switch (state) {
            case PREPARING:
                action = recovering ? Action.ROLLBACK : Action.PREPARE;
                break;
            case COMMITTING:
                action = Action.COMMIT;
                break;
            case COMMITTED:
                action = recovering ? Action.INITSTATE : Action.PREPARE;
                break;
            default:
                throw new IllegalStateException("Unknown state " + state);
        }
        return action;
    }
  • CheckPointState提供了nextState方法進行狀態的切換,nextAction方法則提供了對應state的的下個動做

BaseStatefulBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.javaless

public void execute(Tuple input) {
        if (CheckpointSpout.isCheckpoint(input)) {
            processCheckpoint(input);
        } else {
            handleTuple(input);
        }
    }

    /**
     * Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component.
     */
    private void processCheckpoint(Tuple input) {
        CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
        if (shouldProcessTransaction(action, txid)) {
            LOG.debug("Processing action {}, txid {}", action, txid);
            try {
                if (txid >= lastTxid) {
                    handleCheckpoint(input, action, txid);
                    if (action == ROLLBACK) {
                        lastTxid = txid - 1;
                    } else {
                        lastTxid = txid;
                    }
                } else {
                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
                    collector.ack(input);
                }
            } catch (Throwable th) {
                LOG.error("Got error while processing checkpoint tuple", th);
                collector.fail(input);
                collector.reportError(th);
            }
        } else {
            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
                      "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
            collector.ack(input);
        }
    }

    /**
     * Checks if check points have been received from all tasks across all input streams to this component
     */
    private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
        TransactionRequest request = new TransactionRequest(action, txid);
        Integer count;
        if ((count = transactionRequestCount.get(request)) == null) {
            transactionRequestCount.put(request, 1);
            count = 1;
        } else {
            transactionRequestCount.put(request, ++count);
        }
        if (count == checkPointInputTaskCount) {
            transactionRequestCount.remove(request);
            return true;
        }
        return false;
    }

    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }
  • BaseStatefulBoltExecutor的execute方法首先經過CheckpointSpout.isCheckpoint(input)判斷是不是CheckpointSpout發來的tuple,若是是則執行processCheckpoint
  • processCheckpoint首先調用shouldProcessTransaction判斷全部輸入流的task是否都有給它發送checkpint tuple來決定是否往下處理
  • 若是txid大於lastTxid,則調用handleCheckpoint方法,該方法由子類實現

StatefulBoltExecutor.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.javajvm

public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
    //......

    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
        if (action == PREPARE) {
            if (boltInitialized) {
                bolt.prePrepare(txid);
                state.prepareCommit(txid);
                preparedTuples.addAll(collector.ackedTuples());
            } else {
                /*
                 * May be the task restarted in the middle and the state needs be initialized.
                 * Fail fast and trigger recovery.
                 */
                LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized.");
                collector.fail(checkpointTuple);
                return;
            }
        } else if (action == COMMIT) {
            bolt.preCommit(txid);
            state.commit(txid);
            ack(preparedTuples);
        } else if (action == ROLLBACK) {
            bolt.preRollback();
            state.rollback();
            fail(preparedTuples);
            fail(collector.ackedTuples());
        } else if (action == INITSTATE) {
            if (!boltInitialized) {
                bolt.initState((T) state);
                boltInitialized = true;
                LOG.debug("{} pending tuples to process", pendingTuples.size());
                for (Tuple tuple : pendingTuples) {
                    doExecute(tuple);
                }
                pendingTuples.clear();
            } else {
                /*
                 * If a worker crashes, the states of all workers are rolled back and an initState message is sent across
                 * the topology so that crashed workers can initialize their state.
                 * The bolts that have their state already initialized need not be re-initialized.
                 */
                LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}",
                          checkpointTuple, action, txid);
            }
        }
        collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.delegate.ack(checkpointTuple);
    }

    //......
}
  • StatefulBoltExecutor繼承了BaseStatefulBoltExecutor,實現了handleCheckpoint方法
  • 該方法根據不一樣的action進行相應的處理,PREPARE的話,調用bolt的prePrepare,對state調用prepareCommit;COMMIT的話則調用bolt的preCommit,對state調用commit;ROLLBACK的話,調用bolt的preRollback,對state調用rollback;對於INITSTATE,若是bolt未初始化,則調用bolt的initState
  • 根據action執行完以後,繼續流轉checkpoint tuple,而後調用collector.delegate.ack(checkpointTuple)進行ack

CheckpointTupleForwarder.handleCheckpoint

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.javaide

/**
 * Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology.
 * <p>
 * When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder}
 * so that the checkpoint tuples can flow through the entire topology DAG.
 * </p>
 */
public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
    //......
    /**
     * Forwards the checkpoint tuple downstream.
     *
     * @param checkpointTuple the checkpoint tuple
     * @param action          the action (prepare, commit, rollback or initstate)
     * @param txid            the transaction id.
     */
    protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
        collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
        collector.ack(checkpointTuple);
    }

    //......
}
  • CheckpointTupleForwarder用於包裝non-stateful bolts,使得checkpoint tuples得以在整個topology DAG中順利流轉

小結

  • 若是topology有IStatefulBolt的話(IStatefulBolt爲bolt提供了存取state的抽象,經過checkpiont機制持久化state並利用ack機制提供at-least once語義),TopologyBuilder會自動添加CheckpointSpout,對於bolt不是StatefulBoltExecutor類型,則會使用CheckpointTupleForwarder進行包裝,這樣使得checkpint tuple貫穿整個topology的DAG
  • CheckpointSpout在nextTuple方法先判斷是否須要recover,在判斷是否須要進行checkpoint,都不是的話則sleep一段時間,sleepInterval爲checkpointInterval/10,而checkpointInterval最小爲100,從Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL配置讀取,默認是1000;注意該值並非意味着每隔checkpointInterval就進行checkpoint檢測,也就是說不是fixedRate效果而是fixedDelay的效果,即若是當前checkpoint尚未結束,是不會再重複進行checkpoint檢測的
  • recover及checkpoint都會往CHECKPOINT_STREAM_ID發送tuple;BaseStatefulBoltExecutor則在execute方法封裝了對checkpoint tuple的處理,非checkpint tuple則經過抽象方法handleTuple由子類去實現;具體的handleCheckpoint方法由子類實現,BaseStatefulBoltExecutor只是對其進行前提判斷,要求收到全部輸入流的task發來的checkpoint tuple,且txid >= lastTxid才能夠執行handleCheckpoint操做
  • StatefulBoltExecutor繼承了BaseStatefulBoltExecutor,實現了handleCheckpoint方法,對PREPARE、COMMIT、ROLLBACK、INITSTATE這幾個action(相似three phase commit protocol)進行相應處理,而後繼續流轉checkpoint tuple,並進行ack
  • CheckpointSpout在發送checkpoint tuple的時候,使用txid做爲msgId來發送可靠的tuple,在全部checkpoint tuple在整個topology的DAG都被ack以後,會收到ack,而後調用checkpointState.commit提交整個checkpoint;若是是fail的話則重置相關狀態;通常狀況下Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL(topology.state.checkpoint.interval.ms,默認1000,即1秒)值小於Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,默認30秒);若是checkpointInterval設置得太大,中間假設worker crash了恢復後的state就不太實時,這樣就失去了checkpoint的意義了。

doc

相關文章
相關標籤/搜索