flink 如何實現對watermark 的checkpoint,防止數據複寫

fink slink 後的數據被複寫了???java

生產環境總會遇到各類各樣的莫名其名的數據,一但考慮不周即是車毀人亡啊。apache


 

線上sink 流是es , es 的文檔id 是自定義的 id+windowSatarTimeapi

設window size = 10min , watermark 最大延遲時間是 10s,. 數據中的event time 是亂序到達的,數據最大延遲時間是 30min併發

watermark 生成函數socket

assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
        val maxOutOfOrderness = 2L // 最大無序數據到達的時間,用來生成水印2ms
        var currentMaxTimestamp: Long = _
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")

        override def getCurrentWatermark: Watermark = {
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
          currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
          element.time
        }
      })

 

若是如今是10:15 分,當前win的窗口是 [10:10,10:20),意味着[09:40,09:50,10:00] 的統計值已經生成 。ide

此時,程序發生異常,並有checkpoint + resart 策略,那麼重啓後,watermark 會繼續從斷點處消費?window 是否仍是[10:10,10:20)?函數

答案是不會,watermark 會從0開始增加,window 也會重新開始。測試

重啓後,若是不幸第一條數據的eventtime 是 09:45:02 , 那麼此時 watermark 是 09:45:00 , window 是 [09:40:09:50), 一段時間後數據再次會聚合生條es 記錄文檔 [id+09:40], sink 時以前的es 數據會被覆蓋this

測試:spa

2020-10-21 23:57:01.001 -------watermark: -2
input:Goods(id=1,count=10,time=10)               // 輸入: 1,10,10
()
2020-10-21 23:57:01.001 -------watermark: 8
.... 2020-10-21 23:57:04.004 -------watermark: 8 // 輸入: 0,0,0 觸發異常,重啓 2020-10-21 23:57:09.009 -------watermark: -2 // watermark 從新開始
.... 2020-10-21 23:57:17.017 -------watermark: -2 input:Goods(id=1,count=10,time=10) () 2020-10-21 23:57:17.017 -------watermark: 8
...

解決:

這裏的  currentMaxTimestamp 本質能夠看作是 Operator State , 那麼能夠經過實現  CheckpointedFunction、ListCheckpointed 接口來保存這個state

修改後的water mark 函數

.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
        val maxOutOfOrderness = 2L // 最大無序數據到達的時間,用來生成水印2ms
        var currentMaxTimestamp: Long = _

        override def getCurrentWatermark: Watermark = {
          println("watermark", currentMaxTimestamp - maxOutOfOrderness)
          new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
          currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
          element.time
        }

        override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
          Collections.singletonList(currentMaxTimestamp)
        }

        override def restoreState(state: util.List[JavaLong]): Unit = {
          val stateMin = state.asScala.min
          if (stateMin > 0) currentMaxTimestamp = stateMin
        }
      })

測試:

2020-10-22 00:39:00.000 -------watermark: -2
input:Goods(id=1,count=10,time=10)      // 輸入: 1,10,10
()
2020-10-22 00:39:00.000 -------watermark: 8
...
2020-10-22 00:39:03.003 -------watermark: 8
input:Goods(id=0,count=0,time=0)        // 輸入: 0,0,0 觸發異常,重啓
2020-10-22 00:39:08.008 -------watermark: 8  // 從 checkpoints 中獲取state
...
2020-10-22 00:39:23.023 -------watermark: 8
input:Goods(id=1,count=20,time=20)   // 輸入: 1,20,20
()
2020-10-22 00:39:23.023 -------watermark: 18
....

完整測試程序

import java.util.{Collections, Date}
import java.util

import scala.collection.JavaConverters._
import java.lang.{Long => JavaLong}
import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

/**
 * CheckpointCount
 */
object WatermarkCheckpoint {

  case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
    override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
  }

  def main(args: Array[String]): Unit = {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(1000 * 10)
    env.getCheckpointConfig.setCheckpointTimeout(1000 * 60) // checkpoint 超時時間
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 5) // 兩次 checkpoint 的最小間隔
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 模式
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // checkpoint 併發數
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // cancel job 時持久化checkopint
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false) // 當checkpoint 失敗時不會致使任務失敗終止
    // restart strategy
    env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))
    )
    // state backend
    val file_rocksdb = "file:///tmp/state/rocksdb"  // 須要提早創建路徑
    env.setStateBackend(new RocksDBStateBackend(file_rocksdb, true))
    env.setParallelism(1)

    env.socketTextStream("localhost", 9999)
      .filter(_.nonEmpty)
      .map(x => {
        val arr = x.split(",")
        val g = Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
        println(s"input:$g")
        g
      })

      // watermark 沒有 checkpoint
      /*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
        val maxOutOfOrderness = 2L // 最大無序數據到達的時間,用來生成水印2ms
        var currentMaxTimestamp: Long = _

        override def getCurrentWatermark: Watermark = {
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
          currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
          element.time
        }
      })*/

      // watermark  checkpoint
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
        val maxOutOfOrderness = 2L // 最大無序數據到達的時間,用來生成水印2ms
        var currentMaxTimestamp: Long = _

        override def getCurrentWatermark: Watermark = {
          println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
          new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }

        override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
          currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
          element.time
        }

        override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
          Collections.singletonList(currentMaxTimestamp)
        }

        override def restoreState(state: util.List[JavaLong]): Unit = {
          val stateMin = state.asScala.min
          if (stateMin > 0) currentMaxTimestamp = stateMin
        }
      })

      .map(x => {
        if (x.id == 0) throw new RuntimeException("id is 0")
      })
      .print()

    env.execute(this.getClass.getSimpleName)
  }
}
完整測試代碼
相關文章
相關標籤/搜索