隨着大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者常常要用到不一樣的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增長了選擇合適工具和框架的難度,開發者想要將全部的大數據組件熟練運用幾乎是一項不可能完成的任務。前端
面對這種狀況,Google 在 2016 年 2 月宣佈將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣佈開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺少了解,社區中文資料也比較少。InfoQ 指望經過 **Apache Beam 實戰指南系列文章** 推進 Apache Beam 在國內的普及。java
一.概述面試
大數據發展趨勢從普通的大數據,發展成AI大數據,再到下一代號稱萬億市場的lOT大數據。技術也隨着時代的變化而變化,從Hadoop的批處理,到Spark Streaming,以及流批處理的Flink的出現,整個大數據架構也在逐漸演化。數據庫
Apache Beam做爲新生技術,在這個時代會扮演什麼樣的角色,跟Flink之間的關係是怎樣的?Apache Beam和Flink的結合會給大數據開發者或架構師們帶來哪些意想不到的驚喜呢?apache
二.大數據架構發展演進歷程後端
2.1 大數據架構Hadoopcentos
圖2-1 MapReduce 流程圖api
最初作大數據是把一些日誌或者其餘信息收集後寫入Hadoop 的HDFS系統中,若是運營人員須要報表,則利用Hadoop的MapReduce進行計算並輸出,對於一些非計算機專業的統計人員,後期能夠用Hive進行統計輸出。安全
2.2 流式處理Storm服務器
圖2-2Storm流程圖
業務進一步發展,運營人員須要看到實時數據的展現或統計。例如電商網站促銷的時候,用於統計用戶實時交易數據。數據收集也使用MQ,用流式Storm解決這一業務需求問題。
2.3 Spark批處理和微批處理
圖2-3 Spark流程圖
業務進一步發展,服務前端加上了網關進行負載均衡,消息中心也換成了高吞吐量的輕量級MQ Kafka,數據處理漸漸從批處理髮展到微批處理。
2.4 Flink:真正的流批處理統一
圖2-4 Flink 流程圖
隨着AI和loT的發展,對於傳感設備的信息、報警器的警情以及視頻流的數據量微批計算引擎已經知足不了業務的需求,Flink實現真正的流處理讓警情更實時。
2.5 下一代大數據處理統一標準Apache Beam
圖2-5 Apache Beam 流程圖
BeamSDKs封裝了不少的組件IO,也就是圖左邊這些重寫的高級API,使不一樣的數據源的數據流向後面的計算平臺。經過將近一年的發展,Apache Beam 不光組件IO更加豐富了,而且計算平臺在當初最基本的 Apache Apex、Direct Runner、Apache Flink、Apache Spark、Google Cloud Dataflow之上,又增長了Gearpump、Samza 以及第三方的JStorm等計算平臺。
爲何說Apache Beam 會是大數據處理統一標準呢?
由於不少如今大型公司都在創建本身的「大中臺」,創建統一的數據資源池,打通各個部門以及子公司的數據,以解決信息孤島問題,把這些數據進行集中式管理而且進行後期的數據分析、BI、AI以及機器學習等工做。這種狀況下會出現不少數據源,例如以前用的MySQL、MongodDB、HDFS、HBase、Solr 等,若是想創建中臺就會是一件使人很是苦惱的事情,而且多計算環境更是讓技術領導頭疼。Apache Beam的出現正好迎合了這個時代的新需求,它集成了不少數據庫經常使用的數據源並把它們封裝成SDK的IO,開發人員不必深刻學習不少技術,只要會寫Beam 程序就能夠了,大大節省了人力、時間以及成本。
三.Apache Beam和Flink的關係
隨着阿里巴巴Blink的開源,Flink中國社區開始活躍起來。不少人會開始對各類計算平臺進行對比,好比Storm、Spark、JStorm、Flink等,而且有人提到以前阿里巴巴開源的JStorm比Flink性能高出10-15倍,爲何阿里巴巴卻轉戰基於Flink的Blink呢? 在最近Flink的線下技術會議上,阿里巴巴的人已經回答了這一問題。其實不少技術都是從業務實戰出來的,隨着業務的發展可能還會有更多的計算平臺出現,沒有必要對此過多糾結。
不過,既然你們最近討論得這麼火熱,這裏也列出一些最近問的比較多的、有表明性的關於Beam的問題,逐一進行回答。
1. Flink支持SQL,請問Beam支持嗎?
如今Beam是支持SQL處理的,底層技術跟Flink底層處理是同樣的。
Beam SQL如今只支持Java,底層是Apache Calcite 的一個動態數據管理框架,用於大數據處理和一些流加強功能,它容許你自定義數據庫功能。例如Hive 使用了Calcite的查詢優化,固然還有Flink解析和流SQL處理。Beam在這之上添加了額外的擴展,以便輕鬆利用Beam的統一批處理/流模型以及對複雜數據類型的支持。 如下是Beam SQL具體處理流程圖:
Beam SQL一共有兩個比較重要的概念:
SqlTransform:用於PTransforms從SQL查詢建立的接口。
Row:Beam SQL操做的元素類型。例如:PCollection<Row>。
在將SQL查詢應用於PCollection 以前,集合中Row的數據格式必需要提早指定。 一旦Beam SQL 指定了 管道中的類型是不能再改變的。PCollection行中字段/列的名稱和類型由Schema進行關聯定義。您可使用Schema.builder()來建立 Schemas。
示例:
// Define the schema for the records.
Schema appSchema =
Schema
.builder()
.addInt32Field("appId")
.addStringField("description")
.addDateTimeField("rowtime")
.build();
// Create a concrete row with that type.
Row row =
Row
.withSchema(appSchema)
.addValues(1, "Some cool app", new Date())
.build();
// Create a source PCollection containing only that row
PCollection<Row> testApps =
PBegin
.in(p)
.apply(Create
.of(row)
.withCoder(appSchema.getRowCoder()));
也能夠是其餘類型,不是直接是Row,利用PCollection<T>經過應用ParDo能夠將輸入記錄轉換爲Row格式。如:
// An example POJO class.
class AppPojo {
Integer appId;
String description;
Date timestamp;
}
// Acquire a collection of POJOs somehow.
PCollection<AppPojo> pojos = ...
// Convert them to Rows with the same schema as defined above via a DoFn.
PCollection<Row> apps = pojos
.apply(
ParDo.of(new DoFn<AppPojo, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the current POJO instance
AppPojo pojo = c.element();
// Create a Row with the appSchema schema
// and values from the current POJO
Row appRow =
Row
.withSchema(appSchema)
.addValues(
pojo.appId,
pojo.description,
pojo.timestamp)
.build();
// Output the Row representing the current POJO
c.output(appRow);
}
}));
2. Flink 有並行處理,Beam 有嗎?
Beam 在抽象Flink的時候已經把這個參數抽象出來了,在Beam Flink 源碼解析中會提到。
3. 我這裏有個流批混合的場景,請問Beam是否是支持?
這個是支持的,由於批也是一種流,是一種有界的流。Beam 結合了Flink,Flink dataset 底層也是轉換成流進行處理的。
4. Flink流批寫程序的時候和Beam有什麼不一樣?底層是Flink仍是Beam?
打個比喻,若是Flink是Lucene,那麼Beam 就是Solr,把Flink 的API進行二次重寫,簡化了API,讓你們使用更簡單、更方便。此外,Beam提供了更多的數據源,這是Flink不能比的。固然,Flink 後期可能也會往這方面發展。
四.Apache Beam KafkaIO源碼剖析
Apache Beam KafkaIO 對kafka-clients支持依賴狀況
KafkaIO是Kafka的API封裝,主要負責Apache Kafka讀取和寫入消息。若是想使用KafkaIO,必須依賴beam-sdks-java-io-kafka ,KafkaIO 同時支持多個版本的Kafka客戶端,使用時建議用高版本的或最新的Kafka 版本,由於使用KafkaIO的時候須要包含kafka-clients 的依賴版本。
Apache Beam KafkaIO 對各個kafka-clients 版本的支持狀況以下表:
表4-1 KafkaIO 與kafka-clients 依賴關係表
Apache Beam V2.1.0版本以前源碼中的pom文件都顯式指定了特定的0.9.0.1版本支持,可是從V2.1.0版本和V2.1.1兩個版本開始已經替換成了kafka-clients 的0.10.1.0 版本,而且源碼中提示0.10.1.0 版本更安全。這是由於去年Kafka 0.10.1.0 以前的版本曝出了安全漏洞。在V2.2.0 之後的版本中,Beam對API作了調整和更新,對以前的兩種版本都支持,不過須要在pom中引用的時候本身指定Kafka的版本。可是在Beam V2.5.0 和V2.6.0 版本,源碼中添加了如下提示:
/* <h3>Supported Kafka Client Versions</h3>
* KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
* <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions
* 0.9.x - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future.
* Please ensure that the version included with the application is compatible with the version of
* your Kafka cluster. Kafka client usually fails to initialize with a clear error message in
* case of incompatibility.
*/
也就說在這兩個版本已經移除了對Kafka 客戶端 0.10.1.0 之前版本的支持,舊版本還會支持,可是在之後不久就會刪除。因此你們在使用的時候要注意版本的依賴關係和客戶端的版本支持度。
若是想使用KafkaIO,pom 必需要引用,版本跟4-1表中的對應起來就能夠了。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>...</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>a_recent_version</version>
<scope>runtime</scope>
</dependency>
KafkaIO讀寫源碼解析
KafkaIO源碼連接以下:
連接
在KafkaIO裏面最主要的兩個方法是Kafka的讀寫方法。
KafkaIO讀操做
pipeline.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")//
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer (LongDeserializer.class)
.withValueDeserializer (StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties (ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on 'LogAppendTime'. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
// Use withCreateTime() with topics that have 'CreateTime' timestamps.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
1) 指定KafkaIO的模型,從源碼中不難看出這個地方的KafkaIO<K,V>類型是Long和String 類型,也能夠換成其餘類型。
pipeline.apply(KafkaIO.<Long, String>read() pipeline.apply(KafkaIO.<Long, String>read()
2) 設置Kafka集羣的集羣地址。
.withBootstrapServers ("broker_1:9092,broker_2:9092")
3) 設置Kafka的主題類型,源碼中使用了單個主題類型,若是是多個主題類型則用withTopics(List<String>)方法進行設置。設置狀況基本跟Kafka原生是同樣的。
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
4) 設置序列化類型。Apache Beam KafkaIO 在序列化的時候作了很大的簡化,例如原生Kafka可能要經過Properties 類去設置 ,還要加上很長一段jar包的名字。
Beam KafkaIO的寫法:
.withKeyDeserializer (LongDeserializer.class)
.withValueDeserializer (StringDeserializer.class)
原生Kafka的設置:
Properties props = new Properties();
props.put("key.deserializer", " org.apache.kafka.common.serialization .ByteArrayDeserializer");
props.put("value.deserializer" ,"org.apache.kafka.common. serialization.ByteArray Deserializer");
5) 設置Kafka的消費者屬性,這個地方還能夠設置其餘的屬性。源碼中是針對消費分組進行設置。
.updateConsumerProperties (ImmutableMap.of ("group.id", my_beam_app_1"))
6) 設置Kafka吞吐量的時間戳,能夠是默認的,也能夠自定義。
.withLogAppendTime()
7) 至關於Kafka 中"isolation.level", "read_committed" ,指定KafkaConsumer只應讀取非事務性消息,或從其輸入主題中提交事務性消息。流處理應用程序一般在多個讀取處理寫入階段處理其數據,每一個階段使用前一階段的輸出做爲其輸入。經過指定read_committed模式,咱們能夠在全部階段完成一次處理。針對"Exactly-once" 語義,支持Kafka 0.11版本。
.withReadCommitted()
8) 設置Kafka是否自動提交屬性"AUTO_COMMIT",默認爲自動提交,使用Beam 的方法來設置。
set CommitOffsetsInFinalizeEnabled (boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()
9) 設置是否返回Kafka的其餘數據,例如offset 信息和分區信息,不用能夠去掉。
.withoutMetadata() // PCollection<KV<Long, String>>
10) 設置只返回values值,不用返回key。例如 PCollection<String>,而不是PCollection<Long,String>。
.apply(Values.<String>create()) // PCollection<String>
KafkaIO寫操做
寫操做跟讀操做配置基本類似,咱們看一下具體代碼。
PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
.withBootstrapServers ("broker_1:9092, broker_2:9092")
.withTopic ("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// You can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
.updateProducerProperties (ImmutableMap.of ("compression.type", "gzip"))
// You set publish timestamp for the Kafka records.
.withInputTimestamp() // element timestamp is used while publishing to Kafka
// or you can also set a custom timestamp with a function.
.withPublishTimestampFunction ((elem, elemTs) -> ...)
// Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
.withEOS (20, "eos-sink-group-id");
);
下面這個是Kafka裏面比較重要的一個屬性設置,在Beam中是這樣使用的,很是簡單,可是要注意這個屬性.withEOS 其實就是Kafka中"Exactly-once"。
.withEOS(20, "eos-sink-group-id");
在寫入Kafka時徹底一次性地提供語義,這使得應用程序可以在Beam管道中的一次性語義之上提供端到端的一次性保證。它確保寫入接收器的記錄僅在Kafka上提交一次,即便在管道執行期間重試某些處理也是如此。重試一般在應用程序從新啓動時發生(如在故障恢復中)或者在從新分配任務時(如在自動縮放事件中)。Flink runner一般爲流水線的結果提供精確一次的語義,但不提供變換中用戶代碼的反作用。若是諸如Kafka接收器之類的轉換寫入外部系統,則這些寫入可能會屢次發生。
在此處啓用EOS時,接收器轉換將兼容的Beam Runners中的檢查點語義與Kafka中的事務聯繫起來,以確保只寫入一次記錄。因爲實現依賴於runners checkpoint語義,所以並不是全部runners都兼容。Beam中FlinkRunner針對Kafka 0.11+版本才支持,然而Dataflow runner和Spark runner若是操做kafkaIO是徹底支持的。
關於性能的注意事項
"Exactly-once" 在接收初始消息的時候,除了將原來的數據進行格式化轉換外,還經歷了2個序列化 - 反序列化循環。根據序列化的數量和成本,CPU可能會漲的很明顯。經過寫入二進制格式數據(即在寫入Kafka接收器以前將數據序列化爲二進制數據)能夠下降CPU成本。
關於參數
numShards——設置接收器並行度。存儲在Kafka上的狀態元數據,使用sinkGroupId存儲在許多虛擬分區中。一個好的經驗法則是將其設置爲Kafka主題中的分區數。
sinkGroupId——用於在Kafka上將少許狀態存儲爲元數據的組ID。它相似於與KafkaConsumer一塊兒使用的使用groupID。每一個做業都應使用惟一的groupID,以便從新啓動/更新做業保留狀態以確保一次性語義。狀態是經過Kafka上的接收器事務原子提交的。有關更多信息,請參閱KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期間執行多個健全性檢查以捕獲常見錯誤,以便它不會最終使用彷佛不是由同一做業寫入的狀態。
五.Apache Beam Flink源碼剖析
Apache Beam FlinkRunner對 Flink支持依賴狀況
Flink 是一個流和批處理的統一的計算框架,Apache Beam 跟Flink API作了無縫集成。在Apache Beam中對Flink 的操做主要是 FlinkRunner.java,Apache Beam支持不一樣版本的flink 客戶端。我根據不一樣版本列了一個Flink 對應客戶端支持表以下:
圖5-1 FlinkRunner與Flink依賴關係表
從圖5-1中能夠看出,Apache Beam 對Flink 的API支持的更新速度很是快,從源碼能夠看到2.0.0版本以前的FlinkRunner是很是low的,而且直接拿Flink的實例作爲Beam的實例,封裝的效果也比較差。可是從2.0.0 版本以後 ,Beam就像打了雞血同樣API更新速度特別快,拋棄了之前的冗餘,更好地跟Flink集成,讓人眼前一亮。
Apache Beam Flink 源碼解析
由於Beam在運行的時候都是顯式指定Runner,在FlinkRunner源碼中只是成了簡單的統一入口,代碼很是簡單,可是這個入口中有一個比較關鍵的接口類FlinkPipelineOptions。
請看代碼:
/** Provided options. */
private final FlinkPipelineOptions options;
經過這個類咱們看一下Apache Beam到底封裝了哪些Flink方法。
首先FlinkPipelineOptions是一個接口類,可是它繼承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三個接口類,第一個PipelineOptions你們應該很熟悉了,用於基本管道建立;第二個ApplicationNameOptions 用於設置應用程序名字;第三個用於判斷是流式數據仍是批數據。源代碼以下:
public interface FlinkPipelineOptions extends
PipelineOptions, ApplicationNameOptions, StreamingOptions {
//....
}
1) 設置 Flink Master 方法 ,這個方法用於設置Flink 集羣地址的Master地址。能夠填寫IP和端口,或者是hostname 和端口,默認local 。固然測試也能夠是單機的,在Flink 1.4 利用 start-local.sh 啓動,而到了1.5以上就去掉了這個腳本,本地直接換成了 start-cluster.sh。你們測試的時候須要注意一下。
/**
* The url of the Flink JobManager on which to execute pipelines. This can either be the the * address of a cluster JobManager, in the form "host:port" or one of the special Strings * "[collection]" will execute the pipeline on Java Collections while " [auto]" will let the system
*/
@Description ( "Address of the Flink Master where the Pipeline should be executed. Can"+ "[collection] or [auto].")
void setFlinkMaster (String value);
2) 設置 Flink 的並行數,屬於Flink 高級API裏面的屬性。設置合適的parallelism能提升運算效率,太多了和太少了都不行。設置parallelism有多種方式,優先級爲api>env>p>file。
@Description("The degree of parallelism to be used when distributing operations onto workers.")
@Default.InstanceFactory (DefaultParallelismFactory.class)
Integer getParallelism();
void setParallelism (Integer value);
3) 設置連續檢查點之間的間隔時間(即當前的快照)用於容錯的管道狀態。
@Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval (Long interval)
4) 定義一致性保證的檢查點模式,默認爲"AT_LEAST_ONCE",在Beam的源碼中定義了一個枚舉類CheckpointingMode,除了默認的"AT_LEAST_ONCE",還有"EXACTLY_ONCE"。
"AT_LEAST_ONCE":這個模式意思是系統將以一種更簡單地方式來對operator和udf的狀態進行快照:在失敗後進行恢復時,在operator的狀態中,一些記錄可能會被重放屢次。
"EXACTLY_ONCE":這種模式意思是系統將以以下語義對operator和udf(user defined function)進行快照:在恢復時,每條記錄將在operator狀態中只被重現/重放一次。
@Description ("The checkpointing mode that defines consistency guarantee.")
@Default.Enum ("AT_LEAST_ONCE")
CheckpointingMode getCheckpointingMode();
void setCheckpointingMode (CheckpointingMode mode);
5) 設置檢查點的最大超時時間,默認爲20*60*1000(毫秒)=20(分鐘)。
@Description("The maximum time that a checkpoint may take before being discarded.")
@Default.Long (20 * 60 * 1000)
Long getCheckpointTimeoutMillis();
void setCheckpointTimeoutMillis (Long checkpointTimeoutMillis);
6) 設置從新執行失敗任務的次數,值爲0有效地禁用容錯,值爲-1表示使用系統默認值(在配置中定義)。
@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "+ "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries (Integer retries);
7) 設置執行之間的延遲,默認值爲-1L。
@Description(
"Sets the delay between executions. A value of {@code -1} "
+ "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay (Long delay);
8) 設置重用對象的行爲。
@Description ("Sets the behavior of reusing objects.")
@Default.Boolean(false)
Boolean getObjectReuse();
void setObjectReuse(Boolean reuse);
9) 設置狀態後端在計算期間存儲Beam的狀態,不設置從配置文件中讀取默認值。注意:僅在執行時適用流媒體模式。
@Description ("Sets the state backend to use in streaming mode. "
@JsonIgnore
AbstractStateBackend getStateBackend();
void setStateBackend (AbstractStateBackend stateBackend);
10) 在Flink Runner中啓用/禁用Beam指標。
@Description ("Enable/disable Beam metrics in Flink Runner")
@Default.Boolean(true)
BooleangetEnableMetrics();
voidsetEnableMetrics(BooleanenableMetrics);
11) 啓用或禁用外部檢查點,與CheckpointingInterval一塊兒使用。
@Description(
"Enables or disables externalized checkpoints."
+"Works in conjunction with CheckpointingInterval")
@Default.Boolean(false)
BooleanisExternalized CheckpointsEnabled();
voidsetExternalizedCheckpointsEnabled (BooleanexternalCheckpoints);
12) 設置當他們的Wartermark達到+ Inf時關閉源,Watermark在Flink 中其中一個做用是根據時間戳作單節點排序,Beam也是支持的。
@Description ("If set, s hutdown sources when their watermark reaches +Inf.")
@Default.Boolean (false)
BooleanisShutdownSources OnFinalWatermark();
voidsetShutdown SourcesOnFinalWatermark (BooleanshutdownOnFinalWatermark);
剩餘兩個部分這裏再也不進行翻譯,留給你們去看源碼。
六. KafkaIO和Flink實戰
本節經過解讀一個真正的KafkaIO和Flink實戰案例,幫助你們更深刻地瞭解Apache Beam KafkaIO和Flink的運用。
設計架構圖和設計思路解讀
Apache Beam 外部數據流程圖
設計思路:Kafka消息生產程序發送testmsg到Kafka集羣,Apache Beam 程序讀取Kafka的消息,通過簡單的業務邏輯,最後發送到Kafka集羣,而後Kafka消費端消費消息。
Apache Beam 內部數據處理流程圖
Apache Beam 程序經過kafkaIO讀取Kafka集羣的數據,進行數據格式轉換。數據統計後,經過KafkaIO寫操做把消息寫入Kafka集羣。最後把程序運行在Flink的計算平臺上。
軟件環境和版本說明
系統版本 centos 7
Kafka集羣版本: kafka_2.10-0.10.1.1.tgz
Flink 版本:flink-1.5.2-bin -hadoop27-scala_2.11.tgz
Kafka集羣和Flink單機或集羣配置,你們能夠去網上搜一下配置文章,操做比較簡單,這裏就不贅述了。
實踐步驟
1)新建一個Maven項目
2)在pom文件中添加jar引用
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
3)新建BeamFlinkKafka.java類
4)編寫如下代碼:
public static void main(String[] args) {
//建立管道工廠
PipelineOptions options = PipelineOptionsFactory.create();
// 顯式指定PipelineRunner: FlinkRunner必須指定若是不制定則爲本地
options.setRunner (FlinkRunner.class);
//設置相關管道
Pipeline pipeline = Pipeline.create(options);
//這裏kV後說明kafka中的key和value均爲String類型
PCollection<KafkaRecord<String, String>> lines =
pipeline.apply(KafkaIO.<String,
// 必需設置kafka的服務器地址和端口
String>read().withBootstrapServers ("192.168.1.110:11092, 192.168.1.119:11092, 192.168.1.120:11092")
.withTopic ("testmsg")// 必需設置要讀取的kafka的topic名稱
.withKeyDeserializer (StringDeserializer.class)// 必需序列化key
.withValueDeserializer (StringDeserializer.class)// 必需序列化value
.updateConsumerProperties (ImmutableMap.<String, Object>of ("auto.offset.reset", "earliest")));//這個屬性kafka最多見的.
// 爲輸出的消息類型。或者進行處理後返回的消息類型
PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of (new DoFn<KafkaRecord< String, String>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement (ProcessContext ctx) {
System.out.print ("輸出的分區爲----:" + ctx.element().getKV());
ctx.output (ctx.element().getKV().getValue());// 其實咱們這裏是把"張海 濤在發送消息***"進行返回操做
}
}));
PCollection<String> windowedEvents = kafkadata.apply (Window.<String>into (FixedWindows.of (Duration.standardSeconds(5))));
PCollection<KV<String, Long>> wordcount = windowedEvents.apply (Count.<String> perElement()); // 統計每個kafka消息的Count
PCollection<String> wordtj = wordcount.apply ("ConcatResultKVs", MapElements.via( // 拼接最後的格式化輸出(Key爲Word,Value爲Count)
new SimpleFunction<KV< String, Long>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String apply (KV<String, Long> input) {
System.out.print ("進行統計:" + input.getKey() + ": " + input.getValue());
return input.getKey() + ": " + input.getValue();
}
}));
wordtj.apply (KafkaIO.<Void, String>write() .withBootstrapServers ("192.168.1.110:11092, 192.168.1.119:11092, 192.168.1.120:11092")//設置寫會kafka的集羣配置地址
.withTopic ("senkafkamsg")//設置返回kafka的消息主題
// .withKeySerializer (StringSerializer.class)//這裏不用設置了,由於上面 Void
.withValueSerializer (StringSerializer.class)
// Dataflow runner and Spark 兼容, Flink 對kafka0.11才支持。個人版本是0.10不兼容
//.withEOS (20, "eos-sink-group-id")
.values() // 只須要在此寫入默認的key就好了,默認爲null值
); // 輸出結果
pipeline.run().waitUntilFinish();
}
5)打包jar,本示例是簡單的實戰,並無用Docker,Apache Beam新版本是支持Docker的。
6)經過Apache Flink Dashboard 提交job
7)查看結果
程序接收的日誌以下:
七.實戰解析
本次實戰在源碼分析中已經作過詳細解析,在這裏不作過多的描述,只選擇部分問題再重點解釋一下。此外,若是尚未入門,甚至連管道和Runner等概念都還不清楚,建議先閱讀本系列的第一篇文章《Apache Beam實戰指南之基礎入門》。
1.FlinkRunner在實戰中是顯式指定的,若是想設置參數怎麼使用呢?其實還有另一種寫法,例如如下代碼:
//FlinkPipelineOptions options = PipelineOptionsFactory.as (FlinkPipelineOptions.class);
//options.setStreaming (true);
//options.setAppName ("app_test");
//options.setJobName ("flinkjob");
//options.setFlinkMaster ("localhost:6123");
//options.setParallelism (10);//設置flink 的並行度
//顯式指定PipelineRunner :FlinkRunner,必須指定,若是不指定則爲本地
options.setRunner (FlinkRunner.class);
2.Kafka 有三種數據讀取類型,分別是 「earliest 」,「latest 」,「none 」,分別的意思表明是:
earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 。
none
topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常。
.updateConsumerProperties (ImmutableMap.<String, Object>of ("auto.offset.reset", "earliest")));
3.實戰中我本身想把Kafka的數據寫入,key不想寫入,因此出現了Kafka的key項爲空,而values纔是真正發送的數據。因此開始和結尾要設置個.values(),若是不加上就會報錯。
KafkaIO.<Void, String>write()
.values() // 只須要在此寫入默認的key就好了,默認爲null值
八.小結
隨着AI和loT的時代的到來,各個公司不一樣結構、不一樣類型、不一樣來源的數據進行整合的成本愈來愈高。Apache Beam 技術的統一模型和大數據計算平臺特性優雅地解決了這一問題,相信在loT萬億市場中,Apache Beam將會發揮愈來愈重要的角色。
歡迎工做一到五年的Java工程師朋友們加入Java架構開發:jq.qq.com/?_wv=1027&k…
本羣提供免費的學習指導 架構資料 以及免費的解答
不懂得問題均可以在本羣提出來 以後還會有職業生涯規劃以及面試指導
同時你們能夠多多關注一下小編 你們一塊兒學習進步