在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的瞭解。apache
要說在Apache Beam中常見的函數是哪個,固然是apply()。常見的寫法以下:app
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform])
而在最簡單的wordcount代碼中,就出現了許多種不一樣的傳入參數類型,除了輸入輸出的部分,還包括
1)使用ParDo.of():ide
.apply("ExtractWords-joe", ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext context) { System.out.println(context.element()+"~"); for (String word : context.element().split(" ")) { if (!word.isEmpty()) { //輸出到Output PCollection context.output(word); } } } }) )
2)使用MapElements.via():函數
.apply("FomatResults", MapElements.via(new SimpleFunction<KV<String, Long>,String>() { @Override public String apply(KV<String, Long> input) { return input.getKey()+":"+input.getValue(); } }))
3)以及使用PTransform子類:學習
.apply(new CountWords()) public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } }
這麼多種傳入方式到底有什麼聯繫?經過查看源碼能夠看出apply函數的定義以下:code
public <OutputT extends POutput> OutputT apply( String name, PTransform<? super PBegin, OutputT> root) { return begin().apply(name, root); }
傳入的參數爲PTransform類對象,也就是這幾種傳入參數其實都是PTransform類的變形。
PTransform是一個實現了Serializable接口的抽象類,其中public abstract OutputT expand(InputT input); 是數據處理方法,強制子類必須實現。
所以第(3)種方式很容易理解,就是經過繼承PTransform並實現了expand方法定義了CountWords類,給apply方法傳遞了一個CountWords對象。orm
在第(2)種方式中,MapElements是PTransform的子類,實現了expand方法,其實現方式是調用@Nullable private final SimpleFunction<InputT, OutputT> fn;成員中定義的數據處理方法,MapElements.via()則是一個爲初始化fn的靜態方法,定義以下:對象
public static <InputT, OutputT> MapElements<InputT, OutputT> via( final SimpleFunction<InputT, OutputT> fn) { return new MapElements<>(fn, null, fn.getClass()); }
傳入了一個SimpleFunction對象,SimpleFunction是一個必須實現public OutputT apply(InputT input) 方法的抽象類,用戶在該apply方法中實現數據處理。
因此這種方式的實現方式以下:
定義SimpleFunction的子類並實現其中的apply方法,將該子類的對象傳遞給MapElements.via()。繼承
第(1)種方式中,ParDo.of()方法傳入一個DoFn對象, 返回一個SingleOutput對象:接口
public static <InputT, OutputT> SingleOutput<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); return new SingleOutput<InputT, OutputT>( fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn)); }
SingleOutput與MapElements相似,也是PTransform的子類,實現了expand方法,使用private final DoFn<InputT, OutputT> fn;成員中的方法進行數據處理。
而DoFn是一個抽象類,用戶必須實現其註解方法(存疑) public void processElement(ProcessContext c)。
因此這種方式的實現方式以下:
定義DoFn的子類並實現其中的processElement方法,將該子類的對象傳遞給ParDo.of()。
須要注意的是processElement方法與前2種方式不一樣,輸入和輸出數據都是在傳入參數ProcessContext c中,而不是經過return進行傳遞。
以上爲學習Apache Beam一天的總結,有錯誤歡迎指正。
**
**
1)MapElement.via(SimpleFunction)和PTransform
MapElements是PTransform的一個子類:
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
從泛型參數來看,PTransform處理的是PCollection,而MapElement處理的是PCollection中的一個元素,對比SimpleFunction的apply方法和PTransform的expand方法的實現方式獲得驗證。
2)MapElement.via(SimpleFunction)和ParDo.of(DoFn)
區別以前已經說過,DoFn的processElement方法的輸入和輸出都是從參數傳入,而SimpleFunction的apply方法從參數傳入輸入,從return傳出輸出。
相同的是這2個方法處理的都是PCollection中的一個元素。
查看MapElement的expand方法源碼:
@Override public PCollection<OutputT> expand(PCollection<? extends InputT> input) { checkNotNull(fn, "Must specify a function on MapElements using .via()"); return input.apply( "Map", ParDo.of( new DoFn<InputT, OutputT>() { @ProcessElement public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); } //部分代碼忽略 })); }
能夠看出其實也是實現了DoFn的子類,在DoFn的processElement方法中調用SimpleFunction對象的apply方法進行處理。