Beam從零開始(一)

轉  http://blog.csdn.net/qq_23660243/article/details/54614167

網上看了別人都在談Beam,你說咱們作爲技術人員技術也得緊跟着時代不是,所以也開始利用業餘時間研究Beam。咱不是大神,不能啥都一看就會,所以一天一天來,這個也就作爲筆記吧。廢話不多說,進入主題,按照老規矩,從官網入手。


其實Beam官網目前做的不是很豐滿,不過好在按照步驟進行,可以接受。


Beam是什麼呢?英文中Beam是光束的意思,官方對Beam的解釋是:Apache Beam是一個開源的統一的編程模型(記住,他是個模型而已),我們可以使用它來創建數據處理管道(核心是管道)。我們首先要定義一個程序,使用開源的BeamSDK來定義管道。然後管道由Beam支持的分佈式處理後端之一執行:Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow。


Beam對於尷尬並行數據處理任務特別有用,其中問題可以被分解爲可以獨立和並行處理的許多較小的數據束。我們同樣可以使用Beam來提取,變換和加載(ETL)任務和純數據集成。這些任務對於不同存儲介質和數據源之間移動數據,將數據轉化成理想格式,或將數據加載到新的系統上有很大的好處。


Beam管道運行器將我們定義的處理管道和程序轉化爲與我們選擇的分佈式處理後端兼容的API。當我們運行Beam程序的時候,我們需要爲執行管道的後端指定適當的運行器(Runner)。


好了,上面就是簡短的理論基礎,下面開始我們經典的wordcount環節。不過我打算繞過官方的QuickStart環節,因爲這個真的沒啥意思,我們直接自己手動創建項目然後開始學習。


我們從Minimal WordCount開始說起,下面我簡稱:MW。MW演示了一個可以從文本中讀取的管道。應用轉換來對單詞進行標記和計數,並將結果寫入到輸出文件中。下面是詳細步驟:


首先我們創建一個maven項目,如圖:



然後在pom文件中加入我們的依賴:


<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.4.0</version>
</dependency>

接着創建我們的第一個類:Day01,然後在其中創建main方法,那麼到此我們的準備工作完畢。下面開始編寫代碼:


Creating the Pipeline


創建Beam管道的第一步是創建一個PipelineOptions對象,這個對象讓我們對我們的管道設置各種選項,例如將要執行我們管道的管道線程以及所選擇的線程所需的任何指定配置。我們可以爲我們的Pipeline指定一個Runner。比如DataflowRunner或者SparkRunner。當我們不指定的時候,將會默認調用本地的DirectRunner。這裏我跟官網不同,我使用最爲簡單的本地讀取。


所以我可以直接創建Pipeline對象:


PipelineOptions pipe = PipelineOptionsFactory.create();
// 當我們不指定的時候,會默認使用DirectRunner這種類型
//  pipe.setRunner(DirectRunner.class);


Pipeline p = Pipeline.create(pipe);

【注意】如果這裏直接運行會報錯,說本地找不到DirectRunner類(能導入的那個不是我們需要的),因爲缺少依賴,在pom中增加:


<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.4.0</version>
</dependency>

就可以成功解決問題。創建了管道,我們就可以對管道進行轉化了。


每個轉換採取某種輸入,然後產生一些輸出數據。輸入和輸出數據被SDK類:PCollection所表示。PCollection是一個特殊的類,他由Beam的SDK提供,我們可以用來代表幾乎任何大小的數據集。流程圖如下:


文本文件讀取操作被用於Pipeline本身,他生成PCollection作爲輸出,輸出PCollection中的每個元素表示輸入文件中的一行文本。那麼我們首先創建一個文件:



demo.txt爲我們新創建的文件,裏面內容:


tom
cat
hello

然後我們開始進行讀取:


p.apply(TextIO.Read.from("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\demo.txt"))


讀取完畢我們需要對內容進行處理,在每個元素上調用DoFn方法的ParDo轉換,將文本行標記爲單個單詞,該文本的輸入是由前一個TextIO.Read轉換生成的文本行的PCollection。ParDo同樣轉換輸出爲新的PCollection,其中每個元素表示文本中的單個詞:

.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        for (String word : c.element().split(" ")) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}))



接下來我們需要對每個單詞進行統計,SDK提供的count變換是一種通用的轉換,它採用任何類型的PCollection,並返回key/value類型的PCollection。每個key表示來自於集合的唯一元素,每個value表示key出現的總次數。


.apply(Count.<String>perElement())


下面的轉換將唯一word和出現次數的每個key/value對格式化爲適用於寫入輸出文件的可打印字符串。MapElements是一個更高級別的複合變換,它封裝了一個簡單的ParDo。對於PCollection中的每個元素,MapElements應用只產生一個元素的函數。在本例中,MapElements調用執行格式化的simpleFunction(匿名內部類),作爲輸入,


MapElements獲得由count生成的key/value對的PCollection。併產生可打印字符串的新PCollection。

.apply("FormatResult", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
    @Override
    public String apply(KV<String, Long> input) {
        return input.getKey() + ": " + input.getValue();
    }
})).apply(TextIO.Write.to("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\wordcounts"));


然後我們開始運行:

p.run().waitUntilFinish();


運行後我們得到結果:



每個裏面是word的統計結果,應該是hash分區,所以出現三個文件:


tom: 1
hello: 1

cat: 1


那麼計算結束,這裏僅僅是一個簡單的入門,後面還會繼續深入。


感謝開源。