Flink基本的API(續)

上一篇介紹了編寫 Flink 程序的基本步驟,以及一些常見 API,如:map、filter、keyBy 等,重點介紹了 keyBy 方法。本篇將繼續介紹 Flink 中經常使用的 API,主要內容爲html

  • 指定 transform 函數
  • Flink 支持的數據類型
  • 累加器

指定 transform 函數

許多 transform 操做須要用戶自定義函數來實現,Flink 支持多種自定義 transform 函數,接下來一一介紹。apache

實現接口

/** * 實現 MapFunction 接口 * 其中泛型的第一 String 表明輸入類型,第二個 Integer 表明輸出類型 */
class MyMapFunction implements MapFunction<String, Integer> { @Override public Integer map(String value) { return Integer.parseInt(value); } } //使用 transform 函數
data.map(new MyMapFunction());

匿名類

data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });

匿名類是 Java 語言定義的語法,與 「實現接口」 的方式同樣,只不過不須要顯示定義子類。這種方式比 「實現接口」 更常見一些。編程

Java 8 Lambda 表達式

data.map(s -> Integer.parseInt(s)); //或者
data.map(Integer::parseInt);

Java 8 支持 Lambda 表達式,用法與 Scala 語法很像, 寫起來簡潔,而且容易維護,推薦使用這種方式。json

rich function

顧名思義,比普通的 transform 函數要更豐富,額外提供了 4 個方法:open、close、getRuntimeContext 和 setRuntimeContext。它們能夠用來建立/初始化本地狀態、訪問廣播變量、訪問累加器和計數器等。感受有點像 Hadoop 中的 Mapper 或者 Reducer 類。實現上,可使用自定義類繼承 RichMapFunction 類的方式數組

/** * 與實現 MapFunction 接口相似,這裏是繼承了 RichMapFunction 類 * 同時能夠實現父類更多的方法 */
class MyRichMapFunction extends RichMapFunction<String, Integer> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); } @Override public Integer map(String value) throws Exception { return Integer.parseInt(value); } @Override public void close() throws Exception { super.close(); } } data.map(new MyRichMapFunction());

也可使用匿名類的方式app

data.map (new RichMapFunction<String, Integer>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); } @Override public Integer map(String value) { return Integer.parseInt(value); } @Override public void close() throws Exception { super.close(); } });

若是在 rich function 中須要寫較多的業務,那麼用匿名類的方式並不簡潔,而且可讀性差。框架

Flink支持的數據類型

目前 Flink 支持 6 種數據類型分佈式

  • Java Tuple 和 Scala Case Class
  • Java POJO
  • 原子類型
  • 普通類
  • Values
  • Hadoop Writable 類型
  • 特殊類

Java Tuple 和 Scala Case Class

Tuple (元組)是一個混合類型,包含固定數量的屬性,而且每一個屬性類型能夠不一樣。例如:二元組有 2 個屬性,類名爲 Tuple2;三元組有 3 個屬性,類名爲 Tuple3,以此類推。Java 支持的元組爲 Tuple1 - Tuple25。訪問屬性能夠經過屬性名直接訪問,如:tuple.f4 表明 tuple 的第 5 個屬性。或者使用 tuple.getField(int position) 方法,參數 position 從 0 開始。ide

/** * Tuple2 二元組做爲 DataStream 的輸入類型 */ DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } });

Java POJO

POJO(Plain Ordinary Java Object) 叫作簡單的 Java 對象。知足如下條件的 Java 或 Scala 類會被 Flink 看作 POJO 類型函數

  • 類必須是 public
  • 必須有一個 public 修飾的無參構造方法(默認構造器)
  • 全部屬性必須是 public 修飾或者經過 getter 和 setter 方法能夠訪問到
  • 屬性類型必須也是 Flink 支持的,Flink 使用 avro 對其序列化

POJO 類型更易使用,且 Flink 更高效地處理 POJO 類型的數據。

public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2));

原子類型

Flink 支持 Java 和 Scala 中全部的原子類型,例如: Integer、String 和 Double 等。

普通類 

不是 POJO 類型的類都會被 Flink 看作是普通的類類型。Flink 將它們視爲黑盒且不會訪問它們的內容,普通類類型使用 Kryo 進行序列化與反序列化。這裏是第二次提到序列化與反序列化,簡單解釋下這個概念。由於在分佈式計算的系統中,不可避免要在不一樣機器之間傳輸數據,所以爲了高效傳輸數據且在不一樣語言之間互相轉化,須要經過某種協議(protobuf、kryo、avro、json)將對象轉化成另一種形式(序列化),其餘機器接到序列化的數據後再轉化成以前的對象(反序列化)就能夠正常使用了。

Values

不一樣於通常的序列化框架,Values 類型經過實現 org.apache.flinktypes.Value 接口裏的 write 和 read 方法,實現本身的序列化和反序列化邏輯。當通常的序列化框架不夠高效的時候,可使用 Values 類型。例如:對於一個用數組存儲的稀疏向量。因爲數組大多數元素爲 0 ,能夠僅對非 0 元素進行特殊編碼,而通常的序列化框架會對全部元素進行序列化操做。

Flink 已經預約義了幾種 Value 類型與基本數據類型相對應。如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue。這些 Value 類型能夠看作是基本數據類型的變體,他們的值是可變的,容許程序重複利用對象,減輕 GC 的壓力。例如:Java 基本數據類型 String 是不可變的,可是 Flink 的 StringValue 類型是可變的。

Flink 定義的 Value 類型與 Hadoop Writable 類型類似,本質都是經過改進基本數據類型的缺點,提供系統總體性能。

Hadoop Writable

Hadoop Writable 類型也是手動實現了比較高效的序列化與反序列化的邏輯。Value 類型實現了 org.apache.finktypes.Value 接口,而 Hadoop Writable 類型實現了 org.apache.hadoop.Writable 接口,該接口定義了 write 和 readFields 方法用來手動實現序列化與反序列化邏輯

特殊類型

特殊類型包括 Scala 中的 Either, Option, and Try 類型,以及 Java API 中的 Either 類型。

累加器 

累加器能夠經過 add 操做,對程序中的某些狀態或者操做進行計數,job 結束後會返回計數的結果。累加器能夠用來調試或者記錄信息。

能夠自定義累加器,須要實現 Accumulator 接口,固然 Flink 提供了兩種內置的累加器

  • IntCounter, LongCounter 和 DoubleCounter
  • Histogram:統計分佈

使用累加器的步驟以下:

在 transform 函數中定義累加器對象

private IntCounter numLines = new IntCounter();

註冊累加器對象,能夠在 rich function 的 open 方法進行

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在任何須要統計的地方使用累加器

this.numLines.add(1);

獲取累加器結果

myJobExecutionResult.getAccumulatorResult("num-lines")

Job 結束後,累加器的最終值存儲在 JobExecutionResult 對象中,能夠經過 execute 方法返回值來獲取 JobExecutionResult 對象。可是對於批處理沒法使用調用這個方法(官網沒有提到),能夠經過 env.getLastJobExecutionResult 方法獲取。下面是使用累加器的完整示例

public static void main(String[] args) throws Exception { // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> data =  env.readTextFile("你的輸入路徑"); //使用 rich function transform 函數
        DataSet<Integer> dataSet = data.map(new MyRichMapFunction()); // 執行程序
 dataSet.collect(); // 得到 job 的結果
        JobExecutionResult jobExecutionResult = env.getLastJobExecutionResult(); int res = jobExecutionResult.getAccumulatorResult("num-lines"); // 輸出累加器的值
 System.out.println(res); } // 自定義 rich function /** * 與實現 MapFunction 接口相似,這裏是繼承了 RichMapFunction 類 * 同時能夠實現父類更多的方法 */
class MyRichMapFunction extends RichMapFunction<String, Integer> { /** * 定義累加器 */
    private IntCounter numLines = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { // 註冊累加器
        getRuntimeContext().addAccumulator("num-lines", this.numLines); } @Override public Integer map(String value) throws Exception { // 累加器自增,記錄處理的行數
        this.numLines.add(1); return Integer.parseInt(value); } }

 

總結

Flink 基本 API 的使用介紹完了,本篇主要介紹了自定義的 transform 函數、Flink 支持的數據類型和累加器。後續會詳細介紹 Flink 的原理、機制以及編程模型。

歡迎關注公衆號「渡碼」

 

原文出處:https://www.cnblogs.com/duma/p/10992125.html

相關文章
相關標籤/搜索