以上爲Flink的運行模型,Flink的程序主要由三部分構成,分別爲Source、Transformation、Sink。DataSource主要負責數據的讀取,Transformation主要負責對屬於的轉換操做,Sink負責最終數據的輸出。bash
每一個Flink程序都包含如下的若干流程:服務器
執行環境StreamExecutionEnvironment是全部Flink程序的基礎。markdown
建立執行環境有三種方式,分別爲:架構
StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment
複製代碼
建立一個執行環境,表示當前執行程序的上下文。 若是程序是獨立調用的,則此方法返回本地執行環境;若是從命令行客戶端調用程序以提交到集羣,則此方法返回此集羣的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什麼樣的運行環境,是最經常使用的一種建立執行環境的方式。socket
val env = StreamExecutionEnvironment.getExecutionEnvironment
複製代碼
返回本地執行環境,須要在調用時指定默認的並行度。oop
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
複製代碼
返回集羣執行環境,將Jar提交到遠程服務器。須要在調用時指定JobManager的IP和端口號,並指定要在集羣中運行的Jar包。spa
val env = StreamExecutionEnvironment.createRemoteEnvironment(1)
複製代碼
一列一列的讀取遵循TextInputFormat規範的文本文件,並將結果做爲String返回。命令行
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("/opt/modules/test.txt") stream.print() env.execute("FirstJob") 複製代碼
按照指定的文件格式讀取文件。線程
val env = StreamExecutionEnvironment.getExecutionEnvironment val path = new Path("/opt/modules/test.txt") val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt") stream.print() env.execute("FirstJob") 複製代碼
從Socket中讀取信息,元素能夠用分隔符分開。3d
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("localhost", 11111) stream.print() env.execute("FirstJob") 複製代碼
從集合中建立一個數據流,集合中全部元素的類型是一致的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val list = List(1,2,3,4) val stream = env.fromCollection(list) stream.print() env.execute("FirstJob") 複製代碼
從迭代(Iterator)中建立一個數據流,指定元素數據類型的類由iterator返回。
val env = StreamExecutionEnvironment.getExecutionEnvironment val iterator = Iterator(1,2,3,4) val stream = env.fromCollection(iterator) stream.print() env.execute("FirstJob") 複製代碼
從一個給定的對象序列中建立一個數據流,全部的對象必須是相同類型的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val list = List(1,2,3,4) val stream = env.fromElements(list) stream.print() env.execute("FirstJob") 複製代碼
從給定的間隔中並行地產生一個數字序列。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) stream.print() env.execute("FirstJob") 複製代碼
Data Sink 消費DataStream中的數據,並將它們轉發到文件、套接字、外部系統或者打印出。
Flink有許多封裝在DataStream操做裏的內置輸出格式。
將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串經過調用每一個元素的toString()方法來獲取。
將元組以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每一個字段的值來自對象的toString()方法。
打印每一個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也能夠在輸出流中添加一個前綴,這個能夠幫助區分不一樣的打印調用,若是並行度大於1,那麼輸出也會有一個標識由哪一個任務產生的標誌。
自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。
根據SerializationSchema 將元素寫入到socket中。
DataStream → DataStream:輸入一個參數產生一個參數。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) val streamMap = stream.map { x => x * 2 } streamMap.print() env.execute("FirstJob") 複製代碼
注意:stream.print():每一行前面的數字表明這一行是哪個並行線程輸出的。
DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap{ x => x.split(" ") } streamFilter.print() env.execute("FirstJob") 複製代碼
DataStream → DataStream:結算每一個元素的布爾值,並返回布爾值爲true的元素。下面這個例子是過濾出非0的元素:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) val streamFilter = stream.filter{ x => x == 1 } streamFilter.print() env.execute("FirstJob") 複製代碼
DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop")) val streamCollect = env.fromCollection(List(1,2,3,4)) val streamConnect = streamMap.connect(streamCollect) streamConnect.map(item=>println(item), item=>println(item)) env.execute("FirstJob") 複製代碼
ConnectedStreams → DataStream:做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val streamFlatMap = stream1.flatMap(x => x.split(" ")) val stream2 = env.fromCollection(List(1,2,3,4)) val streamConnect = streamFlatMap.connect(stream2) val streamCoMap = streamConnect.map( (str) => str + "connect", (in) => in + 100 ) env.execute("FirstJob") 複製代碼
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val stream2 = env.readTextFile("test1.txt") val streamConnect = stream1.connect(stream2) val streamCoMap = streamConnect.flatMap( (str1) => str1.split(" "), (str2) => str2.split(" ") ) streamConnect.map(item=>println(item), item=>println(item)) env.execute("FirstJob") 複製代碼
DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap(x => x.split(" ")) val streamSplit = streamFlatMap.split( num => # 字符串內容爲hadoop的組成一個DataStream,其他的組成一個DataStream (num.equals("hadoop")) match{ case true => List("hadoop") case false => List("other") } ) env.execute("FirstJob") 複製代碼
SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap(x => x.split(" ")) val streamSplit = streamFlatMap.split( num => (num.equals("hadoop")) match{ case true => List("hadoop") case false => List("other") } ) val hadoop = streamSplit.select("hadoop") val other = streamSplit.select("other") hadoop.print() env.execute("FirstJob") 複製代碼
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val streamFlatMap1 = stream1.flatMap(x => x.split(" ")) val stream2 = env.readTextFile("test1.txt") val streamFlatMap2 = stream2.flatMap(x => x.split(" ")) val streamConnect = streamFlatMap1.union(streamFlatMap2) env.execute("FirstJob") 複製代碼
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap{ x => x.split(" ") } val streamMap = streamFlatMap.map{ x => (x,1) } val streamKeyBy = streamMap.keyBy(0) env.execute("FirstJob") 複製代碼
KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0) val streamReduce = stream.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) ) streamReduce.print() env.execute("FirstJob") 複製代碼
KeyedStream → DataStream:一個有初始值的分組數據流的滾動摺疊操做,合併當前元素和前一次摺疊操做的結果,併產生一個新的值,返回的流中包含每一次摺疊的結果,而不是隻返回最後一次摺疊的最終結果。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0) val streamReduce = stream.fold(100)( (begin, item) => (begin + item._2) ) streamReduce.print() env.execute("FirstJob") 複製代碼
KeyedStream → DataStream:分組數據流上的滾動聚合操做。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(一樣原理適用於max和maxBy),返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0) val streamReduce = stream.sum(1) streamReduce.print() env.execute("FirstJob") 複製代碼
在5.10以前的算子都是能夠直接做用在Stream上的,由於他們不是聚合類型的操做,可是到5.10後你會發現,咱們雖然能夠對一個無邊界的流數據直接應用聚合算子,可是它會記錄下每一次的聚合結果,這每每不是咱們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能獲得想要的結果。