概述:Apache Beam WordCount編程實戰及源碼解讀,並經過intellij IDEA和terminal兩種方式調試運行WordCount程序,Apache Beam對大數據的批處理和流處理,提供一套先進的統一的編程模型,並能夠運行大數據處理引擎上。完整項目Github源碼java
負責公司大數據處理相關架構,可是具備多樣性,極大的增長了開發成本,急需統一編程處理,Apache Beam,一處編程,到處運行,故將折騰成果分享出來。git
Apache Beam 於2017年1月10日成爲Apache新的頂級項目。github
主要是開發API,爲批處理和流處理提供統一的編程模型。目前(2017)支持JAVA語言,而Python正在緊張開發中。apache
基於maven,intellij IDEA,pom.xm查看 完整項目Github源碼 。直接經過IDEA的項目導入功能便可導入完整項目,等待MAVEN下載依賴包,而後按照以下解讀步驟便可順利運行。編程
關鍵步驟:markdown
/** * MIT. * Author: wangxiaolei(王小雷). * Date:17-2-20. * Project:ApacheBeamWordCount. */
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class WordCount {
/** *1.a.經過Dofn編程Pipeline使得代碼很簡潔。b.對輸入的文本作單詞劃分,輸出。 */
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// 將文本行劃分爲單詞
String[] words = c.element().split("[^a-zA-Z']+");
// 輸出PCollection中的單詞
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/** *2.格式化輸入的文本數據,將轉換單詞爲並計數的打印字符串。 */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/** *3.單詞計數,PTransform(PCollection Transform)將PCollection的文本行轉換成格式化的可計數單詞。 */
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// 將文本行轉換成單個單詞
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// 計算每一個單詞次數
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
/** *4.能夠自定義一些選項(Options),好比文件輸入輸出路徑 */
public interface WordCountOptions extends PipelineOptions {
/** * 文件輸入選項,能夠經過命令行傳入路徑參數,路徑默認爲gs://apache-beam-samples/shakespeare/kinglear.txt */
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
/** * 設置結果文件輸出路徑,在intellij IDEA的運行設置選項中或者在命令行中指定輸出文件路徑,如./pom.xml */
@Description("Path of the file to write to")
@Required
String getOutput();
void setOutput(String value);
}
/** * 5.運行程序 */
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
}
}
pom.xml
模塊加載是否成功,在工具中開發大數據程序,利於調試,開發體驗較好)Spark運行架構
設置VM optionsintellij-idea
-DPapex-runner
設置Programe argumentsapp
--inputFile=pom.xml --output=counts
Apex運行框架
設置VM options
-DPapex-runner
設置Programe arguments
--inputFile=pom.xml --output=counts
Flink運行等等
設置VM options
-DPflink-runner
設置Programe arguments
--inputFile=pom.xml --output=counts
mvn archetype:generate -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots -DarchetypeGroupId=org.apache.beam -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples -DarchetypeVersion=LATEST -DgroupId=org.example -DartifactId=word-count-beam -Dversion="0.1" -Dpackage=org.apache.beam.examples -DinteractiveMode=false
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner