flink基於kafka做爲數據soure和sink實現exactly-once

環境

flink版本:1.12java

阿里雲kafka: 2.2.0正則表達式

image.png

概念

端到端(End to End)的精確一次,它指的是 Flink 應用從 Source 端開始到 Sink 端結束,數據必須通過的起始點和結束點。Flink 自身是沒法保證外部系統「精確一次」語義的,flink只能保證計算的精確一次性,存儲的一致性必須由外部保證,外部系統必須支持「精確一次」語義;而後藉助 Flink 提供的分佈式快照和兩階段提交才能實現。apache

flink計算的exactly-once

Flink 經過 CheckPoint 機制來按期保存計算任務的快照,這個快照中主要包含兩個重要的數據: 1.整個計算任務的狀態。這個狀態主要是計算任務中,每一個子任務在計算過程當中須要保存的臨時狀態數據。 2.數據源的位置信息。這個信息記錄了在數據源的這個流中已經計算了哪些數據。若是數據源是 Kafka 的主題,這個位置信息就是 Kafka 主題中的消費位置。bootstrap

有了 CheckPoint,當計算任務失敗重啓的時候,能夠從最近的一個 CheckPoint 恢復計算任務。具體的作法是,每一個子任務先從 CheckPoint 中讀取並恢復本身的狀態,而後整個計算任務從 CheckPoint 中記錄的數據源位置開始消費數據,只要這個恢復位置和 CheckPoint 中每一個子任務的狀態是徹底對應的。api

flink 與 kafka 的配合

每一個 Flink 的 CheckPoint 對應一個 Kafka 事務。Flink 在建立一個 CheckPoint 的時候,同時開啓一個 Kafka 的事務,完成 CheckPoint 同時提交 Kafka 的事務。當計算任務重啓的時候,在 Flink 中計算任務會恢復到上一個 CheckPoint,這個 CheckPoint 正好對應 Kafka 上一個成功提交的事務。未完成的 CheckPoint 和未提交的事務中的消息都會被丟棄,這樣就實現了端到端的 Exactly Once。markdown

程序

package org.example

import java.time.Duration
import java.util.Properties

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig


object KafkaDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    // Exactly-Once
    env.enableCheckpointing(10 * 1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    val backend: FsStateBackend = new FsStateBackend("file:///tmp/checkpoint",true)
    env.setStateBackend(backend)

    // Kafka 配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "xxxx:9092")
    properties.setProperty("group.id", "dev-miaozhen-metric-group")
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.put("enable.auto.commit", "false")
    // 這個配置只是對於後序的consumer有做用,讓它只消費已commited的數據
    properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");


    val myCosumer = new FlinkKafkaConsumer[String](
      // 主題發現 按照正則表達式自動發現topic
      java.util.regex.Pattern.compile("miaozhen_raw_new[0-9]{0,}"), new SimpleStringSchema(), properties)
    // 這種狀態下提交到kafka的 offset能夠忽略,不起做用
    myCosumer.setCommitOffsetsOnCheckpoints(true)
    // 從數據源生成watermark,這樣的watermark更精準
    val watermark: WatermarkStrategy[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
    myCosumer.assignTimestampsAndWatermarks(watermark)

    val producerProperty = new Properties()
    producerProperty.setProperty("bootstrap.servers", "xxxx:9092")
    producerProperty.setProperty("group.id", "dev-miaozhen-metric-group")
    producerProperty.setProperty("transaction.timeout.ms", "60000")
    //producerProperty.put(ProducerConfig.ACKS_CONFIG, "all") // 設置producer的ack傳輸配置
    // 保證kafka的全局事務性,而不單單是分區事務性
    producerProperty.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
    producerProperty.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0")

    //producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "10000")

    val myProducer = new FlinkKafkaProducer[String](
      "exactly_once",                  // target topic
      new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),    // serialization schema
      producerProperty,
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )


    val stream:DataStream[String] = env.addSource(myCosumer)
    stream.map { text =>
      val arr: Array[String] = text.split(",")
      SensorReading(arr(0).trim, arr(1).toLong, arr(2).toDouble).toString
    }.addSink(myProducer)

    env.execute()
  }
}
複製代碼

備註

kafka自身的exacly-once保證能夠參考developer.aliyun.com/article/768… 分爲冪等性producer和事務性producer.其中冪等性producer指的是分區的冪等性,經過sequence nums的遞增來保證,事務性producer指的是topic'的冪等性,經過PID的惟一性來保證。app

相關文章
相關標籤/搜索