Flink 的流數據處理程序是常規的程序 ,經過再流數據上,實現了各類轉換 (好比 過濾, 更新中間狀態, 定義窗口, 聚合)。流數據能夠來之多種數據源 (好比, 消息隊列, socket 流, 文件). 經過sink組件落地流計算的最終結果,好比能夠把數據落地文件系統,標準輸出流好比命令行界面, Flink 的程序能夠運行在多種上下文環境 ,能夠單獨只是Flink api,也能夠嵌入其餘程序. execution能夠運行在本地的 JVM裏, 也能夠 運行在多臺機器的集羣中。爲了建立你的流數據處理程序,,咱們建議您從程序骨架開始,而後逐步添加您本身的transformations。後面的章節,是一些附加的操做,和一些高級功能。html
下面是一個可運行的完整的例子 ,帶窗口的流數據wordcount程序, 數據源來自一個每5秒一次的socket. 你能夠複製黏貼並本地運行.java
1 public class WindowWordCount { 2 public static void main(String[] args) throws Exception { 3 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 4 DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999) 5 .flatMap(new Splitter()) 6 .keyBy(0) 7 .timeWindow(Time.of(5, TimeUnit.SECONDS)) 8 .sum(1); 9 dataStream.print(); 10 env.execute("Window WordCount"); 11 } 12 public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 13 @Override 14 public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { 15 for (String word: sentence.split(" ")) { 16 out.collect(new Tuple2<String, Integer>(word, 1)); 17 } 18 } 19 } 20 }
爲了運行這個程序, 啓動netcat在terminal中,敲下面這段:node
nc -lk 9999
隨便敲幾個字. 這幾個字將做爲例子程序的輸入. 若是你在5秒內,按了重複的字符,他們的count將會超過1. (若是你敲得不夠快,能夠提升5秒這個設置☺).web
Back to topredis
爲了能夠寫Flink的代碼, 你須要導入對應的語言的DataStream 的依賴包到你的工程裏。算法
最簡單的作法是使用quickstart 腳本: either for Java or for Scala. 你能夠從模板中建立一個空工程 (a Maven Archetype), 這個工程已經準備好了一切編程所需的一切了.經過敲下面的代碼,你可以使用archetype 來手工的建立一個工程:docker
mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-java / -DarchetypeVersion=1.0-SNAPSHOT
mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-scala / -DarchetypeVersion=1.0-SNAPSHOT
這個archetypes依賴,穩定版本或者當前的版本(-SNAPSHOT
).apache
若是你想爲一個存在的maven工程添加Flink,在你的pom文件裏添加下面這段依賴。編程
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
就像例子程序裏看到的, Flink 的流數據編程就像大多數java程序同樣的,是一個帶main函數同樣的java程序 . 每一個程序由相同的基本部分組成:json
ExecutionEnvironment
,咱們如今會對每個步驟一個概述,,請參閱有各個部分的關詳細信息的。
StreamExecutionEnvironment
是全部 Flink 流數據程序的基礎. 你能夠經過treamExecutionEnvironment這個類的任意一個
靜態方法獲取 :
getExecutionEnvironment() createLocalEnvironment() createLocalEnvironment(int parallelism) createLocalEnvironment(int parallelism, Configuration customConfiguration) createRemoteEnvironment(String host, int port, String... jarFiles) createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
通常來講,你只須要使用 getExecutionEnvironment()
, 由於這個方法會根據環境的上下文獲取正確的對象: 若是你通常java程序同樣,在IDE裏執行你的程序,它會建立一個本地environment ,用來在本地機器上執行你的程序。若是你的程序打包成jar, 而後經過命令行或者web界面調用這個jar, Flink 的cluster manager 會執行你的主函數,此時getExecutionEnvironment()
會返回一個集羣環境的execution來執行你的程序。
environment 有多個方法,能夠用來定義不一樣的數據源 。包括文件系統, sockets, and 和外部系統. 你能夠調用下面的代碼來獲取socket裏的數據源,用來調試用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("localhost", 9999)
這行代碼會返回一個DataStream ,你能夠在這個對象使用transformations. 更多的數據源相關的,能夠閱讀數據源那一章。
一旦獲取了DataStream 對象,你能夠調用transformations ,來建立一個新的DataStream。 而後你還能夠寫回socket, 或者繼續調用transform , 或者和其餘的DataStreams結合, 或者 把數據落地到其餘外部系統(好比, 消息隊列或者文件系統). 你能夠經過調用DataStream的方法來調用各類不一樣的transformation。 並嵌入你自定義的函數到transformation中。舉個例子, map transformation大概是像是下面這樣的:
DataStream<String> input = ...; DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
這段代碼會產生一個新的DataStream ,並將原始流上的String類型的數據轉換成Integer類型。更多的詳細內容能夠看transformation這一章。
一旦你有的一個含有你最終計算結果的DataStream, 你能夠把結果落地到外部系統 (好比HDFS, Kafka, Elasticsearch), 或者寫回socket, 或者寫入到文件系統, 或者打印出來.
writeAsText(String path, ...) writeAsCsv(String path, ...) writeToSocket(String hostname, int port, ...) print() addSink(...)
一旦你編寫好轉換和落地等操做,你須要經過調用execute()
來觸發程序開始執行,具體的執行方式依賴具體的StreamExecutionEnvironment
. 這個方法會再本地機器上執行,也可能在集羣上提交這個程序。
env.execute();
下面除了例子是scala編寫的其餘和上面同樣
As presented in the example, Flink DataStream programs look like regular Scala programs with a main()
method. Each program consists of the same basic parts:
StreamExecutionEnvironment
,We will now give an overview of each of those steps, please refer to the respective sections for more details.
The StreamExecutionEnvironment
is the basis for all Flink DataStream programs. You can obtain one using these static methods on classStreamExecutionEnvironment
:
def getExecutionEnvironment def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()) def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)
Typically, you only need to use getExecutionEnvironment
, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment()
will return an execution environment for executing your program on a cluster.
For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment DataStream<String> lines = env.socketTextStream("localhost", 9999)
This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.
Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:
val input: DataStream[String] = ... val mapped = input.map { x => x.toInt }
This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.
Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.
writeAsText(path: String, ...) writeAsCsv(path: String, ...) writeToSocket(hostname: String, port: Int, ...) print() addSink(...)
Once you specified the complete program you need to trigger the program execution by calling execute
on StreamExecutionEnvironment
. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
env.execute()
流數據是一種相同類型的無限的數據集合。
Transformations會返回不一樣子類類型的of DataStream
,而且轉換後的還能夠繼續transformations,好比 keyBy(…)
方法會返回KeyedDataStream
,這個也是流數據, 經過一個 key在本地作分區的流數據。還能夠進行窗口操做。
Flink的全部流數據程序是延遲執的。當main函數執行後, 數據加載和轉換不是馬上執行的,相反的,每一步操做會加入一個執行計劃.。直到evn執行execute方法來啓動程序,這個執行計劃纔會執行。不管是本地執行仍是在集羣上執行.
延遲執行讓你能夠構建複雜的程序,而且讓flink執行起來,像是個完整的計劃單元。
數據transformation讓流數據產生新的流數據,. 程序能夠結合多個流數據來構建複雜的應用拓撲結構。
這章給出了全部可用的transformations的詳細說明。
Transformation | 詳細說明 |
---|---|
Map DataStream → DataStream |
取一個元素併產生一個元素(一進對一出的意思)。下面的例子是一個map函數,該函數將輸入流的值加倍: DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); |
FlatMap DataStream → DataStream |
須要一個元素,併產生零個,一個或多個元素(void返回值,對返回無要求,依賴out如何發送)。下面的例子是一個flatmap功能拆分句子的話: dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); |
Filter DataStream → DataStream |
對每一個元素執行boolean函數,過濾掉false的值。下面的例子是濾出零值的過濾器: dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); |
KeyBy DataStream → KeyedStream |
從邏輯上將一個流數據劃分紅不相交的分區,每一個分區包含相同的鍵的元素。在內部是用經過哈希分區來實現的。查看如何指定鍵的鍵。這一轉變返回keyeddatastream。下面例子展現如何定義分區。 dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple |
Reduce KeyedStream → DataStream |
這是一個keyeddatastream特有的滾動的reduce功能, 多對一:對全部同key的元素進行傳入的運算,將總的結果發送出去。 keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); |
Fold DataStream→ DataStream |
有一個初始值(0),其餘和上面同樣。這是一個keyeddatastream特有的滾動的reduce功能, 多對一:對全部同key的元素進行傳入的運算,將總的結果發送出去。 keyedStream.fold(0, new ReduceFunction<Integer>() { @Override public Integer fold(Integer accumulator, Integer value) throws Exception { return accumulator + value; } }); |
Aggregations KeyedStream → DataStream |
這是一個keyeddatastream特有的滾動的聚合功能. min 和minBy 的區別是min 返回最小值, minBy 返回有指定key的最小值,對應的元素。 keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); |
Window KeyedStream → WindowedStream |
能夠對分區完KeyedStreams進行分區. Windows根據每一個key對應的數據的某些特徵進行分組 (好比:每五秒到達的數據根據key劃分爲一個組). 後面有一章專門詳細介紹windows dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)); // Last 5 seconds of data
|
WindowAll DataStream → AllWindowedStream |
Windows也能在通常的DataStream上使用而不只僅是對KeyedStream 。Windows能對全部的stream event 進行分組(好比:對最近的5秒的數據進行分組). 警告: 這是在許多狀況下,一個非平行變換。全部的記錄都會彙集在一個任務的windowall算子。 dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data |
(Window) Apply WindowedStream → DataStream AllWindowedStream → DataStream |
對windowStream的每個小窗口應用一個函數.。下面的例子是對每一個window的數據作sum的操做. 注意: 若是你使用的是上面的 那個windowAll 的transformation, 你須要傳遞AllWindowFunction ,而不是windowFunction。 windowedStream.apply (new WindowFunction<Tuple2<String,Integer>,Integer>, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }; |
(Window) Reduce WindowedStream → DataStream |
對一個window裏的數據作reduce,並返回reduce的結果。 windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }; |
(Window) Fold WindowedStream → DataStream |
對一個window裏的數據作fold,並返回fold的結果。 windowedStream.fold (new Tuple2<String,Integer>("Sum of all", 0), new FoldFunction<Tuple2<String,Integer>() { public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Tuple2<String, Integer> value) throws Exception { return new Tuple2<String,Integer>(acc.f0, acc.f1 + value.f1); } }; |
windows上的聚合 WindowedStream → DataStream |
聚合window內的內容.。 min 和minBy 的區別是min 返回最小值, minBy 返回有指定key的最小值,對應的元素。 windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key"); |
Union DataStream* → DataStream |
鏈接兩個或者多個datastream,並建立一個包含這幾個dataStream裏的全部元素的新的dataStream。 注意: 若是你union同一個datastream,仍是隻能獲取其中一個。 dataStream.union(otherStream1, otherStream2, ...); |
Window Join DataStream,DataStream → DataStream |
在一個window內,根據給定的key的條件是否知足,來join兩個流。 dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new JoinFunction () {...}); |
Window CoGroup DataStream,DataStream → DataStream |
在一個window內,根據給定的key的條件是否知足,對兩個流合併後,並分組。 dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply (new CoGroupFunction () {...}); |
Connect DataStream,DataStream → ConnectedStreams |
鏈接兩個流,並保留一樣的流類型. 鏈接後的兩個流之間能夠共享 state。 DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); |
CoMap, CoFlatMap ConnectedStreams → DataStream |
map 和 flatMap在鏈接後的流中的效果是相似的。 connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } }); |
Split DataStream → SplitStream |
根據某些標準,將一個流分離成兩個或者多個流。 SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
|
Select SplitStream → DataStream |
從分離後的流中,選出一個或者多個流。 SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");
|
Iterate DataStream → IterativeStream → DataStream |
在流中建立一個「回調」循環:將輸出重定向到以前的一個operater。這對於須要持續更新一個模型的算法來講是特別有用的。接下來的一段代碼就從在一個流中使用了迭代來不斷更新。大於0的元素被重定向到回調channel,剩下的元素繼續往下流。參見iterations來獲取更多詳情 IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } });
|
Extract Timestamps DataStream → DataStream |
能夠抽出時間語義窗口裏面的記錄的時間戳,詳情請見working with time stream.assignTimestamps (new TimeStampExtractor() {...});
|
Transformation | Description |
---|---|
Map DataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream: dataStream.map { x => x * 2 } |
FlatMap DataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words: dataStream.flatMap { str => str.split(" ") } |
Filter DataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: dataStream.filter { _ != 0 } |
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream. dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple |
Reduce KeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. keyedStream.reduce { _ + _ } |
Fold DataStream → DataStream |
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. keyedStream.fold { 0, _ + _ } |
Aggregations KeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") |
Window KeyedStream → WindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows. dataStream.keyBy(0).window(TumblingTimeWindows.of(5, TimeUnit.SECONDS)) // Last 5 seconds of data // Last 5 seconds of data
|
WindowAll DataStream → AllWindowedStream |
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data |
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead. windowedStream.apply { applyFunction } |
Window Reduce WindowedStream → DataStream |
Applies a functional reduce function to the window and returns the reduced value. windowedStream.reduce { _ + _ } |
Window Fold WindowedStream → DataStream |
Applies a functional fold function to the window and returns the folded value. windowedStream.fold { 0, _ + _ } |
Aggregations on windows WindowedStream → DataStream |
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key") |
Union DataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once. dataStream.union(otherStream1, otherStream2, ...) |
Window Join DataStream,DataStream → DataStream |
Join two data streams on a given key and a common window. dataStream.join(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply { ... } |
Window CoGroup DataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window. dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) .apply {} |
Connect DataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types, allowing for shared state between the two streams. someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) |
CoMap, CoFlatMap ConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false ) |
Split DataStream → SplitStream |
Split the stream into two or more streams according to some criterion. val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
|
Select SplitStream → DataStream |
Select one or more streams from a split stream. val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")
|
Iterate DataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description. initialStream. iterate { iteration => { val iterationBody = iteration.map {/*do something*/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } } IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter ( _ > 0); iteration.closeWith(feedback);
|
Extract Timestamps DataStream → DataStream |
Extracts timestamps from records in order to work with windows that use event time semantics. See working with time. stream.assignTimestamps { timestampExtractor }
|
The following transformations are available on data streams of Tuples:
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0);
|
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples val in : DataStream[(Int,Double,String)] = // [...] val out = in.project(2,0)
|
Flink 也經過如下Functions對transformation後的流進行底層的控制(若是須要的話)
Transformation | Description |
---|---|
Hash partitioning DataStream → DataStream |
對keyBy後相同key的流返回DataStream,而不是KeyedStream dataStream.partitionByHash("someKey"); dataStream.partitionByHash(0);
|
Custom partitioning DataStream → DataStream |
經過使用用戶自定義的分區規則來給每一個元素選擇目標task dataStream.partitionCustom(new Partitioner(){...}, "someKey"); dataStream.partitionCustom(new Partitioner(){...}, 0);
|
Random partitioning DataStream → DataStream |
隨機均勻將將元素分區
dataStream.partitionRandom();
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
循環對元素分區,對每一個分區進行負載均衡,用於優化處理數據傾斜的狀況 dataStream.rebalance();
|
Broadcasting DataStream → DataStream |
將每一個元素以廣播形式發送到每個分區 dataStream.broadcast(); |
Transformation | Description |
---|---|
Hash partitioning DataStream → DataStream |
Identical to keyBy but returns a DataStream instead of a KeyedStream. dataStream.partitionByHash("someKey") dataStream.partitionByHash(0)
|
Custom partitioning DataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element. dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)
|
Random partitioning DataStream → DataStream |
Partitions elements randomly according to a uniform distribution. dataStream.partitionRandom()
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. dataStream.rebalance()
|
Broadcasting DataStream → DataStream |
Broadcasts elements to every partition. dataStream.broadcast()
|
Task chain 和資源組
chain兩個接下來的transformations 的意思是將他們放在同一個線程裏面執行,這會工做得更好。默認狀況下,flink儘量地將operator進行chain(例如:接下來的2個map transformation)。API提供了對chain的細粒度控制:
若是你想在整個job中禁用chain,那你可使用StreamExecutionEnvironment.disableOperatorChaining()
。接下來的functions會提供更細粒度的控制。注意,這些functions只能用在transformation以後的DataStream中。好比,你能夠這樣:someStream.map(...).startNewChain(),可是不能這樣:someStream.startNewChain()。
一個資源組就是Flink裏的一個slot,參考slots,根據須要,你能夠在對每個slot進行手動隔離操做
Transformation | Description |
---|---|
Start new chain | 使用以下操做開始新的chain,如下兩個map將會被chain,但filter不會和第一個map進行chain操做 someStream.filter(...).map(...).startNewChain().map(...);
|
Disable chaining | 禁用對這個map操做的chain操做 someStream.map(...).disableChaining();
|
Start a new resource group | 啓動一個新的包含這個map和後續一些操做的資源組 someStream.filter(...).startNewResourceGroup();
|
Isolate resources | 對slot進行分離操做 someStream.map(...).isolateResources();
|
Transformation | Description |
---|---|
Start new chain | Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. someStream.filter(...).map(...).startNewChain().map(...)
|
Disable chaining | Do not chain the map operator someStream.map(...).disableChaining()
|
Start a new resource group | Start a new resource group containing the map and the subsequent operators. someStream.filter(...).startNewResourceGroup()
|
Isolate resources | Isolate the operator in its own slot. someStream.map(...).isolateResources()
|
keyBy transformation 須要指定的鍵已經在DataStream上定義。
使用方式以下
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*define window here*/);
Flink的數據模型不是基於鍵值對的。所以,你不須要將dataStream轉換爲鍵值類型。「鍵」是虛擬的:他們就像function同樣定義在數據流之上用來指導分組操做
參見 the relevant section of the DataSet API documentation來了解怎樣去指定鍵。僅僅只須要將DataSet替換爲DataStream和groupBy
withkeyBy
.
一些transformations以用戶自定義的functions爲參數
See the relevant section of the DataSet API documentation.
Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.
See the relevant section of the DataSet API documentation.
數據源能夠經過StreamExecutionEnvironment.addSource(sourceFunction)
. 來建立,你能夠直接使用Flink自帶的functions來建立,同時也可以編寫實現了 SourceFuntion 、theParallelSourceFunction
接口的類來定義本身的non-parallel的數據源,或編寫繼承RichParallelSourceFunction的類來定義parallel的本身的數據源。
在StreamExecutionEnvironment中已經有幾個定義好的數據源可用了:
基於文件的:
readTextFile(path)
/ TextInputFormat
- Reads files line wise and returns them as Strings.
readTextFileWithValue(path)
/ TextValueInputFormat
- Reads files line wise and returns them as StringValues. StringValues are mutable strings.
readFile(path)
/ Any input format - Reads files as dictated by the input format.
readFileOfPrimitives(path, Class)
/ PrimitiveInputFormat
- Parses files of new-line (or another char sequence) delimited primitive data types such as String
or Integer
.
readFileStream
- create a stream by appending elements when there are changes to a file
基於Socket的:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.基於集合的:
fromCollection(Collection)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
fromCollection(Iterator, Class)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
fromElements(T ...)
- Creates a data stream from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator, Class)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
自定義:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...))
. See connectors for more details.
Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction)
. You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction
for non-parallel sources, or by implementing theParallelSourceFunction
interface or extending RichParallelSourceFunction
for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment
:
File-based:
readTextFile(path)
/ TextInputFormat
- Reads files line wise and returns them as Strings.
readTextFileWithValue(path)
/ TextValueInputFormat
- Reads files line wise and returns them as StringValues. StringValues are mutable strings.
readFile(path)
/ Any input format - Reads files as dictated by the input format.
readFileOfPrimitives(path, Class)
/ PrimitiveInputFormat
- Parses files of new-line (or another char sequence) delimited primitive data types such as String
or Integer
.
readFileStream
- create a stream by appending elements when there are changes to a file
Socket-based:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.Collection-based:
fromCollection(Seq)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
fromCollection(Iterator)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
fromElements(elements: _*)
- Creates a data stream from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
Custom:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...))
. See connectors for more details.StreamExecutionEnvironment提供ExecutionConfig來設置job的運行時配置
See the relevant section of the DataSet API documentation.
如下是對應API:
enableTimestamps()
/ disableTimestamps()
: 給從源頭髮出來的event加上一個時間戳. areTimestampsEnabled()
.返回如今的時間
setAutoWatermarkInterval(long milliseconds)
: Set the interval for automatic watermark emission.設置Watermarks 發送的時間間隔。能夠經過getAutoWatermarkInterval()來獲的目前的間隔。
Flink能夠將數據輸出到文件、socket,外部系統,或者打印出來.Flink自帶多種內置的輸出形式:
writeAsText()
/ TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
writeAsCsv(...)
/ CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
print()
/ printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
write()
/ FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
writeToSocket
- Writes elements to a socket according to a SerializationSchema
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
writeAsText()
/ TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
writeAsCsv(...)
/ CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
print()
/ printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
write()
/ FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
writeToSocket
- Writes elements to a socket according to a SerializationSchema
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
在將一個流式程序提交到集羣中運行以前,確保你實現的算法可以按照你的預想來運行會更好。所以,執行數據分析程序一般是檢查結果、調試和改進的增量過程。
Flink提供在本地IDE調試、注入測試數據、結果數據收集等功能,明顯改善了數據分析程序的部署。這些部分都在怎樣改善程序的部署提供了提示。
LocalStreamEnvironment在與JVM同一個進程中啓動Flink系統。若是你是從IDE中啓動LocalEnvironement,你能夠在程序中設置斷點來提升調試效率
如下爲例子:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<String> lines = env.addSource(/* some source */); // build your program env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment() val lines = env.addSource(/* some source */) // build your program env.execute()
Flink提供一些獲得java集合的特殊數據源來進行測試。一旦一個程序唄測試,這些source和sink很容易就被從外部系統讀入/往外部系統輸出的 source或sink替換了。
數據源集合使用範例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // Create a DataStream from a list of elements DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); // Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data); // Create a DataStream from an Iterator Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment() // Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5) // Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data) // Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt)
注意:目前,數據源集合要求數據類型和迭代器都要實現Serializable接口。此外,數據源集合也不能並行地執行。
時間窗口,一般是指必定時間內的一組event的組合。 時間窗口函數定義了時間的類型,目前支持三種不一樣的時間窗口:
Processing time: Processing time是指當transformation發生時候的機器的時間, Processing time 是最簡單的時間類型,也是性能最高的。. 可是,在分佈式和異步環境下,機器時間,每每不一致和有不少不肯定性。
Event time: Event time是指每一個event發生的時間 。 這份時間通常是當消息進入flink前,消息自己自帶的。或者從消息的某個字段中抽取出來. 當使用event time的狀況下,亂序的消息能夠被適當的處理。. 舉個例子, 在12分的時間窗口裏,當一個10分鐘的event在12分鐘的時候到達了,transformation也會正確的處理這些亂了序的event。. Event time 的處理方式提供了可預測的結果。 , 但會帶來更多的延遲, 由於亂序的消息須要被緩存起來到內存裏。
Ingestion time: Ingestion(食入,攝取) time 是當event進入到flink的時間。.當event消息進入到flink被分配到的task所在的機器上的時間,做爲分配給event的時間,. Ingestion time比 processing time更有肯定性和可預測性, 比event time有更低的延遲。由於不依賴外部系統。所以, Ingestion time 提供了一種處於二者之間的解決方案。 Ingestion time 其實能夠說是 event time的一種特殊狀況,實際上,ingestion time 和eventtime在flink的底層中的處理方式是同樣的。
當使用 event time時, transformations須要避免無限的等待event到達,Watermarks 提供了一種控制event time的偏移時間的機制。Watermarks是由 sources發射出來的. 一個watermark 帶有一個肯定的時間戳(long),好比轉換後是2015-12-03 14:17:30 ,則表示,不會再有比這個時間更早的時間的消息會到達。
你能夠經過下面的方式,選擇你須要的時間語義。
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
默認狀況是TimeCharacteristic.ProcessingTime
, 寫一個processing time的語義的程序是不須要,再作其餘事情。
若是要寫一個event time語義的程序 , 須要作下面4個步驟:
1:設置event time的語義 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2:使用DataStream.assignTimestamps(...)
告訴flink,時間戳和event的關聯。好比說,第幾個字段是時間戳。
設置 讓時間戳有效,enableTimestamps()
, 還有watermark的發射間隔。(setAutoWatermarkInterval(long milliseconds)
) inExecutionConfig
.
舉個例子, 假設 咱們有一個tuple的數據流, 而且裏面的第一個字段是時間戳 (產生這些消息的系統賦予的,非flink), 而且咱們知道處理時間和這個時間的落差不會超過1秒。
DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{ @Override public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0; } @Override public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0 - 1000; } @Override public long getCurrentWatermark() { return Long.MIN_VALUE; } });
val stream: DataStream[(Long,Int,Double,String)] = null; stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] { override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000 override def getCurrentWatermark: Long = Long.MinValue })
若是你肯定,你的時間戳必定是升序的,按順序到達,你可使用 AscendingTimestampExtractor
, 系統會自動的發射watermark:
DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{ @Override public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0; } });
stream.extractAscendingTimestamp(record => record._1)
使用 ingestion time 語義,你須要:
1:env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
.
你能夠想象一下,這個設置,實際上是是event time的簡寫。由於source根據當前的機器時間,flink注入和發射都是在flink作的,因此flink能夠推斷後面的那些參數,因此自動作的。
Flink提供了好多方法,爲KeyedStream定義windowon . 每一個window 包含了一樣的key的元素 。
Flink提供具備靈活性的通用窗口和普通狀況下能通用的一些數據。在你須要自定義時間窗口以前,先看看這些預約義的能不能使用吧。
Transformation | Description |
---|---|
跳動時間 window KeyedStream → WindowedStream |
定義一個5秒跳動的窗口. 表示根據元素的時間戳,5秒爲一個單位組織起來的窗口, 而且每一個元素只會在一個窗口中出現一次。時間戳根據上面的env設置的語義而定。 keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS));
|
滑動時間window KeyedStream → WindowedStream |
定義一個5秒的窗口, 1秒滑動一次. 表示根據元素的時間戳,5秒爲一個單位組織起來的窗口 ,可是每一個元素可能在多個窗口中出現屢次。 keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));
|
跳動個數 window KeyedStream → WindowedStream |
1000個一個單位的窗口,一個元素只會出現一次 keyedStream.countWindow(1000);
|
滑動個數 window KeyedStream → WindowedStream |
1000個一個單位的窗口,一個元素可能出現屢次 keyedStream.countWindow(1000, 100)
|
Transformation | Description |
---|---|
Tumbling time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS))
|
Sliding time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time). keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))
|
Tumbling count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. keyedStream.countWindow(1000)
|
Sliding count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements). keyedStream.countWindow(1000, 100)
|
這個機制能夠定義出功能更豐富的窗口,相反的須要寫更多的代碼。 舉個例子,下面是一個自定義的窗口,每一個window持有最新的5秒而且每1秒滑動一次。 可是,當100個元素被添加到window後,window的execution函數,會被跟蹤(觸發)。以後每一次execution執行都會被跟蹤(觸發)。 window會保留10個元素:
keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10));
keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10))
構造一個自定義窗口的通常方式是,
(1)指定一個WindowAssigner
,
(2)指定 一個觸發器Trigger
(optionally),
(3)指定一個逐出器Evictor
(optionally).
WindowAssigner定義瞭如何組織一個窗口
(時間或者個數) 一個window 元素的邏輯組合,有一個begin value,和一個end value。相應的,有一個begin time和end time. 帶有時間戳的元素 。
舉個例子,滑動時間窗口分配器,定義了5秒爲一個單位,每1秒滑動一次,假設,以毫秒爲單位,時間從0毫秒開始,而後咱們有6個窗口: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. 每一個進來的元素,根據他們的時間戳,被分配到這6個窗口中,可能出如今多個窗口裏,好比帶有2000 時間戳的元素,會被分配到前三個窗口。Flink 運行,會綁定在對應的窗口分配器,能夠覆蓋更多的場景. 你能夠自定義你的window類型,經過繼承WindowAssigner
類。
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
全部進來的元素,按key分組,每一個組放在相同的window裏。這些window沒有默認的trigger,所以若是沒有自定義trigger的話,這些數據是不會被trigger觸發的 stream.window(GlobalWindows.create()); |
Tumbling time windows KeyedStream → WindowedStream |
全部進來的元素,根據元素各自的時間戳被分配到一個window裏,windows之間不交叉, 每一個元素最多隻會出如今一個window裏一次。 The window 有一個默認的 trigger. 針對event/ingestion time這兩鍾語義, 當收到一個高於本身的end value的watermark, window就會 觸發。, 而對於 processing time 當前的processing time 超過他的current end value. stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))); |
Sliding time windows KeyedStream → WindowedStream |
全部進來的元素,根據元素各自的時間戳被分配到一個window裏,windows之間可能會產生交叉, 。The window 有一個默認的 trigger. 針對event/ingestion time這兩鍾語義, 當收到一個高於本身的end value的watermark, window就會 觸發,而對於 processing time 觸發條件則是當前的processing time 超過他的current end value. stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))); |
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified. stream.window(GlobalWindows.create) |
Tumbling time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value. stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) |
Sliding time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value. stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) |
Trigger定義了,跟在每個的window後面的函數(sum,count),何時evaluated (「fires」)。若是沒有指定trigger,就使用默認的trigger
. Flink 自帶了一組trigger,若是默認的trigger都無法知足你的應用,能夠經過實現Trigger接口實現本身的trigger
. 注意,若是使用自定義trigger後,會覆蓋默認的trigger.
Transformation | Description |
---|---|
Processing time trigger | 當前的處理時間超過他的end-value時,則發射一個window,今後以後,被跟蹤的window上的元素就會被丟棄。 windowedStream.trigger(ProcessingTimeTrigger.create()); |
Watermark trigger | 當接收到一個超過end value的watermark時,則發射一個window。被跟蹤的window上的元素就會被丟棄。 windowedStream.trigger(EventTimeTrigger.create()); |
Continuous processing time trigger | 每一個being fire 的 window會按期的考慮if()。噹噹前時間超過他的end-value的時候,纔會真正發射,被觸發的窗口裏的函數將會保留。 windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); |
Continuous watermark time trigger | 每一個being fire 的 window會按期的考慮if()。當watermark時間超過他的end-value的時候,纔會真正發射,被觸發的窗口裏的函數將會保留。 windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); |
Count trigger | 超過1000個元素後,這個窗口就會被髮射,處於準備發射狀態的窗口裏的元素,將會被保留。 windowedStream.trigger(CountTrigger.of(1000)); |
Purging trigger | Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering. windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); |
Delta trigger | 每一個being fire 的 window會按期的考慮if()。當最後一個元素和第一個插入的元素運算後知足true的時候,纔會真正發射。 windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() { @Override public double getDelta (Double old, Double new) { return (new - old > 0.01); } })); |
Transformation | Description |
---|---|
Processing time trigger | A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded. windowedStream.trigger(ProcessingTimeTrigger.create); |
Watermark trigger | A window is fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are henceforth discarded. windowedStream.trigger(EventTimeTrigger.create); |
Continuous processing time trigger | A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained. windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); |
Continuous watermark time trigger | A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window's end-value has been received. The elements on the triggered window are retained. windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); |
Count trigger | A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained. windowedStream.trigger(CountTrigger.of(1000)); |
Purging trigger | Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering. windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000))); |
Delta trigger | A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`. windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 })) |
當trigger進行了fire以後, 而且執行sum和count以前, 有一個可選的逐出器能夠移除保留元素。. Flink 自帶了一組evictors ,你還能夠經過實現Evictor接口,實現自定義的逐出器。
.
Transformation | Description |
---|---|
Time evictor | 從window的begin處開始移除元素,知道最後剩下 end value -1秒到 end value的元素。 triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))); |
Count evictor | 保留倒數的最後1000 元素,其餘的丟掉。 triggeredStream.evict(CountEvictor.of(1000)); |
Delta evictor | 從window的begin開始,一直丟元素,知道某個元素,8比最後一個元素15小5。(經過一個閾值5 ,和一個函數). triggeredStream.evict(DeltaEvictor.of(5000, new DeltaFunction<Double>() { public double (Double oldValue, Double newValue) { return newValue - oldValue; } })); |
Transformation | Description |
---|---|
Time evictor | Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second). triggeredStream.evict(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))); |
Count evictor | Retain 1000 elements from the end of the window backwards, evicting all others. triggeredStream.evict(CountEvictor.of(1000)); |
Delta evictor | Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction). windowedStream.evict(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 })) |
window 分配器,trigger,evictor的機制都很強大。這些機制讓你能夠定義各類不一樣類型的window。Flink’s的基本window實際上是在這三個機制之上包了一層的,.下面是一些通用的端口是如何經過這三種機制來構造的 。
Window type | Definition |
---|---|
Tumbling count windowstream.countWindow(1000) |
stream.window(GlobalWindows.create()) .trigger(CountTrigger.of(1000) .evict(CountEvictor.of(1000))) |
Sliding count windowstream.countWindow(1000, 100) |
stream.window(GlobalWindows.create()) .trigger(CountTrigger.of(1000) .evict(CountEvictor.of(100))) |
Tumbling event time windowstream.timeWindow(Time.of(5, TimeUnit.SECONDS)) |
stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))) .trigger(EventTimeTrigger.create()) |
Sliding event time windowstream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) |
stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) .trigger(EventTimeTrigger.create()) |
Tumbling processing time windowstream.timeWindow(Time.of(5, TimeUnit.SECONDS)) |
stream.window(TumblingTimeWindows.of((Time.of(5, TimeUnit.SECONDS))) .trigger(ProcessingTimeTrigger.create()) |
Sliding processing time windowstream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) |
stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) .trigger(ProcessingTimeTrigger.create()) |
You也能夠對普通流(stream,以前都是keyedStream)定義窗口。經過調用 the windowAll
這個transformation.這個stream 包含了全部keyed的stream, but 在一個單獨的task裏evaluated (在一個單獨的計算節點上). 定義trigger和evictor的語法是同樣的:
nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10));
nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10))
基本的window 定義也適用於普通的nokey的windows:
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment. nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS));
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment. nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. nonKeyedStream.countWindowAll(1000)
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements). nonKeyedStream.countWindowAll(1000, 100)
|
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment. nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS));
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment. nonKeyedStream.timeWindowAll(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS));
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. nonKeyedStream.countWindowAll(1000)
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements). nonKeyedStream.countWindowAll(1000, 100)
|
Flink有檢查點機制來恢復出現問題的job,此機制須要對信息進行持久化來支持以後須要的時候進行再次訪問。
檢查點機制會保存數據源的進度和用戶自定義的狀態來提供一次性處理語義
能夠在StreamExecutionEnvironment上調用enableCheckpointing(n)來啓用檢查點機制,n表明檢查點時間片的秒數
檢查點中的的其它參數:
setNumberOfExecutionRerties()
這個方法返回這個job在失敗後已經重啓的次數。檢查點啓用打可是這個值沒有顯示設置的狀況下,job會無限次地從新啓動。docs on streaming fault tolerance 詳細介紹了Flink的容錯機制
當源添加了快照機制後Flink能夠確保將一次處理語義更新爲用戶自定義的狀態。如今只支持源頭是kafka(和一些內置的數據生成器)。The following table lists the state update guarantees of Flink coupled with the bundled sources:
Source | Guarantees | Notes |
---|---|---|
Apache Kafka | exactly once | Use the appropriate Kafka connector for your version |
RabbitMQ | at most once | |
Twitter Streaming API | at most once | |
Collections | at most once | |
Files | at least once | At failure the file will be read from the beginning |
Sockets | at most once |
To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:
Sink | Guarantees | Notes |
---|---|---|
HDFS rolling sink | exactly once | Implementation depends on Hadoop version |
Elasticsearch | at least once | |
Kafka producer | at least once | |
File sinks | at least once | |
Socket sinks | at lest once | |
Standard output | at least once |
能夠在建立operator的時候調用setParallelism(int)設置並行的實例個數
默認狀況下,元素在網絡傳輸中並非一個接一個的(這樣容易致使網絡擁塞),而是採用緩衝的方式。緩衝區的大小(在機器間實際傳輸的大小)能夠在配置文件中設置。雖然這種方式對於提升吞吐量頗有幫助,可是對於incoming速度很慢的流來講容易致使延時。可在執行環境中(或我的的operators)使用env.setBufferTimeOut(timeoutMillis)來設置緩衝區的最長填充時間。在這個時間到以後就會發送緩衝區類的數據,即便緩衝區尚未被填滿。默認的時間爲100ms
用法:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis); env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis) env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
爲最大化吞吐量,調用setBufferTimeOut(-1)來禁用時間限制,這種狀況下緩衝區只有被填充滿了纔會發送。最小化延時,就能夠將時間限制設置接近0(好比5或10ms)。可是要避免將時間設置設爲0,它會引發嚴重的性能降低
Flink中的全部transformation和functions看起來很像(在function 處理的術語中),可是實際上它是帶狀態的處理操做。你能夠定義本地變量或者使用Flink的狀態接口來將每個transformation(map,filter等)狀態化。你能夠註冊一個實現了狀態接口的本地變量來當作managed state。在這個以及其它使用Flink本地狀態接口的例子中,Flink會在出錯的時候自動地取得transformations的狀態並存儲起來。
最終須要達到的效果是在出現故障或未出現故障的狀況下都能對各類狀態作正確的更新。
首先,咱們來看在出現故障點額狀況下如何保持本地變量的一致,而後再來看Flink的state接口。
默認狀況下,state的檢查點會保存在內存中的JobManager中,對於大量state的持久化,Flink支持將檢查點存儲到文件系統(如HDFS、S3等等),這些都能在flink-conf.yaml或者viaStreamExecutionEnvironment.setStateBackend(…)中配置。
本地變量能經過Checkpointed接口來被checkpointed
用戶自定義的function若是實現了Checkpointed接口,那麼snapshotState(...) 和restoreState(...)這些方法就會被執行,將function state存儲下來.
除此以外,自定義的functions 也能實現CheckpointNotifier接口,經過thenotifyCheckpointComplete(long checkpointId)來得到完成checkpoint的通知。注意,若是在checkPoint完成後和通知發出前出現了故障,那麼Flink不能保證你自定義的function會接收到checkPoint完成通知。這些丟失的通知都應該被包含在下一個checkPoint的通知中。
好比,如下是使用Checkpointed接口的範例:
// persistent counterprivatelong counter =0; @Override public Long reduce(Long value1, Long value2){ counter++;return value1 + value2;}// regularly persists state during normal operation @Override public Serializable snapshotState(long checkpointId,long checkpointTimestamp){return counter;}// restores state on recovery from failure @Override publicvoidrestoreState(Long state){ counter = state;}}public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {
state接口提供了到key/value states的權限。由於state是根據key來劃分,因此它只能使用在經過stream.keyBy(...)以後的KeyedStream上。
The handle to the state can be obtained from the function’s RuntimeContext
. The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.
The following code sample shows how to use the key/value state inside a reduce function. When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).
public class CounterSum implements RichReduceFunction<Long> { /** The state handle */ private OperatorState<Long> counter; @Override public Long reduce(Long value1, Long value2) { counter.update(counter.value() + 1); return value1 + value2; } @Override public void open(Configuration config) { counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L); } }
State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.
The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the keyBy()
function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.
The Scala API has shortcuts that for stateful map()
or flatMap()
functions on KeyedStream
, which give the state of the current key as an option directly into the function, and return the result with a state update:
val stream: DataStream[(String, Int)] = ... val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) })
Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier
interface.
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true)
.
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation or a filter
. Here, we show an example using filters. First, we define an IterativeStream
IterativeStream<Integer> iteration = input.iterate();
Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map
transformation)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
To close an iteration and define the iteration tail, call the closeWith(feedbackStream)
method of the IterativeStream
. The DataStream given to the closeWith
function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the 「termination」 logic, where an element is allowed to propagate downstream rather than being fed back.
iteration.closeWith(iterationBody.filter(/* one part of the stream */)); DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
DataStream<Long> someIntegers = env.generateSequence(0, 1000); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } }); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); iteration.closeWith(stillGreaterThanZero); DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } });
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation or a filter
. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.
val iteratedStream = someDataStream.iterate( iteration => { val iterationBody = iteration.map(/* this is executed many times */) (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */)) })
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000) val iteratedStream = someIntegers.iterate( iteration => { val minusOne = iteration.map( v => v - 1) val stillGreaterThanZero = minusOne.filter (_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } )
Connectors provide code for interfacing with various third-party systems.
Currently these systems are supported:
To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.
This connector provides access to event streams served by Apache Kafka.
Flink provides special Kafka Connectors for reading and writing data to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanisms to provide different processing guarantees (most importantly exactly-once guarantees).
For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the flink-connector-kafka-083
package and the FlinkKafkaConsumer082
class are appropriate.
Package | Supported since | Class name | Kafka version | Checkpointing behavior | Notes |
---|---|---|---|---|---|
flink-connector-kafka | 0.9, 0.10 | KafkaSource | 0.8.1, 0.8.2 | Does not participate in checkpointing (no consistency guarantees) | Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka |
flink-connector-kafka | 0.9, 0.10 | PersistentKafkaSource | 0.8.1, 0.8.2 | Does not guarantee exactly-once processing, element order, or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
flink-connector-kafka-083 | 0.9.1, 0.10 | FlinkKafkaConsumer081 | 0.8.1 | Guarantees exactly-once processing | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually |
flink-connector-kafka-083 | 0.9.1, 0.10 | FlinkKafkaConsumer082 | 0.8.2 | Guarantee exactly-once processing | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually |
Then, import the connector in your maven project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.
advertised.host.name
setting in theconfig/server.properties
file must be set to the machine’s IP address.The standard FlinkKafkaConsumer082
is a Kafka consumer providing access to one topic.
The following parameters have to be provided for the FlinkKafkaConsumer082(...)
constructor:
Example:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)) .print();
val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env .addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties)) .print
As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.
The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will continue on reading from where it left off after a restart. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000);
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
A class providing an interface for sending data to Kafka.
The following arguments have to be provided for the KafkaSink(…)
constructor in order:
Example:
stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))
The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema)
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, SerializationSchema serializationSchema)
If this constructor is used, the user needs to make sure to set the broker(s) with the 「metadata.broker.list」 property. Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.
The Apache Kafka official documentation can be found here.
This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster
The connector provides a Sink that can send data to an Elasticsearch Index.
The sink can use two different methods for communicating with Elasticsearch:
See here for information about the differences between the two modes.
This code shows how to create a sink that uses an embedded Node for communication:
DataStream<String> input = ...; Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name"); input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } }));
val input: DataStream[String] = ... val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name") text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").`type`("my-type").source(json) } }))
Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name
parameter that must correspond to the name of your cluster.
Internally, the sink uses a BulkProcessor
to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor
can be configured using these config keys: * bulk.flush.max.actions: Maximum amount of elements to buffer * bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer * bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds
This example code does the same, but with a TransportClient
:
DataStream<String> input = ...; Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name"); List<TransportAddress> transports = new ArrayList<String>(); transports.add(new InetSocketTransportAddress("node-1", 9300)); transports.add(new InetSocketTransportAddress("node-2", 9300)); input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } }));
val input: DataStream[String] = ... val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name") val transports = new ArrayList[String] transports.add(new InetSocketTransportAddress("node-1", 9300)) transports.add(new InetSocketTransportAddress("node-2", 9300)) text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").`type`("my-type").source(json) } }))
The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient
.
More about information about Elasticsearch can be found here.
This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:
DataStream<String> input = ...; input.addSink(new RollingSink<String>("/base/path"));
val input: DataStream[String] = ... input.addSink(new RollingSink("/base/path"))
The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
By default the rolling sink will use the pattern "yyyy-MM-dd--HH"
to name the rolling buckets. This pattern is passed to SimpleDateFormat
with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer()
on a RollingSink
.
The default writer is StringWriter
. This will call toString()
on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter()
on a RollingSink
. If you want to write Hadoop SequenceFiles you can use the providedSequenceFileWriter
which can also be configured to use compression.
The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).
Example:
DataStream<Tuple2<IntWritable,Text>> input = ...; RollingSink sink = new RollingSink<String>("/base/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ... val sink = new RollingSink[String]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, input.addSink(sink)
This will create a sink that writes to bucket files that follow this schema:
/base/path/{date-time}/part-{parallel-task}-{count}
Where date-time
is the string that we get from the date/time format, parallel-task
is the index of the parallel sink instance and count
is the running number of part files that where created because of the batch size.
For in-depth information, please refer to the JavaDoc for RollingSink.
This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
A class providing an interface for receiving data from RabbitMQ.
The followings have to be provided for the RMQSource(…)
constructor in order:
Example:
DataStream<String> stream = env .addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema())) .print
stream = env .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema)) .print
A class providing an interface for sending data to RabbitMQ.
The followings have to be provided for the RMQSink(…)
constructor in order:
Example:
stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
More about RabbitMQ can be found here.
Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-inTwitterSource
class for establishing a connection to this stream. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter’s Application Management and register the application by clicking on the 「Create New App」 button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey
and sonsumerSecret
in TwitterSource
respectively) is located on the 「API Keys」 tab. The necessary access token data (token
and secret
) can be acquired here. Remember to keep these pieces of information secret and do not push them to public repositories.
Create a properties file, and pass its path in the constructor of TwitterSource
. The content of the file should be similar to this:
#properties file for my app secret=*** consumerSecret=*** token=***-*** consumerKey=***
The TwitterSource
class has two constructors.
public TwitterSource(String authPath, int numberOfTweets);
to emit finite number of tweetspublic TwitterSource(String authPath);
for streamingBoth constructors expect a String authPath
argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets
determines how many tweet the source emits.
In contrast to other connectors, the TwitterSource
depends on no additional services. For example the following code should run gracefully:
DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
The TwitterSource
emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap
abstract class among the examples. JSONParseFlatMap
is an extension of the FlatMapFunction
and has a
String getField(String jsonText, String field);
getField(jsonText : String, field : String) : String
function which can be use to acquire the value of a given field.
There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
TwitterLocal
is an example how to use TwitterSource
. It implements a language frequency counter program.
A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user’s computer.
The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.
For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.
cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors mvn assembly:assembly ~~~bash This creates an assembly jar under *flink-streaming-connectors/target*. #### RabbitMQ Pull the docker image: ~~~bash sudo docker pull flinkstreaming/flink-connectors-rabbitmq
To run the container, type:
sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost’s and the Docker container’s ports so RabbitMQ can communicate with the application through these.
To start the RabbitMQ server:
sudo /etc/init.d/rabbitmq-server start
To launch the example on the host computer, execute:
java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \ > log.txt 2> errorlog.txt
There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:
<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ
Pull the image:
sudo docker pull flinkstreaming/flink-connectors-kafka
To run the container type:
sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \ flinkstreaming/flink-connectors-kafka
Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost’s and the Docker container’s ports so Kafka can communicate with the application through these. First start a zookeeper in the background:
/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \ > zookeeperlog.txt &
Then start the kafka server in the background:
/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \ > serverlog.txt 2> servererr.txt &
To launch the example on the host computer execute:
java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \ > log.txt 2> errorlog.txt
In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:
<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka
See the relevant section of the DataSet API documentation.
See the relevant section of the DataSet API documentation.