策劃 & 審校 | Natalie做者 | 張海濤編輯 | Linda AI 前線導讀: 本文是 Apache Beam 實戰指南系列文章第五篇內容,將對 Beam 框架中的 pipeline 管道進行剖析,並結合應用示例介紹如何設計和應用 Beam 管道。系列文章第一篇回顧 Apache Beam 實戰指南 | 基礎入門、第二篇回顧 Apache Beam 實戰指南 | 玩轉 KafkaIO 與 Flink、第三篇回顧 Apache Beam 實戰指南 | 玩轉大數據存儲 HdfsIO、第四篇回顧 A pache Beam 實戰指南 | 如何結合 ClickHouse 打造「AI 微服務」?
一.概述關於 Apache Beam 實戰指南系列文章前端
隨着大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者常常要用到不一樣的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增長了選擇合適工具和框架的難度,開發者想要將全部的大數據組件熟練運用幾乎是一項不可能完成的任務。java
面對這種狀況,Google 在 2016 年 2 月宣佈將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣佈開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺少了解,社區中文資料也比較少。InfoQ 指望經過 Apache Beam 實戰指南系列文章 推進 Apache Beam 在國內的普及。linux
其餘行業問我們 IT 具體幹什麼的,不少 IT 人員會自嘲本身就是「搬磚」(此處將複製代碼稱爲搬磚)的民工。過了兩天 GitHub 出現自動寫代碼的人工智能,IT 程序員深深嘆了一口氣說道「完了要失業了,代碼沒得搬了」。其實從入行 IT 那一刻起,無論咱們作前端、服務端、底層架構等任何崗位,其實咱們都是爲數據服務的服務人員(注:不是說從民工轉崗到服務員了):把數據從後端搬到前端,把前端數據再寫入數據庫。儘管編程語言從 C、C++、C#、JAVA、Python 不停變化,爲了適應時代背景框架也是變幻無窮,咱們拼命從「亞馬遜熱帶雨林」一直學到「地中海」。程序員
而後 Apache Beam 這個一統「地中海」的框架出現了。Apache Beam 不光統一了數據源,還統一了流批計算。在這個數據傳輸過程當中有一條核心的技術就是管道(Pipeline),不論是 Strom,Flink ,Beam 它都是核心。在這條管道中能夠對數據進行過濾、淨化、清洗、合併、分流以及各類實時計算操做。數據庫
本文會詳細介紹如何設計 Apache Beam 管道、管道設計工具介紹、源碼和案例分析,普及和提高你們對 Apache Beam 管道的認知。編程
二.怎樣設計好本身的管道?設計管道注意事項圖 2-1 簡單管道後端
1. 你輸入的數據存儲在那裏?首先要肯定你要構造幾條數據源,在 Beam 能夠構建多條,構建以前能夠選擇本身的 SDK 的 IO。微信
2. 你的數據類型是什麼樣的?Beam 提供的是鍵值對的數據類型,你的數據多是日誌文本、格式化設備事件、數據庫的行,因此在 PCollection 就應該肯定數據集的類型。網絡
3. 你想怎麼處理數據?對數據進行轉換、過濾處理、窗口計算、SQL 處理等。在管道中提供了通用的 ParDo 轉換類,算子計算以及 BeamSQL 等操做。架構
4. 你打算把數據最後輸出到哪裏去?在管道末尾進行 Write 寫入操做,把數據最後寫入你本身想存放或最後流向的地方。
管道的幾種玩法1. 分支管道:屢次轉換,處理相同的數據集圖 2-2-1 屢次轉換處理相同數據示意圖
描述:例如上圖 2-1-1 圖所示,從一個數據庫的表讀取或轉換數據集,而後從數據集中分別找找以字母「A」開頭的數據放入一個分支數據集中,若是以字母「B」開頭的數據放入另外一個分支數據集中,最終兩個數據集進行隔離處理。
數據集:
// 爲了演示顯示內存數據集
final List<String> LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");
示例代碼:
PCollection<String> dbRowCollection = ...;// 這個地方能夠讀取任何數據源。
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以"A"開頭的數據
c.output(c.element());
System.out.append("A 開頭的單詞有:"+c.element()+"\r");
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以"A"開頭的數據
c.output(c.element());
System.out.append("B 開頭的單詞有:"+c.element()+"\r");
}
}
}));
最終結果展現:
A 開頭的單詞有:Aggressive
B 開頭的單詞有:Bold
A 開頭的單詞有:Apprehensive
B 開頭的單詞有:Brilliant
原示例代碼地址 :pipelineTest2_1
2. 分支管道:一次轉換,輸出多個數據集圖 2-2-2 一次轉換多個輸出示意圖
描述:根據圖 2-2-1 和圖 2-2-2 圖中能夠看出,他們以不一樣的方式執行着相同的操做,圖 2-2-1 中的管道包含兩個轉換,用於處理同一輸入中的元素 PCollection。一個轉換使用如下邏輯:
if(以'A'開頭){outputToPCollectionA}
另外一個轉換爲
if(以'B'開頭){outputToPCollectionB}
由於每一個轉換讀取整個輸入 PCollection,因此輸入中的每一個元素都會 PCollection 被處理兩次。圖 2-2-2 中的管道以不一樣的方式執行相同的操做 - 只有一個轉換使用如下邏輯:
if(以'A'開頭){outputToPCollectionA} else if(以'B'開頭){outputToPCollectionB}
其中輸入中的每一個元素都 PCollection 被處理一次。
數據集:同 2-1-1 數據集
示例代碼:
// 定義兩個 TupleTag,每一個輸出一個。
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母帶有"A"的數據集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母帶有"B"的數據集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
若是每一個元素的轉換計算很是耗時,則使用其餘輸出會更有意義,由於一次性過濾所有數據,比所有數據過濾兩次從性能上和轉換上都存在必定程度上提高,數據量越大越明顯。
最終結果展現:
A 開頭的單詞有:Apprehensive
A 開頭的單詞有:Aggressive
B 開頭的單詞有:Brilliant
B 開頭的單詞有:Bold
原示例代碼地址 :pipelineTest2_2
3. 合併管道:多個數據集,合併成一個管道輸出圖 2-2-3 多數據集合並輸出圖
描述:
上圖 2-2-3 是接圖 2-2-1 的繼續,把帶「A」 的數據和帶「B」 字母開頭的數據進行合併到一個管道。這個地方注意點是 Flatten 用法必須兩個數據的數據類型相同。
數據集:
// 爲了演示顯示內存數據集
final List<String> LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List<String> LINESb = Arrays.asList(
"Bold",
"Brilliant");
示例代碼:
// 將兩個 PCollections 與 Flatten 合併
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// 繼續合併新的 PCollection
mergedCollectionWithFlatten.apply(...);
結果展現:
合併單詞單詞有:
Aggressive
Brilliant
Apprehensive
Bold
原示例代碼地址 :pipelineTest2_3
4. 合併管道:多個數據源,連接合並一個管道輸出圖 2-2-4 多數據源合併輸出圖
描述:
你的管道能夠從一個或多個源讀取或輸入。若是你的管道從多個源讀取而且這些源中的數據相關聯,則將輸入鏈接在一塊兒會頗有用。在上面的圖 2-2-4 所示的示例中,管道從數據庫表中讀取名稱和地址,並從 Kafka 主題中讀取名稱和訂單號。而後管道 CoGroupByKey 用於鏈接此信息,其中鍵是名稱 ; 結果 PCollection 包含名稱,地址和訂單的全部組合。
示例代碼:
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);管道的設計工具
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// 將集合值合併到 CoGbkResult 集合中。
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
對於管道的設計不光用代碼去實現,也能夠用視圖工具。如今存在的有兩種一種是拓藍公司出品叫 Talend Big Data Studio,另外一種就是免費開源的視圖設計工具 kettle-beam。
三.怎樣建立你的管道
Apache Beam 程序從頭至尾就是處理數據的管道。本小節使用 Apache Beam SDK 中的類構建管道,一個完整的 Apache Beam 管道構建流程以下:
首先建立一個 Pipeline 對象。
不論是數據作任何操做,如「 讀取」或「 建立」及轉換都要爲管道建立 PCollection 一個或多個的數 據集(PCollection<String>)。
在 Apache Beam 的管道中你能夠對數據集 PCollection 作任何操做,例如轉換數據格式,過濾,分組,分析或以其餘方式處理數據中的每個元素。每一個轉換都會建立一個新輸出數據集 PCollection,固然你能夠在處理完成以前進行作任何的轉換處理。
把你認爲最終處理完成的數據集寫或以其餘方式輸出最終的存儲地方。
最後運行管道。
每個 Apache Beam 程序都會從建立管道(Pipeline)對象開始。
在 Apache Beam SDK,每個管道都是一個獨立的實體,管道的數據集也都封裝着它的數據和對應的數據類型(在 Apache Beam 中有對應的數據轉換 類型包)。最後把數據進行用於各類轉換操做。
在建立的管道的時候須要設置管道選項 PipelineOptions,有兩種建立方式第一種是無參數和一種有參數的。具體兩種有什麼不一樣呢?無參數的能夠在程序中指定相應的管道選項參數,如顯示設置執行大數據引擎參數。有參數的就能夠在提交 Apache Beam jar 程序的時候進行用 Shell 腳本的方式後期設置管道對應的參數。
具體示例以下:
無參數
// 首先定義管道的選項
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 顯示設置執行大數據引擎
// 建立管道實體對象
Pipeline p = Pipeline.create(options);
有參數
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();
提交設置參數的格式以下:
--<option>=<value>將數據讀入你的管道
建立 PCollection 的初始值,請讀取外部的數據源及指定的本地數據。例如讀取數據庫,文本文件,流數據等等,如今 Apache Beam java SDKS 支持 33 種數據源,正在接入集成的有 7 種,Python 13 種,正在集成的 1 種。基本覆蓋了 IT 行業的一切數據源。例如讀取文本數據咱們能夠用 TextIO.Read 的方法進行讀取數據。轉換應用於管道對象 p 中。而且返回對應格式的數據集 PCollection:
PCollection<String> lines = p.apply(
"ReadLines", TextIO.read().from("/home/inputData.txt"));
注意: 在 Apache Beam 程序執行中,Beam 程序 2.2.0 之前版本不支持 Windows 如:D\inputData.txt 路徑格式。只支持 linux 路徑格式,及其餘如 HDFS 等存儲路徑。
已經支持的數據源統計以下表:
將管道數據轉換爲處理的格式
不少時候直接從數據源讀取的數據不會直接流入目標存儲。大部分須要進行數據格式的轉換,數據的清洗,數據的過濾,數據的關聯處理,數據累加操做等。這裏須要對源數據進行處理,處理完成的數據處理流入目標存儲外還能夠進行看成參數同樣,傳遞並繼續應用到管道中。
如下示例代碼爲 把一串數據經過轉換操做賦值給 words , 而後再把 words 再次傳遞到下一個操做應用,再進一步進行操做的處理工做。
PCollection<String> words = ...;編寫或輸出管道最終輸出數據的地方
PCollection<String> reversedWords = words.apply(new ReverseWords());
通過一些列的清洗、過濾、關聯、轉換處理工做後的數據,最終都會經過 SDKIO 進行寫入管道外的存儲或着數據庫表。然而這種寫入操做大部分都是在管道的末尾端進行操做的。
以下面代碼示例,就是把管道的數據經過 Apache Beam 中的 TextIO.Write 寫入 Linux 的文本文件 test.txt 中。
PCollection<String> filteredWords = ...;運行你的管道
filteredWords.apply("WriteMyFile", TextIO.write().to("/home/test.txt"));
構建管道後,使用 run 方法最後執行管道。管道以異步方式執行的。寫完這一句代碼後你就能夠把本身的程序用 Jenkins 進行編譯並提交給運行管道平臺,最終有管道執行平臺來運行。
運行代碼示例:
p.run();
處理異步執行的方式,還有同步執行方式,是在 run 方法後面加個看守方法 waitUntilFinish。具體代碼以下:
p.run().waitUntilFinish();四.怎樣測試你的管道
Apache Beam 管道開發中最後的測試在整個開發中也是很是重要的一個環節。Apache Beam 的代碼程序沒必要要每次都進行遠程構建執行到 Flink 集羣上,由於管道代碼的錯誤及 Bug 的修改在本地能更好的調試,然而每次構建到遠程上面去執行是很是麻煩的事情。Apache Beam 提供 DirectRunner ,一個用於本地測試的執行引擎。
使用 DirectRunner 測試管道的時候,你能夠用小規模的數據進行測試。此外你若是開發機器上裝了本地的 flink ,也能夠指定本地的 Flink 執行。例如測試一個簡單的轉換函數 DoFn,符合變換,數據源輸入到管道尾端數據輸出等操做。
注意點:DirectRunner 是用於管道或 Apache Beam 程序 本地開發調試測試的 數據執行引擎,不能夠用於真正生產環境中運行。不然程序執行性能會大大下降,這裏有坑要避開。
測試單個 Pipeline 步驟咱們開發完成管道 Beam 程序後須要本地測試,Beam SDK for Java 提供了一種方便的方法來測試 TestPipeline 的封裝類。 在 Beam SDK testing 包中。
它的使用操做方法:
建立一個 TestPipeline。
建立一些已知的靜態測試數據,也稱爲內存數據,真正應用基本是流或批數據。
使用 Create 方法建立 PCollection 輸入數據。
使用 Apply 方法進行數據的轉換處理而且返回指定的 PCollection。
最後使用 PAssert 去驗證輸出的結果是否爲預期結果值。
Apache Beam 中簡單的管道單元測試實例。
public class CountTest {端到端的測試管道
// 建立靜態的內存數據
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// 建立一個測試管道.
Pipeline p = TestPipeline.create();
// 建立一個輸入數據集.
PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
// 添加轉換統計單詞個數.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// 驗證結果.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// 運行整個管道.
p.run();
}
端到端的測試,主要針對輸入端和輸出端兩端的測試。要測試整個管道,請執行如下操做:
建立一個 Beam 測試 SDK 中所提供的 TestPipeline 實例。
對於多步驟數據流水線中的每一個輸入數據源,建立相對應的靜態(Static)測試數據集。
使用 Create Transform,將全部的這些靜態測試數據集轉換成 PCollection 做爲輸入數據集。
按照真實數據流水線邏輯,調用全部的 Transforms 操做。
在數據流水線中全部應用到 Write Transform 的地方,都使用 PAssert 來替換這個 Write Transform,而且驗證輸出的結果是否咱們指望的結果相匹配
因爲端到端測試跟單個 Pipeline 步驟類似就不在舉示例代碼。其實開發過程當中本地調試打斷點,寫日誌測試也是更快解決問題的一個辦法。
五. Apache Beam 的管道源碼解析Apache Beam Pipeline 源碼解析管道源代碼主類是比較簡單的,本文針對 Pipeline.java 進行解析。
1. 定義管道參數及管道建立在管道建立首先能夠定義管道的選項,例如 Beam 做業程序的名稱、惟一標識、運行引擎平臺等,固然也能夠提交引擎平臺用命令指定也能夠。而後實例化一個管道對象。
源碼示例以下:
PipelineOptions options = PipelineOptionsFactory.create();2. 讀取數據源
Pipeline p = Pipeline.create(options);
讀取要處理的數據,有文本數據,結構化數據和非結構化數據以及流數據。做爲數據處理的源數據。
源碼示例以下:
PCollection<String> lines =3. 進行數據處理操做
p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
在管道里面能夠進行窗口操做、函數操做、原子操做以及 SQL 操做。
數據統計的源碼示例:
PCollection<KV<String, Integer>> wordCounts =allLines4. 輸出結果及運行
.apply(ParDo.of(new ExtractWords()))
.apply(new Count<String>());
源代碼示例:
PCollection<String> formattedWordCounts =六.管道實戰案例案例場景描述
wordCounts.apply(ParDo.of(new FormatCounts()));
formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
p.run();
隨着人工智能 的不斷髮展,AI Cloud 在銀行加快落地,安防 AI 碎片化的應用場景遍地開花。本文結合銀行營業網點的業務,介紹管道案例實戰。
以銀行的員工脫離崗檢測中的行爲分析數據預處理爲例。咱們去銀行辦理業務過程當中,首先要取號,而後叫號。叫號提示會對接系統造成一條消息回傳後臺,可是有時候正常辦理業務期間有櫃檯營業員出去,而後好久纔回來。這個時候攝像頭會根據櫃檯離崗時間自動 AI 行爲分析生成報警處理。
案例業務架構流程
叫號報警和行爲分析報警產生的數據經過營業網點進行上報。
上傳網關集羣,網關集羣進行轉換消息格式壓縮消息。
消息流入消息中心等待消費,消息中心再次起着消峯做用。
用 Beam 管道的時間窗口特性、流合併處理特性進行消息消費處理
消息進入大數據實時分析處理平臺處理應用消息。
// 建立管道工廠2. 測試運行結果
PipelineOptions options = PipelineOptionsFactory.create();
// 顯式指定 PipelineRunner:FlinkRunner 必須指定若是不制定則爲本地
options.setRunner(DirectRunner.class); // 生產環境關閉
// options.setRunner(FlinkRunner.class); // 生成環境打開
Pipeline pipeline = Pipeline.create(options);// 設置相關管道
// 爲了演示顯示內存數據集
// 叫號數據
final List<KV<String, String>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101 號顧客請到 3 號櫃檯"), KV.of("DS-2CD2T26FDWDA3-IS", "102 號顧客請到 1 號櫃檯"),
KV.of("DS-2CD6984F-IHS", "103 號顧客請到 4 號櫃檯"),
KV.of("DS-2CD7627HWD-LZS", "104 號顧客請到 2 號櫃檯"));
//AI 行爲分析消息
final List<KV<String, String>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I",
"CMOS 智能半球網絡攝像機, 山東省濟南市解放路支行 3 號櫃,type=2,display_image=no"),
KV.of("DS-2CD2T26FDWDA3-IS", "CMOS 智能筒型網絡攝像機, 山東省濟南市甸柳莊支行 1 號櫃檯,type=2,display_image=no"),
KV.of("DS-2CD6984F-IHS", "星光級全景拼接網絡攝像機, 山東省濟南市市中區支行 4 號櫃檯,type=2,display_image=no"),
KV.of("DS-2CD7627HWD-LZS", "全結構化攝像機, 山東省濟南市市中區支行 2 號櫃檯,type=2,display_image=no"));
PCollection<KV<String, String>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist));
PCollection<KV<String, String>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist));
final TupleTag<String> noticeTag = new TupleTag<>();
final TupleTag<String> messageTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create());
System.out.append("合併分組後的結果:\r");
PCollection<String> contactLines = results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String name = e.getKey();
Iterable<String> emailsIter = e.getValue().getAll(noticeTag);
Iterable<String> phonesIter = e.getValue().getAll(messageTag);
System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r");
}
}));
pipeline.run().waitUntilFinish();
源碼地址:pipelineTest2_5.java
七.小結近幾年隨着 AloT 發展得如火如荼,其落地場景也遍地開花。loT 做爲 AI 落地先鋒,已經步入線下各行各業。本文以 Beam 管道的設計切入,重點對 Beam 管道設計工具和源碼進行解析,最後結合銀行金融行業對 AI 碎片化的場景進行數據預處理的案例,幫助你們全面瞭解 Beam 管道。
做者介紹張海濤,目前就任於海康威視雲基礎平臺,負責海康威視在全國金融行業 AI 大數據落地的基礎架構設計和中間件的開發,專一 AI 大數據方向。Apache Beam 中文社區發起人之一,若是想進一步瞭解最新 Apache Beam 和 ClickHouse 動態和技術研究成果,請加微信 cyrjkj 入羣共同研究和運用。