Flink從入門到放棄(入門篇3)-DataSetAPI

戳更多文章:

1-Flink入門javascript

2-本地環境搭建&構建第一個Flink應用css

3-DataSet APIhtml

4-DataSteam APIjava

5-集羣部署node

6-分佈式緩存c++

7-重啓策略面試

8-Flink中的窗口算法

9-Flink中的Timesql

Flink時間戳和水印數據庫

Broadcast廣播變量

FlinkTable&SQL

Flink實戰項目實時熱銷排行

Flink寫入RedisSink

17-Flink消費Kafka寫入Mysql

首先咱們來看一下編程結構:

編程結構

public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { if (args.length != 2){ System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; } String hostName = args[0]; Integer port = Integer.parseInt(args[1]); final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream<String> text = env.socketTextStream(hostName, port); DataStream<Tuple2<String, Integer>> counts text.flatMap(new LineSplitter()) .keyBy(0) .sum(1); counts.print(); env.execute("Java WordCount from SocketTextStream Example"); } 

上面的SocketTextStreamWordCount是一個典型的Flink程序,他由一下及格部分構成:

  • 得到一個execution environment,
  • 加載/建立初始數據,
  • 指定此數據的轉換,
  • 指定放置計算結果的位置,
  • 觸發程序執行

公衆號

  • 全網惟一一個從0開始幫助Java開發者轉作大數據領域的公衆號~

  • 大數據技術與架構或者搜索import_bigdata關注~

  • 海量【java和大數據的面試題+視頻資料】整理在公衆號,關注後能夠下載~

 
image

DataSet API

分類:

  • Source: 數據源建立初始數據集,例如來自文件或Java集合
  • Transformation: 數據轉換將一個或多個DataSet轉換爲新的DataSet
  • Sink: 將計算結果存儲或返回

DataSet Sources

基於文件的

  • readTextFile(path)/ TextInputFormat- 按行讀取文件並將其做爲字符串返回。

  • readTextFileWithValue(path)/ TextValueInputFormat- 按行讀取文件並將它們做爲StringValues返回。StringValues是可變字符串。

  • readCsvFile(path)/ CsvInputFormat- 解析逗號(或其餘字符)分隔字段的文件。返回元組或POJO的DataSet。支持基本java類型及其Value對應做爲字段類型。

  • readFileOfPrimitives(path, Class)/ PrimitiveInputFormat- 解析新行(或其餘字符序列)分隔的原始數據類型(如String或)的文件Integer。

  • readFileOfPrimitives(path, delimiter, Class)/ PrimitiveInputFormat- 解析新行(或其餘字符序列)分隔的原始數據類型的文件,例如String或Integer使用給定的分隔符。

  • readSequenceFile(Key, Value, path)/ SequenceFileInputFormat- 建立一個JobConf並從類型爲SequenceFileInputFormat,Key class和Value類的指定路徑中讀取文件,並將它們做爲Tuple2 <Key,Value>返回。

基於集合

  • fromCollection(Collection) - 從Java Java.util.Collection建立數據集。集合中的全部數據元必須屬於同一類型。

  • fromCollection(Iterator, Class) - 從迭代器建立數據集。該類指定迭代器返回的數據元的數據類型。

  • fromElements(T ...) - 根據給定的對象序列建立數據集。全部對象必須屬於同一類型。

  • fromParallelCollection(SplittableIterator, Class)- 並行地從迭代器建立數據集。該類指定迭代器返回的數據元的數據類型。

  • generateSequence(from, to) - 並行生成給定間隔中的數字序列。

通用方法

  • readFile(inputFormat, path)/ FileInputFormat- 接受文件輸入格式。

  • createInput(inputFormat)/ InputFormat- 接受通用輸入格式。

代碼示例

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 從本地文件系統讀 DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // 讀取HDFS文件 DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // 讀取CSV文件 DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class); // 讀取CSV文件中的部分 DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class); // 讀取CSV映射爲一個java類 DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode"); // 讀取一個指定位置序列化好的文件 DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); // 從輸入字符建立 DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // 建立一個數字序列 DataSet<Long> numbers = env.generateSequence(1, 10000000); // 從關係型數據庫讀取 DataSet<Tuple2<String, Integer> dbData = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish()); 

DataSet Transformation

詳細能夠參考官網:https://flink.sojb.cn/dev/batch/dataset_transformations.html#filter

  • Map

採用一個數據元並生成一個數據元。

data.map(new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } }); 
  • FlatMap

採用一個數據元並生成零個,一個或多個數據元。

data.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) { for (String s : value.split(" ")) { out.collect(s); } } }); 
  • MapPartition

在單個函數調用中轉換並行分區。該函數將分區做爲Iterable流來獲取,而且能夠生成任意數量的結果值。每一個分區中的數據元數量取決於並行度和先前的 算子操做。

data.mapPartition(new MapPartitionFunction<String, Long>() { public void mapPartition(Iterable<String> values, Collector<Long> out) { long c = 0; for (String s : values) { c++; } out.collect(c); } }); 
  • Filter

計算每一個數據元的布爾函數,並保存函數返回true的數據元。
重要信息:系統假定該函數不會修改應用謂詞的數據元。違反此假設可能會致使錯誤的結果。

data.filter(new FilterFunction<Integer>() { public boolean filter(Integer value) { return value > 1000; } }); 
  • Reduce

經過將兩個數據元重複組合成一個數據元,將一組數據元組合成一個數據元。Reduce能夠應用於完整數據集或分組數據集。

data.reduce(new ReduceFunction<Integer> { public Integer reduce(Integer a, Integer b) { return a + b; } }); 

若是將reduce應用於分組數據集,則能夠經過提供CombineHintto 來指定運行時執行reduce的組合階段的方式 setCombineHint。在大多數狀況下,基於散列的策略應該更快,特別是若是不一樣鍵的數量與輸入數據元的數量相比較小(例如1/10)。

  • ReduceGroup

將一組數據元組合成一個或多個數據元。ReduceGroup能夠應用於完整數據集或分組數據集。

data.reduceGroup(new GroupReduceFunction<Integer, Integer> { public void reduce(Iterable<Integer> values, Collector<Integer> out) { int prefixSum = 0; for (Integer i : values) { prefixSum += i; out.collect(prefixSum); } } }); 
  • Aggregate

將一組值聚合爲單個值。聚合函數能夠被認爲是內置的reduce函數。聚合能夠應用於完整數據集或分組數據集。

Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2); 

您還可使用簡寫語法進行最小,最大和總和聚合。

Dataset<Tuple3<Integer, String, Double>> input = // [...] DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2); 
  • Distinct

返回數據集的不一樣數據元。它相對於數據元的全部字段或字段子集從輸入DataSet中刪除重複條目。

data.distinct(); 

使用reduce函數實現Distinct。您能夠經過提供CombineHintto 來指定運行時執行reduce的組合階段的方式 setCombineHint。在大多數狀況下,基於散列的策略應該更快,特別是若是不一樣鍵的數量與輸入數據元的數量相比較小(例如1/10)。

  • Join

經過建立在其鍵上相等的全部數據元對來鏈接兩個數據集。可選地使用JoinFunction將數據元對轉換爲單個數據元,或使用FlatJoinFunction將數據元對轉換爲任意多個(包括無)數據元。請參閱鍵部分以瞭解如何定義鏈接鍵。

result = input1.join(input2)
               .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1) 

您能夠經過Join Hints指定運行時執行鏈接的方式。提示描述了經過分區或廣播進行鏈接,以及它是使用基於排序仍是基於散列的算法。
若是未指定提示,系統將嘗試估算輸入大小,並根據這些估計選擇最佳策略。

// This executes a join by broadcasting the first data set // using a hash table for the broadcast data result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where(0).equalTo(1); 

請注意,鏈接轉換僅適用於等鏈接。其餘鏈接類型須要使用OuterJoin或CoGroup表示。

  • OuterJoin

在兩個數據集上執行左,右或全外鏈接。外鏈接相似於常規(內部)鏈接,並建立在其鍵上相等的全部數據元對。此外,若是在另外一側沒有找到匹配的Keys,則保存「外部」側(左側,右側或二者都滿)的記錄。匹配數據元對(或一個數據元和null另外一個輸入的值)被賦予JoinFunction以將數據元對轉換爲單個數據元,或者轉換爲FlatJoinFunction以將數據元對轉換爲任意多個(包括無)數據元。請參閱鍵部分以瞭解如何定義鏈接鍵。

input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins .where(0) // key of the first input (tuple field 0) .equalTo(1) // key of the second input (tuple field 1) .with(new JoinFunction<String, String, String>() { public String join(String v1, String v2) { // NOTE: // - v2 might be null for leftOuterJoin // - v1 might be null for rightOuterJoin // - v1 OR v2 might be null for fullOuterJoin } }); 
  • CoGroup

reduce 算子操做的二維變體。將一個或多個字段上的每一個輸入分組,而後關聯組。每對組調用轉換函數。

data1.coGroup(data2)
     .where(0) .equalTo(1) .with(new CoGroupFunction<String, String, String>() { public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) { out.collect(...); } }); 
  • Cross

構建兩個輸入的笛卡爾積(交叉乘積),建立全部數據元對。可選擇使用CrossFunction將數據元對轉換爲單個數據元

DataSet<Integer> data1 = // [...] DataSet<String> data2 = // [...] DataSet<Tuple2<Integer, String>> result = data1.cross(data2); 

注:交叉是一個潛在的很是計算密集型 算子操做它甚至能夠挑戰大的計算集羣!建議使用crossWithTiny()和crossWithHuge()來提示系統的DataSet大小。

  • Union

生成兩個數據集的並集。

DataSet<String> data1 = // [...] DataSet<String> data2 = // [...] DataSet<String> result = data1.union(data2); 
  • Rebalance

均勻地Rebalance 數據集的並行分區以消除數據誤差。只有相似Map的轉換可能會遵循Rebalance 轉換。

DataSet<String> in = // [...] DataSet<String> result = in.rebalance() .map(new Mapper()); 
  • Hash-Partition

散列分區給定鍵上的數據集。鍵能夠指定爲位置鍵,表達鍵和鍵選擇器函數。

DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByHash(0) .mapPartition(new PartitionMapper()); 
  • Range-Partition

Range-Partition給定鍵上的數據集。鍵能夠指定爲位置鍵,表達鍵和鍵選擇器函數。

DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionByRange(0) .mapPartition(new PartitionMapper()); 
  • Custom Partitioning

手動指定數據分區。
注意:此方法僅適用於單個字段鍵。

DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key) 
  • Sort Partition

本地按指定順序對指定字段上的數據集的全部分區進行排序。能夠將字段指定爲元組位置或字段表達式。經過連接sortPartition()調用來完成對多個字段的排序。

DataSet<Tuple2<String,Integer>> in = // [...] DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING) .mapPartition(new PartitionMapper()); 
  • First-n

返回數據集的前n個(任意)數據元。First-n能夠應用於常規數據集,分組數據集或分組排序數據集。分組鍵能夠指定爲鍵選擇器函數或字段位置鍵。

DataSet<Tuple2<String,Integer>> in = // [...] // regular data set DataSet<Tuple2<String,Integer>> result1 = in.first(3); // grouped data set DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0) .first(3); // grouped-sorted data set DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0) .sortGroup(1, Order.ASCENDING) .first(3); 

DataSet Sink

數據接收器使用DataSet用於存儲或返回。使用OutputFormat描述數據接收器算子操做 。Flink帶有各類內置輸出格式,這些格式封裝在DataSet上的算子操做中:

  • writeAsText()/ TextOutputFormat- 按字符串順序寫入數據元。經過調用每一個數據元的toString()方法得到字符串。
  • writeAsFormattedText()/ TextOutputFormat- 按字符串順序寫數據元。經過爲每一個數據元調用用戶定義的format()方法來獲取字符串。
  • writeAsCsv(...)/ CsvOutputFormat- 將元組寫爲逗號分隔值文件。行和字段分隔符是可配置的。每一個字段的值來自對象的toString()方法。
  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)- 在標準輸出/標準錯誤流上打印每一個數據元的toString()值。可選地,能夠提供前綴(msg),其前綴爲輸出。這有助於區分不一樣的打印調用。若是並行度大於1,則輸出也將與生成輸出的任務的標識符一塊兒添加。
  • write()/ FileOutputFormat- 自定義文件輸出的方法和基類。支持自定義對象到字節的轉換。
  • output()/ OutputFormat- 大多數通用輸出方法,用於非基於文件的數據接收器(例如將結果存儲在數據庫中)。

能夠將DataSet輸入到多個 算子操做。程序能夠編寫或打印數據集,同時對它們執行其餘轉換。

示例:

// text data DataSet<String> textData = // [...] // write DataSet to a file on the local file system textData.writeAsText("file:///my/result/on/localFS"); // write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // write DataSet to a file and overwrite the file if it exists textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // tuples as lines with pipe as the separator "a|b|c" DataSet<Tuple3<String, Integer, Double>> values = // [...] values.writeAsCsv("file:///path/to/the/result/file", "\n", "|"); // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.writeAsText("file:///path/to/the/result/file"); // this writes values as strings using a user-defined TextFormatter object values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() { public String format (Tuple2<Integer, Integer> value) { return value.f1 + " - " + value.f0; } }); 

使用自定義輸出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...] // write Tuple DataSet to a relational database myResult.output( // build and configure OutputFormat JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() ); 

序列化器

  • Flink自帶了針對諸如int,long,String等標準類型的序列化器
  • 針對Flink沒法實現序列化的數據類型,咱們能夠交給Avro和Kryo
  • 使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro(); 使用kryo序列化:env.getConfig().enableForceKryo(); 使用自定義序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) 

數據類型

  • Java Tuple 和 Scala case class

  • Java POJOs:java實體類

  • Primitive Types
    默認支持java和scala基本數據類型

  • General Class Types
    默認支持大多數java和scala class

  • Hadoop Writables
    支持hadoop中實現了org.apache.hadoop.Writable的數據類型

  • Special Types
    例如scala中的Either Option 和Try

  • 全網惟一一個從0開始幫助Java開發者轉作大數據領域的公衆號~

  • 大數據技術與架構或者搜索import_bigdata關注~

  • 海量【java和大數據的面試題+視頻資料】整理在公衆號,關注後能夠下載~

 
image
相關文章
相關標籤/搜索