Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink

https://mp.weixin.qq.com/s?__biz=MzU1NDA4NjU2MA==&mid=2247492538&idx=2&sn=9a2bd9fe2d7fd681c10ebd368ef81c9c&chksm=fbea5a75cc9dd3636c148ebe6e296621d0c07132938a62f0b3643f34af414b3fd85e616e754b&scene=0&key=f9325dcb38245ddcc4d3ff16d58d0602dc8b5a680b011fb377597865a73a9fe39fb1a37a2ac92b1374a6e89170f0e7d366ebdfd286651f10ac319eb028a94291f051cce5ac3c287c1ab69751b6dc19cd&ascene=1&uin=MjgwMTEwNDQxNg%3D%3D&devicetype=Windows-QQBrowser&version=6103000b&lang=zh_CN&pass_ticket=DXC1954%2BK1SGTbNf0BfROhv9qHwlnnEPi%2BhWkN5VYUoPmHizjz4O33VIful%2FVDWv


更多優質內容請關注微信公衆號「AI 前線」(ID:ai-front)

關於 Apache Beam 實戰指南系列文章前端

隨着大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者常常要用到不一樣的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增長了選擇合適工具和框架的難度,開發者想要將全部的大數據組件熟練運用幾乎是一項不可能完成的任務。java

面對這種狀況,Google 在 2016 年 2 月宣佈將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣佈開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺少了解,社區中文資料也比較少。InfoQ 指望經過 Apache Beam 實戰指南系列文章 推進 Apache Beam 在國內的普及。git

一.概述

大數據發展趨勢從普通的大數據,發展成 AI 大數據,再到下一代號稱萬億市場的 lOT 大數據。技術也隨着時代的變化而變化,從 Hadoop 的批處理,到 Spark Streaming,以及流批處理的 Flink 的出現,整個大數據架構也在逐漸演化。github

Apache Beam 做爲新生技術,在這個時代會扮演什麼樣的角色,跟 Flink 之間的關係是怎樣的?Apache Beam 和 Flink 的結合會給大數據開發者或架構師們帶來哪些意想不到的驚喜呢?web

二.大數據架構發展演進歷程2.1 大數據架構 Hadoop

圖 2-1 MapReduce 流程圖數據庫

最初作大數據是把一些日誌或者其餘信息收集後寫入 Hadoop 的 HDFS 系統中,若是運營人員須要報表,則利用 Hadoop 的 MapReduce 進行計算並輸出,對於一些非計算機專業的統計人員,後期能夠用 Hive 進行統計輸出。apache

2.2 流式處理 Storm

圖 2-2Storm 流程圖後端

業務進一步發展,運營人員須要看到實時數據的展現或統計。例如電商網站促銷的時候,用於統計用戶實時交易數據。數據收集也使用 MQ,用流式 Storm 解決這一業務需求問題。centos

2.3 Spark 批處理和微批處理

圖 2-3 Spark 流程圖api

業務進一步發展,服務前端加上了網關進行負載均衡,消息中心也換成了高吞吐量的輕量級 MQ Kafka,數據處理漸漸從批處理髮展到微批處理。

2.4 Flink:真正的流批處理統一

圖 2-4 Flink 流程圖

隨着 AI 和 loT 的發展,對於傳感設備的信息、報警器的警情以及視頻流的數據量微批計算引擎已經知足不了業務的需求,Flink 實現真正的流處理讓警情更實時。

2.5 下一代大數據處理統一標準 Apache Beam

圖 2-5 Apache Beam 流程圖

BeamSDKs 封裝了不少的組件 IO,也就是圖左邊這些重寫的高級 API,使不一樣的數據源的數據流向後面的計算平臺。經過將近一年的發展,Apache Beam 不光組件 IO 更加豐富了,而且計算平臺在當初最基本的 Apache Apex、Direct Runner、Apache Flink、Apache Spark、Google Cloud Dataflow 之上,又增長了 Gearpump、Samza 以及第三方的 JStorm 等計算平臺。

爲何說 Apache Beam 會是大數據處理統一標準呢?

由於不少如今大型公司都在創建本身的「大中臺」,創建統一的數據資源池,打通各個部門以及子公司的數據,以解決信息孤島問題,把這些數據進行集中式管理而且進行後期的數據分析、BI、AI 以及機器學習等工做。這種狀況下會出現不少數據源,例如以前用的 MySQL、MongodDB、HDFS、HBase、Solr 等,若是想創建中臺就會是一件使人很是苦惱的事情,而且多計算環境更是讓技術領導頭疼。Apache Beam 的出現正好迎合了這個時代的新需求,它集成了不少數據庫經常使用的數據源並把它們封裝成 SDK 的 IO,開發人員不必深刻學習不少技術,只要會寫 Beam 程序就能夠了,大大節省了人力、時間以及成本。

三.Apache Beam 和 Flink 的關係

隨着阿里巴巴 Blink 的開源,Flink 中國社區開始活躍起來。不少人會開始對各類計算平臺進行對比,好比 Storm、Spark、JStorm、Flink 等,而且有人提到以前阿里巴巴開源的 JStorm 比 Flink 性能高出 10-15 倍,爲何阿里巴巴卻轉戰基於 Flink 的 Blink 呢? 在最近 Flink 的線下技術會議上,阿里巴巴的人已經回答了這一問題。其實不少技術都是從業務實戰出來的,隨着業務的發展可能還會有更多的計算平臺出現,沒有必要對此過多糾結。

不過,既然你們最近討論得這麼火熱,這裏也列出一些最近問的比較多的、有表明性的關於 Beam 的問題,逐一進行回答。

 1. Flink 支持 SQL,請問 Beam 支持嗎?

如今 Beam 是支持 SQL 處理的,底層技術跟 Flink 底層處理是同樣的。

Beam SQL 如今只支持 Java,底層是 Apache Calcite 的一個動態數據管理框架,用於大數據處理和一些流加強功能,它容許你自定義數據庫功能。例如 Hive 使用了 Calcite 的查詢優化,固然還有 Flink 解析和流 SQL 處理。Beam 在這之上添加了額外的擴展,以便輕鬆利用 Beam 的統一批處理 / 流模型以及對複雜數據類型的支持。 如下是 Beam SQL 具體處理流程圖:

Beam SQL 一共有兩個比較重要的概念:

SqlTransform:用於 PTransforms 從 SQL 查詢建立的接口。

Row:Beam SQL 操做的元素類型。例如:PCollection。

在將 SQL 查詢應用於 PCollection 以前,集合中 Row 的數據格式必需要提早指定。 一旦 Beam SQL 指定了 管道中的類型是不能再改變的。PCollection 行中字段 / 列的名稱和類型由 Schema 進行關聯定義。您可使用 Schema.builder() 來建立 Schemas。

示例:

// Define the schema for the records.
Schema appSchema = 
 Schema
   .builder()
   .addInt32Field("appId")
   .addStringField("description")
   .addDateTimeField("rowtime")
   .build();
// Create a concrete row with that type.
Row row = 
 Row
   .withSchema(appSchema)
   .addValues(1, "Some cool app", new Date())
   .build();
// Create a source PCollection containing only that row
PCollection<Row> testApps = 
 PBegin
   .in(p)
   .apply(Create
             .of(row)
             .withCoder(appSchema.getRowCoder()));

也能夠是其餘類型,不是直接是 Row,利用 PCollection經過應用 ParDo 能夠將輸入記錄轉換爲 Row 格式。如:

 

// An example POJO class.
class AppPojo {
Integer appId;
String description;
Date timestamp;
}
// Acquire a collection of POJOs somehow.
PCollection<AppPojo> pojos = ...
// Convert them to Rows with the same schema as defined above via a DoFn.
PCollection<Row> apps = pojos
.apply(
    ParDo.of(new DoFn<AppPojo, Row>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        // Get the current POJO instance
        AppPojo pojo = c.element();
        // Create a Row with the appSchema schema 
          // and values from the current POJO
          Row appRow = 
                Row
                  .withSchema(appSchema)
                 .addValues(
                    pojo.appId, 
                    pojo.description, 
                   pojo.timestamp)
                  .build();
        // Output the Row representing the current POJO
        c.output(appRow);
      }
    }));
 2. Flink 有並行處理,Beam 有嗎?

Beam 在抽象 Flink 的時候已經把這個參數抽象出來了,在 Beam Flink 源碼解析中會提到。

 3. 我這裏有個流批混合的場景,請問 Beam 是否是支持?

這個是支持的,由於批也是一種流,是一種有界的流。Beam 結合了 Flink,Flink dataset 底層也是轉換成流進行處理的。

 4. Flink 流批寫程序的時候和 Beam 有什麼不一樣?底層是 Flink 仍是 Beam?

打個比喻,若是 Flink 是 Lucene,那麼 Beam 就是 Solr,把 Flink 的 API 進行二次重寫,簡化了 API,讓你們使用更簡單、更方便。此外,Beam 提供了更多的數據源,這是 Flink 不能比的。固然,Flink 後期可能也會往這方面發展。

四.Apache Beam KafkaIO 源碼剖析KafkaIO 對 kafka-clients 支持依賴狀況

KafkaIO 是 Kafka 的 API 封裝,主要負責 Apache Kafka 讀取和寫入消息。若是想使用 KafkaIO,必須依賴 beam-sdks-java-io-kafka ,KafkaIO 同時支持多個版本的 Kafka 客戶端,使用時建議用高版本的或最新的 Kafka 版本,由於使用 KafkaIO 的時候須要包含 kafka-clients 的依賴版本。

Apache Beam  KafkaIO 對各個 kafka-clients 版本的支持狀況以下表:

表 4-1  KafkaIO 與 kafka-clients 依賴關係表

Apache Beam V2.1.0 版本以前源碼中的 pom 文件都顯式指定了特定的 0.9.0.1 版本支持,可是從 V2.1.0 版本和 V2.1.1 兩個版本開始已經替換成了 kafka-clients 的 0.10.1.0 版本,而且源碼中提示 0.10.1.0 版本更安全。這是由於去年 Kafka 0.10.1.0 以前的版本曝出了安全漏洞。在 V2.2.0 之後的版本中,Beam 對 API 作了調整和更新,對以前的兩種版本都支持,不過須要在 pom 中引用的時候本身指定 Kafka 的版本。可是在 Beam V2.5.0 和 V2.6.0 版本,源碼中添加了如下提示:

<h3>Supported Kafka Client Versions</h3>
* KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
<i>kafka-clients</i> versions 0.10.and newer are supported at runtime. The older versions
0.9.x - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future.
* Please ensure that the version included with the application is compatible with the version of
* your Kafka cluster. Kafka client usually fails to initialize with a clear error message in
* case of incompatibility.
*/

也就說在這兩個版本已經移除了對 Kafka 客戶端 0.10.1.0 之前版本的支持,舊版本還會支持,可是在之後不久就會刪除。因此你們在使用的時候要注意版本的依賴關係和客戶端的版本支持度。

若是想使用 KafkaIO,pom 必需要引用,版本跟 4-1 表中的對應起來就能夠了。

 

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>...</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>a_recent_version</version>
<scope>runtime</scope>
</dependency>
KafkaIO 讀寫源碼解析

KafkaIO 源碼連接以下:

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

在 KafkaIO 裏面最主要的兩個方法是 Kafka 的讀寫方法。

 KafkaIO 讀操做

 

pipeline.apply(KafkaIO.<Long, String>read() 
      .withBootstrapServers("broker_1:9092,broker_2:9092")// 
      .withTopic("my_topic")    // use withTopics(List<String>) to read from multiple topics. 
      .withKeyDeserializer(LongDeserializer.class) 
     .withValueDeserializer(StringDeserializer.class) 
     // Above four are required configuration. returns   PCollection<KafkaRecord<Long, String>> 
     // Rest of the settings are optional :  
      // you can further customize KafkaConsumer used to read the records by   adding more 
     // settings for ConsumerConfig. e.g : 
     .updateConsumerProperties(ImmutableMap.of("group.id",   "my_beam_app_1")) 

      // set event times and watermark based on 'LogAppendTime'. To provide   a custom 
     // policy see withTimestampPolicyFactory(). withProcessingTime() is   the default. 
     // Use withCreateTime() with topics that have 'CreateTime' timestamps. 
      .withLogAppendTime()  
     // restrict reader to committed messages on Kafka (see method   documentation). 
      .withReadCommitted()  
      // offset consumed by the pipeline can be committed back. 
      .commitOffsetsInFinalize()  
      // finally, if you don't need Kafka metadata, you can drop it.g 
     .withoutMetadata() // PCollection<KV<Long, String>> 
   ) 
   .apply(Values.<String>create()) // PCollection<String>

1) 指定 KafkaIO 的模型,從源碼中不難看出這個地方的 KafkaIO<k,v>類型是 Long 和 String 類型,也能夠換成其餘類型。

 

pipeline.apply(KafkaIO.<Long, String>read()

2) 設置 Kafka 集羣的集羣地址。

 

.withBootstrapServers("broker_1:9092,broker_2:9092")

3) 設置 Kafka 的主題類型,源碼中使用了單個主題類型,若是是多個主題類型則用 withTopics(List) 方法進行設置。設置狀況基本跟 Kafka 原生是同樣的。

 

.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.

4) 設置序列化類型。Apache Beam KafkaIO 在序列化的時候作了很大的簡化,例如原生 Kafka 可能要經過 Properties 類去設置 ,還要加上很長一段 jar 包的名字。

Beam KafkaIO 的寫法:

 

.withKeyDeserializer(LongDeserializer.class) 
.withValueDeserializer(StringDeserializer.class)

原生 Kafka 的設置:

 

Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");

5) 設置 Kafka 的消費者屬性,這個地方還能夠設置其餘的屬性。源碼中是針對消費分組進行設置。

 

.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))

6) 設置 Kafka 吞吐量的時間戳,能夠是默認的,也能夠自定義。

 

.withLogAppendTime()

7) 至關於 Kafka 中"isolation.level", "read_committed" ,指定 KafkaConsumer 只應讀取非事務性消息,或從其輸入主題中提交事務性消息。流處理應用程序一般在多個讀取處理寫入階段處理其數據,每一個階段使用前一階段的輸出做爲其輸入。經過指定 read_committed 模式,咱們能夠在全部階段完成一次處理。針對"Exactly-once" 語義,支持 Kafka 0.11 版本。

.withReadCommitted()

8) 設置 Kafka 是否自動提交屬性"AUTO_COMMIT",默認爲自動提交,使用 Beam 的方法來設置。

 

set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()

9) 設置是否返回 Kafka 的其餘數據,例如 offset 信息和分區信息,不用能夠去掉。

 

.withoutMetadata() // PCollection<KV<Long, String>>

10) 設置只返回 values 值,不用返回 key。例如  PCollection,而不是 PCollection<long,string>。

 

.apply(Values.<String>create()) // PCollection<String>
 KafkaIO 寫操做

寫操做跟讀操做配置基本類似,咱們看一下具體代碼。

 

PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
    .withBootstrapServers("broker_1:9092,broker_2:9092")
    .withTopic("results")
    .withKeySerializer(LongSerializer.class)
    .withValueSerializer(StringSerializer.class)
    // You can further customize KafkaProducer used to write the records by adding more
    // settings for ProducerConfig. e.g, to enable compression :
    .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
   // You set publish timestamp for the Kafka records.
    .withInputTimestamp() // element timestamp is used while publishing to Kafka
    // or you can also set a custom timestamp with a function.
    .withPublishTimestampFunction((elem, elemTs) -> ...)
    // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
    .withEOS(20, "eos-sink-group-id");
 );

下面這個是 Kafka 裏面比較重要的一個屬性設置,在 Beam 中是這樣使用的,很是簡單,可是要注意這個屬性.withEOS 其實就是 Kafka 中"Exactly-once"。

 

.withEOS(20, "eos-sink-group-id");

在寫入 Kafka 時徹底一次性地提供語義,這使得應用程序可以在 Beam 管道中的一次性語義之上提供端到端的一次性保證。它確保寫入接收器的記錄僅在 Kafka 上提交一次,即便在管道執行期間重試某些處理也是如此。重試一般在應用程序從新啓動時發生(如在故障恢復中)或者在從新分配任務時(如在自動縮放事件中)。Flink runner 一般爲流水線的結果提供精確一次的語義,但不提供變換中用戶代碼的反作用。若是諸如 Kafka 接收器之類的轉換寫入外部系統,則這些寫入可能會屢次發生。

在此處啓用 EOS 時,接收器轉換將兼容的 Beam Runners 中的檢查點語義與 Kafka 中的事務聯繫起來,以確保只寫入一次記錄。因爲實現依賴於 runners checkpoint 語義,所以並不是全部 runners 都兼容。Beam 中 FlinkRunner 針對 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 若是操做 kafkaIO 是徹底支持的。

 關於性能的注意事項

"Exactly-once" 在接收初始消息的時候,除了將原來的數據進行格式化轉換外,還經歷了 2 個序列化 - 反序列化循環。根據序列化的數量和成本,CPU 可能會漲的很明顯。經過寫入二進制格式數據(即在寫入 Kafka 接收器以前將數據序列化爲二進制數據)能夠下降 CPU 成本。

 關於參數

numShards——設置接收器並行度。存儲在 Kafka 上的狀態元數據,使用 sinkGroupId 存儲在許多虛擬分區中。一個好的經驗法則是將其設置爲 Kafka 主題中的分區數。

sinkGroupId——用於在 Kafka 上將少許狀態存儲爲元數據的組 ID。它相似於與 KafkaConsumer 一塊兒使用的使用 groupID。每一個做業都應使用惟一的 groupID,以便從新啓動 / 更新做業保留狀態以確保一次性語義。狀態是經過 Kafka 上的接收器事務原子提交的。有關更多信息,請參閱 KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期間執行多個健全性檢查以捕獲常見錯誤,以便它不會最終使用彷佛不是由同一做業寫入的狀態。

五.Apache Beam Flink 源碼剖析FlinkRunner 對 Flink 支持依賴狀況

Flink 是一個流和批處理的統一的計算框架,Apache Beam 跟 Flink API 作了無縫集成。在 Apache Beam 中對 Flink 的操做主要是 FlinkRunner.java,Apache Beam 支持不一樣版本的 flink 客戶端。我根據不一樣版本列了一個 Flink 對應客戶端支持表以下:

圖 5-1  FlinkRunner 與 Flink 依賴關係表

從圖 5-1 中能夠看出,Apache Beam 對 Flink 的 API 支持的更新速度很是快,從源碼能夠看到 2.0.0 版本以前的 FlinkRunner 是很是 low 的,而且直接拿 Flink 的實例作爲 Beam 的實例,封裝的效果也比較差。可是從 2.0.0 版本以後 ,Beam 就像打了雞血同樣 API 更新速度特別快,拋棄了之前的冗餘,更好地跟 Flink 集成,讓人眼前一亮。

Apache Beam  Flink 源碼解析

由於 Beam 在運行的時候都是顯式指定 Runner,在 FlinkRunner 源碼中只是成了簡單的統一入口,代碼很是簡單,可是這個入口中有一個比較關鍵的接口類 FlinkPipelineOptions。

請看代碼:

 

/** Provided options. */
private final FlinkPipelineOptions options;

經過這個類咱們看一下 Apache Beam 到底封裝了哪些 Flink 方法。

首先 FlinkPipelineOptions 是一個接口類,可是它繼承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三個接口類,第一個 PipelineOptions 你們應該很熟悉了,用於基本管道建立;第二個 ApplicationNameOptions 用於設置應用程序名字;第三個用於判斷是流式數據仍是批數據。源代碼以下:

 

public interface FlinkPipelineOptions  extends 
PipelineOptions, ApplicationNameOptions, StreamingOptions {
//....
}

1) 設置 Flink Master 方法 ,這個方法用於設置 Flink 集羣地址的 Master 地址。能夠填寫 IP 和端口,或者是 hostname 和端口,默認 local 。固然測試也能夠是單機的,在 Flink 1.4 利用 start-local.sh 啓動,而到了 1.5 以上就去掉了這個腳本,本地直接換成了 start-cluster.sh。你們測試的時候須要注意一下。

 

/**
* The url of the Flink JobManager on which to execute pipelines.   This can either be the the   * address of a cluster JobManager, in the form "host:port" or one of the special Strings  * "[collection]" will execute the pipeline on Java Collections while "[auto]" will let the system
*/
@Description( "Address of the Flink Master where the Pipeline should  be executed. Can"+ "[collection] or [auto].")
void setFlinkMaster(String value);

2) 設置 Flink 的並行數,屬於 Flink 高級 API 裏面的屬性。設置合適的 parallelism 能提升運算效率,太多了和太少了都不行。設置 parallelism 有多種方式,優先級爲 api>env>p>file。

 

@Description("The degree of parallelism to be used when distributing operations onto workers.")
@Default.InstanceFactory(DefaultParallelismFactory.class)
Integer getParallelism();
void setParallelism(Integer value);

3) 設置連續檢查點之間的間隔時間(即當前的快照)用於容錯的管道狀態。

 

@Description("The interval between consecutive checkpoints (i.e.  snapshots of the current"
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval(Long interval)

4) 定義一致性保證的檢查點模式,默認爲"AT_LEAST_ONCE",在 Beam 的源碼中定義了一個枚舉類 CheckpointingMode,除了默認的"AT_LEAST_ONCE",還有"EXACTLY_ONCE"。

"AT_LEAST_ONCE":這個模式意思是系統將以一種更簡單地方式來對 operator 和 udf 的狀態進行快照:在失敗後進行恢復時,在 operator 的狀態中,一些記錄可能會被重放屢次。

"EXACTLY_ONCE":這種模式意思是系統將以以下語義對 operator 和 udf(user defined function) 進行快照:在恢復時,每條記錄將在 operator 狀態中只被重現 / 重放一次。

 

@Description("The checkpointing mode that defines consistency guarantee.")
@Default.Enum("AT_LEAST_ONCE")
CheckpointingMode getCheckpointingMode();
void setCheckpointingMode(CheckpointingMode mode);

5) 設置檢查點的最大超時時間,默認爲 20*60*1000(毫秒)=20(分鐘)。

 

@Description("The maximum time that a checkpoint may take before being discarded.")
@Default.Long(20 * 60 * 1000)
Long getCheckpointTimeoutMillis();
void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);

6) 設置從新執行失敗任務的次數,值爲 0 有效地禁用容錯,值爲 -1 表示使用系統默認值(在配置中定義)。

 

@Description(
"Sets the number of times that failed tasks are re-executed. "
"A value of zero effectively disables fault tolerance. A value of -1 indicates "+ "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries(Integer retries);

7) 設置執行之間的延遲,默認值爲 -1L。

 

@Description(
    "Sets the delay between executions. A value of {@code -1} "
        + "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);

8) 設置重用對象的行爲。

 

@Description("Sets the behavior of reusing objects.")
@Default.Boolean(false)
Boolean getObjectReuse();
void setObjectReuse(Boolean reuse);

9) 設置狀態後端在計算期間存儲 Beam 的狀態,不設置從配置文件中讀取默認值。注意:僅在執行時適用流媒體模式。

 

@Description("Sets the state backend to use in streaming mode. "
@JsonIgnore
AbstractStateBackend getStateBackend();
void setStateBackend(AbstractStateBackend stateBackend);

10) 在 Flink Runner 中啓用 / 禁用 Beam 指標。

 

@Description("Enable/disable Beam metrics in Flink Runner")
@Default.Boolean(true)
BooleangetEnableMetrics();
voidsetEnableMetrics(BooleanenableMetrics);

11) 啓用或禁用外部檢查點,與 CheckpointingInterval 一塊兒使用。

 

@Description(
"Enables or disables externalized checkpoints."
+"Works in conjunction with CheckpointingInterval")
@Default.Boolean(false)
BooleanisExternalizedCheckpointsEnabled();
voidsetExternalizedCheckpointsEnabled(BooleanexternalCheckpoints);

12) 設置當他們的 Wartermark 達到 + Inf 時關閉源,Watermark 在 Flink 中其中一個做用是根據時間戳作單節點排序,Beam 也是支持的。

 

@Description("If set, shutdown sources when their watermark reaches +Inf.")
@Default.Boolean(false)
BooleanisShutdownSourcesOnFinalWatermark();
voidsetShutdownSourcesOnFinalWatermark(BooleanshutdownOnFinalWatermark);

剩餘兩個部分這裏再也不進行翻譯,留給你們去看源碼。

六. KafkaIO 和 Flink 實戰

本節經過解讀一個真正的 KafkaIO 和 Flink 實戰案例,幫助你們更深刻地瞭解 Apache Beam KafkaIO 和 Flink 的運用。

設計架構圖和設計思路解讀

Apache Beam 外部數據流程圖

設計思路:Kafka 消息生產程序發送 testmsg 到 Kafka 集羣,Apache Beam 程序讀取 Kafka 的消息,通過簡單的業務邏輯,最後發送到 Kafka 集羣,而後 Kafka 消費端消費消息。

Apache Beam 內部數據處理流程圖

Apache Beam 程序經過 kafkaIO 讀取 Kafka 集羣的數據,進行數據格式轉換。數據統計後,經過 KafkaIO 寫操做把消息寫入 Kafka 集羣。最後把程序運行在 Flink 的計算平臺上。

軟件環境和版本說明
  • 系統版本 centos 7

  • Kafka 集羣版本: kafka_2.10-0.10.1.1.tgz

  • Flink 版本:flink-1.5.2-bin-hadoop27-scala_2.11.tgz

Kafka 集羣和 Flink 單機或集羣配置,你們能夠去網上搜一下配置文章,操做比較簡單,這裏就不贅述了。

實踐步驟

1)新建一個 Maven 項目

2)在 pom 文件中添加 jar 引用

 

<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-sdks-java-io-kafka</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>
</dependency>
<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-runners-core-java</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-runners-flink_2.11</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_2.11</artifactId>
 <version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-runtime_2.11</artifactId>
 <version>1.5.2</version>
 <!--<scope>provided</scope>-->
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.11</artifactId>
 <version>1.5.2</version>
 <!--<scope>provided</scope>-->
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-metrics-core</artifactId>
 <version>1.5.2</version>
 <!--<scope>provided</scope>-->
</dependency>

3)新建 BeamFlinkKafka.java 類

4)編寫如下代碼:

 

public static void main(String[] args) {
// 建立管道工廠
PipelineOptions options = PipelineOptionsFactory.create(); 
// 顯式指定 PipelineRunner:FlinkRunner 必須指定若是不制定則爲本地 
options.setRunner(FlinkRunner.class); 
// 設置相關管道
Pipeline pipeline = Pipeline.create(options);
// 這裏 kV 後說明 kafka 中的 key 和 value 均爲 String 類型
PCollection<KafkaRecord<String, String>> lines = 
pipeline.apply(KafkaIO.<String, 
// 必需設置 kafka 的服務器地址和端口
String>read().withBootstrapServers("192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092")
    .withTopic("testmsg")// 必需設置要讀取的 kafka 的 topic 名稱
    .withKeyDeserializer(StringDeserializer.class)// 必需序列化 key
    .withValueDeserializer(StringDeserializer.class)// 必需序列化 value
    .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")));// 這個屬性 kafka 最多見的.
// 爲輸出的消息類型。或者進行處理後返回的消息類型
PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<String, String>, String>() { 
private static final long serialVersionUID = 1L;
 @ProcessElement
 public void processElement(ProcessContext ctx) {
  System.out.print("輸出的分區爲 ----:" + ctx.element().getKV());
  ctx.output(ctx.element().getKV().getValue());// 其實咱們這裏是把"張海     濤在發送消息 ***"進行返回操做
 }
}));
PCollection<String> windowedEvents = kafkadata.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))));
PCollection<KV<String, Long>> wordcount = windowedEvents.apply(Count.<String>perElement()); // 統計每個 kafka 消息的 Count
PCollection<String> wordtj = wordcount.apply("ConcatResultKVs", MapElements.via( // 拼接最後的格式化輸出(Key 爲 Word,Value 爲 Count)
new SimpleFunction<KV<String, Long>, String>() {
 private static final long serialVersionUID = 1L;
  @Override
  public String apply(KV<String, Long> input) {
  System.out.print("進行統計:" + input.getKey() + ": " + input.getValue());
    return input.getKey() + ": " + input.getValue();
   }
  }));
wordtj.apply(KafkaIO.<Void, String>write()  .withBootstrapServers("192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092")// 設置寫會 kafka 的集羣配置地址
  .withTopic("senkafkamsg")// 設置返回 kafka 的消息主題
  // .withKeySerializer(StringSerializer.class)// 這裏不用設置了,由於上面 Void 
  .withValueSerializer(StringSerializer.class)
  // Dataflow runner and Spark 兼容, Flink 對 kafka0.11 才支持。個人版本是 0.10 不兼容
  //.withEOS(20, "eos-sink-group-id")
  .values() // 只須要在此寫入默認的 key 就好了,默認爲 null 值
); // 輸出結果
pipeline.run().waitUntilFinish();

5)打包 jar,本示例是簡單的實戰,並無用 Docker,Apache Beam 新版本是支持 Docker 的。

6)經過 Apache Flink Dashboard 提交 job

7)查看結果

程序接收的日誌以下:

七.實戰解析

本次實戰在源碼分析中已經作過詳細解析,在這裏不作過多的描述,只選擇部分問題再重點解釋一下。此外,若是尚未入門,甚至連管道和 Runner 等概念都還不清楚,建議先閱讀本系列的第一篇文章 《Apache Beam 實戰指南之基礎入門》

1.FlinkRunner 在實戰中是顯式指定的,若是想設置參數怎麼使用呢?其實還有另一種寫法,例如如下代碼:

 

//FlinkPipelineOptions options =PipelineOptionsFactory.as(FlinkPipelineOptions.class);
//options.setStreaming(true);
//options.setAppName("app_test");
//options.setJobName("flinkjob");
//options.setFlinkMaster("localhost:6123");
//options.setParallelism(10);// 設置 flink 的並行度
// 顯式指定 PipelineRunner:FlinkRunner,必須指定,若是不指定則爲本地 
options.setRunner(FlinkRunner.class);

2.Kafka 有三種數據讀取類型,分別是 「earliest 」,「latest 」,「none 」,分別的意思表明是:

  • earliest

當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,從頭開始消費 。

  • latest

當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據 。

  • none

topic 各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常。

 

.updateConsumerProperties(ImmutableMap.<String,Object>of("auto.offset.reset", "earliest")));

3. 實戰中我本身想把 Kafka 的數據寫入,key 不想寫入,因此出現了 Kafka 的 key 項爲空,而 values 纔是真正發送的數據。因此開始和結尾要設置個.values(),若是不加上就會報錯。

KafkaIO.<Void, String>write()
.values() // 只須要在此寫入默認的 key 就好了,默認爲 null 值
八.小結

隨着 AI 和 loT 的時代的到來,各個公司不一樣結構、不一樣類型、不一樣來源的數據進行整合的成本愈來愈高。Apache Beam 技術的統一模型和大數據計算平臺特性優雅地解決了這一問題,相信在 loT 萬億市場中,Apache Beam 將會發揮愈來愈重要的角色。

做者介紹

張海濤,目前就任於海康威視雲基礎平臺,負責雲計算大數據的基礎架構設計和中間件的開發,專一雲計算大數據方向。Apache Beam 中文社區發起人之一,若是想進一步瞭解最新 Apache Beam 動態和技術研究成果,請加微信 cyrjkj 入羣共同研究和運用。

傳送門:系列文章第一篇《Apache Beam 實戰指南之基礎入門》

相關文章
相關標籤/搜索