Flink實戰: 結合Kafka構建端到端的Exactly-Once處理程序

前言

在消息處理過程當中,除了Flink程序自己的邏輯(operator),咱們還須要和外部系統進行交互,例如本地磁盤文件,HDFS,Kafka,Mysql等。雖然Flink自己支持Exactly-Once語義,可是對於完整的數據處理系統來講,最終呈現出來的語義和外部系統是相關的。html

咱們先總覽一下Flink不一樣connector的消息傳遞語義java

在Guarantees這一列,咱們能夠發現如下3種語義:git

  1. at most once : 至多一次。可能致使消息丟失。
  2. at least once : 至少一次。可能致使消息重複。
  3. exactly once : 恰好一次。不丟失也不重複。

語義分析

咱們結合Kafka connector來介紹這3中不一樣的語義,以及分析它是如何產生的。github

Kafka Producer的語義

Producer的at-most-once和at-least-once語義主要由「retries」控制(在callback中實現異常重發也至關於retry)。sql

若是配置值是一個大於0的整數,Producer在收到error的callback後,Producer將從新發送消息。考慮這一種狀況,收到消息後,Broker正確的保存了消息,只是在返回ack時出現broker故障或者網絡異常。這時候,producer收到error的callback,它不能確認異常緣由,只能從新發送消息,這樣就致使了消息重複。apache

若是配置值等於0,Producer在收到error的callback後,不從新發送消息。若是異常時因爲broker沒有正確保存消息致使,那麼將致使消息丟失。緩存

Producer的Exactly-Once語義,主要由「enable.idempotence」控制,若是該參數爲true,將會確保消息最終只會被broker保存一次。一樣的Producer在接收到error的callback後,它須要重發數據,只是在0.11以及更新的版本中,Producer會爲每一批消息生成一個序列號,經過這個序列號Broker能夠過濾重複消息。而且因爲序列號是保存在topic上的,即便主分片失敗了,新的broker也能知道消息是否須要過濾。這裏還有一個細節須要注意,「acks」不能被設置爲0或者1,由於萬一主分片(leader replication)異常下線,將致使數據丟失,這樣語義被破壞了。服務器

NOTE : Kafka有兩個概念很容易被混淆。一個是Durable,另外一個是Message Delivery Semantics。這兩個地方都存在消息丟失的可能性,可是機制徹底不一樣。網絡

Durable主要描述軟件或者服務器故障後,數據是否仍能保留。Durable丟失消息主要是沒有持久化:主分片收到數據後沒有及時刷新到磁盤,副本沒有及時複製以及持久化到磁盤。app

Durable主要經過「acks」控制,最強的級別是「all」,在broker返回ack以前,它會確認每個副本都已經保存了該消息。這樣它能在n-1個副本宕機後,仍保留完整數據。最弱的級別是「0」,broker收到消息不確認持久化就返回,若是後續持久化失敗,消息會丟失。當「acks」設置爲「1」的時候,broker會確認主分片(leader replication)已經保存了消息,同時副本會主動向主分片同步,消息丟失風險較小。可是存在這種狀況,消息到達主分片而且返回了success的ack,這時主分片fail而且副本將來得及同步這條消息,消息會丟失。

Message Delivery Semantics 主要是描述在消息系統中,消息實際被處理的次數。 要區別這兩點,能夠簡單的認爲,Durable關注消息的持久化,Message Delivery Semantics關注消息的發送。

Kafka Consumer的語義

Consumer的at-most-once和at-least-once語義主要經過「offset」控制。offset的可配置爲自動提交和手動提交。若配置「enable.auto.commit」爲true,在Consumer fetch數據後,後臺會自動提交offset。若配置「enable.auto.commit」爲false,須要主動調用commitSync​()或者commitAsync()來提交offset。

在自動提交的情形下,Consumer表現爲at-most-once語義。在主動提交的情形下,根據用戶對異常處理的不一樣,可表現爲at-most-once或者at-least-once。

假設Consumer在fetch完數據後,後續的處理步驟出現了異常

若是offset是自動提交的,那麼Consumer將不能再次消費這些數據(除非重啓Consumer,並經過seek(TopicPartition, long)重置offset)。它表現出at-most-once語義。

在捕獲異常後,若是手動提交offset,表現出at-most-once語義。若是不提交offset,Consumer可重複消費該消息,表現出at-least-once語義。

在Consumer中,沒有配置能夠保證Exactly-Once語義。若要達到這個目標,須要在at-least-once的基礎上實現冪等。這點和Producer是相似的,區別是Consumer的冪等性須要用戶本身來完成。

編碼準備

前面的篇幅主要介紹了Kafka的3種語義(Message Delivery Semantics),經過上述內容,咱們能夠得出,想要Flink和Kafka達成端到端 Exactly-Once語義,首先咱們須要0.11版本或者更新的Kafka 、Producer和Consumer,其次使用冪等的Producer發送數據以及實現冪等的Consumer消費。

前置條件

  1. JDK8
  2. Maven 3
  3. Git
  4. IDE
  5. Kafka

建立基本工程

數據以及部分代碼來自http://training.data-artisans.com/

Flink提供了經過mvn生成的精簡Flink工程的方式,使用起來很是方便。在pom文件中,也包含了shade打包的方式,由於提交到集羣上運行,須要jar-with-dependencies。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.2
-DgroupId=org.apache.flink.quickstart
-DartifactId=flink-java-project
-Dversion=0.1
-Dpackage=org.apache.flink.quickstart
-DinteractiveMode=false

按實際須要修改DgroupId,DartifactId,Dversion。

下載另外一個依賴工程(示例代碼須要),執行install

git clone https://github.com/dataArtisans/flink-training-exercises.git

cd flink-training-exercises

mvn clean install

在創建的工程中加入上一步打包的jar做爲依賴

<dependency>
   <groupId>com.data-artisans</groupId>
   <artifactId>flink-training-exercises</artifactId>
   <version>0.15.2</version>
 </dependency>

下載樣例數據

wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz

wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz

導入項目到IDE,編寫FlinkKafkaProducerEOSDemo

public static void main(String[] args) throws Exception {

		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

		String	rideInput = "./taxi-data/nycTaxiRides.gz";
		String taxiRideTopicId = "taxi-ride";

		// start the data generator
		DataStream<TaxiRide> rides = env.addSource(
				new CheckpointedTaxiRideSource(rideInput, servingSpeedFactor));

		Properties properties = new Properties();
		properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

		SerializationSchema<TaxiRide> taxiRideSerializationSchema = new TaxiRideSchema();
		rides.addSink(new FlinkKafkaProducer011<TaxiRide>(taxiRideTopicId,
				new KeyedSerializationSchemaWrapper(taxiRideSerializationSchema),
				properties,
				FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // 開啓Kafka EOS
		));

		env.execute("send taxi ride to kafka ");
	}

上述代碼的主體邏輯是讀取nycTaxiRides.gz,並將數據發往Kafka。 主要使用了CheckpointedTaxiRideSource以及FlinkKafkaProducer011。 接下來,說明爲何它們能達成End-To-End的Exactly-Once語義。

CheckpointedTaxiRideSource:這是一個擁有狀態的文件流,它在Checkpoint的時候記錄數據讀取位置(至關於Kafka的offset),Flink錯誤恢復後會從新定位到checkpoint記錄的位置,它在整個系統上表現出來的是at-least-once。考慮這樣一個場景,checkpoint成功,可是某一個commit失敗,原則上本次全部的提交都要回滾。若是後續的Sink處理不當或者不支持回滾,這些數據會被提交到Sink中。在Flink 恢復後,這部分數據被從新計算,致使Sink中出現了重複的數據。

FlinkKafkaProducer011 : 提供了冪等以及事務提交。Producer的冪等性參照文章開頭的語義說明,這裏再也不介紹。 Sink中的冪等性主要是經過兩階段提交協議來支持的(注意區分Kafka Producer自己的冪等性和依靠事務實現的冪等性)。Kafka 0.11及更新的版本提供了事務支持,能夠結合Flink的兩階段提交協議使用。爲了保證Sink中的數據的惟一性,將兩次checkpoint之間的數據放在一個事務中,一塊兒預提交,若是commit成功,則進入下一個checkpoint;若失敗,終止事務並回滾數據。

FlinkKafkaProducer011 兩階段提交代碼

protected void preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction) throws FlinkKafka011Exception {
        switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
        case 1:
        case 2:
            this.flush(transaction);
        case 3:
            this.checkErroneous();
            return;
        default:
            throw new UnsupportedOperationException("Not implemented semantic");
        }
    }
protected void commit(FlinkKafkaProducer011.KafkaTransactionState transaction) {
        switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
        case 1:
            transaction.producer.commitTransaction();
            this.recycleTransactionalProducer(transaction.producer);
        case 2:
        case 3:
            return;
        default:
            throw new UnsupportedOperationException("Not implemented semantic");
        }
    }
protected void abort(FlinkKafkaProducer011.KafkaTransactionState transaction) {
        switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
        case 1:
            transaction.producer.abortTransaction();
            this.recycleTransactionalProducer(transaction.producer);
        case 2:
        case 3:
            return;
        default:
            throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

上面是兩階段提交的主要代碼。

preCommit:將本次checkpoint中未發往Broker的數據flush到Kafka Broker。這時數據已經在Kafka Broker中,可是因爲事務的隔離性,Consumer暫時不會讀取到這些數據(除非配置了「read_uncommitted」)。

TIPS :爲何須要調用flush?

在Flink processElement的時候,調用KafkaProducer的send來發送數據,可是Kafka爲了更高的性能,send並不當即發送數據,而是緩存在buffer中,到必定的消息量才發往Kafka Broker。這裏經過flush能夠強制將數據發往Kafka Broker。

commit:提交事務,這時Consumer能夠讀到這些數據。

abort: 若是事務失敗,終止事務。

FlinkKafkaConsumerEOSDemo

FlinkKafkaConsumerEOSDemo的分析流程能夠參照FlinkKafkaProducerEOSDemo,這裏不作細緻分析。

`public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100);//

String taxiRideTopicId = "taxi-ride";
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

    DataStreamSource<TaxiRide> taxiRideDataStreamSource = env.addSource(new FlinkKafkaConsumer011<TaxiRide>(taxiRideTopicId, new TaxiRideSchema(), properties),
            "kafka-source-ride");

    String filePath = "./taxi-data/taxi-ride.txt";
    WriteFormat format = new WriteFormatAsText();
    long period = 200;
    taxiRideDataStreamSource.filter(new RideCleansing.NYCFilter()).addSink(
            new WriteSinkFunctionByMillis<TaxiRide>(filePath,format,period)
    );

    env.execute("print taxride ");


}`

須要注意的是FlinkKafkaConsumer011的Exactly-Once語義經過用戶配置自動設置,若是不肯定Flink的語義,能夠在FlinkKafkaConsumer09中打斷點,斷點位置:

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		}

自動配置相關代碼:

public static OffsetCommitMode fromConfiguration(
			boolean enableAutoCommit,
			boolean enableCommitOnCheckpoint,
			boolean enableCheckpointing) {

		if (enableCheckpointing) {
			// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
			return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
		} else {
			// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
			return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
		}

總結

Flink經過checkpoint和兩階段提交協議,爲端到端的Exactly-Once的實現提供了可能,若是在項目中確實須要這種語義,不妨一試。

相關文章
相關標籤/搜索