Apache Beam學習筆記——幾種常見的處理類Transform

在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的瞭解。apache

要說在Apache Beam中常見的函數是哪個,固然是apply()。常見的寫法以下:app

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])

而在最簡單的wordcount代碼中,就出現了許多種不一樣的傳入參數類型,除了輸入輸出的部分,還包括
1)使用ParDo.of():ide

.apply("ExtractWords-joe",
        ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext context) {
                System.out.println(context.element()+"~");
                for (String word : context.element().split(" ")) {
                    if (!word.isEmpty()) {
                        //輸出到Output PCollection
                        context.output(word);
                    }
                }
            }
        })
)

2)使用MapElements.via():函數

.apply("FomatResults",
        MapElements.via(new SimpleFunction<KV<String, Long>,String>() {
            @Override
            public String apply(KV<String, Long> input) {
                return input.getKey()+":"+input.getValue();
            }
        }))

3)以及使用PTransform子類:學習

.apply(new CountWords())
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

這麼多種傳入方式到底有什麼聯繫?經過查看源碼能夠看出apply函數的定義以下:code

public <OutputT extends POutput> OutputT apply(
      String name, PTransform<? super PBegin, OutputT> root) {
    return begin().apply(name, root);
  }

傳入的參數爲PTransform類對象,也就是這幾種傳入參數其實都是PTransform類的變形
PTransform是一個實現了Serializable接口的抽象類,其中public abstract OutputT expand(InputT input); 是數據處理方法,強制子類必須實現。
所以第(3)種方式很容易理解,就是經過繼承PTransform並實現了expand方法定義了CountWords類,給apply方法傳遞了一個CountWords對象。orm

在第(2)種方式中,MapElements是PTransform的子類,實現了expand方法,其實現方式是調用@Nullable private final SimpleFunction<InputT, OutputT> fn;成員中定義的數據處理方法,MapElements.via()則是一個爲初始化fn的靜態方法,定義以下:對象

public static <InputT, OutputT> MapElements<InputT, OutputT> via(
      final SimpleFunction<InputT, OutputT> fn) {
    return new MapElements<>(fn, null, fn.getClass());
  }

傳入了一個SimpleFunction對象,SimpleFunction是一個必須實現public OutputT apply(InputT input) 方法的抽象類,用戶在該apply方法中實現數據處理。
因此這種方式的實現方式以下:
定義SimpleFunction的子類並實現其中的apply方法,將該子類的對象傳遞給MapElements.via()。繼承

第(1)種方式中,ParDo.of()方法傳入一個DoFn對象, 返回一個SingleOutput對象:接口

public static <InputT, OutputT> SingleOutput<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
    validate(fn);
    return new SingleOutput<InputT, OutputT>(
        fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn));
  }

SingleOutput與MapElements相似,也是PTransform的子類,實現了expand方法,使用private final DoFn<InputT, OutputT> fn;成員中的方法進行數據處理。
而DoFn是一個抽象類,用戶必須實現其註解方法(存疑) public void processElement(ProcessContext c)。
因此這種方式的實現方式以下:
定義DoFn的子類並實現其中的processElement方法,將該子類的對象傳遞給ParDo.of()。
須要注意的是processElement方法與前2種方式不一樣,輸入和輸出數據都是在傳入參數ProcessContext c中,而不是經過return進行傳遞。

以上爲學習Apache Beam一天的總結,有錯誤歡迎指正。

**

Day2補充,3種方式的區別和聯繫:

**
1)MapElement.via(SimpleFunction)和PTransform
MapElements是PTransform的一個子類:
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
從泛型參數來看,PTransform處理的是PCollection,而MapElement處理的是PCollection中的一個元素,對比SimpleFunction的apply方法和PTransform的expand方法的實現方式獲得驗證。

2)MapElement.via(SimpleFunction)和ParDo.of(DoFn)
區別以前已經說過,DoFn的processElement方法的輸入和輸出都是從參數傳入,而SimpleFunction的apply方法從參數傳入輸入,從return傳出輸出。
相同的是這2個方法處理的都是PCollection中的一個元素。
查看MapElement的expand方法源碼:

@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
  checkNotNull(fn, "Must specify a function on MapElements using .via()");
  return input.apply(
      "Map",
      ParDo.of(
          new DoFn<InputT, OutputT>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              c.output(fn.apply(c.element()));
            }
    //部分代碼忽略
          }));
}

能夠看出其實也是實現了DoFn的子類,在DoFn的processElement方法中調用SimpleFunction對象的apply方法進行處理。

相關文章
相關標籤/搜索