Apache Beam(原名Google DataFlow)是Google在2016年2月份貢獻給Apache基金會的Apache孵化項目,被認爲是繼MapReduce,GFS和BigQuery等以後,Google在大數據處理領域對開源社區的又一個很是大的貢獻。Apache Beam的主要目標是統一批處理和流處理的編程範式,爲無限,亂序,web-scale的數據集處理提供簡單靈活,功能豐富以及表達能力十分強大的SDK。Apache Beam項目重點在於數據處理的編程範式和接口定義,並不涉及具體執行引擎的實現,Apache Beam但願基於Beam開發的數據處理程序能夠執行在任意的分佈式計算引擎上。本文主要介紹Apache Beam的編程範式-Beam Model,以及經過Beam SDK如何方便靈活的編寫分佈式數據處理業務邏輯,但願讀者可以經過本文對Apache Beam有初步的瞭解,同時對於分佈式數據處理系統如何處理亂序無限數據流的能力有初步的認識。java
隨着分佈式數據處理不斷髮展,新的分佈式數據處理技術也不斷被提出,業界涌現出了愈來愈多的分佈式數據處理框架,從最先的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分佈式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分佈式處理框架的代價也很是大:須要學習一個新的數據處理框架,並重寫全部的業務邏輯。解決這個問題的思路包括兩個部分,首先,須要一個編程範式,可以統一,規範分佈式數據處理的需求,例如,統一批處理和流處理的需求。其次,生成的分佈式數據處理任務應該可以在各個分佈式執行引擎上執行,用戶能夠自由切換分佈式數據處理任務的執行引擎與執行環境。Apache Beam正是爲了解決以上問題而提出的。git
Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發分佈式數據處理任務業務邏輯的API接口,生成的的分佈式數據處理任務Pipeline交給具體的Beam Runner執行引擎。Apache Beam目前支持的API接口是由Java語言實現的,Python版本的API正在開發之中。Apache Beam支持的底層執行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執行引擎的支持也在討論或開發當中。其基本架構以下圖所示:github
圖1 Apache Beam架構圖web
須要注意的是,雖然Apache Beam社區很是但願全部的Beam執行引擎都可以支持Beam SDK定義的功能全集,可是在實際實現中可能並不必定。例如,基於MapReduce的Runner顯然很難實現和流處理相關的功能特性。目前Google DataFlow Cloud是對Beam SDK功能集支持最全面的執行引擎,在開源執行引擎中,支持最全面的則是Apache Flink。apache
Beam Model指的是Beam的編程範式,即Beam SDK背後的設計思想。在介紹Beam Model以前,先簡要介紹一下Beam Model要處理的問題域與一些基本概念。編程
Beam Model處理的目標數據是無限的時間亂序數據流,不考慮時間順序或是有限的數據集可看作是無限亂序數據流的一個特例。Beam Model從下面四個維度概括了用戶在進行數據處理的時候須要考慮的問題:網絡
Beam Model將」WWWH「四個維度抽象出來組成了Beam SDK,用戶在基於Beam SDK構建數據處理業務邏輯時,在每一步只須要根據業務需求按照這四個維度調用具體的API便可生成分佈式數據處理Pipeline,並提交到具體執行引擎上執行。「WWWH」四個維度的抽象僅僅關注業務邏輯自己,和分佈式任務如何執行沒有任何關係。架構
不一樣於Apache Flink或是Apache Spark,Beam SDK使用同一套API表示數據源,輸出目標以及操做符等。下面介紹4個基於Beam SDK的數據處理任務,經過這四個數據處理任務,讀者能夠了解經過Beam Mode是如何統一靈活的描述批處理和流處理任務的,這4個任務用來處理手機遊戲領域的統計需求,包括:app
注:示例代碼來自Beam的源碼,具體地址參見:apache/incubator-beam。部分分析內容參考了Beam的官方文檔,詳情請參見引用連接。框架
下面基於Beam Model的「WWWH」四個維度,分析業務邏輯,並經過代碼展現如何經過Beam SDK實現「WWWH」四個維度的業務邏輯。
統計每一個用戶的歷史總得分數是一個很是簡單的任務,在這裏咱們簡單的經過一個批處理任務實現,每次須要新的用戶分數數據的時候,從新執行一次這個批處理任務便可。對於用戶分數任務,「WWWH」四維度分析結果以下:
經過「WWWH」的分析,對於用戶分數這個批處理任務,經過Beam Java SDK實現的代碼以下所示:
gameEvents
[... input ...]
[... parse ...]
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
[... output ...];
ExtractAndSumScore實現了「What」中描述的邏輯,即按用戶分組,而後累加分數,其相關代碼以下:
gameInfo
.apply(MapElements
.via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
.apply(Sum.<String>integersPerKey());
經過MapElements肯定Key與Value分別是用戶與分數,而後Sum定義按key分組,並累加分數。Beam支持將多個對數據的操做合併成一個操做,這樣不只能夠支持更清晰的業務邏輯實現,同時也能夠在多處重用合併後的操做邏輯。
按照小時統計每一個團隊的分數,得到最高分數的團隊可能得到獎勵,這個分析任務增長了對窗口的要求,不過咱們依然能夠經過一個批處理任務實現,對於這個任務的「WWWH」四個維度的分析以下:
相對於第一個用戶分數任務,只是在Where部分回答了「數據在什麼範圍中計算?」的問題,同時在What部分「如何計算數據?」中,分組的條件由用戶改成了團隊,這在代碼中也會相應的體現:
gameEvents
[... input ...]
[... parse ...]
.apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i)
-> new Instant(i.getTimestamp())))
.apply("FixedWindowsTeam", Window.<GameActionInfo>into(
FixedWindows.of(Duration.standardMinutes(windowDuration))))
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
[... output ...];
「AddEventTimestamps」定義瞭如何從原始數據中抽取EventTime數據,「FixedWindowsTeam」則定義了1小時固定窗口,而後重用了ExtractAndSumScore類,只是將分組的列從用戶改爲了團隊。對於每小時團隊分數任務,引入了關於「Where」部分窗口定義的新業務邏輯,可是從代碼中能夠看到,關於「Where」部分的實現和關於「What」部分的實現是徹底獨立的,用戶只須要新加兩行關於「Where」的代碼,很是簡單和清晰。
前面兩個任務均是基於有限數據集的批處理任務,對於排行榜來講,咱們一樣須要統計用戶分數以及每小時團隊分數,可是從業務角度但願獲得的是實時數據。對於Apache Beam來講,一個相同處理邏輯的批處理任務和流處理任務的惟一不一樣就是任務的輸入和輸出,中間的業務邏輯Pipeline無需任何改變。對於當前示例的排行榜數據分析任務,咱們不只但願他們知足和前兩個示例相同的業務邏輯,同時也能夠知足更定製化的業務需求,例如:
上述兩個問題正是經過回答「When」和「How」兩個問題來定義用戶的數據分析需求。「When」取決於用戶但願多常獲得計算結果,在回答「When」的時候,基本上能夠分爲四個階段:
對於每小時團隊得分的流處理任務,本示例但願的業務邏輯爲,基於Event Time的1小時時間窗口,按團隊計算分數,在一小時窗口內,每5分鐘輸出一次當前的團隊分數,對於遲到的數據,每10分鐘輸出一次當前的團隊分數,在窗口結束2小時後遲到的數據通常不可能會出現,假如出現的話,直接拋棄。「WWWH」表達以下:
在基於Beam SDK的實現中,用戶基於「WWWH」 Beam Model表示的業務邏輯能夠分別獨立直接的實現出來:
gameEvents
[... input ...]
.apply("LeaderboardTeamFixedWindows", Window
.<GameActionInfo>into(FixedWindows.of(
Duration.standardMinutes(Durations.minutes(60))))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Durations.minutes(5)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Durations.minutes(10))))
.withAllowedLateness(Duration.standardMinutes(120)
.accumulatingFiredPanes())
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
[... output ...]
LeaderboardTeamFixedWindows對應「Where」定義窗口,Trigger對應「Where」定義結果輸出條件,Accumulation對應「How」定義輸出結果內容,ExtractTeamScore對應「What」定義計算邏輯。
Apache Beam的Beam Model對無限亂序數據流的數據處理進行了很是優雅的抽象,「WWWH」四個維度對數據處理的描述,很是清晰與合理,Beam Model在統一了對無限數據流和有限數據集的處理模式的同時,也明確了對無限數據流的數據處理方式的編程範式,擴大了流處理系統可應用的業務範圍,例如,Event-Time/Session窗口的支持,亂序數據的處理支持等。Apache Flink,Apache Spark Streaming等項目的API設計均愈來愈多的借鑑或參考了Apache Beam Model,且做爲Beam Runner的實現,與Beam SDK的兼容度也愈來愈高。本文主要介紹了Beam Model,以及如何基於Beam Model設計現實中的數據處理任務,但願可以讓讀者對Apache Beam項目可以有一個初步的瞭解。因爲Apache Beam已經進入Apache Incubator孵化,因此讀者也能夠經過官網或是郵件組瞭解更多Apache Beam的進展和狀態。