Flink實戰(八) - Streaming Connectors 編程

1 概覽

1.1 預約義的源和接收器

Flink內置了一些基本數據源和接收器,而且始終可用。該預約義的數據源包括文件,目錄和插socket,並從集合和迭代器攝取數據。該預約義的數據接收器支持寫入文件和標準輸入輸出及socket。html

1.2 綁定鏈接器

鏈接器提供用於與各類第三方系統鏈接的代碼。目前支持這些系統:數據庫

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

要在應用程序中使用其中一個鏈接器,一般須要其餘第三方組件,例如數據存儲或消息隊列的服務器。apache

雖然本節中列出的流鏈接器是Flink項目的一部分,而且包含在源版本中,但它們不包含在二進制分發版中。

1.3 Apache Bahir中的鏈接器

Flink的其餘流處理鏈接器正在經過Apache Bahir發佈,包括:bootstrap

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

1.4 其餘鏈接到Flink的方法

1.4.1 經過異步I / O進行數據渲染

使用鏈接器不是將數據輸入和輸出Flink的惟一方法。一種常見的模式是在一個Map或多個FlatMap 中查詢外部數據庫或Web服務以渲染主數據流。segmentfault

Flink提供了一個用於異步I / O的API, 以便更有效,更穩健地進行這種渲染。windows

1.4.2 可查詢狀態

當Flink應用程序將大量數據推送到外部數據存儲時,這可能會成爲I / O瓶頸。若是所涉及的數據具備比寫入更少的讀取,則更好的方法能夠是外部應用程序從Flink獲取所需的數據。在可查詢的狀態界面,容許經過Flink被管理的狀態,按須要查詢支持這個。後端

2 HDFS鏈接器

此鏈接器提供一個Sink,可將分區文件寫入任一Hadoop文件系統支持的文件系統 。安全

  • 要使用此鏈接器,請將如下依賴項添加到項目中:


請注意,流鏈接器當前不是二進制發佈的一部分

2.1 Bucketing File Sink

能夠配置分段行爲以及寫入,但咱們稍後會介紹。這是能夠建立一個默認狀況下彙總到按時間拆分的滾動文件的存儲槽的方法服務器

  • Java

  • Scala

惟一必需的參數是存儲桶的基本路徑。能夠經過指定自定義bucketer,寫入器和批量大小來進一步配置接收器。架構

默認狀況下,當數據元到達時,分段接收器將按當前系統時間拆分,並使用日期時間模式"yyyy-MM-dd--HH"命名存儲區。這種模式傳遞給 DateTimeFormatter使用當前系統時間和JVM的默認時區來造成存儲桶路徑。用戶還能夠爲bucketer指定時區以格式化存儲桶路徑。每當遇到新日期時,都會建立一個新存儲桶。

例如,若是有一個包含分鐘做爲最精細粒度的模式,將每分鐘得到一個新桶。每一個存儲桶自己都是一個包含多個部分文件的目錄:接收器的每一個並行實例將建立本身的部件文件,當部件文件變得太大時,接收器也會在其餘文件旁邊建立新的部件文件。當存儲桶變爲非活動狀態時,將刷新並關閉打開的部件文件。若是存儲桶最近未寫入,則視爲非活動狀態。默認狀況下,接收器每分鐘檢查一次非活動存儲桶,並關閉任何超過一分鐘未寫入的存儲桶。setInactiveBucketCheckInterval()並 setInactiveBucketThreshold()在一個BucketingSink。

也能夠經過指定自定義bucketer setBucketer()上BucketingSink。若是須要,bucketer可使用數據元或元組的屬性來肯定bucket目錄。

默認編寫器是StringWriter。這將調用toString()傳入的數據元並將它們寫入部分文件,由換行符分隔。在a setWriter() 上指定自定義編寫器使用BucketingSink。若是要編寫Hadoop SequenceFiles,可使用提供的 SequenceFileWriter,也能夠配置爲使用壓縮。

有兩個配置選項指定什麼時候應關閉零件文件並啓動新零件文件:

  • 經過設置批量大小(默認部件文件大小爲384 MB)
  • 經過設置批次滾動時間間隔(默認滾動間隔爲Long.MAX_VALUE

當知足這兩個條件中的任何一個時,將啓動新的部分文件。看以下例子:

  • Java

  • Scala

  • 這將建立一個接收器,該接收器將寫入遵循此模式的存儲桶文件:

  • Java

  • 生成結果

  • date-time是咱們從日期/時間格式獲取的字符串
  • parallel-task是並行接收器實例的索引
  • count是因爲批處理大小或批處理翻轉間隔而建立的部分文件的運行數

然而這種方式建立了太多小文件,不適合HDFS!僅供娛樂!

3 Apache Kafka鏈接器

3.1 簡介

此鏈接器提供對Apache Kafka服務的事件流的訪問。

Flink提供特殊的Kafka鏈接器,用於從/向Kafka主題讀取和寫入數據。Flink Kafka Consumer集成了Flink的檢查點機制,可提供一次性處理語義。爲實現這一目標,Flink並不徹底依賴Kafka的消費者羣體偏移跟蹤,而是在內部跟蹤和檢查這些偏移。

爲用例和環境選擇一個包(maven artifact id)和類名。對於大多數用戶來講,FlinkKafkaConsumer08(部分flink-connector-kafka)是合適的。

而後,導入maven項目中的鏈接器:

環境配置參考

3.2 ZooKeeper安裝及配置

  • 下載zk

http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.15.1.tar.gz

  • 配置系統環境

  • 修改配置數據存儲路徑

  • 啓動3.3 Kafka部署及測試假設你剛剛開始而且沒有現有的Kafka或ZooKeeper數據
因爲Kafka控制檯腳本對於基於Unix和Windows的平臺不一樣,所以在Windows平臺上使用bin windows 而不是bin /,並將腳本擴展名更改成.bat。

Step 1:下載代碼

  • 解壓

  • 配置環境變量



  • 配置服務器屬性
  • 修改日誌存儲路徑
  • 修改主機名

Step 2: 啓動服務器

Kafka使用ZooKeeper,所以若是尚未ZooKeeper服務器,則須要先啓動它。

  • 後臺模式啓動

Step 3: 建立一個主題

  • 建立topic

Step 4: 發送一些消息

Kafka附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其做爲消息發送到Kafka集羣。 默認狀況下,每行將做爲單獨的消息發送。

運行生產者,而後在控制檯中鍵入一些消息以發送到服務器。

  • 啓動生產者

Step 5: 啓動一個消費者

Kafka還有一個命令行使用者,它會將消息轉儲到標準輸出。

  • 分屏,新建消費端

  • 在不一樣的終端中運行上述每一個命令,那麼如今應該可以在生產者終端中鍵入消息並看到它們出如今消費者終端中

全部命令行工具都有其餘選項; 運行不帶參數的命令將顯示更詳細地記錄它們的使用信息。

3.4 Kafka 1.0.0+ Connector

從Flink 1.7開始,有一個新的通用Kafka鏈接器,它不跟蹤特定的Kafka主要版本。 相反,它在Flink發佈時跟蹤最新版本的Kafka。

若是您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka鏈接器。 若是使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的鏈接器。

兼容性

經過Kafka客戶端API和代理的兼容性保證,通用Kafka鏈接器與較舊和較新的Kafka代理兼容。 它與版本0.11.0或更高版本兼容,具體取決於所使用的功能。

將Kafka Connector從0.11遷移到通用(V1.10新增)

要執行遷移,請參閱升級做業和Flink版本指南

  • 在整個過程當中使用Flink 1.9或更新版本。
  • 不要同時升級Flink和操做符。
  • 確保您做業中使用的Kafka Consumer和/或Kafka Producer分配了惟一標識符(uid):
  • 使用stop with savepoint功能獲取保存點(例如,使用stop --withSavepoint)CLI命令。

用法

  • 要使用通用Kafka鏈接器,請爲其添加依賴關係:


而後實例化新源(FlinkKafkaConsumer)

Flink Kafka Consumer是一個流數據源,能夠從Apache Kafka中提取並行數據流。 使用者能夠在多個並行實例中運行,每一個實例都將從一個或多個Kafka分區中提取數據。

Flink Kafka Consumer參與了檢查點,並保證在故障期間沒有數據丟失,而且計算處理元素「剛好一次」。(注意:這些保證天然會假設Kafka自己不會丟失任何數據。)

請注意,Flink在內部將偏移量做爲其分佈式檢查點的一部分進行快照。 承諾給Kafka的抵消只是爲了使外部的進展觀與Flink對進展的見解同步。 這樣,監控和其餘工做能夠了解Flink Kafka消費者在多大程度上消耗了一個主題。

和接收器(FlinkKafkaProducer)。

除了從模塊和類名中刪除特定的Kafka版本以外,API向後兼容Kafka 0.11鏈接器。

3.5 Kafka消費者

Flink的Kafka消費者被稱爲FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供對一個或多個Kafka主題的訪問。

構造函數接受如下參數:

  • 主題名稱/主題名稱列表
  • DeserializationSchema / KeyedDeserializationSchema用於反序列化來自Kafka的數據
  • Kafka消費者的屬性。須要如下屬性:

    • 「bootstrap.servers」(以逗號分隔的Kafka經紀人名單)
    • 「zookeeper.connect」(逗號分隔的Zookeeper服務器列表)(僅Kafka 0.8須要)
    • 「group.id」消費者羣組的ID


  • 上述程序注意配置ip主機映射
  • 虛擬機hosts

  • 本地機器 hosts
  • 發送消息

  • 運行程序消費消息


Example:

  • Java

  • Scala

The DeserializationSchema

Flink Kafka Consumer須要知道如何將Kafka中的二進制數據轉換爲Java / Scala對象。

在 DeserializationSchema容許用戶指定這樣的一個架構。T deserialize(byte[] message) 爲每一個Kafka消息調用該方法,從Kafka傳遞值。

從它開始一般頗有幫助AbstractDeserializationSchema,它負責將生成的Java / Scala類型描述爲Flink的類型系統。實現vanilla的用戶DeserializationSchema須要本身實現該getProducedType(...)方法。

爲了訪問Kafka消息的鍵和值,KeyedDeserializationSchema具備如下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)

爲方便起見,Flink提供如下模式:

  • TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)建立基於Flink的模式TypeInformation。若是Flink編寫和讀取數據,這將很是有用。此模式是其餘通用序列化方法的高性能Flink替代方案。
  • JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)將序列化的JSON轉換爲ObjectNode對象,可使用objectNode.get(「field」)做爲(Int / String / ...)()從中訪問字段。KeyValue objectNode包含一個「key」和「value」字段,其中包含全部字段,以及一個可選的「元數據」字段,用於公開此消息的偏移量/分區/主題。
  • AvroDeserializationSchema它使用靜態提供的模式讀取使用Avro格式序列化的數據。它能夠從Avro生成的類(AvroDeserializationSchema.forSpecific(...))中推斷出模式,也能夠GenericRecords 使用手動提供的模式(with AvroDeserializationSchema.forGeneric(...))。此反序列化架構要求序列化記錄不包含嵌入式架構。

    • 還有一個可用的模式版本,能夠在Confluent Schema Registry中查找編寫器的模式(用於編寫記錄的 模式)。使用這些反序列化模式記錄將使用從模式註冊表中檢索的模式進行讀取,並轉換爲靜態提供的模式(經過 ConfluentRegistryAvroDeserializationSchema.forGeneric(...)或ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。

要使用此反序列化模式,必須添加如下附加依賴項:

當遇到因任何緣由沒法反序列化的損壞消息時,有兩個選項 - 從deserialize(...)方法中拋出異常將致使做業失敗並從新啓動,或者返回null以容許Flink Kafka使用者以靜默方式跳過損壞的消息。請注意,因爲使用者的容錯能力(請參閱下面的部分以獲取更多詳細信息),所以對損壞的消息執行失敗將使消費者嘗試再次反序列化消息。所以,若是反序列化仍然失敗,則消費者將在該損壞的消息上進入不間斷重啓和失敗循環。

3.6 Kafka生產者

Flink的Kafka Producer被稱爲FlinkKafkaProducer011(或010 對於Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,對於Kafka>=1.0.0的版原本說)。

它容許將記錄流寫入一個或多個Kafka主題。

自應用

  • Pro
  • 確保啓動端口

  • Pro端生產消息

  • 消費端接收

Example

  • Java

  • Scala

上面的示例演示了建立Flink Kafka Producer以將流寫入單個Kafka目標主題的基本用法。對於更高級的用法,還有其餘構造函數變體容許提供如下內容:

  • 提供自定義屬性

生產者容許爲內部的KafkaProducer提供自定義屬性配置。

  • 自定義分區程序

將記錄分配給特定分區,能夠爲FlinkKafkaPartitioner構造函數提供實現。將爲流中的每一個記錄調用此分區程序,以肯定應將記錄發送到的目標主題的確切分區。

  • 高級序列化模式

與消費者相似,生產者還容許使用調用的高級序列化模式KeyedSerializationSchema,該模式容許單獨序列化鍵和值。它還容許覆蓋目標主題,以便一個生產者實例能夠將數據發送到多個主題。

3.8 Kafka消費者開始位置配置

Flink Kafka Consumer容許配置如何肯定Kafka分區的起始位置。

  • Java

  • Scala

Flink Kafka Consumer的全部版本都具備上述明確的起始位置配置方法。

  • setStartFromGroupOffsets(默認行爲)

從group.idKafka代理(或Zookeeper for Kafka 0.8)中的消費者組(在消費者屬性中設置)提交的偏移量開始讀取分區。若是找不到分區的偏移量,auto.offset.reset將使用屬性中的設置。

  • setStartFromEarliest()/ setStartFromLatest()

從最先/最新記錄開始。在這些模式下,Kafka中的承諾偏移將被忽略,不會用做起始位置。

  • setStartFromTimestamp(long)

從指定的時間戳開始。對於每一個分區,時間戳大於或等於指定時間戳的記錄將用做起始位置。若是分區的最新記錄早於時間戳,則只會從最新記錄中讀取分區。在此模式下,Kafka中的已提交偏移將被忽略,不會用做起始位置。

還能夠指定消費者應從每一個分區開始的確切偏移量:

  • Java

  • Scala

上面的示例將使用者配置爲從主題的分區0,1和2的指定偏移量開始myTopic。偏移值應該是消費者應爲每一個分區讀取的下一條記錄。請注意,若是使用者須要讀取在提供的偏移量映射中沒有指定偏移量的分區,則它將回退到setStartFromGroupOffsets()該特定分區的默認組偏移行爲(即)。

請注意,看成業從故障中自動恢復或使用保存點手動恢復時,這些起始位置配置方法不會影響起始位置。在恢復時,每一個Kafka分區的起始位置由存儲在保存點或檢查點中的偏移量肯定。

3.9 Kafka生產者和容錯

Kafka 0.8

在0.9以前,Kafka沒有提供任何機制來保證至少一次或剛好一次的語義。

Kafka 0.9和0.10

啓用Flink的檢查點時,FlinkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次傳輸保證。

除了開啓Flink的檢查點,還應該配置setter方法:

  • setLogFailuresOnly(boolean)

默認爲false。啓用此選項將使生產者僅記錄失敗日誌而不是捕獲和從新拋出它們。這大致上就是計數已成功的記錄,即便它從未寫入目標Kafka主題。這必須設爲false對於確保 至少一次

  • setFlushOnCheckpoint(boolean)

默認爲true。啓用此函數後,Flink的檢查點將在檢查點成功以前等待檢查點時的任何動態記錄被Kafka確認。這可確保檢查點以前的全部記錄都已寫入Kafka。必須開啓,對於確保 至少一次

總之,默認狀況下,Kafka生成器對版本0.9和0.10具備至少一次保證,即

setLogFailureOnly設置爲false和setFlushOnCheckpoint設置爲true。

默認狀況下,重試次數設置爲「0」。這意味着當setLogFailuresOnly設置爲時false,生產者會當即失敗,包括Leader更改。
默認狀況下,該值設置爲「0」,以免重試致使目標主題中出現重複消息。對於常常更改代理的大多數生產環境,建議將重試次數設置爲更高的值。
Kafka目前沒有生產者事務,所以Flink在Kafka主題裏沒法保證剛好一次交付

Kafka >= 0.11

啓用Flink的檢查點後,FlinkKafkaProducer011

對於Kafka >= 1.0.0版本是FlinkKafkaProduce

能夠提供準確的一次交付保證。

除了啓用Flink的檢查點,還能夠經過將適當的語義參數傳遞給FlinkKafkaProducer011,選擇三種不一樣的算子操做模式

  • Semantic.NONE

Flink啥都不保證。生成的記錄可能會丟失,也可能會重複。

  • Semantic.AT_LEAST_ONCE(默認設置)


相似於setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。這能夠保證不會丟失任何記錄(儘管它們能夠重複)。

  • Semantic.EXACTLY_ONCE

使用Kafka事務提供剛好一次的語義。每當您使用事務寫入Kafka時,不要忘記爲任何從Kafka消費記錄的應用程序設置所需的isolation.level(read_committed 或read_uncommitted- 後者爲默認值)。

注意事項

Semantic.EXACTLY_ONCE 模式依賴於在從所述檢查點恢復以後提交在獲取檢查點以前啓動的事務的能力。若是Flink應用程序崩潰和完成重啓之間的時間較長,那麼Kafka的事務超時將致使數據丟失(Kafka將自動停止超過超時時間的事務)。考慮到這一點,請根據預期的停機時間適當配置事務超時。

Kafka broker默認 transaction.max.timeout.ms 設置爲15分鐘。此屬性不容許爲生產者設置大於其值的事務超時。

FlinkKafkaProducer011默認狀況下,將_transaction.timeout.msproducer config_中的屬性設置爲1小時,所以_transaction.max.timeout.ms_在使用 Semantic.EXACTLY_ONCE 模式以前應該增長 該屬性。

在_read_committed_模式中KafkaConsumer,任何未完成的事務(既不停止也不完成)將阻止來自給定Kafka主題的全部讀取超過任何未完成的事務。換言之,遵循如下事件順序:

  1. 用戶事務1開啓並寫記錄
  2. 用戶事務2開啓並寫了一些其餘記錄
  3. 用戶提交事務2

即便事務2已經提交了記錄,在事務1提交或停止以前,消費者也不會看到它們。這有兩個含義:

  • 首先,在Flink應用程序的正常工做期間,用戶能夠預期Kafka主題中生成的記錄的可見性會延遲,等於已完成檢查點之間的平均時間。
  • 其次,在Flink應用程序失敗的狀況下,讀者將阻止此應用程序編寫的主題,直到應用程序從新啓動或配置的事務超時時間過去爲止。此註釋僅適用於有多個代理/應用程序寫入同一Kafka主題的狀況。
Semantic.EXACTLY_ONCE 模式爲每一個FlinkKafkaProducer011實例使用固定大小的KafkaProducers池。每一個檢查點使用其中一個生產者。若是併發檢查點的數量超過池大小,FlinkKafkaProducer011 將引起異常並將使整個應用程序失敗。請相應地配置最大池大小和最大併發檢查點數。
Semantic.EXACTLY_ONCE 採起全部可能的措施,不要留下任何阻礙消費者閱讀Kafka主題的延遲事務,這是必要的。可是,若是Flink應用程序在第一個檢查點以前失敗,則在從新啓動此類應用程序後,系統中沒有關於先前池大小的信息。所以,在第一個檢查點完成以前按比例縮小Flink應用程序是不安全的 _ FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR_。

3.10 Kafka消費者及其容錯

啓用Flink的檢查點後,Flink Kafka Consumer將使用主題中的記錄,並以一致的方式按期檢查其全部Kafka偏移以及其餘 算子操做的狀態。若是做業失敗,Flink會將流式程序恢復到最新檢查點的狀態,並從存儲在檢查點中的偏移量開始從新使用來自Kafka的記錄。

所以,繪製檢查點的間隔定義了程序在發生故障時最多能夠返回多少。

檢查點經常使用參數

enableCheckpointing

啓用流式傳輸做業的檢查點。 將按期快照流式數據流的分佈式狀態。 若是發生故障,流數據流將從最新完成的檢查點從新啓動。

該做業在給定的時間間隔內按期繪製檢查點。 狀態將存儲在配置的狀態後端。

此刻未正確支持檢查點迭代流數據流。 若是「force」參數設置爲true,則系統仍將執行做業。

setCheckpointingMode

setCheckpointTimeout

setMaxConcurrentCheckpoints

要使用容錯的Kafka使用者,須要在運行環境中啓用拓撲的檢查點:

  • Scala

  • Java

另請注意,若是有足夠的處理插槽可用於從新啓動拓撲,則Flink只能從新啓動拓撲。所以,若是拓撲因爲丟失了TaskManager而失敗,那麼以後仍然必須有足夠的可用插槽。YARN上的Flink支持自動重啓丟失的YARN容器。

若是未啓用檢查點,Kafka使用者將按期向Zookeeper提交偏移量。

參考

Streaming Connectors

Kafka官方文檔

相關文章
相關標籤/搜索