2.3 Apache Flink DataStream API

1. Flink 運行模型

圖 Flink查詢模型

以上爲Flink的運行模型,Flink的程序主要由三部分構成,分別爲Source、Transformation、Sink。DataSource主要負責數據的讀取,Transformation主要負責對屬於的轉換操做,Sink負責最終數據的輸出。bash

2. Flink 程序架構

每一個Flink程序都包含如下的若干流程:服務器

  • 得到一個執行環境;(Execution Environment)
  • 加載/建立初始數據;(Source)
  • 指定轉換這些數據;(Transformation)
  • 指定放置計算結果的位置;(Sink)
  • 觸發程序執行。

3. Environment

執行環境StreamExecutionEnvironment是全部Flink程序的基礎。markdown

建立執行環境有三種方式,分別爲:架構

StreamExecutionEnvironment.getExecutionEnvironment 
StreamExecutionEnvironment.createLocalEnvironment 
StreamExecutionEnvironment.createRemoteEnvironment
複製代碼

3.1 StreamExecutionEnvironment.getExecutionEnvironment

建立一個執行環境,表示當前執行程序的上下文。 若是程序是獨立調用的,則此方法返回本地執行環境;若是從命令行客戶端調用程序以提交到集羣,則此方法返回此集羣的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什麼樣的運行環境,是最經常使用的一種建立執行環境的方式。socket

val env = StreamExecutionEnvironment.getExecutionEnvironment
複製代碼

3.2 StreamExecutionEnvironment.createLocalEnvironment

返回本地執行環境,須要在調用時指定默認的並行度。oop

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
複製代碼

3.3 StreamExecutionEnvironment.createRemoteEnvironment

返回集羣執行環境,將Jar提交到遠程服務器。須要在調用時指定JobManager的IP和端口號,並指定要在集羣中運行的Jar包。spa

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)
複製代碼

4. Source

4.1 基於File的數據源

  • readTextFile(path)

一列一列的讀取遵循TextInputFormat規範的文本文件,並將結果做爲String返回。命令行

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("/opt/modules/test.txt") stream.print() 
env.execute("FirstJob")
複製代碼
  • readFile(fileInputFormat, path)

按照指定的文件格式讀取文件。線程

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val path = new Path("/opt/modules/test.txt") 
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt") 
stream.print() env.execute("FirstJob")
複製代碼

4.2 基於Socket的數據源

  • socketTextStream

從Socket中讀取信息,元素能夠用分隔符分開。3d

val env = StreamExecutionEnvironment.getExecutionEnvironment
 val stream = env.socketTextStream("localhost", 11111) 
stream.print() 
env.execute("FirstJob")
複製代碼

4.3 基於集合(Collection)的數據源

  • fromCollection(seq)

從集合中建立一個數據流,集合中全部元素的類型是一致的。

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val list = List(1,2,3,4) 
val stream = env.fromCollection(list) 
stream.print() 
env.execute("FirstJob")
複製代碼
  • fromCollection(Iterator)

從迭代(Iterator)中建立一個數據流,指定元素數據類型的類由iterator返回。

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val iterator = Iterator(1,2,3,4) 
val stream = env.fromCollection(iterator)
stream.print() 
env.execute("FirstJob")
複製代碼
  • fromElements(elements:_*)

從一個給定的對象序列中建立一個數據流,全部的對象必須是相同類型的。

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val list = List(1,2,3,4) 
val stream = env.fromElements(list) 
stream.print() 
env.execute("FirstJob")
複製代碼
  • generateSequence(from, to)

從給定的間隔中並行地產生一個數字序列。

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
stream.print() 
env.execute("FirstJob")
複製代碼

4. Sink

Data Sink 消費DataStream中的數據,並將它們轉發到文件、套接字、外部系統或者打印出。

Flink有許多封裝在DataStream操做裏的內置輸出格式。

4.1 writeAsText

將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串經過調用每一個元素的toString()方法來獲取。

4.2 WriteAsCsv

將元組以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每一個字段的值來自對象的toString()方法。

4.3 print/printToErr

打印每一個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也能夠在輸出流中添加一個前綴,這個能夠幫助區分不一樣的打印調用,若是並行度大於1,那麼輸出也會有一個標識由哪一個任務產生的標誌。

4.4 writeUsingOutputFormat

自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。

4.5 writeToSocket

根據SerializationSchema 將元素寫入到socket中。

5. Transformaction

5.1 Map

DataStream → DataStream:輸入一個參數產生一個參數。

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
val streamMap = stream.map { x => x * 2 }
streamMap.print() env.execute("FirstJob")
複製代碼

注意:stream.print():每一行前面的數字表明這一行是哪個並行線程輸出的。

5.2 FlatMap

DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。

val env = StreamExecutionEnvironment.getExecutionEnvironment  
val stream = env.readTextFile("test.txt") 
val streamFlatMap = stream.flatMap{     x => x.split(" ") } 
streamFilter.print() 
env.execute("FirstJob")
複製代碼

5.3 Filter

DataStream → DataStream:結算每一個元素的布爾值,並返回布爾值爲true的元素。下面這個例子是過濾出非0的元素:

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
val streamFilter = stream.filter{     x => x == 1 } 
streamFilter.print() 
env.execute("FirstJob")
複製代碼

5.4 Connect

圖 Connect算子

DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
 
val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
 
val streamConnect = streamMap.connect(streamCollect)
 
streamConnect.map(item=>println(item), item=>println(item))
 
env.execute("FirstJob")
複製代碼

5.5 CoMap,CoFlatMap

圖 CoMap/CoFlatMap

ConnectedStreams → DataStream:做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
    (str) => str + "connect",
    (in) => in + 100
)
 
env.execute("FirstJob")
複製代碼
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
    (str1) => str1.split(" "),
    (str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))
 
env.execute("FirstJob")
複製代碼

5.6 Spilt

圖 Split

DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
# 字符串內容爲hadoop的組成一個DataStream,其他的組成一個DataStream
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
env.execute("FirstJob")
複製代碼

5.7 Select

圖 Select

SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()
 
env.execute("FirstJob")
複製代碼

5.8 Union

圖 Union

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
 
env.execute("FirstJob")
複製代碼

5.9 KeyBy

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
val streamMap = streamFlatMap.map{
    x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
複製代碼

5.10 Reduce

KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")
複製代碼

5.11 Fold

KeyedStream → DataStream:一個有初始值的分組數據流的滾動摺疊操做,合併當前元素和前一次摺疊操做的結果,併產生一個新的值,返回的流中包含每一次摺疊的結果,而不是隻返回最後一次摺疊的最終結果。

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.fold(100)(
  (begin, item) => (begin + item._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")
複製代碼

5.12 Aggregations

KeyedStream → DataStream:分組數據流上的滾動聚合操做。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(一樣原理適用於max和maxBy),返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
 
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
 
val streamReduce = stream.sum(1)
 
streamReduce.print()
 
env.execute("FirstJob")
複製代碼

在5.10以前的算子都是能夠直接做用在Stream上的,由於他們不是聚合類型的操做,可是到5.10後你會發現,咱們雖然能夠對一個無邊界的流數據直接應用聚合算子,可是它會記錄下每一次的聚合結果,這每每不是咱們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能獲得想要的結果。

相關文章
相關標籤/搜索