在大數據的浪潮之下,技術的更新迭代十分頻繁。受技術開源的影響,大數據開發者提供了十分豐富的工具。但也由於如此,增長了開發者選擇合適工具的難度。在大數據處理一些問題的時候,每每使用的技術是多樣化的。這徹底取決於業務需求,好比進行批處理的MapReduce,實時流處理的Flink,以及SQL交互的Spark SQL等等。而把這些開源框架,工具,類庫,平臺整合到一塊兒,所須要的工做量以及複雜度,可想而知。這也是大數據開發者比較頭疼的問題。而今天要分享的就是整合這些資源的一個解決方案,它就是 Apache Beam。java
Apache Beam 最初叫 Apache Dataflow,由谷歌和其合做夥伴向Apache捐贈了大量的核心代碼,並創立孵化了該項目。該項目的大部分大碼來自於 Cloud Dataflow SDK,其特色有如下幾點:git
那 Apache Beam到底能解決哪些問題,它的應用場景是什麼,下面咱們能夠經過一張圖來講明,以下圖所示:github
經過改圖,咱們能夠很清晰的看到整個技術的發展流向;一部分是谷歌派系,另外一部分則是Apache派系。在開發大數據應用時,咱們有時候使用谷歌的框架,API,類庫,平臺等,而有時候咱們則使用Apache的,好比:HBase,Flink,Spark等。而咱們要整合這些資源則是一個比較頭疼的問題,Apache Beam 的問世,整合這些資源提供了很方便的解決方案。apache
下面,咱們經過一張流程圖來看Beam的運行流程,以下圖所示:編程
經過上圖,咱們能夠清楚的知道,執行一個流程分如下步驟:api
Beam SDK 提供了一個統一的編程模型,來處理任意規模的數據集,其中包括有限的數據集,無限的流數據。Apache Beam SDK 使用相同的類來表達有限和無限的數據,一樣使用相同的轉換方法對數據進行操做。Beam 提供了多種 SDK,你能夠選擇一種你熟悉的來創建數據處理管道,如上述的 2.1 中的圖,咱們能夠知道,目前 Beam 支持 Java,Python 以及其餘待開發的語言。bash
在 Beam 管道上運行引擎會根據你選擇的分佈式處理引擎,其中兼容的 API 轉換你的 Beam 程序應用,讓你的 Beam 應用程序能夠有效的運行在指定的分佈式處理引擎上。於是,當運行 Beam 程序的時候,你能夠按照本身的需求選擇一種分佈式處理引擎。當前 Beam 支持的管道運行引擎有如下幾種:框架
本示例經過使用 Java SDK 來完成,你能夠嘗試運行在不一樣的執行引擎上。maven
關於上述的安裝步驟,並非本篇博客的重點,這裏筆者就很少贅述了,不明白的能夠到官網翻閱文檔進行安裝。編程語言
Apache Beam 的源代碼在 Github 有託管,能夠到 Github 下載對應的源碼,下載地址:https://github.com/apache/beam
而後,將其中的示例代碼進行打包,命令以下所示:
$ mvn archetype:generate \ -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=LATEST \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
此時,命令會建立一個文件夾 word-count-beam,裏面包含一個 pom.xml 和相關的代碼文件。命令以下所示:
$ cd word-count-beam/ $ ls pom.xml src $ ls src/main/java/org/apache/beam/examples/ DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
一個 Beam 程序能夠運行在多個 Beam 的可執行引擎上,包括 ApexRunner,FlinkRunner,SparkRunner 或者 DataflowRunner。另外還有 DirectRunner。不須要特殊的配置就能夠在本地執行,方便測試使用。
下面,你能夠按需選擇你想執行程序的引擎:
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--inputFile=pom.xml --output=counts --runner=ApexRunner" -Papex-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=FlinkRunner --inputFile=pom.xml --output=counts" -Pflink-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \ --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
而後,你能夠經過訪問 http://<flink master>:8081 來監測運行的應用程序。
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \ --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \ -Pdataflow-runner
當程序運行完成後,你能夠看到有多個文件以 count 開頭,個數取決於執行引擎的類型。當你查看文件的內容的時候,每一個惟一的單詞後面會顯示其出現次數,可是先後順序是不固定的,也是分佈式引擎爲了提升效率的一種經常使用方式。
$ ls counts* $ more counts* api: 9 bundled: 1 old: 4 Apache: 2 The: 1 limitations: 1 Foundation: 1 ...
$ cat counts* BEAM: 1 have: 1 simple: 1 skip: 4 PAssert: 1 ...
$ ls counts* $ more counts* The: 1 api: 9 old: 4 Apache: 2 limitations: 1 bundled: 1 Foundation: 1 ...
$ ls /tmp/counts* $ more /tmp/counts* The: 1 api: 9 old: 4 Apache: 2 limitations: 1 bundled: 1 Foundation: 1 ...
$ ls counts* $ more counts* beam: 27 SF: 1 fat: 1 job: 1 limitations: 1 require: 1 of: 11 profile: 10 ...
$ gsutil ls gs://<your-gcs-bucket>/counts* $ gsutil cat gs://<your-gcs-bucket>/counts* feature: 15 smother'st: 1 revelry: 1 bashfulness: 1 Bashful: 1 Below: 2 deserves: 32 barrenly: 1 ...
Apache Beam 主要針對理想並行的數據處理任務,並經過把數據集拆分多個子數據集,讓每一個子數據集可以被單獨處理,從而實現總體數據集的並行化處理。固然,也能夠用 Beam 來處理抽取,轉換和加載任務和數據集成任務(一個ETL過程)。進一步將數據從不一樣的存儲介質中或者數據源中讀取,轉換數據格式,最後加載到新的系統中。