Apache Beam: 下一代的大數據處理標準

Apache Beam(原名Google DataFlow)是Google在2016年2月份貢獻給Apache基金會的Apache孵化項目,被認爲是繼MapReduce,GFS和BigQuery等以後,Google在大數據處理領域對開源社區的又一個很是大的貢獻。Apache Beam的主要目標是統一批處理和流處理的編程範式,爲無限,亂序,web-scale的數據集處理提供簡單靈活,功能豐富以及表達能力十分強大的SDK。Apache Beam項目重點在於數據處理的編程範式和接口定義,並不涉及具體執行引擎的實現,Apache Beam但願基於Beam開發的數據處理程序能夠執行在任意的分佈式計算引擎上。本文主要介紹Apache Beam的編程範式-Beam Model,以及經過Beam SDK如何方便靈活的編寫分佈式數據處理業務邏輯,但願讀者可以經過本文對Apache Beam有初步的瞭解,同時對於分佈式數據處理系統如何處理亂序無限數據流的能力有初步的認識。java

Apache Beam基本架構

隨着分佈式數據處理不斷髮展,新的分佈式數據處理技術也不斷被提出,業界涌現出了愈來愈多的分佈式數據處理框架,從最先的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分佈式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分佈式處理框架的代價也很是大:須要學習一個新的數據處理框架,並重寫全部的業務邏輯。解決這個問題的思路包括兩個部分,首先,須要一個編程範式,可以統一,規範分佈式數據處理的需求,例如,統一批處理和流處理的需求。其次,生成的分佈式數據處理任務應該可以在各個分佈式執行引擎上執行,用戶能夠自由切換分佈式數據處理任務的執行引擎與執行環境。Apache Beam正是爲了解決以上問題而提出的。git

Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發分佈式數據處理任務業務邏輯的API接口,生成的的分佈式數據處理任務Pipeline交給具體的Beam Runner執行引擎。Apache Beam目前支持的API接口是由Java語言實現的,Python版本的API正在開發之中。Apache Beam支持的底層執行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執行引擎的支持也在討論或開發當中。其基本架構以下圖所示:github

圖1 Apache Beam架構圖web

須要注意的是,雖然Apache Beam社區很是但願全部的Beam執行引擎都可以支持Beam SDK定義的功能全集,可是在實際實現中可能並不必定。例如,基於MapReduce的Runner顯然很難實現和流處理相關的功能特性。目前Google DataFlow Cloud是對Beam SDK功能集支持最全面的執行引擎,在開源執行引擎中,支持最全面的則是Apache Flink。apache

Beam Model

Beam Model指的是Beam的編程範式,即Beam SDK背後的設計思想。在介紹Beam Model以前,先簡要介紹一下Beam Model要處理的問題域與一些基本概念。編程

  1. 數據。分佈式數據處理要處理的數據類型通常能夠分爲兩類,有限的數據集和無限的數據流。有限的數據集,好比一個HDFS中的文件,一個HBase表等,特色是數據提早已經存在,通常也已經持久化,不會忽然消失。而無限的數據流,好比kafka中流過來的系統日誌流,或是從twitter API拿到的twitter流等等,這類數據的特色是,數據動態流入,無窮無盡,沒法所有持久化。通常來講,批處理框架的設計目標是用來處理有限的數據集,流處理框架的設計目標是用來處理無限的數據流。有限的數據集能夠看作是無限的數據流的一種特例,可是從數據處理邏輯的角度,這二者並沒有不一樣之處,例如,假設微博數據包含時間戳和轉發量,用戶但願按照統計每小時的轉發量總和,此業務邏輯應該能夠同時在有限數據集和無限數據流上執行,並不該該由於數據源的不一樣而對業務邏輯的實現產生任何影響。
  2. 時間。Process Time是指數據進入分佈式處理框架的時間,而Event-Time則是指數據產生的時間。這兩個時間一般是不一樣的,例如,對於一個處理微博數據的流計算任務,一條2016-06-01-12:00:00發表的微博通過網絡傳輸等延遲可能在2016-06-01-12:01:30才進入到流處理系統中。批處理任務一般進行全量的數據計算,較少關注數據的時間屬性,可是對於流處理任務來講,因爲數據流是無情無盡的,沒法進行全量的計算,一般是對某個窗口中得數據進行計算,對於大部分的流處理任務來講,按照時間進行窗口劃分,多是最多見的需求。
  3. 亂序。對於流處理框架處理的數據流來講,其數據的到達順序可能並不嚴格按照Event-Time的時間順序。若是基於Process Time定義時間窗口,數據到達的順序就是數據的順序,所以不存在亂序問題。可是對於基於Event Time定義的時間窗口來講,可能存在時間靠前的消息在時間靠後的消息後到達的狀況,這在分佈式的數據源中可能很是常見。對於這種狀況,如何肯定遲到數據,以及對於遲到數據如何處理一般是很棘手的問題。

 

Beam Model處理的目標數據是無限的時間亂序數據流,不考慮時間順序或是有限的數據集可看作是無限亂序數據流的一個特例。Beam Model從下面四個維度概括了用戶在進行數據處理的時候須要考慮的問題:網絡

  1. What。如何對數據進行計算?例如,Sum,Join或是機器學習中訓練學習模型等。在Beam SDK中由Pipeline中的操做符指定。
  2. Where。數據在什麼範圍中計算?例如,基於Process-Time的時間窗口,基於Event-Time的時間窗口,滑動窗口等等。在BeamSDK中由Pipeline中的窗口指定。
  3. When。什麼時候將計算結果輸出?例如,在1小時的Event-Time時間窗口中,每隔1分鐘,將當前窗口計算結果輸出。在Beam SDK中由Pipeline中的Watermark和觸發器指定。
  4. How。遲到數據如何處理?例如,將遲到數據計算增量結果輸出,或是將遲到數據計算結果和窗口內數據計算結果合併成全量結果輸出。在Beam SDK中由Accumulation指定。

Beam Model將」WWWH「四個維度抽象出來組成了Beam SDK,用戶在基於Beam SDK構建數據處理業務邏輯時,在每一步只須要根據業務需求按照這四個維度調用具體的API便可生成分佈式數據處理Pipeline,並提交到具體執行引擎上執行。「WWWH」四個維度的抽象僅僅關注業務邏輯自己,和分佈式任務如何執行沒有任何關係。架構

Beam SDK

不一樣於Apache Flink或是Apache Spark,Beam SDK使用同一套API表示數據源,輸出目標以及操做符等。下面介紹4個基於Beam SDK的數據處理任務,經過這四個數據處理任務,讀者能夠了解經過Beam Mode是如何統一靈活的描述批處理和流處理任務的,這4個任務用來處理手機遊戲領域的統計需求,包括:app

  1. 用戶分數。批處理任務,基於有限數據集統計用戶分數。
  2. 每小時團隊分數。批處理任務,基於有限數據集統計每小時,每一個團隊的分數。
  3. 排行榜。流處理任務,2個統計項,每小時每一個團隊的分數以及用戶實時的歷史總得分數。
  4. 遊戲狀態。流處理任務,統計每小時每一個團隊的分數,以及更復雜的每小時統計信息,好比每小時每一個用戶在線時間等。

注:示例代碼來自Beam的源碼,具體地址參見:apache/incubator-beam。部分分析內容參考了Beam的官方文檔,詳情請參見引用連接。框架

下面基於Beam Model的「WWWH」四個維度,分析業務邏輯,並經過代碼展現如何經過Beam SDK實現「WWWH」四個維度的業務邏輯。

用戶分數

統計每一個用戶的歷史總得分數是一個很是簡單的任務,在這裏咱們簡單的經過一個批處理任務實現,每次須要新的用戶分數數據的時候,從新執行一次這個批處理任務便可。對於用戶分數任務,「WWWH」四維度分析結果以下:

經過「WWWH」的分析,對於用戶分數這個批處理任務,經過Beam Java SDK實現的代碼以下所示:

gameEvents

  [... input ...]

    [... parse ...]

      .apply("ExtractUserScore", new ExtractAndSumScore("user")) 

  [... output ...];

ExtractAndSumScore實現了「What」中描述的邏輯,即按用戶分組,而後累加分數,其相關代碼以下:

gameInfo

 .apply(MapElements

     .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))

     .withOutputType(

         TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))

 .apply(Sum.<String>integersPerKey());

經過MapElements肯定Key與Value分別是用戶與分數,而後Sum定義按key分組,並累加分數。Beam支持將多個對數據的操做合併成一個操做,這樣不只能夠支持更清晰的業務邏輯實現,同時也能夠在多處重用合併後的操做邏輯。

每小時團隊分數

按照小時統計每一個團隊的分數,得到最高分數的團隊可能得到獎勵,這個分析任務增長了對窗口的要求,不過咱們依然能夠經過一個批處理任務實現,對於這個任務的「WWWH」四個維度的分析以下:

相對於第一個用戶分數任務,只是在Where部分回答了「數據在什麼範圍中計算?」的問題,同時在What部分「如何計算數據?」中,分組的條件由用戶改成了團隊,這在代碼中也會相應的體現:

gameEvents

  [... input ...]

  [... parse ...]

  .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i)

    -> new Instant(i.getTimestamp())))

  .apply("FixedWindowsTeam", Window.<GameActionInfo>into(

    FixedWindows.of(Duration.standardMinutes(windowDuration))))

  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

  [... output ...];

「AddEventTimestamps」定義瞭如何從原始數據中抽取EventTime數據,「FixedWindowsTeam」則定義了1小時固定窗口,而後重用了ExtractAndSumScore類,只是將分組的列從用戶改爲了團隊。對於每小時團隊分數任務,引入了關於「Where」部分窗口定義的新業務邏輯,可是從代碼中能夠看到,關於「Where」部分的實現和關於「What」部分的實現是徹底獨立的,用戶只須要新加兩行關於「Where」的代碼,很是簡單和清晰。

排行榜

前面兩個任務均是基於有限數據集的批處理任務,對於排行榜來講,咱們一樣須要統計用戶分數以及每小時團隊分數,可是從業務角度但願獲得的是實時數據。對於Apache Beam來講,一個相同處理邏輯的批處理任務和流處理任務的惟一不一樣就是任務的輸入和輸出,中間的業務邏輯Pipeline無需任何改變。對於當前示例的排行榜數據分析任務,咱們不只但願他們知足和前兩個示例相同的業務邏輯,同時也能夠知足更定製化的業務需求,例如:

  1. 流處理任務相對於批處理任務,一個很是重要的特性是,流處理任務能夠更加實時的返回計算結果,例如計算每小時團隊分數時,對於一小時的時間窗口,默認是在一小時的數據所有到達後,把最終的結算結果輸出,可是流處理系統應該同時支持在一小時窗口只有部分數據到達時,就將部分計算結果輸出,從而使得用戶能夠獲得實時的分析結果。
  2. 保證和批處理任務一致的計算結果正確性。因爲亂序數據的存在,對於某一個計算窗口,如何肯定全部數據是否到達(Watermark)?遲到數據如何處理?處理結果如何輸出,總量,增量,並列?流處理系統應該提供機制保證用戶能夠在知足低延遲性能的同時達到最終的計算結果正確性。

上述兩個問題正是經過回答「When」和「How」兩個問題來定義用戶的數據分析需求。「When」取決於用戶但願多常獲得計算結果,在回答「When」的時候,基本上能夠分爲四個階段:

  1. Early。在窗口結束前,肯定什麼時候輸出中間狀態數據。
  2. On-Time。在窗口結束時,輸出窗口數據計算結果。因爲亂序數據的存在,如何判斷窗口結束多是用戶根據額外的知識預估的,且容許在用戶設定的窗口結束後出現遲到的屬於該窗口的數據。
  3. Late。在窗口結束後,有遲到的數據到達,在這個階段,什麼時候輸出計算結果。
  4. Final。可以容忍遲到的最大限度,例如1小時。到達最後的等待時間後,輸出最終的計算結果,同時再也不接受以後的遲到數據,清理該窗口的狀態數據。

對於每小時團隊得分的流處理任務,本示例但願的業務邏輯爲,基於Event Time的1小時時間窗口,按團隊計算分數,在一小時窗口內,每5分鐘輸出一次當前的團隊分數,對於遲到的數據,每10分鐘輸出一次當前的團隊分數,在窗口結束2小時後遲到的數據通常不可能會出現,假如出現的話,直接拋棄。「WWWH」表達以下:

在基於Beam SDK的實現中,用戶基於「WWWH」 Beam Model表示的業務邏輯能夠分別獨立直接的實現出來:

gameEvents
 [... input ...]
  .apply("LeaderboardTeamFixedWindows", Window
    .<GameActionInfo>into(FixedWindows.of(
      Duration.standardMinutes(Durations.minutes(60))))
    .triggering(AfterWatermark.pastEndOfWindow()
      .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
        .plusDelayOf(Durations.minutes(5)))
      .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
        .plusDelayOf(Durations.minutes(10))))
    .withAllowedLateness(Duration.standardMinutes(120)
    .accumulatingFiredPanes())
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
  [... output ...]

LeaderboardTeamFixedWindows對應「Where」定義窗口,Trigger對應「Where」定義結果輸出條件,Accumulation對應「How」定義輸出結果內容,ExtractTeamScore對應「What」定義計算邏輯。

總結

Apache Beam的Beam Model對無限亂序數據流的數據處理進行了很是優雅的抽象,「WWWH」四個維度對數據處理的描述,很是清晰與合理,Beam Model在統一了對無限數據流和有限數據集的處理模式的同時,也明確了對無限數據流的數據處理方式的編程範式,擴大了流處理系統可應用的業務範圍,例如,Event-Time/Session窗口的支持,亂序數據的處理支持等。Apache Flink,Apache Spark Streaming等項目的API設計均愈來愈多的借鑑或參考了Apache Beam Model,且做爲Beam Runner的實現,與Beam SDK的兼容度也愈來愈高。本文主要介紹了Beam Model,以及如何基於Beam Model設計現實中的數據處理任務,但願可以讓讀者對Apache Beam項目可以有一個初步的瞭解。因爲Apache Beam已經進入Apache Incubator孵化,因此讀者也能夠經過官網或是郵件組瞭解更多Apache Beam的進展和狀態。

相關文章
相關標籤/搜索