Apache Beam WordCount編程實戰及源碼解讀

概述:Apache Beam WordCount編程實戰及源碼解讀,並經過intellij IDEA和terminal兩種方式調試運行WordCount程序,Apache Beam對大數據的批處理和流處理,提供一套先進的統一的編程模型,並能夠運行大數據處理引擎上。完整項目Github源碼java

Apache Beam WordCount編程實戰及源碼解讀

負責公司大數據處理相關架構,可是具備多樣性,極大的增長了開發成本,急需統一編程處理,Apache Beam,一處編程,到處運行,故將折騰成果分享出來。git

1.Apache Beam編程實戰–前言,Apache Beam的特色與關鍵概念。

Apache Beam 於2017年1月10日成爲Apache新的頂級項目。github

1.1.Apache Beam 特色:

  • 統一:對於批處理和流媒體用例使用單個編程模型。
  • 方便:支持多個pipelines環境運行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
  • 可擴展:編寫和分享新的SDKs,IO鏈接器和transformation庫
    部分翻譯摘自官網:Apacher Beam 官網

1.2.Apache Beam關鍵概念:

1.2.1.Apache Beam SDKs

主要是開發API,爲批處理和流處理提供統一的編程模型。目前(2017)支持JAVA語言,而Python正在緊張開發中。apache

1.2.2. Apache Beam Pipeline Runners(Beam的執行器/執行者們),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多個大數據計算框架。可謂是一處Apache Beam編程,多計算框架運行。

1.2.3. 他們的對以下的支持狀況詳見

Apache Beam WordCount編程實戰及源碼解讀

2.Apache Beam編程實戰–Apache Beam源碼解讀

基於maven,intellij IDEA,pom.xm查看 完整項目Github源碼 。直接經過IDEA的項目導入功能便可導入完整項目,等待MAVEN下載依賴包,而後按照以下解讀步驟便可順利運行。編程

2.1.源碼解析-Apache Beam 數據流處理原理解析:

關鍵步驟:markdown

  • 建立Pipeline
  • 將轉換應用於Pipeline
  • 讀取輸入文件
  • 應用ParDo轉換
  • 應用SDK提供的轉換(例如:Count)
  • 寫出輸出
  • 運行Pipeline

Apache Beam WordCount編程實戰及源碼解讀

2.2.源碼解析,完整項目Github源碼,附WordCount,pom.xml等

/** * MIT. * Author: wangxiaolei(王小雷). * Date:17-2-20. * Project:ApacheBeamWordCount. */


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public class WordCount {

    /** *1.a.經過Dofn編程Pipeline使得代碼很簡潔。b.對輸入的文本作單詞劃分,輸出。 */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 將文本行劃分爲單詞
            String[] words = c.element().split("[^a-zA-Z']+");
            // 輸出PCollection中的單詞
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /** *2.格式化輸入的文本數據,將轉換單詞爲並計數的打印字符串。 */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }
    /** *3.單詞計數,PTransform(PCollection Transform)將PCollection的文本行轉換成格式化的可計數單詞。 */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 將文本行轉換成單個單詞
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 計算每一個單詞次數
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /** *4.能夠自定義一些選項(Options),好比文件輸入輸出路徑 */
    public interface WordCountOptions extends PipelineOptions {

        /** * 文件輸入選項,能夠經過命令行傳入路徑參數,路徑默認爲gs://apache-beam-samples/shakespeare/kinglear.txt */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();
        void setInputFile(String value);

        /** * 設置結果文件輸出路徑,在intellij IDEA的運行設置選項中或者在命令行中指定輸出文件路徑,如./pom.xml */
        @Description("Path of the file to write to")
        @Required
        String getOutput();
        void setOutput(String value);
    }
    /** * 5.運行程序 */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}

3.支持Spark,Flink,Apex等大數據數據框架來運行該WordCount程序。完整項目Github源碼(推薦,注意pom.xml模塊加載是否成功,在工具中開發大數據程序,利於調試,開發體驗較好)

3.1.intellij IDEA(社區版)中Spark大數據框架運行Pipeline計算程序

  • Spark運行架構

    • 設置VM optionsintellij-idea

      -DPapex-runner
    • 設置Programe argumentsapp

      --inputFile=pom.xml --output=counts

Apache Beam WordCount編程實戰及源碼解讀

3.2.intellij IDEA(社區版)中Apex,Flink等支持的大數據框架都可運行WordCount的Pipeline計算程序,完整項目Github源碼

  • Apex運行框架

    • 設置VM options

      -DPapex-runner
    • 設置Programe arguments

      --inputFile=pom.xml --output=counts
  • Flink運行等等

    • 設置VM options

      -DPflink-runner
    • 設置Programe arguments

      --inputFile=pom.xml --output=counts

4.終端運行(Terminal)(不推薦,第一次下載過程很慢,開發體驗較差)

4.1.如下命令是下載官方示例源碼,第一次運行下載較慢,若是失敗了就多運行幾回,(推薦下載,完整項目Github源碼)直接用上述解讀在intellij IDEA中運行。

mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false

Apache Beam WordCount編程實戰及源碼解讀

4.2.打包並運行

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

Apache Beam WordCount編程實戰及源碼解讀

4.3.成功運行結果

4.3.1.顯示運行成功

Apache Beam WordCount編程實戰及源碼解讀

4.3.2.WordCount輸出計算結果

這裏寫圖片描述

相關文章
相關標籤/搜索