本文主要研究一下flink的ListCheckpointedhtml
public static class CounterSource extends RichParallelSourceFunction<Long> implements ListCheckpointed<Long> { /** current offset for exactly once semantics */ private Long offset; /** flag for job cancellation */ private volatile boolean isRunning = true; @Override public void run(SourceContext<Long> ctx) { final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); offset += 1; } } } @Override public void cancel() { isRunning = false; } @Override public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { return Collections.singletonList(offset); } @Override public void restoreState(List<Long> state) { for (Long s : state) offset = s; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.javajava
@PublicEvolving public interface ListCheckpointed<T extends Serializable> { /** * Gets the current state of the function. The state must reflect the result of all prior * invocations to this function. * * <p>The returned list should contain one entry for redistributable unit of state. See * the {@link ListCheckpointed class docs} for an illustration how list-style state * redistribution works. * * <p>As special case, the returned list may be null or empty (if the operator has no state) * or it may contain a single element (if the operator state is indivisible). * * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value. * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master. * * @return The operator state in a list of redistributable, atomic sub-states. * Should not return null, but empty list instead. * * @throws Exception Thrown if the creation of the state object failed. This causes the * checkpoint to fail. The system may decide to fail the operation (and trigger * recovery), or to discard this checkpoint attempt and to continue running * and to try again with the next checkpoint attempt. */ List<T> snapshotState(long checkpointId, long timestamp) throws Exception; /** * Restores the state of the function or operator to that of a previous checkpoint. * This method is invoked when the function is executed after a failure recovery. * The state list may be empty if no state is to be recovered by the particular parallel instance * of the function. * * <p>The given state list will contain all the <i>sub states</i> that this parallel * instance of the function needs to handle. Refer to the {@link ListCheckpointed class docs} * for an illustration how list-style state redistribution works. * * <p><b>Important:</b> When implementing this interface together with {@link RichFunction}, * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}. * * @param state The state to be restored as a list of atomic sub-states. * * @throws Exception Throwing an exception in this method causes the recovery to fail. * The exact consequence depends on the configured failure handling strategy, * but typically the system will re-attempt the recovery, or try recovering * from a different checkpoint. */ void restoreState(List<T> state) throws Exception; }
List結構
)