本文主要研究下flink的checkpoint配置html
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure. env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
單位milliseconds
),而CheckpointingMode默認是CheckpointingMode.EXACTLY_ONCE,也能夠指定爲CheckpointingMode.AT_LEAST_ONCE大概幾毫秒
)能夠使用CheckpointingMode.AT_LEAST_ONCE,其餘大部分應用使用CheckpointingMode.EXACTLY_ONCE就能夠單位milliseconds
),超時沒完成就會被abort掉大於1的值不起做用
)#============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false
默認爲true
),有些不支持async或者只支持async的state backend可能會忽略這個參數用於指定checkpoint執行的超時時間,單位milliseconds
),minPauseBetweenCheckpoints(用於指定checkpoint coordinator上一個checkpoint完成以後最小等多久能夠出發另外一個checkpoint
),maxConcurrentCheckpoints(用於指定運行中的checkpoint最多能夠有多少個,若是有設置了minPauseBetweenCheckpoints,則maxConcurrentCheckpoints這個參數大於1的值不起做用
),enableExternalizedCheckpoints(用於開啓checkpoints的外部持久化,在job failed的時候externalized checkpoint state沒法自動清理,可是在job canceled的時候能夠配置是刪除仍是保留state
)