在消息處理過程當中,除了Flink程序自己的邏輯(operator),咱們還須要和外部系統進行交互,例如本地磁盤文件,HDFS,Kafka,Mysql等。雖然Flink自己支持Exactly-Once語義,可是對於完整的數據處理系統來講,最終呈現出來的語義和外部系統是相關的。html
咱們先總覽一下Flink不一樣connector的消息傳遞語義 。java
在Guarantees這一列,咱們能夠發現如下3種語義:git
咱們結合Kafka connector來介紹這3中不一樣的語義,以及分析它是如何產生的。github
Producer的at-most-once和at-least-once語義主要由「retries」控制(在callback中實現異常重發也至關於retry)。sql
若是配置值是一個大於0的整數,Producer在收到error的callback後,Producer將從新發送消息。考慮這一種狀況,收到消息後,Broker正確的保存了消息,只是在返回ack時出現broker故障或者網絡異常。這時候,producer收到error的callback,它不能確認異常緣由,只能從新發送消息,這樣就致使了消息重複。apache
若是配置值等於0,Producer在收到error的callback後,不從新發送消息。若是異常時因爲broker沒有正確保存消息致使,那麼將致使消息丟失。緩存
Producer的Exactly-Once語義,主要由「enable.idempotence」控制,若是該參數爲true,將會確保消息最終只會被broker保存一次。一樣的Producer在接收到error的callback後,它須要重發數據,只是在0.11以及更新的版本中,Producer會爲每一批消息生成一個序列號,經過這個序列號Broker能夠過濾重複消息。而且因爲序列號是保存在topic上的,即便主分片失敗了,新的broker也能知道消息是否須要過濾。這裏還有一個細節須要注意,「acks」不能被設置爲0或者1,由於萬一主分片(leader replication)異常下線,將致使數據丟失,這樣語義被破壞了。服務器
NOTE : Kafka有兩個概念很容易被混淆。一個是Durable,另外一個是Message Delivery Semantics。這兩個地方都存在消息丟失的可能性,可是機制徹底不一樣。網絡
Durable主要描述軟件或者服務器故障後,數據是否仍能保留。Durable丟失消息主要是沒有持久化:主分片收到數據後沒有及時刷新到磁盤,副本沒有及時複製以及持久化到磁盤。app
Durable主要經過「acks」控制,最強的級別是「all」,在broker返回ack以前,它會確認每個副本都已經保存了該消息。這樣它能在n-1個副本宕機後,仍保留完整數據。最弱的級別是「0」,broker收到消息不確認持久化就返回,若是後續持久化失敗,消息會丟失。當「acks」設置爲「1」的時候,broker會確認主分片(leader replication)已經保存了消息,同時副本會主動向主分片同步,消息丟失風險較小。可是存在這種狀況,消息到達主分片而且返回了success的ack,這時主分片fail而且副本將來得及同步這條消息,消息會丟失。
Message Delivery Semantics 主要是描述在消息系統中,消息實際被處理的次數。 要區別這兩點,能夠簡單的認爲,Durable關注消息的持久化,Message Delivery Semantics關注消息的發送。
Consumer的at-most-once和at-least-once語義主要經過「offset」控制。offset的可配置爲自動提交和手動提交。若配置「enable.auto.commit」爲true,在Consumer fetch數據後,後臺會自動提交offset。若配置「enable.auto.commit」爲false,須要主動調用commitSync()或者commitAsync()來提交offset。
在自動提交的情形下,Consumer表現爲at-most-once語義。在主動提交的情形下,根據用戶對異常處理的不一樣,可表現爲at-most-once或者at-least-once。
假設Consumer在fetch完數據後,後續的處理步驟出現了異常。
若是offset是自動提交的,那麼Consumer將不能再次消費這些數據(除非重啓Consumer,並經過seek(TopicPartition, long)重置offset)。它表現出at-most-once語義。
在捕獲異常後,若是手動提交offset,表現出at-most-once語義。若是不提交offset,Consumer可重複消費該消息,表現出at-least-once語義。
在Consumer中,沒有配置能夠保證Exactly-Once語義。若要達到這個目標,須要在at-least-once的基礎上實現冪等。這點和Producer是相似的,區別是Consumer的冪等性須要用戶本身來完成。
前面的篇幅主要介紹了Kafka的3種語義(Message Delivery Semantics),經過上述內容,咱們能夠得出,想要Flink和Kafka達成端到端 Exactly-Once語義,首先咱們須要0.11版本或者更新的Kafka 、Producer和Consumer,其次使用冪等的Producer發送數據以及實現冪等的Consumer消費。
數據以及部分代碼來自http://training.data-artisans.com/ 。
Flink提供了經過mvn生成的精簡Flink工程的方式,使用起來很是方便。在pom文件中,也包含了shade打包的方式,由於提交到集羣上運行,須要jar-with-dependencies。
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.2
-DgroupId=org.apache.flink.quickstart
-DartifactId=flink-java-project
-Dversion=0.1
-Dpackage=org.apache.flink.quickstart
-DinteractiveMode=false
按實際須要修改DgroupId,DartifactId,Dversion。
git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean install
<dependency> <groupId>com.data-artisans</groupId> <artifactId>flink-training-exercises</artifactId> <version>0.15.2</version> </dependency>
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz
public static void main(String[] args) throws Exception { final int maxEventDelay = 60; // events are out of order by max 60 seconds final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String rideInput = "./taxi-data/nycTaxiRides.gz"; String taxiRideTopicId = "taxi-ride"; // start the data generator DataStream<TaxiRide> rides = env.addSource( new CheckpointedTaxiRideSource(rideInput, servingSpeedFactor)); Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); SerializationSchema<TaxiRide> taxiRideSerializationSchema = new TaxiRideSchema(); rides.addSink(new FlinkKafkaProducer011<TaxiRide>(taxiRideTopicId, new KeyedSerializationSchemaWrapper(taxiRideSerializationSchema), properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // 開啓Kafka EOS )); env.execute("send taxi ride to kafka "); }
上述代碼的主體邏輯是讀取nycTaxiRides.gz,並將數據發往Kafka。 主要使用了CheckpointedTaxiRideSource以及FlinkKafkaProducer011。 接下來,說明爲何它們能達成End-To-End的Exactly-Once語義。
CheckpointedTaxiRideSource:這是一個擁有狀態的文件流,它在Checkpoint的時候記錄數據讀取位置(至關於Kafka的offset),Flink錯誤恢復後會從新定位到checkpoint記錄的位置,它在整個系統上表現出來的是at-least-once。考慮這樣一個場景,checkpoint成功,可是某一個commit失敗,原則上本次全部的提交都要回滾。若是後續的Sink處理不當或者不支持回滾,這些數據會被提交到Sink中。在Flink 恢復後,這部分數據被從新計算,致使Sink中出現了重複的數據。
FlinkKafkaProducer011 : 提供了冪等以及事務提交。Producer的冪等性參照文章開頭的語義說明,這裏再也不介紹。 Sink中的冪等性主要是經過兩階段提交協議來支持的(注意區分Kafka Producer自己的冪等性和依靠事務實現的冪等性)。Kafka 0.11及更新的版本提供了事務支持,能夠結合Flink的兩階段提交協議使用。爲了保證Sink中的數據的惟一性,將兩次checkpoint之間的數據放在一個事務中,一塊兒預提交,若是commit成功,則進入下一個checkpoint;若失敗,終止事務並回滾數據。
FlinkKafkaProducer011 兩階段提交代碼
protected void preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction) throws FlinkKafka011Exception { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: case 2: this.flush(transaction); case 3: this.checkErroneous(); return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
protected void commit(FlinkKafkaProducer011.KafkaTransactionState transaction) { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: transaction.producer.commitTransaction(); this.recycleTransactionalProducer(transaction.producer); case 2: case 3: return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
protected void abort(FlinkKafkaProducer011.KafkaTransactionState transaction) { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: transaction.producer.abortTransaction(); this.recycleTransactionalProducer(transaction.producer); case 2: case 3: return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
上面是兩階段提交的主要代碼。
preCommit:將本次checkpoint中未發往Broker的數據flush到Kafka Broker。這時數據已經在Kafka Broker中,可是因爲事務的隔離性,Consumer暫時不會讀取到這些數據(除非配置了「read_uncommitted」)。
TIPS :爲何須要調用flush?
在Flink processElement的時候,調用KafkaProducer的send來發送數據,可是Kafka爲了更高的性能,send並不當即發送數據,而是緩存在buffer中,到必定的消息量才發往Kafka Broker。這裏經過flush能夠強制將數據發往Kafka Broker。
commit:提交事務,這時Consumer能夠讀到這些數據。
abort: 若是事務失敗,終止事務。
FlinkKafkaConsumerEOSDemo的分析流程能夠參照FlinkKafkaProducerEOSDemo,這裏不作細緻分析。
`public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100);//
String taxiRideTopicId = "taxi-ride"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); DataStreamSource<TaxiRide> taxiRideDataStreamSource = env.addSource(new FlinkKafkaConsumer011<TaxiRide>(taxiRideTopicId, new TaxiRideSchema(), properties), "kafka-source-ride"); String filePath = "./taxi-data/taxi-ride.txt"; WriteFormat format = new WriteFormatAsText(); long period = 200; taxiRideDataStreamSource.filter(new RideCleansing.NYCFilter()).addSink( new WriteSinkFunctionByMillis<TaxiRide>(filePath,format,period) ); env.execute("print taxride "); }`
須要注意的是FlinkKafkaConsumer011的Exactly-Once語義經過用戶配置自動設置,若是不肯定Flink的語義,能夠在FlinkKafkaConsumer09中打斷點,斷點位置:
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); }
自動配置相關代碼:
public static OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) { if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; }
Flink經過checkpoint和兩階段提交協議,爲端到端的Exactly-Once的實現提供了可能,若是在項目中確實須要這種語義,不妨一試。