做者介紹:董亭亭,快手大數據架構實時計算引擎團隊負責人。目前負責 Flink 引擎在快手內的研發、應用以及周邊子系統建設。2013 年畢業於大連理工大學,曾就任於奇虎 360、58 集團。主要研究領域包括:分佈式計算、調度系統、分佈式存儲等系統。html
本文主要分享Flink connector相關內容,分爲如下三個部分的內容:第一部分會首先介紹一下Flink Connector有哪些。第二部分會重點介紹在生產環境中常用的kafka connector的基本的原理以及使用方法。第三部分答疑環節,看你們有沒有一些問題。java
Flink是新一代流批統一的計算引擎,它須要從不一樣的第三方存儲引擎中把數據讀過來,進行處理,而後再寫出到另外的存儲引擎中。Connector的做用就至關於一個鏈接器,鏈接 Flink 計算引擎跟外界存儲系統。Flink裏有如下幾種方式,固然也不限於這幾種方式能夠跟外界進行數據交換:第一種 Flink裏面預約義了一些source和sink。第二種 FLink內部也提供了一些Boundled connectors。第三種 可使用第三方apache Bahir項目中提供的鏈接器。第四種是經過異步IO方式。下面分別簡單介紹一下這四種數據讀寫的方式。mysql
Flink裏預約義了一部分source和sink。在這裏分了幾類。git
若是要從文本文件中讀取數據,能夠直接使用github
就能夠以文本的形式讀取該文件中的內容。固然也可使用正則表達式
根據指定的fileInputFormat格式讀取文件中的內容。redis
若是數據在FLink內進行了一系列的計算,想把結果寫出到文件裏,也能夠直接使用內部預約義的一些sink,好比將結果已文本或csv格式寫出到文件中,可使用DataStream的writeAsText(path)和 writeAsCsv(path)。sql
提供Socket的host name及port,能夠直接用StreamExecutionEnvironment預約的接口socketTextStream建立基於Socket的source,從該socket中以文本的形式讀取數據。固然若是想把結果寫出到另一個Socket,也能夠直接調用DataStream writeToSocket。apache
詳細也能夠參考Flink源碼中提供的一些相對應的Examples來查看異常預約義source和sink的使用方法,例如WordCount、SocketWindowWordCount。json
Flink裏已經提供了一些綁定的Connector,例如kafka source和sink,Es sink等。讀寫kafka、es、rabbitMQ時能夠直接使用相應connector的api便可。第二部分會詳細介紹生產環境中最經常使用的kafka connector。
雖然該部分是Flink 項目源代碼裏的一部分,可是真正意義上不算做flink引擎相關邏輯,而且該部分沒有打包在二進制的發佈包裏面。因此在提交Job時候須要注意,job代碼jar包中必定要將相應的connetor相關類打包進去,不然在提交做業時就會失敗,提示找不到相應的類,或初始化某些類異常。
Apache Bahir 最初是從 Apache Spark 中獨立出來項目提供,以提供不限於 Spark 相關的擴展/插件、鏈接器和其餘可插入組件的實現。經過提供多樣化的流鏈接器(streaming connectors)和 SQL 數據源擴展分析平臺的覆蓋面。若有須要寫到flume、redis的需求的話,可使用該項目提供的connector。
流計算中常常須要與外部存儲系統交互,好比須要關聯mysql中的某個表。通常來講,若是用同步I/O的方式,會形成系統中出現大的等待時間,影響吞吐和延遲。爲了解決這個問題,異步I/O能夠併發處理多個請求,提升吞吐,減小延遲。
Async的原理可參考官方文檔:
本章重點介紹生產環境中最經常使用到的Flink kafka connector。使用flink的同窗,必定會很熟悉kafka,它是一個分佈式的、分區的、多副本的、 支持高吞吐的、發佈訂閱消息系統。生產環境環境中也常常會跟kafka進行一些數據的交換,好比利用kafka consumer讀取數據,而後進行一系列的處理以後,再將結果寫出到kafka中。這裏會主要分兩個部分進行介紹,一是Flink kafka Consumer,一個是Flink kafka Producer。
首先看一個例子來串聯下Flink kafka connector。代碼邏輯裏主要是從kafka裏讀數據,而後作簡單的處理,再寫回到kafka中。
分別用紅色框 框出 如何構造一個Source sink Function. Flink提供了現成的構造FLinkKafkaConsumer、Producer的接口,能夠直接使用。這裏須要注意,由於kafka有多個版本,多個版本之間的接口協議會不一樣。Flink針對不一樣版本的kafka有相應的版本的Consumer和Producer。例如:針對0八、0九、十、11版本,Flink對應的consumer分別是FlinkKafkaConsumer0八、0九、0十、011,producer也是。
由於kafka中數據都是以二進制byte形式存儲的。讀到flink系統中以後,須要將二進制數據轉化爲具體的java、scala對象。具體須要實現一個schema類,定義如何序列化和反序列數據。反序列化時須要實現DeserializationSchema接口,並重寫deserialize(byte[] message)函數,若是是反序列化kafka中kv的數據時,須要實現KeyedDeserializationSchema接口,並重寫deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)函數。
另外Flink中也提供了一些經常使用的序列化反序列化的schema類。例如,SimpleStringSchema,按字符串方式進行序列化、反序列化。TypeInformationSerializationSchema,它可根據Flink的TypeInformation信息來推斷出須要選擇的schema。JsonDeserializationSchema 使用jackson反序列化json格式消息,並返回ObjectNode,可使用.get(「property」)方法來訪問相應字段。
如何設置做業從kafka消費數據最開始的起始位置,這一部分flink也提供了很是好的封裝。在構造好的FlinkKafkaConsumer類後面調用以下相應函數,設置合適的其實位置。
setStartFromGroupOffsets,也是默認的策略,從group offset位置讀取數據,group offset指的是kafka broker端記錄的某個group的最後一次的消費位置。可是kafka broker端沒有該group信息,會根據kafka的參數"auto.offset.reset"的設置來決定從哪一個位置開始消費。
setStartFromEarliest,從kafka最先的位置開始讀取。
setStartFromLatest,從kafka最新的位置開始讀取。
setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka時戳,是指kafka爲每條消息增長另外一個時戳。該時戳能夠表示消息在proudcer端生成時的時間、或進入到kafka broker時的時間。
setStartFromSpecificOffsets,從指定分區的offset位置開始讀取,如指定的offsets中不存某個分區,該分區從group offset位置開始讀取。此時須要用戶給定一個具體的分區、offset的集合。
一些具體的使用方法能夠參考下圖。須要注意的是,由於flink框架有容錯機制,若是做業故障,若是做業開啓checkpoint,會從上一次checkpoint狀態開始恢復。或者在中止做業的時候主動作savepoint,啓動做業時從savepoint開始恢復。這兩種狀況下恢復做業時,做業消費起始位置是從以前保存的狀態中恢復,與上面提到跟kafka這些單獨的配置無關。
實際的生產環境中可能有這樣一些需求,好比場景一,有一個flink做業須要將五份數據聚合到一塊兒,五份數據對應五個kafka topic,隨着業務增加,新增一類數據,同時新增了一個kafka topic,如何在不重啓做業的狀況下做業自動感知新的topic。場景二,做業從一個固定的kafka topic讀數據,開始該topic有10個partition,但隨着業務的增加數據量變大,須要對kafka partition個數進行擴容,由10個擴容到20。該狀況下如何在不重啓做業狀況下動態感知新擴容的partition?
針對上面的兩種場景,首先須要在構建FlinkKafkaConsumer時的properties中設置flink.partition-discovery.interval-millis參數爲非負值,表示開啓動態發現的開關,以及設置的時間間隔。此時FLinkKafkaConsumer內部會啓動一個單獨的線程按期去kafka獲取最新的meta信息。針對場景一,還需在構建FlinkKafkaConsumer時,topic的描述能夠傳一個正則表達式描述的pattern。每次獲取最新kafka meta時獲取正則匹配的最新topic列表。針對場景二,設置前面的動態發現參數,在按期獲取kafka最新meta信息時會匹配新的partition。爲了保證數據的正確性,新發現的partition從最先的位置開始讀取。
Flink kafka consumer commit offset方式須要區分是否開啓了checkpoint。
若是checkpoint關閉,commit offset要依賴於kafka客戶端的auto commit。需設置enable.auto.commit, auto.commit.interval.ms 參數到consumer properties,就會按固定的時間間隔按期auto commit offset到kafka。
若是開啓checkpoint,這個時候做業消費的offset是Flink在state中本身管理和容錯。此時提交offset到kafka,通常都是做爲外部進度的監控,想實時知道做業消費的位置和lag狀況。此時須要setCommitOffsetsOnCheckpoints爲true來設置當checkpoint成功時提交offset到kafka。此時commit offset的間隔就取決於checkpoint的間隔,因此此時從kafka一側看到的lag可能並不是徹底實時,若是checkpoint間隔比較長lag曲線可能會是一個鋸齒狀。
咱們知道當flink做業內使用EventTime屬性時,須要指定從消息中提取時戳和生成水位的函數。FlinkKakfaConsumer構造的source後直接調用assignTimestampsAndWatermarks函數設置水位生成器的好處是此時是每一個partition一個watermark assigner,以下圖。source生成的睡戳爲多個partition時戳對齊後的最小時戳。此時在一個source讀取多個partition,而且partition之間數據時戳有必定差距的狀況下,由於在source端watermark在partition級別有對齊,不會致使數據讀取較慢partition數據丟失。
使用FlinkKafkaProducer往kafka中寫數據時,若是不單獨設置partition策略,會默認使用FlinkFixedPartitioner,該partitioner分區的方式是task所在的併發id對topic 總partition數取餘:parallelInstanceId % partitions.length。此時若是sink爲4,paritition爲1,則4個task往同一個partition中寫數據。但當sink task< partition 個數時會有部分partition沒有數據寫入,例如sink task爲2,partition總數爲4,則後面兩個partition將沒有數據寫入。若是構建FlinkKafkaProducer時,partition設置爲null,此時會使用kafka producer默認分區方式,非key寫入的狀況下,使用round-robin的方式進行分區,每一個task都會輪訓的寫下游的全部partition。該方式下游的partition數據會比較均衡,可是缺點是partition個數過多的狀況下維持過多的網絡連接,即每一個task都會維持跟全部partition所在broker的連接。
Flink kafka 0九、010版本下,經過setLogFailuresOnly爲false,setFlushOnCheckpoint爲true,能達到at-least-once語義。setLogFailuresOnly,默認爲false,是控制寫kafka失敗時,是否只打印失敗的log不拋異常讓做業中止。setFlushOnCheckpoint,默認爲true,是控制是否在checkpoint時fluse數據到kafka,保證數據已經寫到kafka。不然數據有可能還緩存在kafka 客戶端的buffer中,並無真正寫出到kafka,此時做業掛掉數據即丟失,不能作到至少一次的語義。
Flink kafka 011版本下,經過兩階段提交的sink結合kafka事務的功能,能夠保證端到端精準一次。詳細原理能夠參考:https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka。
(1)在flink consumer的並行度的設置:是對應topic的partitions個數嗎?要是有多個主題數據源,並行度是設置成整體的partitions數嗎?答:這個並非絕對的,跟topic的數據量也有關,若是數據量不大,也能夠設置小於partitions個數的併發數。但不要設置併發數大於partitions總數,由於這種狀況下某些併發由於分配不到partition致使沒有數據處理。
(2)若是 partitioner 傳 null 的時候是 round-robin 發到每個partition?若是有 key 的時候行爲是 kafka 那種按照 key 分佈到具體分區的行爲嗎?答:若是在構造FlinkKafkaProducer時,若是沒有設置單獨的partitioner,則默認使用FlinkFixedPartitioner,此時不管是帶key的數據,仍是不帶key。若是主動設置partitioner爲null時,不帶key的數據會round-robin的方式寫出,帶key的數據會根據key,相同key數據分區的相同的partition,若是key爲null,再輪詢寫。不帶key的數據會輪詢寫各partition。
(3)若是checkpoint時間過長,offset未提交到kafka,此時節點宕機了,重啓以後的重複消費如何保證呢?首先開啓checkpoint時offset是flink經過狀態state管理和恢復的,並非從kafka的offset位置恢復。在checkpoint機制下,做業從最近一次checkpoint恢復,自己是會回放部分歷史數據,致使部分數據重複消費,Flink引擎僅保證計算狀態的精準一次,要想作到端到端精準一次須要依賴一些冪等的存儲系統或者事務操做。
微信公衆號:zhisheng
另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號(zhisheng)了,你能夠回覆關鍵字:Flink 便可無條件獲取到。另外也能夠加我微信 你能夠加個人微信:yuanblog_tzs,探討技術!
更多私密資料請加入知識星球!
之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客
一、Flink 從0到1學習 —— Apache Flink 介紹
二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
三、Flink 從0到1學習 —— Flink 配置文件詳解
四、Flink 從0到1學習 —— Data Source 介紹
五、Flink 從0到1學習 —— 如何自定義 Data Source ?
六、Flink 從0到1學習 —— Data Sink 介紹
七、Flink 從0到1學習 —— 如何自定義 Data Sink ?
八、Flink 從0到1學習 —— Flink Data transformation(轉換)
九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows
十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解
十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch
十二、Flink 從0到1學習 —— Flink 項目如何運行?
1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka
1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置
1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹
1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL
1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ
1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase
1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS
20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis
2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra
2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume
2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB
2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ
2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了
2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了
2八、Flink 從0到1學習 —— Flink 中如何管理配置?
2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?
30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文
3二、爲何說流處理即將來?
3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫
3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理
3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了
40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
4二、Flink 從0到1學習 —— 如何使用 Side Output 來分流?
四、Flink 源碼解析 —— standalone session 模式啓動流程
五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動
六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動
七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程
八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程
九、Flink 源碼解析 —— 如何獲取 JobGraph?
十、Flink 源碼解析 —— 如何獲取 StreamGraph?
十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?
十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?
1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程
1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程
1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制
1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制
1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?
1八、Flink Metrics 源碼解析 —— Flink-metrics-core
1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog
20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard
2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite
2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb
2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx
2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j
2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd
2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus
2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?
30、Flink Clients 源碼解析原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng