DataStream API編程指導html
文檔翻譯自Flink DataStream API Programming Guidejava
-----------------------------------------------------------------------web
Flink中的DataStream程序是實如今數據流上的transformation(如filtering,updating state, defining windows,aggregating)的普通程序。建立數據流的來源多種多樣(如消息隊列,socket流,文件等)。程序經過data sink返回結果,如將數據寫入文件,或發送到標準輸出(如命令行終端)。Flink程序能夠在多種上下文中運行,如獨立運行或是嵌入在其餘程序中執行。程序的執行能夠發生在本地JVM,或者在一個擁有許多設備的集羣上。算法
有關介紹Flink API基礎概念的文檔,請見basic conceptsapache
爲了建立你本身的Flink DataStream程序,咱們鼓勵你從文檔anatomy of a Flink Program開始,且歡迎你添加本身的transformations。該文檔接下來的部分是額外的operation和進階特性的參考文檔。編程
下面的程序是一個完整的流式窗口word count應用,它計算出在web socket的大小爲5秒的窗口中的出現各個單詞的數量。你能夠複製 & 粘貼代碼並在本地運行。windows
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;api
public class WindowWordCount {緩存
public static void main(String[] args) throws Exception {性能優化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
要運行該示例程序,首先從終端運行netcat來開始輸入流
nc -lk 9999
僅須要輸入一些單詞,這些將是word count程序的輸入數據。若是你想看到count大於1的結果,在5秒內重複輸入同一個單詞。
Data transformation會將一或多個DataStream轉換成一個新的DataStream。程序能夠將多個transformation結合造成複雜的拓撲結構(topology)。
本小節給出了全部可用的transformation的描述。
Transformation |
描述 |
Map DataStream -> DataStream |
獲取一個element併產出一個element。下例是一個將輸入*2的map方法: DataStream<Integer> dataStream = //... |
FlapMap DataStream -> DataStream |
獲取一個element,併產生出0、1或多個element。下例是一個爲句子分詞的flatmap方法
dataStream.flatMap(new FlatMapFunction<String, String>() { |
Filter DataStream -> DataStream |
在每一個獲取的element上運行一個boolean方法,留下那些方法返回true的element。下例是一個過濾掉0值的filter dataStream.filter(new FilterFunction<Integer>() { |
KeyBy |
將流邏輯分爲不相交的分區,每一個分區包含的都是具備相同key的element,該分區方法使用hash分區實現。定義key的方法見於Keys。下例是一個返回KeyedDataStream的transformation。 dataStream.keyBy("someKey") // Key by field "someKey" |
Reduce KeyedStream -> DataStream |
一個在keyed data stream上「滾動」進行的reduce方法。將上一個reduce過的值和當前element結合,產生新的值併發送出。下例是一個建立部分和的reduce方法。 keyedStream.reduce(new ReduceFunction<Integer>() { |
Fold KeyedStream -> DataStream |
一個在帶有初始值的數據流上「滾動」進行的fold方法。將上一個fold的值和當前element結合,產生新的值併發送出。下例是一個fold方法,當應用於序列{1, 2, 3, 4, 5}時,它發出序列{"start-1", "start-1-2", "start-1-2-3" …}。 DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { |
Aggregations KeyedStream -> DataStream |
在一個keyed DataStream上「滾動」進行聚合的方法。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy同樣如此)。 keyedStream.sum(0); |
Window KeyedStream - > WindowedStream |
Window能夠定義在已經分區的KeyedStream上。窗口將根據一些特徵(如最近5秒到達的數據)將數據按其各自的key集合在一塊兒。有關窗口的完整描述見於windows // Last 5 seconds of data dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); |
WindowAll DataStream -> AllWindowedStream |
Window能夠定義在普通的DataStream上。窗口將根據一些特徵(如最近5秒到達的數據)將全部Stream事件集合在一塊兒。有關窗口的完整描述見於windows
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data |
Window Apply WindowedStream -> DataStream AllWindowedStream -> DataStream |
將一個通常函數應用到window總體上去,下面是一我的工計算window中全部element的總和的應用。 windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { // applying an AllWindowFunction on non-keyed window stream |
Window Reduce WindowedStream -> DataStream |
對窗口應用一個功能性reduce方法並返回reduce的結果 windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { |
Window Fold Windowed Stream -> DataStream |
對窗口應用一個功能性fold方法。下例代碼在應用到序列(1, 2, 3, 4, 5)時,它將該序列fold成爲字符串"start-1-2-3-4-5" windowedStream.fold("start-", new FoldFunction<Integer, String>() { |
Aggregations on windows WindowedStream -> DataStream |
對窗口中的內容聚合。其中,min和minBy的區別在於min返回最小值,而minBy返回的是帶有在此域中最小值的element(max和maxBy同樣如此)。 windowedStream.sum(0); |
Union DataStream* -> DataStream |
將2個或多個data stream合併建立出一個新的包含全部stream的element的stream。注意:若是你對一個data stream本身進行union操做,則在返回的結果中,每一個element都會出現2個。 dataStream.union(otherStream1, otherStream2, ...); |
Window Join DataStream, DataStream -> DataStream |
在給定key和普通window中,將2個DataStream進行Join操做
dataStream.join(otherStream) |
Window CoGroup DataStream, DataStream -> DataStream |
在給定key和普通window中,對2個DataStream進行CoGroup操做。 dataStream.coGroup(otherStream) |
Connect DataStream, DataStream -> ConnectedStreams |
在保留兩個DataStream的類型的狀況下,將兩者"鏈接"起來。Connect使咱們能夠共享兩個Stream的狀態 DataStream<Integer> someStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); |
CoMap, CoFlatMap ConnectedStreams -> DataStream |
該操做相似於map和flatMap針對鏈接的Data Stream版本。Sd connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override @Override |
Split DataStream -> SplitStream |
根據某些標準將Stream分割成2個或更多的stream SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { |
Select SplitStream -> DataStream |
從SplitStream中選擇1個或多個stream
SplitStream<Integer> split; |
Iterate DataStream -> IterativeStream -> DataStream |
經過將一個Operator的輸出重定向到前面的某個Operator的方法,在數據流圖中建立一個「反饋」循環。這在定義持續更新模型的算法時十分有用。下面的例子從一個Stream開始,並持續應用迭代體(Iteration body)。大於0的element被送回到反饋通道,而其餘的element則被轉發到下游。相關完整描述請見Iterations IterativeStream<Long> iteration = initialStream.iterate(); |
Extract Timestamps DataStream -> DataStream |
經過從數據中抽取時間戳來使得經過使用事件時間語義的窗口能夠工做。詳情見於Event Time。 stream.assignTimestamps (new TimeStampExtractor() {...}); |
接下來的Transformation是對Tuple類型的data stream可用的Transformation:
Transformation |
描述 |
Project DataStream -> DataStream |
從tuple中選擇出域的子集而產生新的DataStream DataStream<Tuple3<Integer, Double, String>> in = // [...] |
若是須要,Flink一樣提供了在進行一次transformation後針對精確stream分割的低層次的控制(low-level control),它們經過如下幾個方法實現。
Transformations |
描述 |
Custom partitioning DataStream -> DataStream |
使用一個用戶自定義的Partitioner來對每一個element選擇目標任務sd dataStream.partitionCustom(partitioner, "someKey"); |
Random partitioning DataStream -> DataStream |
根據均勻分佈來隨機分割element dataStream.shuffle(); |
Rebalancing(輪詢分割) DataStream -> DataStream |
輪詢分割element,建立相同負荷的分割。對數據變形(data skew)時的性能優化十分有用s dataStream.rebalance(); |
Rescaling DataStream -> DataStream |
將element輪詢分割到下游Operator子集中去。這在你想流水線並行時十分有用,例如,須要從每一個並行的source實例中將數據fan out到一個有着一些mapper來分發負載,可是又不想要函數rebalance()那樣引發的徹底rebalance的效果時。這就須要僅在本地傳輸數據,而不是須要從網絡傳輸,這須要依賴其餘諸如TaskManager的任務槽數量等等configuration值。
dataStream.rescale(); |
Broadcasting DataStream -> DataStream |
將element廣播到每個分割中去 dataStream.broadcast(); |
將兩個transformation連接起來意味着將它們部署在一塊兒(co-locating),共享同一個線程來得到更好的性能。Flink默認地儘量地連接Operator(如兩個連續的map transformation)。若有須要,API還給出了細粒度的連接控制:
使用StreamExecutionEnvironment.disableOperatorChaining()來關閉整個Job的連接操做。下面表格中的方法則是更加細粒度的控制函數,注意,因爲這些函數引用的是前一個transformation,因此它們僅僅在一個DataStream的transformation後使用纔是正確的,例如someStream.map( … ).startNewChain()是正確的,而someStream.startNewChain()是錯誤的。
一個資源組就是Flink中的一個任務槽,若有須要,你能夠人工孤立某個Operator到一個獨立的任務槽中。
Transformation |
描述 |
startNewChain() |
以當前Operator起點,開始一個新的連接。在下例中,兩個mapper將會被連接而filter則不會與第一個mapper連接 someStream.filter(...).map(...).startNewChain().map(...); |
disableChaining() |
下例中,將不會連接mapOperator。 someStream.map(...).disableChaining(); |
slotSharingGroup() |
設置一個Operation的共享任務槽的分組。Flink將會把同一個任務槽共享組的Operation放到同一個任務槽中,而不在同一個任務槽共享組的Operation放到其餘任務槽中。這能夠用來孤立任務槽。若是全部的輸入Operation都在同一個任務槽共享組中,則該任務槽共享組會繼承下來。任務槽共享組的默認名爲"default",Operation能夠經過調用slotSharingGroup("default")來定義其名稱。 someStream.filter(...).slotSharingGroup("name"); |
數據源能夠經過StreamExecutionEnvironment.addSource(sourceFunction)來建立數據源。你可使用Flink提供的source方法,也能夠經過實現SourceFunction來編寫自定義的非並行數據源,也能夠經過實現ParallelSourceFunction接口或繼承RichParallelSourceFunction來編寫自定義並行數據源。
如下是幾個預約義的數據流源,能夠經過StreamExecutionEnvironment來訪問:
1. 基於文件的:
· readTextFile(path) / TextInputFormat - 以行讀取方式讀文件並返回字符串
· readFile(path) / 任意輸入格式 - 按用輸入格式的描述讀取文件
· readFileStream - 建立一個stream,在文件有改動時追加element
2. 基於Socket的:
· socketTextStream - 從socket讀取,element能夠經過分割符來分開
3. 基於Collection的:
· fromCollection(Collection) - 從Java.util.Collection建立一個數據流。collection中全部的element都必須是同一類型的。
· fromCollection(Iterator, Class) - 從一個迭代器中建立一個數據流。class參數明確了迭代器返回的element的類型。
· fromElement(T …) - 從一個給定的對象序列建立一個數據流。全部對象都必須是同一類型的。
· fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中建立一個並行數據流。class參數明確了迭代器返回的element的類型。
· generateSequence(from, to) - 從一個給定區間中生成一個並行數字序列。
4. 自定義:
· addSource - 附上一個新的source方法。例如,經過調用addSource(new FlinkKafkaConsumer08<>(…))來從Apache Kafka讀取數據,更多信息見於connector
Data Sink消耗DataStream並將它們轉發到文件、socket、外部系統或打印它們。Flink自帶了許多內置的輸出格式,封裝爲DataStream的operation中:
· writeAsText() / TextOutputFormat - 以行字符串的方式寫文件,字符串經過調用每一個element的toString()方法得到。
· writeAsCsv(…) / CsvOutputFormat - 以逗號分隔的值來說Tuple寫入文件,行和域的分隔符是能夠配置的。每一個域的值是經過調用object的toString()方法得到的。
· print() / printToErr() - 將每一個element的toString()值打印在標準輸出 / 標準錯誤流中。能夠提供一個前綴(msg)做爲輸出的前綴,使得在不一樣print的調用能夠互相區分。若是並行度大於1,輸出也會以task的標識符(identifier)爲產生的輸出的前綴。
· writeUsingOutputFormat() / FileOutputFormat - 自定義文件輸出所用的方法和基類,支持自定義object到byte的轉換。
· writeToSocket - 依據SerializationSchema將element寫到socket中。
· addSink - 調用自定義sink方法,Flink自帶鏈接到其餘系統的connector(如Apache Kafka),這些connector都以sink方法的形式實現。
注意DataStream的write*()函數主要用於debug,它們不參與Flink的檢查點,這意味着這些方法一般處於「至少一次(at-least-once)「的執行語義下。flush到目標系統的數據依賴於OutputFormat的實現,這意味着不是全部發送到OutputFormat的element都會當即出如今目標系統中,此外,在失效的狀況下,這些數據極可能會丟失。
故爲了可靠性以及將stream「剛好一次(exact once)」地傳入文件系統,咱們應當使用flink-connector-filesystem。此外,經過實現「.addSink(…)」的自定義內容會參加Flink的檢查點機制,故會保證「剛好一次」的執行語義。
迭代流程序實現了一個階段方法並將之嵌入到一個IterativeStream中。做爲一個可能永遠不會結束的程序,它沒有最大迭代數,反之,你須要使用split或filter的transformation來明確流的哪一部分會被反饋到迭代中,哪一部分則繼續轉發到下游。這裏,咱們使用filter做爲例子,咱們定義IterativeStream:
IterativeStream<Integer> iteration = input.iterate();
而後,咱們定義在循環中將要進行的邏輯處理,咱們經過一系列transformation來實現(這裏用了一個簡單的map transformation):
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
咱們能夠調用IterativeStream的closeWith(feedbackStream)函數來關閉一個迭代並定義迭代尾。傳遞給closeWith方法的DataStream將會反饋回迭代頭。分割出用來反饋的stream的部分和向前傳播的stream部分一般的方法即是使用filter來進行分割。這些filter能夠定義諸如"termination"邏輯,即element將會傳播到下游,而不是被反饋回去。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
默認地,反饋的那部分流將會自動設置爲迭代頭的輸入,要想重載該行爲,用戶須要設置closeWith函數中的一個boolean參數。例如,下面是一個持續將整數序列中的數字減1知道它們變爲0的程序:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
StreamExecutionEnvironment包含ExecutionConfig,它可使用戶設置job的確切運行時配置值。
請參考execution configuration來查看參數的解釋。特別的,如下這些參數僅適用於DataStream API:
enableTimestamps() / disableTimestamps():在每個source發出的事件上附加上一個時間戳。函數areTimestampsEnabled()能夠返回該狀態的當前值。
setAutoWatermarkInterval(long milliseconds):設置自動水印發布(watermark emission)區間。你能夠經過調用函數getAutoWatermarkInterval()來獲取當前值。
文檔Fault Tolerance Documentation描述了打開並配置Flink的檢查點機制的選項和參數
默認的,element在網絡傳輸時不是一個個單獨傳輸的(這會致使沒必要要的網絡流量),而是緩存後傳輸。緩存(是在設備間傳輸的實際單位)的大小能夠在Flink的配置文件中設置。儘管該方法有益於優化吞吐量,他會在stream到達不夠快時致使執行時間方面的問題。爲了控制吞吐量和執行時間,你能夠在執行環境(或獨立的Operator)中調用env.setBufferTimeout(timeoutMillis)來設置等待裝滿buffer的最大等待時間,在這個時間事後,無論buffer是否已滿,它都會自動發出。該默認超時時間是100ms。下例是設置API的用法:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
要最大化吞吐量,設置setBufferTimeout(-1)來去除超時時間,則buffer僅在它滿後纔會被flush。要最小化執行時間,設置timeout爲一個接近0的數字(如5ms或10ms)。應當避免設置Timeout爲0,由於它會形成嚴重的性能降低。
在分佈式集羣上運行流程序以前,確保算法正確執行很重要。所以,實現數據分析程序一般須要遞增的檢查結果、debug、優化的過程。
Flink提供了能夠顯著簡化數據分析程序的開發過程的特性,便可以在IDE中本地進行debug、注入測試數據、以及結果數據的收集等。本節對如何簡化Flink程序開發提出幾點建議。
LocalStreamEnvironment在建立它的同一個JVM進程下建立Flink系統。若是你從IDE中啓動一個LocalEnvironment,你能夠在代碼中設置斷點來簡單地debug你的程序。下例爲LocalEnvironment是如何建立並使用的:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
Flink提供基於Java collection的特殊數據源來方便測試。一旦程序測試以後,source和sink能夠簡單地替代爲對外部系統的讀取/寫出的source和sink。Collection數據源使用方法以下:
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
注意:當前Collection數據源須要實現Serializable接口的數據類型和迭代器。此外,Collection數據源沒法並行執行(並行度=1)
Flink一樣提供了一個收集測試和debug的DataStream結果的sink,它的使用方式以下:
import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)