大數據課程之Flinkjava
Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在全部常見的集羣環境中運行,之內存執行速度和任意規模來執行計算。mysql
Flink起源於Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其餘的大學共同進行的研究項目,2014年4月Stratosphere的代碼被複制並捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成爲Apache軟件基金會的頂級項目。linux
在德語中,Flink一詞表示快速和靈巧,項目採用一隻松鼠的彩色圖案做爲logo,這不只是由於松鼠具備快速和靈巧的特色,還由於柏林的松鼠有一種迷人的紅棕色,而Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一隻Apache風格的松鼠。web
圖 Flink Logoredis
Flink雖然誕生的早(2010年),可是實際上是起大早趕晚集,直到2015年纔開始忽然爆發熱度。sql
在Flink被apache提高爲頂級項目以後,阿里實時計算團隊決定在阿里內部創建一個 Flink 分支 Blink,並對 Flink 進行大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。數據庫
Blink由2016年上線,服務於阿里集團內部搜索、推薦、廣告和螞蟻等大量核心實時業務。與2019年1月Blink正式開源,目前阿里70%的技術部門都有使用該版本。apache
Blink比起Flink的優點就是對SQL語法的更完善的支持以及執行SQL的性能提高。編程
事件驅動型應用是一類具備狀態的應用,它從一個或多個事件流提取數據,並根據到來的事件觸發計算、狀態更新或其餘外部動做。比較典型的就是以kafka爲表明的消息隊列幾乎都是事件驅動型應用。json
與之不一樣的就是SparkStreaming微批次,如圖:
事件驅動型:
批處理的特色是有界、持久、大量,很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計。
流處理的特色是無界、實時, 無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計。
在spark的世界觀中,一切都是由批次組成的,離線數據是一個大批次,而實時數據是由一個一個無限的小批次組成的。
而在flink的世界觀中,一切都是由流組成的,離線數據是有界限的流,實時數據是一個沒有界限的流,這就是所謂的有界流和無界流。
無界數據流:無界數據流有一個開始可是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取後當即處理event。對於無界數據流咱們沒法等待全部數據都到達,由於輸入是無界的,而且在任什麼時候間點都不會完成。處理無界數據一般要求以特定順序(例如事件發生的順序)獲取event,以便可以推斷結果完整性。
有界數據流:有界數據流有明肯定義的開始和結束,能夠在執行任何計算以前經過獲取全部數據來處理有界流,處理有界流不須要有序獲取,由於能夠始終對有界數據集進行排序,有界流的處理也稱爲批處理。
這種以流爲世界觀的架構,得到的最大好處就是具備極低的延遲。
最底層級的抽象僅僅提供了有狀態流,它將經過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其能夠對某些特定的操做進行底層的抽象,它容許用戶能夠自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此以外,用戶能夠註冊事件時間並處理時間回調,從而使程序能夠處理複雜的計算。
實際上,大多數應用並不須要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,好比DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API爲數據處理提供了通用的構建模塊,好比由用戶定義的多種形式的轉換(transformations),鏈接(joins),聚合(aggregations),窗口操做(windows)等等。DataSet API 爲有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。
Table API 是以表爲中心的聲明式編程,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(相似於關係數據庫中的表),同時API提供可比較的操做,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什麼邏輯操做應該執行,而不是準確地肯定這些操做代碼的看上去如何 。 儘管Table API能夠經過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,可是使用起來卻更加簡潔(代碼量更少)。除此以外,Table API程序在執行以前會通過內置優化器進行優化。
你能夠在表與 DataStream/DataSet 之間無縫切換,以容許程序將 Table API 與 DataStream 以及 DataSet 混合使用。
Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢能夠直接在Table API定義的表上執行。
Flink在1.4版本中實現了狀態管理,所謂狀態管理就是在流失計算過程當中將算子的中間結果保存在內存或者文件系統中,等下一個事件進入算子後可讓當前事件的值與歷史值進行彙總累計。
在分佈式系統中,組成系統的各個計算機是獨立的。這些計算機有可能fail。
一個sender發送一條message到receiver。根據receiver出現fail時sender如何處理fail,能夠將message delivery分爲三種語義:
At Most once: 對於一條message,receiver最多收到一次(0次或1次).
能夠達成At Most Once的策略:
sender把message發送給receiver.不管receiver是否收到message,sender都再也不重發message.
At Least once: 對於一條message,receiver最少收到一次(1次及以上).
能夠達成At Least Once的策略:
sender把message發送給receiver.當receiver在規定時間內沒有回覆ACK或回覆了error信息,那麼sender重發這條message給receiver,直到sender收到receiver的ACK.
Exactly once: 對於一條message,receiver確保只收到一次
目前大多數框架時間窗口計算,都是採用當前系統時間,以時間爲單位進行的聚合計算只能反應數據到達計算引擎的時間,而並非實際業務時間
<?xml version="1.0" encoding="UTF-8"?>
<build>
|
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object WordCountBeach { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val input = "F:\\input\\words.txt" val ds: DataSet[String] = env.readTextFile(input) import org.apache.flink.api.scala._ val aggDs = ds.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) aggDs.print() } }
def main(args: Array[String]): Unit = {
|
注意:Flink程序支持java 和 scala兩種語言,本課程中以scala語言爲主。
在引入包中,有java和scala兩種包時注意要使用scala的包
import org.apache.flink.api.java.utils.ParameterTool
|
在linux系統中用
nc -lk 7777 |
進行發送測試
解壓縮 flink-1.7.0-bin-hadoop27-scala_2.11.tgz
修改 flink/conf/flink-conf.yaml 文件
注意:yaml文件必須是冒號+空格
修改 /conf/slave文件
.分發給 另外兩臺機子
啓動
1) 準備數據文件
2) 把含數據文件的文件夾,分發到taskmanage 機器中
因爲讀取數據是從本地磁盤讀取,實際任務會被分發到taskmanage的機器中,因此要把目標文件分發。
3) 執行程序
./flink run -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output.csv |
4) 到目標文件夾中查看計算結果
注意:計算結果根據會保存到taskmanage的機器下,不會再jobmanage下。
5) 在webui控制檯查看計算過程
1) 啓動hadoop集羣
2) 啓動yarn-session
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d |
其中:
-n(--container):TaskManager的數量。
-s(--slots): 每一個TaskManager的slot數量,默認一個slot一個core,默認每一個taskmanager的slot的個數爲1,有時能夠多一些taskmanager,作冗餘。
-jm:JobManager的內存(單位MB)。
-tm:每一個taskmanager的內存(單位MB)。
-nm:yarn 的appName(如今yarn的ui上的名字)。
-d:後臺執行。
3) 執行任務
./flink run -m yarn-cluster -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output5.csv |
4) 去yarn控制檯查看任務狀態
圖 Yarn模式任務提交流程
Flink任務提交後,Client向HDFS上傳Flink的Jar包和配置,以後向Yarn ResourceManager提交任務,ResourceManager分配Container資源並通知對應的NodeManager啓動ApplicationMaster,ApplicationMaster啓動後加載Flink的Jar包和配置構建環境,而後啓動JobManager,以後ApplicationMaster向ResourceManager申請資源啓動TaskManager,ResourceManager分配Container資源後,由ApplicationMaster通知資源所在節點的NodeManager啓動TaskManager,NodeManager加載Flink的Jar包和配置構建環境並啓動TaskManager,TaskManager啓動後向JobManager發送心跳包,並等待JobManager向其分配任務。
圖 任務調度原理
客戶端不是運行時和程序執行的一部分,但它用於準備併發送dataflow(JobGraph)給Master(JobManager),而後,客戶端斷開鏈接或者維持鏈接以等待接收計算結果。
當 Flink 集羣啓動後,首先會啓動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,而後 TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均爲獨立的 JVM 進程。
Client 爲提交 Job 的客戶端,能夠是運行在任何機器上(與 JobManager 環境連通便可)。提交 Job 後,Client 能夠結束進程(Streaming的任務),也能夠不結束並等待結果返回。
JobManager 主要負責調度 Job 並協調 Task 作 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
TaskManager 在啓動的時候就設置好了槽位數(Slot),每一個 slot 能啓動一個 Task,Task 爲線程。從 JobManager 處接收須要部署的 Task,部署啓動後,與本身的上游創建 Netty 鏈接,接收數據並處理。
關於執行圖
Flink 中的執行圖能夠分紅四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph:是根據用戶經過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
JobGraph:StreamGraph通過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化爲,將多個符合條件的節點 chain 在一塊兒做爲一個節點,這樣能夠減小數據在節點之間流動所須要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後造成的「圖」,並非一個具體的數據結構。
每個worker(TaskManager)是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。爲了控制一個worker能接收多少個task,worker經過task slot來進行控制(一個worker至少有一個task slot)。·
每一個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那麼它會將其管理的內存分紅三份給各個slot。資源slot化意味着一個subtask將不須要跟來自其餘job的subtask競爭被管理的內存,取而代之的是它將擁有必定數量的內存儲備。須要注意的是,這裏不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。
經過調整task slot的數量,容許用戶定義subtask之間如何互相隔離。若是一個TaskManager一個slot,那將意味着每一個task group運行在獨立的JVM中(該JVM多是經過一個特定的容器啓動的),而一個TaskManager多個slot意味着更多的subtask能夠共享同一個JVM。而在同一個JVM進程中的task將共享TCP鏈接(基於多路複用)和心跳消息。它們也可能共享數據集和數據結構,所以這減小了每一個task的負載。
圖 TaskManager與Slot
Task Slot是靜態的概念,是指TaskManager具備的併發執行能力,能夠經過參數taskmanager.numberOfTaskSlots進行配置,而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的併發能力,能夠經過參數parallelism.default進行配置。
也就是說,假設一共有3個TaskManager,每個TaskManager中的分配3個TaskSlot,也就是每一個TaskManager能夠接收3個task,一共9個TaskSlot,若是咱們設置parallelism.default=1,即運行程序默認的並行度爲1,9個TaskSlot只用了1個,有8個空閒,所以,設置合適的並行度才能提升效率。
Flink程序的執行具備並行、分佈式的特性。在執行過程當中,一個 stream 包含一個或多個 stream partition ,而每個 operator 包含一個或多個 operator subtask,這些operator subtasks在不一樣的線程、不一樣的物理機或不一樣的容器中彼此互不依賴得執行。
一個特定operator的subtask的個數被稱之爲其parallelism(並行度)。一個stream的並行度老是等同於其producing operator的並行度。一個程序中,不一樣的operator可能具備不一樣的並行度。
圖 並行數據流
Stream在operator之間傳輸數據的形式能夠是one-to-one(forwarding)的模式也能夠是redistributing的模式,具體是哪種形式,取決於operator的種類。
One-to-one:stream(好比在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關係。
Ø 相似於spark中的窄依賴
Redistributing:stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區會發生改變。每個operator subtask依據所選擇的transformation發送數據到不一樣的目標subtask。例如,keyBy() 基於hashCode重分區、broadcast和rebalance會隨機從新分區,這些算子都會引發redistribute過程,而redistribute過程就相似於Spark中的shuffle過程。
Ø 相似於spark中的寬依賴
相同並行度的one to one操做,Flink這樣相連的operator 連接在一塊兒造成一個task,原來的operator成爲裏面的subtask。將operators連接成task是很是有效的優化:它能減小線程之間的切換和基於緩存區的數據交換,在減小時延的同時提高吞吐量。連接的行爲能夠在編程API中進行指定。
圖 task與operator chains
OperatorChain的優勢
Ø 減小線程切換
Ø 減小序列化與反序列化
Ø 減小延遲而且提升吞吐能力
• OperatorChain 組成條件(重要)
Ø 上下游算子並行度一致
Ø 上下游算子之間沒有數據shuffle
建立一個執行環境,表示當前執行程序的上下文。 若是程序是獨立調用的,則此方法返回本地執行環境;若是從命令行客戶端調用程序以提交到集羣,則此方法返回此集羣的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什麼樣的運行環境,是最經常使用的一種建立執行環境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
|
若是沒有設置並行度,會以flink-conf.yaml中的配置爲準,默認是1
返回本地執行環境,須要在調用時指定默認的並行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1) |
返回集羣執行環境,將Jar提交到遠程服務器。須要在調用時指定JobManager的IP和端口號,並指定要在集羣中運行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar") |
建立kafka工具類
object MyKafkaUtil {
|
增長業務主類 StartupApp
object StartupApp { def main(args: Array[String]): Unit = { } |
Flink+kafka是如何實現exactly-once語義的
Flink經過checkpoint來保存數據是否處理完成的狀態
由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也能夠改成文件級的進行持久化保存。
執行過程其實是一個兩段式提交,每一個算子執行完成,會進行「預提交」,直到執行完sink操做,會發起「確認提交」,若是執行失敗,預提交會放棄掉。
若是宕機須要經過StateBackend進行恢復,只能恢復全部確認提交的操做。
轉換算子
val streamMap = stream.map { x => x * 2 } |
val streamFlatMap = stream.flatMap{ x => x.split(" ") } |
val streamFilter = stream.filter{ x => x == 1 } |
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的。
KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
//求各個渠道的累計個數
|
flink是如何保存累計值的,
flink是一種有狀態的流計算框架,其中說的狀態包括兩個層面:
1) operator state 主要是保存數據在流程中的處理狀態,用於確保語義的exactly-once。
2) keyed state 主要是保存數據在計算過程當中的累計值。
這兩種狀態都是經過checkpoint機制保存在StateBackend中,StateBackend能夠選擇保存在內存中(默認使用)或者保存在磁盤文件中。
Split
圖 Split
DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。
Select
圖 Select
SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
需求:把appstore和其餘的渠道的數據單獨拆分出來,作成兩個流
// 將appstore與其餘渠道拆分拆分出來 成爲兩個獨立的流
|
圖 Connect算子
DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
CoMap,CoFlatMap
圖 CoMap/CoFlatMap
ConnectedStreams → DataStream:做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。
//合併之後打印
|
圖 Union
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。
//合併之後打印
|
Connect與 Union 區別:
1 、 Union以前兩個流的類型必須是同樣,Connect能夠不同,在以後的coMap中再去調整成爲同樣的。
2 Connect只能操做兩個流,Union能夠操做多個
Flink沒有相似於spark中foreach方法,讓用戶進行迭代的操做。雖有對外的輸出操做都要利用Sink完成。最後經過相似以下方式完成整個任務最終輸出操做。
myDstream.addSink(new MySink(xxxx)) |
官方提供了一部分的框架的sink。除此之外,須要用戶自定義實現sink。
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
|
mykafkaUtil中增長方法
def getProducer(topic:String): FlinkKafkaProducer011[String] ={
|
主函數中添加sink
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
|
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
|
object MyRedisUtil {
|
在主函數中調用
sumDstream.map( chCount=>(chCount._1,chCount._2+"" )).addSink(MyRedisUtil.getRedisSink())
|
pom.xml
<dependency>
|
添加MyEsUtil
import java.util
|
在main方法中調用
// 明細發送到es 中
|
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
|
添加MyJdbcSink
class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] { //反覆調用
|
在main方法中增長
把明細保存到mysql中
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
|
在Flink的流式處理中,會涉及到時間的不一樣概念,以下圖所示:
圖 Flink時間概念
Event Time:是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。
Ingestion Time:是數據進入Flink的時間。
Processing Time:是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
例如,一條日誌進入Flink的時間爲2017-11-12 10:00:00.123,到達Window的系統時間爲2017-11-12 10:00:01.234,日誌的內容以下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?—— eventTime,由於咱們要根據日誌的生成時間進行統計。
streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。
Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的「buckets」桶,咱們能夠在這些桶上作計算操做。
Window能夠分紅兩類:
l CountWindow:按照指定的數據條數生成一個Window,與時間無關。
l TimeWindow:按照時間生成Window。
對於TimeWindow,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
1. 滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切片。
特色:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。例如:若是你指定了一個5分鐘大小的滾動窗口,窗口的建立以下圖所示:
圖 滾動窗口
適用場景:適合作BI統計等(作每一個時間段的聚合計算)。
2. 滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特色:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那麼每一個窗口中5分鐘的窗口裏包含着上個10分鐘產生的數據,以下圖所示:
圖 滑動窗口
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。
3. 會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特色:時間無對齊。
session窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的時間週期內再也不收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。
圖 會話窗口
TimeWindow是將指定時間範圍內的全部數據組成一個window,一次對一個window裏面的全部數據進行計算。
1. 滾動窗口
Flink默認的時間窗口根據Processing Time 進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不一樣的窗口中。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時間間隔能夠經過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
2. 滑動窗口(SlidingEventTimeWindows)
滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置爲了2s,也就是說,窗口每2s就計算一次,每一次計算的window範圍是5s內的全部元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時間間隔能夠經過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的全部元素的總數。
1 滾動窗口
默認的CountWindow是一個滾動窗口,只須要指定窗口大小便可,當元素數量達到窗口大小時,就會觸發窗口的執行。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
2 滑動窗口
滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置爲了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是5個元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
在Flink的流式處理中,絕大部分的業務都會使用eventTime,通常只在eventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。
若是要使用EventTime,那麼須要引入EventTime的時間屬性,引入方式以下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env建立的每個stream追加時間特徵
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡、分佈式等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的Event Time順序排列的。
圖 數據的亂序
那麼此時出現一個問題,一旦出現亂序,若是隻根據eventTime決定window的運行,咱們不能明確數據是否所有到位,但又不能無限期的等下去,此時必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。
Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。
數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,所以,window的執行也是由Watermark觸發的。
Watermark能夠理解成一個延遲觸發機制,咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime - t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime – t,那麼這個窗口被觸發執行。
有序流的Watermarker以下圖所示:(Watermark設置爲0)
圖 有序數據的Watermark
亂序流的Watermarker以下圖所示:(Watermark設置爲2)
圖 無序數據的Watermark
當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前全部到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的中止時間要晚,那麼就會觸發相應窗口的執行。因爲Watermark是由數據攜帶的,所以,若是運行過程當中沒法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發。
上圖中,咱們設置的容許最大延遲到達時間爲2s,因此時間戳爲7s的事件對應的Watermark是5s,時間戳爲12s的事件的Watermark是10s,若是咱們的窗口1是1s~5s,窗口2是6s~10s,那麼時間戳爲7s的事件到達時的Watermarker剛好觸發窗口1,時間戳爲12s的事件到達時的Watermark剛好觸發窗口2。
Watermark 就是觸發前一窗口的「關窗時間」,一旦觸發關門那麼以當前時刻爲準在窗口範圍內的全部全部數據都會收入窗中。
只要沒有達到水位那麼無論現實中的時間推動了多久都不會觸發關窗。
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
|
def main(args: Array[String]): Unit = {
|
結果是按照Event Time的時間窗口計算得出的,而無關係統的時間(包括輸入的快慢)。
def main(args: Array[String]): Unit = {
|
相鄰兩次數據的EventTime的時間差超過指定的時間間隔就會觸發執行。若是加入Watermark, 會在符合窗口觸發的狀況下進行延遲。到達延遲水位再進行窗口觸發。
def main(args: Array[String]): Unit = {
|
Table API是流處理和批處理通用的關係型API,Table API能夠基於流輸入或者批輸入來運行而不須要進行任何修改。Table API是SQL語言的超集並專門爲Apache Flink設計的,Table API是Scala 和Java語言集成式的API。與常規SQL語言中將查詢指定爲字符串不一樣,Table API查詢是以Java或Scala中的語言嵌入樣式來定義的,具備IDE支持如:自動完成和語法檢測。
<dependency>
|
def main(args: Array[String]): Unit = {
|
若是流中的數據類型是case class能夠直接根據case class的結構生成table
tableEnv.fromDataStream(startupLogDstream) |
或者根據字段順序單獨命名
tableEnv.fromDataStream(startupLogDstream,’mid,’uid .......) |
最後的動態表能夠轉換爲流進行輸出
table.toAppendStream[(String,String)] |
用一個單引放到字段前面 來標識字段名, 如 ‘name , ‘mid ,’amount 等
//每10秒中渠道爲appstore的個數
|
一、 若是使用 groupby table轉換爲流的時候只能用toRetractDstream
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] |
二、 toRetractDstream 獲得的第一個boolean型字段標識 true就是最新的數據,false表示過時老數據
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] rDstream.filter(_._1).print() |
三、 若是使用的api包括時間窗口,那麼時間的字段必須,包含在group by中。
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
1 用到時間窗口,必須提早聲明時間字段,若是是processTime直接在建立動態表時進行追加就能夠
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) |
2 若是是EventTime要在建立動態表時聲明
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.processtime)
|
3 滾動窗口可使用Tumble over 10000.millis on
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
def main(args: Array[String]): Unit = { |