如何正確使用 Flink Connector?

本文主要分享 Flink connector 相關內容,分爲如下三個部分的內容:第一部分會首先介紹一下 Flink Connector 有哪些。第二部分會重點介紹在生產環境中常用的 kafka connector 的基本的原理以及使用方法。第三部分答疑,對社區反饋的問題進行答疑。html

Flink Streaming Connector

Flink 是新一代流批統一的計算引擎,它須要從不一樣的第三方存儲引擎中把數據讀過來,進行處理,而後再寫出到另外的存儲引擎中。Connector 的做用就至關於一個鏈接器,鏈接 Flink 計算引擎跟外界存儲系統。Flink 裏有如下幾種方式,固然也不限於這幾種方式能夠跟外界進行數據交換:java

  • 第一種 Flink 裏面預約義了一些 source 和 sink。
  • 第二種 Flink 內部也提供了一些 Boundled connectors。
  • 第三種可使用第三方 Apache Bahir 項目中提供的鏈接器。
  • 第四種是經過異步 IO 方式。

下面分別簡單介紹一下這四種數據讀寫的方式。正則表達式

1.jpg

1.預約義的 source 和 sink

Flink 裏預約義了一部分 source 和 sink。在這裏分了幾類。redis

2.jpg

  • 基於文件的 source 和 sink。

若是要從文本文件中讀取數據,能夠直接使用:apache

env.readTextFile(path)

就能夠以文本的形式讀取該文件中的內容。固然也可使用:json

env.readFile(fileInputFormat, path)

根據指定的 fileInputFormat 格式讀取文件中的內容。api

若是數據在 Flink 內進行了一系列的計算,想把結果寫出到文件裏,也能夠直接使用內部預約義的一些 sink,好比將結果已文本或 csv 格式寫出到文件中,可使用 DataStream 的 writeAsText(path) 和 writeAsCsv(path)。緩存

  • 基於 Socket 的 Source 和 Sink

提供 Socket 的 host name 及 port,能夠直接用 StreamExecutionEnvironment 預約的接口 socketTextStream 建立基於 Socket 的 source,從該 socket 中以文本的形式讀取數據。固然若是想把結果寫出到另一個 Socket,也能夠直接調用 DataStream writeToSocket。性能優化

  • 基於內存 Collections、Iterators 的 Source

能夠直接基於內存中的集合或者迭代器,調用 StreamExecutionEnvironment fromCollection、fromElements 構建相應的 source。結果數據也能夠直接 print、printToError 的方式寫出到標準輸出或標準錯誤。網絡

詳細也能夠參考 Flink 源碼中提供的一些相對應的 Examples 來查看異常預約義 source 和 sink 的使用方法,例如 WordCount、SocketWindowWordCount。

2.Bundled Connectors

Flink 裏已經提供了一些綁定的 Connector,例如 kafka source 和 sink,Es sink等。讀寫 kafka、es、rabbitMQ 時能夠直接使用相應 connector 的 api 便可。第二部分會詳細介紹生產環境中最經常使用的 kafka connector。

雖然該部分是 Flink 項目源代碼裏的一部分,可是真正意義上不算做 Flink 引擎相關邏輯,而且該部分沒有打包在二進制的發佈包裏面。因此在提交 Job 時候須要注意, job 代碼 jar 包中必定要將相應的 connetor 相關類打包進去,不然在提交做業時就會失敗,提示找不到相應的類,或初始化某些類異常。

3.jpg

3.Apache Bahir 中的鏈接器

Apache Bahir 最初是從 Apache Spark 中獨立出來項目提供,以提供不限於 Spark 相關的擴展/插件、鏈接器和其餘可插入組件的實現。經過提供多樣化的流鏈接器(streaming connectors)和 SQL 數據源擴展分析平臺的覆蓋面。若有須要寫到 flume、redis 的需求的話,可使用該項目提供的 connector。

4.jpg

4.Async I/O

流計算中常常須要與外部存儲系統交互,好比須要關聯 MySQL 中的某個表。通常來講,若是用同步 I/O 的方式,會形成系統中出現大的等待時間,影響吞吐和延遲。爲了解決這個問題,異步 I/O 能夠併發處理多個請求,提升吞吐,減小延遲。

Tips:Async 的原理可參考官方文檔

Flink Kafka Connector

本章重點介紹生產環境中最經常使用到的 Flink kafka connector。使用 Flink 的同窗,必定會很熟悉 kafka,它是一個分佈式的、分區的、多副本的、 支持高吞吐的、發佈訂閱消息系統。生產環境環境中也常常會跟 kafka 進行一些數據的交換,好比利用 kafka consumer 讀取數據,而後進行一系列的處理以後,再將結果寫出到 kafka 中。這裏會主要分兩個部分進行介紹,一是 Flink kafka Consumer,一個是 Flink kafka Producer。

5.jpg

首先看一個例子來串聯下 Flink kafka connector。代碼邏輯裏主要是從 kafka 裏讀數據,而後作簡單的處理,再寫回到 kafka 中。

分別用紅框框出如何構造一個 Source sink Function。Flink 提供了現成的構造FlinkKafkaConsumer、Producer 的接口,能夠直接使用。這裏須要注意,由於 kafka 有多個版本,多個版本之間的接口協議會不一樣。Flink 針對不一樣版本的 kafka 有相應的版本的 Consumer 和 Producer。例如:針對 0八、0九、十、11 版本,Flink 對應的 consumer 分別是 FlinkKafkaConsumer 0八、0九、0十、011,producer 也是。

6.jpg

1.Flink kafka Consumer

  • 反序列化數據

由於 kafka 中數據都是以二進制 byte 形式存儲的。讀到 Flink 系統中以後,須要將二進制數據轉化爲具體的 java、scala 對象。具體須要實現一個 schema 類,定義如何序列化和反序列數據。反序列化時須要實現 DeserializationSchema 接口,並重寫 deserialize(byte[] message) 函數,若是是反序列化 kafka 中 kv 的數據時,須要實現 KeyedDeserializationSchema 接口,並重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函數。

另外 Flink 中也提供了一些經常使用的序列化反序列化的 schema 類。例如,SimpleStringSchema,按字符串方式進行序列化、反序列化。TypeInformationSerializationSchema,它可根據 Flink 的 TypeInformation 信息來推斷出須要選擇的 schema。JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,並返回 ObjectNode,可使用 .get(「property」) 方法來訪問相應字段。

7.jpg

  • 消費起始位置設置

如何設置做業從 kafka 消費數據最開始的起始位置,這一部分 Flink 也提供了很是好的封裝。在構造好的 FlinkKafkaConsumer 類後面調用以下相應函數,設置合適的起始位置。

  • setStartFromGroupOffsets,也是默認的策略,從 group offset 位置讀取數據,group offset 指的是 kafka broker 端記錄的某個 group 的最後一次的消費位置。可是 kafka broker 端沒有該 group 信息,會根據 kafka 的參數"auto.offset.reset"的設置來決定從哪一個位置開始消費。
  • setStartFromEarliest,從 kafka 最先的位置開始讀取。
  • setStartFromLatest,從 kafka 最新的位置開始讀取。
  • setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 爲每條消息增長另外一個時戳。該時戳能夠表示消息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。
  • setStartFromSpecificOffsets,從指定分區的 offset 位置開始讀取,如指定的 offsets 中不存某個分區,該分區從 group offset 位置開始讀取。此時須要用戶給定一個具體的分區、offset 的集合。

一些具體的使用方法能夠參考下圖。須要注意的是,由於 Flink 框架有容錯機制,若是做業故障,若是做業開啓 checkpoint,會從上一次 checkpoint 狀態開始恢復。或者在中止做業的時候主動作 savepoint,啓動做業時從 savepoint 開始恢復。這兩種狀況下恢復做業時,做業消費起始位置是從以前保存的狀態中恢復,與上面提到跟 kafka 這些單獨的配置無關。

8.jpg

  • topic 和 partition 動態發現

實際的生產環境中可能有這樣一些需求,好比場景一,有一個 Flink 做業須要將五份數據聚合到一塊兒,五份數據對應五個 kafka topic,隨着業務增加,新增一類數據,同時新增了一個 kafka topic,如何在不重啓做業的狀況下做業自動感知新的 topic。場景二,做業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨着業務的增加數據量變大,須要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該狀況下如何在不重啓做業狀況下動態感知新擴容的 partition?

針對上面的兩種場景,首先須要在構建 FlinkKafkaConsumer 時的 properties 中設置 flink.partition-discovery.interval-millis 參數爲非負值,表示開啓動態發現的開關,以及設置的時間間隔。此時 FlinkKafkaConsumer 內部會啓動一個單獨的線程按期去 kafka 獲取最新的 meta 信息。針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述能夠傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。針對場景二,設置前面的動態發現參數,在按期獲取 kafka 最新 meta 信息時會匹配新的 partition。爲了保證數據的正確性,新發現的 partition 從最先的位置開始讀取。

9.jpg

  • commit offset 方式

Flink kafka consumer commit offset 方式須要區分是否開啓了 checkpoint。

若是 checkpoint 關閉,commit offset 要依賴於 kafka 客戶端的 auto commit。需設置 enable.auto.commit,auto.commit.interval.ms 參數到 consumer properties,就會按固定的時間間隔按期 auto commit offset 到 kafka。

若是開啓 checkpoint,這個時候做業消費的 offset 是 Flink 在 state 中本身管理和容錯。此時提交 offset 到 kafka,通常都是做爲外部進度的監控,想實時知道做業消費的位置和 lag 狀況。此時須要 setCommitOffsetsOnCheckpoints 爲 true 來設置當 checkpoint 成功時提交 offset 到 kafka。此時 commit offset 的間隔就取決於 checkpoint 的間隔,因此此時從 kafka 一側看到的 lag 可能並不是徹底實時,若是 checkpoint 間隔比較長 lag 曲線可能會是一個鋸齒狀。

10.jpg

  • Timestamp Extraction/Watermark 生成

咱們知道當 Flink 做業內使用 EventTime 屬性時,須要指定從消息中提取時戳和生成水位的函數。FlinkKakfaConsumer 構造的 source 後直接調用 assignTimestampsAndWatermarks 函數設置水位生成器的好處是此時是每一個 partition 一個 watermark assigner,以下圖。source 生成的時戳爲多個 partition 時戳對齊後的最小時戳。此時在一個 source 讀取多個 partition,而且 partition 之間數據時戳有必定差距的狀況下,由於在 source 端 watermark 在 partition 級別有對齊,不會致使數據讀取較慢 partition 數據丟失。

11.jpg

2.Flink kafka Producer

  • Producer 分區

使用 FlinkKafkaProducer 往 kafka 中寫數據時,若是不單獨設置 partition 策略,會默認使用 FlinkFixedPartitioner,該 partitioner 分區的方式是 task 所在的併發 id 對 topic 總 partition 數取餘:parallelInstanceId % partitions.length。

  • 此時若是 sink 爲 4,paritition 爲 1,則 4 個 task 往同一個 partition 中寫數據。但當 sink task < partition 個數時會有部分 partition 沒有數據寫入,例如 sink task 爲2,partition 總數爲 4,則後面兩個 partition 將沒有數據寫入。
  • 若是構建 FlinkKafkaProducer 時,partition 設置爲 null,此時會使用 kafka producer 默認分區方式,非 key 寫入的狀況下,使用 round-robin 的方式進行分區,每一個 task 都會輪循的寫下游的全部 partition。該方式下游的 partition 數據會比較均衡,可是缺點是 partition 個數過多的狀況下須要維持過多的網絡鏈接,即每一個 task 都會維持跟全部 partition 所在 broker 的鏈接。

12.jpg

  • 容錯

Flink kafka 0九、010 版本下,經過 setLogFailuresOnly 爲 false,setFlushOnCheckpoint 爲 true,能達到 at-least-once 語義。setLogFailuresOnly,默認爲 false,是控制寫 kafka 失敗時,是否只打印失敗的 log 不拋異常讓做業中止。setFlushOnCheckpoint,默認爲 true,是控制是否在 checkpoint 時 fluse 數據到 kafka,保證數據已經寫到 kafka。不然數據有可能還緩存在 kafka 客戶端的 buffer 中,並無真正寫出到 kafka,此時做業掛掉數據即丟失,不能作到至少一次的語義。

Flink kafka 011 版本下,經過兩階段提交的 sink 結合 kafka 事務的功能,能夠保證端到端精準一次。詳細原理能夠參考:

https://www.ververica.com/blo...

13.jpg

一些疑問與解答

Q:在 Flink consumer 的並行度的設置:是對應 topic 的 partitions 個數嗎?要是有多個主題數據源,並行度是設置成整體的 partitions 數嗎?

A:這個並非絕對的,跟 topic 的數據量也有關,若是數據量不大,也能夠設置小於 partitions 個數的併發數。但不要設置併發數大於 partitions 總數,由於這種狀況下某些併發由於分配不到 partition 致使沒有數據處理。

Q:若是 partitioner 傳 null 的時候是 round-robin 發到每個 partition?若是有 key 的時候行爲是 kafka 那種按照 key 分佈到具體分區的行爲嗎?

A:若是在構造 FlinkKafkaProducer 時,若是沒有設置單獨的 partitioner,則默認使用 FlinkFixedPartitioner,此時不管是帶 key 的數據,仍是不帶 key。若是主動設置 partitioner 爲 null 時,不帶 key 的數據會 round-robin 的方式寫出,帶 key 的數據會根據 key,相同 key 數據分區的相同的 partition,若是 key 爲 null,再輪詢寫。不帶 key 的數據會輪詢寫各 partition。

Q:若是 checkpoint 時間過長,offset 未提交到 kafka,此時節點宕機了,重啓以後的重複消費如何保證呢?

A:首先開啓 checkpoint 時 offset 是 Flink 經過狀態 state 管理和恢復的,並非從 kafka 的 offset 位置恢復。在 checkpoint 機制下,做業從最近一次 checkpoint 恢復,自己是會回放部分歷史數據,致使部分數據重複消費,Flink 引擎僅保證計算狀態的精準一次,要想作到端到端精準一次須要依賴一些冪等的存儲系統或者事務操做。


▼ Apache Flink 社區推薦 ▼

Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:

https://developer.aliyun.com/...

首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:

https://tianchi.aliyun.com/ma...

相關文章
相關標籤/搜索