依賴數據源的類型,你能夠將用於批處理和流的DataSet接口作爲數據源寫一個batch或者streaming程序。本節教程旨在介紹在這兩個方向上通用接口的基本概念。java
注:當我以StreamingExecutionEnvironment和DataStream API來做爲講述這些概念的實例時。其餘和DataSet API是同樣的,僅僅使用了ExecutionEnvironment和DataSet來指代而已。express
DataSet和DataStreamapache
Flink使用特殊的類——DataSet和DataStream來表達程序中的數據。你能夠把他們想象成包含能夠複製的數據的不可變集合。在DataSet中的數據是有限的可是在一個DataStream中元素的數量是無上限的。api
這些集合在一些關鍵的點上仍是和經常使用的java集合有一些區別的。首先,他們是可變的,意味着一旦他們被建立就不能增長或者移除任何元素。你要作的不只僅只是檢查裏面的元素這麼簡單。ide
一個集合是經過在Flink程序中增長一個source來初始化的。新的集合經過調用像map、filter等函數來作轉換而生成的。函數
Flink程序的結構oop
Flink程序就像是轉換數據集合的程序同樣。每一個程序都包含下面的部分:學習
1 獲取執行環境this
2 加載/建立初始化數據命令行
3 說明在數據上要作的轉換
4 說明將你計算的結果保存在哪
5 執行程序
我如今只是給出這些步驟的一個總覽,若是須要深刻學習還須要瞭解更多。關於java DataSet API的核心類均可以在包org.apache.flink.api.java中找到,java DataStream API的核心類能夠在包org.apache.flink.streaming.api中找到。
StreamExecutionEnvironment是全部的Flink程序的基礎。你能夠經過下面列舉的靜態方法來獲取:
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host,int port,String... jarFiles)
通常狀況下,你只須要使用getExecutionEnvironment(),他依賴以下的上下文:若是你是在IDE中做爲通常java程序來執行的話,他會在你本地的機器上建立一個本地環境來運行你的代碼。若是你將本身的程序打包成jar文件,經過命令行的方式調用,Flink集羣管理器就會執行你的main方法,getExecutionEnvironment()將會爲你在集羣上的程序打造一個執行環境。
對於特定的數據源執行環境使用不少方法從文件中讀取數據:你只須要逐行讀取,就像CSV文件那樣,或者使用完整的自定義數據輸入格式。爲了將文本文件做爲一系列行的集合來讀,你能夠這麼用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");
這樣你就能夠經過這個DataStream來建立轉換爲新的DataStream輸入源。你能夠經過調用一個轉換函數來應用這個轉換。舉個例子,一個map轉換像是這樣:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
這樣你就能夠將原始集合中的每一個字符串轉換成整數。
一旦你有了一個包含你最終結果的DataStream,你就能夠經過建立一個sink來寫入外部系統。下面是建立sink的一些示例方法:
writeAsText(String path) print()
一旦你完成了整個程序,你須要經過在StreamExecutionEnvironment上調用execute()來執行你的程序。依賴於ExecutionEnvironment的類型執行環境能夠決定是在本地機器運行仍是將你的程序提交到集羣上執行。
execute()方法返回一個JobExecutionResult,他包含執行時間和累加結果。
延遲評估
全部的Flink程序都是延遲執行的:當主函數執行的時候,數據每每不會馬上就加載並轉換。然而,每一個操做都被建立並加入了程序計劃中。當執行程序經過execute()觸發的時候操做才真正被執行。無論執行環境的類型是本地仍是集羣的方式。
延遲評估使得Flink的執行程序做爲歷史鏡像計劃單元來安排程序的運行。
特定的鍵
一些轉換(join,coGroup,keyBy,groupBy)須要一個在集合元素中定義的鍵。另一些(Reduce,GroupReduce,Aggregate,Windows)則容許數據能夠在他們被使用以前在鍵上組織。
DataSet被下面這樣的方式組織:
DataSet<...> input = //[...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
而在DataStream中是這樣作的:
DataStream<...> input = //[...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的數據格式不是基於鍵值對的。所以,你也不必非得講數據集合轉成鍵和值的類型。鍵是「虛擬」的:他們做爲真實數據上的函數來指導分組算子。
注:下面我將使用DataStream API和keyBy函數。請自行腦補DataSet API的方式(你只須要將他們替換成DataSet和groupBy)。
爲Tuples定義鍵
最簡單的方式是在一個或多個Tuple的域上進行分組的Tuples:
DataStream<Tuple3<Integer,String,Long>> input = //[...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元組在第一個域上進行分組:
DataStream<Tuple3<Integer,String,Long>> input = //[...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
如今,我使用包含第一個和第二個域的組合鍵來分組元組:
若是你有一個嵌套tuple的DataStream數據,好比:
DataStream<Tuple3<Tuple2<Integer,Float>,String,Long>> ds;
使用keyBy(0)可讓系統來調用完整的Tuple2做爲一個鍵。若是你想要看下嵌套的Tuple2是怎麼用的,那你就不得不使用以下介紹的——field expression(域表達式)
使用域表達式定義鍵
你可使用基於字符串的域表達式來引出嵌套域和grouping,sorting,joining和coGrouping的定義。
域表達式使得你能夠像Tuple和Java POJO同樣簡單地在嵌套組合類型中切換域。
在下面的例子中,你有一個WC的POJO,他有兩個域——word和count。爲了按照word分組,你只須要按照他的name來調用keyBy()函數。
//some ordinary POJO public class WC { public String word; public int count; } DataStream<WC> words = //[...] DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
域表達式示例:
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; //getter / setter for private field(count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
使用Selector函數來定義鍵:
另一種定義鍵的方式就是「key selector」函數。一個key selector函數將一個獨立的元素來做爲參數,返回這個元素的鍵。這個鍵能夠是任意類型而且能夠從計算結果中取出。
下面這個例子展現了key selector是怎樣返回一個對象的域的:
// some ordinary POJO public class WC {public String word; public int count;} DataStream<WC> words = //[...] KeyedStream<WC> keyed = words .keyBy(new KeySelector<WC, String>() { pubic String getKey(WC wc) {return wc.word;} });
定義轉換函數
大部分的轉換都須要用戶自定義的函數。這裏列舉了不一樣的方式:
實現一個接口
最經常使用的方式就是實現一個提供的接口:
class MyMapFunction implements MapFunction<String,Integer> { public Integer map(String value) {return Integer.parseInt(value);} }); data.map(new MyMapFunction());
匿名類
你還能夠經過一個叫作匿名類的東東:
data.map(new MapFunction<String,Integer>(){ public Integer map(String value) {return Integer.parseInt(value);} });
Java 8 Lambdas函數
Flink也支持Java 8中定義的Lambda接口。
data.filter(s->s.startsWith("http://")); data.reduce((i1,i2) -> i1 + i2);