Apache Beam實戰指南之基礎入門

本文由 【AI前線】原創,ID:ai-front,原文連接:t.cn/ROIp4VBjava

本文是 Apache Beam 實戰指南系列文章 的第一篇內容,將簡要介紹 Apache Beam 的發展歷史、應用場景、模型和運行流程、SDKs ,並結合 Beam 的應用示例和代碼剖析帶你進一步瞭解 Beam 的運用原理。

更多精彩文章請添加微信「AI 前線」(ID:ai-front)複製代碼

前言:大數據 2.0 時代不期而至

隨着大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。早期的處理模型 (Map/Reduce) 早已經力不從心,並且也很難應用處處理流程長且複雜的數據流水線上。另外,近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者常常要用到不一樣的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發。這大大增長了選擇合適工具和框架的難度,開發者想要將全部的大數據組件熟練運用幾乎是一項不可能完成的任務。shell

面對這種狀況,Google 在 2016 年 2 月宣佈將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣佈開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺少了解,社區中文資料也比較少。InfoQ 指望經過 Apache Beam 實戰指南系列文章 推進 Apache Beam 在國內的普及。數據庫

本文將簡要介紹 Apache Beam 的發展歷史、應用場景、模型和運行流程、SDKs 和 Beam 的應用示例。歡迎加入 Beam 中文社區深刻討論和交流。編程

概述

大數據處理領域的一大問題是:開發者常常要用到不少不一樣的技術、框架、API、開發語言和 SDK。取決於須要完成的是什麼任務,以及在什麼狀況下進行,開發者極可能會用 MapReduce 進行批處理,用 Apache Spark SQL 進行交互請求(interactive queries),用 Apache Flink 進行實時流處理,還有可能用到基於雲端的機器學習框架。後端

近兩年涌現的開源大潮,爲大數據開發者提供了十分富餘的工具。但這同時也增長了開發者選擇合適工具的難度,尤爲對於新入行的開發者來講。這極可能拖慢、甚至阻礙開源工具的發展:把各類開源框架、工具、庫、平臺人工整合到一塊兒所需工做之複雜,是大數據開發者常有的抱怨之一,也是他們支持專有大數據平臺的首要緣由。bash

Apache Beam 發展歷史

Beam 在 2016 年 2 月成爲 Apache 孵化器項目,並在 2016 年 12 月升級成爲 Apache 基金會的頂級項目。經過十五個月的努力,一個稍顯混亂的代碼庫,從多個組織合併,已發展成爲數據處理的通用引擎,集成多個處理數據框架,能夠作到跨環境。服務器

Beam 通過三個孵化器版本和三個後孵化器版本的演化和改進,最終在 2017 年 5 月 17 日迎來了它的第一個穩定版 2.0.0。發佈穩定版本 3 個月以來,Apache Beam 已經出現明顯的增加,不管是經過官方仍是社區的貢獻數量。Apache Beam 在谷歌雲方面也已經展現出了「才幹」。微信

Beam 2.0.0 改進了用戶體驗,重點在於框架跨環境的無縫移植能力,這些執行環境包括執行引擎、操做系統、本地集羣、雲端以及數據存儲系統。Beam 的其餘特性還包括以下幾點:網絡

  • API 穩定性和對將來版本的兼容性。架構

  • 有狀態的數據處理模式,高效的支持依賴於數據的計算。

  • 支持用戶擴展的文件系統,支持 Hadoop 分佈式發文件系統及其餘。

  • 提供了一個度量指標系統,可用於跟蹤管道的執行情況。

網上已經有不少人寫過 Beam 2.0.0 版本以前的資料,可是 2.0.0 版本後 API 不少寫法變更較大,本文將帶着你們從零基礎到 Apache Beam 入門。

Apache Beam 應用場景

Google Cloud、PayPal、Talend 等公司都在使用 Beam,國內包括阿里巴巴、百度、金山、蘇寧、九次方大數據、360、慧聚數通訊息技術有限公司等也在使用 Beam,同時還有一些大數據公司的架構師或研發人員正在一塊兒進行研究。Apache Beam 中文社區正在集成一些工做中的 runners 和 sdk IO,包括人工智能、機器學習和時序數據庫等一些功能。

如下爲應用場景的幾個例子:

  1. Beam 能夠用於 ETL Job 任務

    Beam 的數據能夠經過 SDKs 的 IO 接入,經過管道能夠用後面的 Runners 作清洗。

  2. Beam 數據倉庫快速切換、跨倉庫

    因爲 Beam 的數據源是多樣 IO,因此用 Beam 能夠快速切換任何數據倉庫。

  3. Beam 計算處理平臺切換、跨平臺

    Runners 目前提供了 3-4 種能夠切換的平臺,隨着 Beam 的強大應該會有更多的平臺提供給你們使用。

Apache Beam 運行流程

4-1 數據處理流程

如圖 4-1 所示,Apache Beam 大致運行流程分紅三大部分:

  1. Modes

    Modes 是 Beam 的模型或叫數據來源的 IO,它是由多種數據源或倉庫的 IO 組成,數據源支持批處理和流處理。

  2. Pipeline

    Pipeline 是 Beam 的管道,全部的批處理或流處理都要經過這個管道把數據傳輸到後端的計算平臺。這個管道如今是惟一的。數據源能夠切換多種,計算平臺或處理平臺也支持多種。須要注意的是,管道只有一條,它的做用是鏈接數據和 Runtimes 平臺。

  3. Runtimes

    Runtimes 是大數據計算或處理平臺,目前支持 Apache Flink、Apache Spark、Direct Pipeline 和 Google Clound Dataflow 四種。其中 Apache Flink 和 Apache Spark 同時支持本地和雲端。Direct Pipeline 僅支持本地,Google Clound Dataflow 僅支持雲端。除此以外,後期 Beam 國外研發團隊還會集成其餘大數據計算平臺。因爲谷歌未進入中國,目前國內開發人員在工做中對谷歌雲的使用應該不是不少,主要之前兩種爲主。爲了使讀者讀完文章後能快速學習且更貼近實際工做環境,後續文章中我會之前兩種做爲大數據計算或處理平臺進行演示。

Beam Model 及其工做流程

Beam Model 指的是 Beam 的編程範式,即 Beam SDK 背後的設計思想。在介紹 Beam Model 以前,先簡要介紹一下 Beam Model 要處理的問題域與一些基本概念。

  1. 數據源類型。分佈式數據來源類型通常能夠分爲兩類,有界的數據集和無界的數據流。有界的數據集,好比一個 Ceph 中的文件,一個 Mongodb 表等,特色是數據已經存在,數據集有已知的、固定的大小,通常存在磁盤上,不會忽然消失。而無界的數據流,好比 Kafka 中流過來的數據流,這種數據的特色是數據動態流入、沒有邊界、沒法所有持久化到磁盤上。Beam 框架設計時須要針對這兩種數據的處理進行考慮,即批處理和流處理。

  2. 時間。分佈式框架的時間處理有兩種,一種是全量計算,另外一種是部分增量計算。我給你們舉個例子:例如咱們玩「王者農藥」遊戲,遊戲的數據須要實時地流向服務器,掉血狀況會隨着時間實時變化,可是排行榜的數據則是所有玩家在必定時間內的排名,例如一週或一個月。Beam 針對這兩種狀況都設計了對應的處理方式。

  3. 亂序。對於流處理框架處理的數據流來講,數據到達大致分兩種,一種是按照 Process Time 定義時間窗口,這種不用考慮亂序問題,由於都是關閉當前窗口後才進行下一個窗口操做,須要等待,因此執行都是有序的。而另外一種,Event Time 定義的時間窗口則不須要等待,可能當前操做尚未處理完,就直接執行下一個操做,形成消息順序處理但結果不是按順序排序了。例如咱們的訂單消息,採用了分佈式處理,若是下單操做所屬服務器處理速度比較慢,而用戶支付的服務器速度很是快,這時最後的訂單操做時間軸就會出現一種狀況,下單在支付的後面。對於這種狀況,如何肯定遲到數據,以及對於遲到數據如何處理一般是很麻煩的事情。

Beam Model 處理的目標數據是無界的時間亂序數據流,不考慮時間順序或有界的數據集可看作是無界亂序數據流的一個特例。Beam Model 從下面四個維度概括了用戶在進行數據處理的時候須要考慮的問題:

  1. What。如何對數據進行計算?例如,機器學習中訓練學習模型能夠用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操做符指定。

  2. Where。數據在什麼範圍中計算?例如,基於 Process-Time 的時間窗口、基於 Event-Time 的時間窗口、滑動窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。

  3. When。什麼時候輸出計算結果?例如,在 1 小時的 Event-Time 時間窗口中,每隔 1 分鐘將當前窗口計算結果輸出。在 Beam SDK 中由 Pipeline 的 Watermark 和觸發器指定。

  4. How。遲到數據如何處理?例如,將遲到數據計算增量結果輸出,或是將遲到數據計算結果和窗口內數據計算結果合併成全量結果輸出。在 Beam SDK 中由 Accumulation 指定。

Beam Model 將「WWWH」四個維度抽象出來組成了 Beam SDK,用戶在基於 Beam SDK 構建數據處理業務邏輯時,每一步只須要根據業務需求按照這四個維度調用具體的 API,便可生成分佈式數據處理 Pipeline,並提交到具體執行引擎上執行。「WWWH」四個維度只是從業務的角度看待問題,並非所有適用於本身的業務。作技術架構必定要結合本身的業務使用相應的技術特性或框架。Beam 作爲「一統」的框架,爲開發者帶來了方便。

Beam SDKs

Beam SDK 給上層應用的開發者提供了一個統一的編程接口,開發者不須要了解底層的具體的大數據平臺的開發接口是什麼,直接經過 Beam SDK 的接口就能夠開發數據處理的加工流程,無論輸入是用於批處理的有界數據集,仍是流式的無界數據集。對於這兩類輸入數據,Beam SDK 都使用相同的類來表現,而且使用相同的轉換操做進行處理。Beam SDK 擁有不一樣編程語言的實現,目前已經完整地提供了 Java 的 SDK,Python 的 SDK 還在開發中,相信將來會發布更多不一樣編程語言的 SDK。

Beam 2.0 的 SDKs 目前有:

Amqp:高級消息隊列協議。

Cassandra:Cassandra 是一個 NoSQL 列族(column family)實現,使用由 Amazon Dynamo 引入的架構方面的特性來支持 Big Table 數據模型。Cassandra 的一些優點以下所示:

  • 高度可擴展性和高度可用性,沒有單點故障

  • NoSQL 列族實現

  • 很是高的寫入吞吐量和良好的讀取吞吐量

  • 相似 SQL 的查詢語言(從 0.8 版本起),並經過二級索引支持搜索

  • 可調節的一致性和對複製的支持靈活的模式

Elasticesarch:一個實時的分佈式搜索引擎。

Google-cloud-platform:谷歌雲 IO。

Hadoop-file-system:操做 Hadoop 文件系統的 IO。

Hadoop-hbase:操做 Hadoop 上的 Hbase 的接口 IO。

Hcatalog:Hcatalog 是 Apache 開源的對於表和底層數據管理統一服務平臺。

Jdbc:鏈接各類數據庫的數據庫鏈接器。

Jms:Java 消息服務(Java Message Service,簡稱 JMS)是用於訪問企業消息系統的開發商中立的 API。企業消息系統能夠協助應用軟件經過網絡進行消息交互。JMS 在其中扮演的角色與 JDBC 很類似,正如 JDBC 提供了一套用於訪問各類不一樣關係數據庫的公共 API,JMS 也提供了獨立於特定廠商的企業消息系統訪問方式。

Kafka:處理流數據的輕量級大數據消息系統,或叫消息總線。

Kinesis:對接亞馬遜的服務,能夠構建用於處理或分析流數據的自定義應用程序,以知足特定需求。

Mongodb:MongoDB 是一個基於分佈式文件存儲的數據庫。

Mqtt:IBM 開發的一個即時通信協議。

Solr:亞實時的分佈式搜索引擎技術。

xml:一種數據格式。

Beam Pipeline Runners

Beam Pipeline Runner 將用戶用 Beam 模型定義開發的處理流程翻譯成底層的分佈式數據處理平臺支持的運行時環境。在運行 Beam 程序時,須要指明底層的正確 Runner 類型,針對不一樣的大數據平臺,會有不一樣的 Runner。目前 Flink、Spark、Apex 以及谷歌的 Cloud DataFlow 都有支持 Beam 的 Runner。

須要注意的是,雖然 Apache Beam 社區很是但願全部的 Beam 執行引擎都可以支持 Beam SDK 定義的功能全集,可是在實際實現中可能沒法達到這一指望。例如,基於 MapReduce 的 Runner 顯然很難實現和流處理相關的功能特性。就目前狀態而言,對 Beam 模型支持最好的就是運行於谷歌雲平臺之上的 Cloud Dataflow,以及能夠用於自建或部署在非谷歌雲之上的 Apache Flink。固然,其它的 Runner 也正在迎頭遇上,整個行業也在朝着支持 Beam 模型的方向發展。

Beam 2.0 的 Runners 框架以下:

Apex

誕生於 2015 年 6 月的 Apache Apex,其一樣源自 DataTorrent 及其使人印象深入的 RTS 平臺,其中包含一套核心處理引擎、儀表板、診斷與監控工具套件外加專門面向數據科學家用戶的圖形流編程系統 dtAssemble。主要用於流處理,經常使用於物聯網等場景。

Direct-java

本地處理和運行 runner。

Flink_2.10

Flink 是一個針對流數據和批數據的分佈式處理引擎。

Gearpump

Gearpump 是一個基於 Akka Actor 的輕量級的實時流計算引擎。現在流平臺須要處理來自各類移動端和物聯網設備的海量數據,系統要能不間斷地提供服務,對數據的處理要能作到不丟失不重複,對各類軟硬件錯誤能平滑處理,對用戶的輸入要能實時響應。除了這些系統層面的需求外,用戶層面的接口還要能作到豐富而靈活,一方面,平臺要提供足夠豐富的基礎設施,能最簡化應用程序的編寫;另外一方面,這個平臺應提供具備表現力的編程 API,讓用戶能靈活表達各類計算,而且整個系統能夠定製,容許用戶選擇調度策略和部署環境,容許用戶在不一樣的指標間作折中取捨,以知足特定的需求。Akka Actor 提供了通訊、併發、隔離、容錯的基礎設施,Gearpump 經過把抽象層次提高到 Actor 這一層,屏蔽了底層的細節,專一於流處理需求自己,能更簡單而又高效地解決上述問題。

Dataflow

2016 年 2 月份,谷歌及其合做夥伴向 Apache 捐贈了一大批代碼,創立了孵化中的 Beam 項目(最初叫 Apache Dataflow)。這些代碼中的大部分來自於谷歌 Cloud Dataflow SDK——開發者用來寫流處理和批處理管道(pipelines)的庫,可在任何支持的執行引擎上運行。當時,支持的主要引擎是谷歌 Cloud Dataflow。

Spark

Apache Spark 是一個正在快速成長的開源集羣計算系統。Apache Spark 生態系統中的包和框架日益豐富,使得 Spark 可以執行高級數據分析。Apache Spark 的快速成功得益於它的強大功能和易用性。相比於傳統的 MapReduce 大數據分析,Spark 效率更高、運行時速度更快。Apache Spark 提供了內存中的分佈式計算能力,具備 Java、Scala、Python、R 四種編程語言的 API 編程接口。

實戰:開發第一個 Beam 程序

8.1 開發環境
  1. 下載安裝 JDK 7 或更新的版本,檢測 JAVA_HOME 環境變量。本文示例使用的是 JDK 1.8。

  2. 下載 maven 並配置,本文示例使用的是 maven-3.3.3。

  3. 開發環境 myeclipse、Spring Tool Suite 、IntelliJ IDEA,這個能夠按照我的喜愛,本文示例用的是 STS。

8.2 開發第一個 wordCount 程序而且運行

1 新建一個 maven 項目

2 在 pom.xml 文件中添加兩個 jar 包

3 新建一個 txtIOTest.java

寫入如下代碼:

4 由於 Windows 上的 Beam2.0.0 不支持本地路徑,須要部署到 Linux 上,須要打包如圖,此處注意要把依賴 jar 都打包進去。


5 部署 beam.jar 到 Linux 環境中

使用 Xshell 5 登陸虛擬機或者 Linux 系統。用 rz 命令把剛纔打包的文件上傳上去。其中虛擬機要安裝上 jdk 並配置好環境變量。

咱們能夠用輸入 javac 命令測試一下。

咱們把 beam.jar 上傳到 /usr/local/ 目錄下面,而後新建一個文件,也就是源文件。命令:touch text.txt 命令:chmod o+rwx text.txt

修改 text.txt 並添加數據。 命令:vi text.txt

 

運行命令:java -jar beam.jar,生成文件。

用 cat 命令查看文件內容,裏面就是統計的結果。

8.3 實戰剖析

咱們能夠經過以上實戰代碼進一步瞭解 Beam 的運用原理。

第一件事情是搭建一個管道(Pipeline),例如咱們小時候家裏澆地用的「水管」。它就是鏈接水源和處理的橋樑。

PipelineOptions pipelineOptions = PipelineOptionsFactory.create();// 建立管道複製代碼

第二件事情是讓咱們的管道有一個處理框架,也就是咱們的 Runtimes 。例如咱們接到水要怎麼處理,是輸送給咱們城市的污水處理廠,仍是其餘。這個污水處理廠就至關於咱們的處理框架,例如如今流行的 Apache Spark 或 Apache Flink。這個要根據本身的業務指定,以下代碼中我指定了本地的處理框架。

pipelineOptions.setRunner(DirectRunner.class);複製代碼

第三件事情也是 Beam 最後一個重要的地方,就是模型 (Model),通俗點講就是咱們的數據來源。若是結合以上第一件和第二件的事情說就是水從哪裏來,水的來源多是河裏、多是污水通道等等。本實例用的是有界固定大小的文本文件。固然 Model 還包含無界數據,例如 kafka 等等,能夠根據的需求靈活運用。

pipeline.apply(TextIO.read().from("/usr/local/text.txt")).apply
("ExtractWords", ParDo.of(new DoFn<String, String>() //後省略複製代碼

最後一步是處理結果,這個比較簡單,能夠根據本身的需求處理。但願經過代碼的實戰結合原理剖析能夠幫助你們更快地熟悉 Beam 並可以簡單地運用 Beam。

總結

Apache Beam 是集成了不少數據模型的一個統一化平臺,它爲大數據開發工程師頻繁換數據源或多數據源、多計算框架提供了集成統一框架平臺。Apache Beam 社區如今已經集成了數據庫的切換 IO,將來 Beam 中文社區還將爲 Beam 集成更多的 Model 和計算框架,爲你們提供方便。

做者介紹

張海濤,目前就任於海康威視雲基礎平臺,負責雲計算大數據的基礎架構設計和中間件的開發,專一雲計算大數據方向。Apache Beam 中文社區發起人之一,若是想進一步瞭解最新 Apache Beam 動態和技術研究成果,請加微信 cyrjkj 入羣共同研究和運用。

策劃 & 審校|Natalie

關於【AI 前線】:面向 AI 愛好者、開發者和科學家,提供最新最全 AI 領域技術資訊、一線業界實踐案例、蒐羅整理業界技術分享乾貨、最新 AI 論文解讀。每週一節技術分享公開課,助力你全面擁抱人工智能技術。ID:ai-front

相關文章
相關標籤/搜索