Flink基本的API

Flink使用 DataSet 和 DataStream 表明數據集。DateSet 用於批處理,表明數據是有限的;而 DataStream 用於流數據,表明數據是無界的。數據集中的數據是不能夠變的,也就是說不能對其中的元素增長或刪除。咱們經過數據源建立 DataSet 或者 DataStream ,經過 map,filter 等轉換(transform)操做對數據集進行操做產生新的數據集。ide

編寫 Flink 程序通常通過一下幾個步驟:函數

  • 得到 execution 環境
  • 建立輸入數據
  • 在數據集上進行轉換操做(下文統一稱爲:transform)
  • 輸出結果數據
  • 觸發程序執行

下面咱們將介紹編寫 Flink 程序所涉及的基本 API。oop

輸入和輸出

首先,須要得到 execution 環境,Flink 提供了一下如下三種方式:大數據

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

以第一個爲例建立 execution 環境的代碼以下this

批處理:spa

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("file:///D:\\words.txt");
text.print();

流處理code

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///D:\\words.txt");
text.print();
env.execute();

words.txt 文件內容:orm

a
b
c
d
e
a
b

上面代碼建立了 execution 環境,同時利用 env 建立了輸入源。在數據集上調用 print 方法能夠將數據輸出到控制檯,固然也能夠調用 writeAsText 等方法將數據輸出到其餘介質。上面流處理最後一行代碼調用了 execute 方法,在流處理中須要顯式調用該方法觸發程序的執行。對象

上述代碼有兩種方式運行,一種是直接在 IDE 中執行,就像運行一個普通的 Java 程序,Flink 將啓動一個本地的環境執行程序。另外一種方式是將程序打包,提交到 Flink 集羣運行。上面例子基本包含了一個 Flink 程序的基本骨架,可是並無對數據集進行更多的 transform 操做,下面咱們簡單介紹基本 transform 操做。blog

map操做

這裏的 map 操做相似 MapReduce 中的 map,對數據進行解析,處理。示例以下

批處理

DataSet<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
});
words.print();

流處理

DataStream<Tuple2<String, Integer>> words = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
});
words.print()

這裏批處理和流處理除了數據集的類型不一樣,其他寫法都同樣。就是將每一個單詞映射成了一個 (單詞, 1) 二元組。與 map 相似的 transform 還有 filter,過濾不須要的記錄,讀者能夠自行嘗試。

指定 key

大數據處理常常須要按照某個維度進行處理,也就是須要指定 key。在 DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key。這裏咱們以 keyBy 爲例進行介紹。

Flink 的數據模型並非基於 key-value 的,key 是虛擬的,能夠看作是定義在數據上的函數。

在 Tuple 中定義 key

KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0); //0 表明 Tuple2 (二元組)中第一個元素
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = words.keyBy(0,1); //0,1 表明二元組中第一個和第二個元素做爲 key\

對於嵌套的 tuple

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

ds.keyBy(0) 將會把 Tuple2<Integer, Float> 總體做爲 key。

用字段表達式指定 key

public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word");

這裏指定 WC 對象的 word 字段做爲 key。字段表達式語法以下:

  • Java對象使用字段名做爲key,例子如上
  • 對於 Tuple 類型使用字段名(f0, f1,...)或者偏移(從0開始)指定 key,例如 f0 和 5 分別表明 Tuple 第一個字段和第六個字段
  • Java 對象和 Tuple 嵌套的字段做爲 key,例如:f1.user.zip 表示 Tuple 第二個字段中的 user 對象中的 zip 字段做爲 key
  • 通配符 * 表明選擇全部類型做爲 key

字段表達式的舉例

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}
  • "count": WC類的 count 字段
  • "complex": complex 的全部字段(遞歸地)
  • "complex.word.f2": ComplexNestedClass 類中 word 三元組的第三個字段
  • "complex.hadoopCitizen": complex類中的 hadoopCitizen 字段

使用 Key Selector 指定 key

經過 key 選擇器函數來制定 key,key 選擇器的輸入爲每一個元素,輸出爲指定的 key,例子以下

words.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {

            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
});

能夠看到實現的效果與 keyBy(0) 是同樣的。

以上即是 Flink 指定 key 的方法。

總結 

這篇文章主要介紹了 Flink 程序的基本骨架。得到環境、建立輸入源、對數據集作 transform 以及輸出。因爲數據處理常常會按照不一樣維度(不一樣的 key)進行統計,所以,本篇內容重點介紹了 Flink 中如何指定 key。後續將會繼續介紹 Flink API 的使用。

歡迎關注公衆號「渡碼」

相關文章
相關標籤/搜索