Flink主頁在其頂部展現了該項目的理念:「Apache Flink是爲分佈式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架」。java
Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在全部常見的集羣環境中運行,以內存執行速度和任意規模來執行計算。mysql
① 事件驅動型(Event-driven)linux
事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取數據,並根據到來的事件觸發計算、狀態更新或其餘外部動做。比較典型的就是以kafka爲表明的消息隊列幾乎都是事件驅動型應用。 事件驅動型以下圖:web
與之不一樣的就是SparkStreaming微批次如圖:redis
② 分層APIsql
最底層級的抽象僅僅提供了有狀態流,它將經過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其能夠對某些特定的操做進行底層的抽象,它容許用戶能夠自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此以外,用戶能夠註冊事件時間並處理時間回調,從而使程序能夠處理複雜的計算。數據庫
大多數應用並不須要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,好比DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API爲數據處理提供了通用的構建模塊,好比由用戶定義的多種形式的轉換(transformations),鏈接(joins),聚合(aggregations),窗口操做(windows)等等。apache
DataSet API 爲有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。編程
Table API 是以表爲中心的聲明式編程,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(相似於關係數據庫中的表),同時API提供可比較的操做,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什麼邏輯操做應該執行,而不是準確地肯定這些操做代碼的看上去如何 。 儘管Table API能夠經過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,可是使用起來卻更加簡潔(代碼量更少)。除此以外,Table API程序在執行以前會通過內置優化器進行優化。json
你能夠在表與 DataStream/DataSet 之間無縫切換,以容許程序將 Table API 與 DataStream 以及 DataSet 混合使用。
Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢能夠直接在Table API定義的表上執行。
③ 流與批的世界觀
批處理的特色是有界、持久、大量,很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計。
流處理的特色是無界、實時, 無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計。
在spark的世界觀中,一切都是由批次組成的,離線數據是一個大批次,而實時數據是由一個一個無限的小批次組成的。
而在flink的世界觀中,一切都是由流組成的,離線數據是有界限的流,實時數據是一個沒有界限的流,這就是所謂的有界流和無界流。
無界數據流:無界數據流有一個開始可是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取後當即處理event。對於無界數據流咱們沒法等待全部數據都到達,由於輸入是無界的,而且在任什麼時候間點都不會完成。處理無界數據一般要求以特定順序(例如事件發生的順序)獲取event,以便可以推斷結果完整性。
有界數據流:有界數據流有明肯定義的開始和結束,能夠在執行任何計算以前經過獲取全部數據來處理有界流,處理有界流不須要有序獲取,由於能夠始終對有界數據集進行排序,有界流的處理也稱爲批處理。
這種以流爲世界觀的架構,得到的最大好處就是具備極低的延遲。
Flink+kafka是如何實現exactly-once語義的
Flink經過checkpoint來保存數據是否處理完成的狀態;
有JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也能夠改成文件級的進行持久化保存。
執行過程其實是一個兩段式提交,每一個算子執行完成,會進行「預提交」,直到執行完sink操做,會發起「確認提交」,若是執行失敗,預提交會放棄掉。
若是宕機須要經過StateBackend進行恢復,只能恢復全部確認提交的操做。
Spark中要想實現有狀態的,須要使用updateBykey或者藉助redis;
而Fink是把它記錄在State Bachend,只要是通過keyBy等處理以後結果會記錄在State Bachend(已處理未提交; 若是是處理完了就是已提交狀態;),
它還會記錄另一種狀態值:keyState,好比keyBy累積的結果;
StateBachend若是不想存儲在內存中,也能夠存儲在fs文件中或者HDFS中; IDEA的工具只支持memory內存式存儲,一旦重啓就沒了;部署到linux中就支持存儲在文件中了;
Kakfa的自動提交:「enable.auto.commit」,好比從kafka出來後到sparkStreaming以後,一進來consumer會幫你自動提交,若是在處理過程當中,到最後有一個沒有寫出去(好比寫到redis、ES),雖然處理失敗了但kafka的偏移量已經發生改變;因此移偏移量的時機很重要;
object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中 //將json轉換成json對象 val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString => JSON.parseObject(jsonString, classOf[StartupLog]) } //需求一 相同渠道的值進行累加 val sumDStream: DataStream[(String, Int)] = startupLogDStream.map { startuplog => (startuplog.ch, 1) }.keyBy(0)
.reduce { (startuplogCount1, startuplogCount2) => val newCount: Int = startuplogCount1._2 + startuplogCount2._2 (startuplogCount1._1, newCount) } //val sumDStream: DataStream[(String, Int)] = startupLogDStream.map{startuplog => (startuplog.ch,1)}.keyBy(0).sum(1) //sumDStream.print() env.execute() } }
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的。
KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
Split
DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。
select
SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
//需求二 把 appstore 和其餘的渠道的數據 分紅兩個流 val splitableStream: SplitStream[StartupLog] = startupLogDStream.split { startuplog => var flagList: List[String] = List() if (startuplog.ch.equals("appstore")) { flagList = List("apple") } else { flagList = List("other") } flagList } val appleStream: DataStream[StartupLog] = splitableStream.select("apple") //appleStream.print("this is apple").setParallelism(1) val otherdStream: DataStream[StartupLog] = splitableStream.select("other") //otherdStream.print("this is other").setParallelism(1)
DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
ConnectedStreams → DataStream:做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。
//需求三 把上面兩個流合併爲一個 val connStream: ConnectedStreams[StartupLog, StartupLog] = appleStream.connect(otherdStream) val allDataStream: DataStream[String] = connStream.map((startuplog1: StartupLog) => startuplog1.ch, (startuplog2: StartupLog) => startuplog2.ch) allDataStream.print("all").setParallelism(1)
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。
//合併流union val unionDStream: DataStream[StartupLog] = appleStream.union(otherdStream) unionDStream.print("union").setParallelism(1)
Connect與 Union 區別:
1 、 Union以前兩個流的類型必須是同樣,Connect能夠不同,在以後的coMap中再去調整成爲同樣的。
2 Connect只能操做兩個流,Union能夠操做多個
Flink沒有相似於spark中foreach方法,讓用戶進行迭代的操做。雖有對外的輸出操做都要利用Sink完成。最後經過相似以下方式完成整個任務最終輸出操做。
myDstream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此之外,須要用戶自定義實現sink。
Kafka
object MyKafkaUtil { val prop = new Properties() prop.setProperty("bootstrap.servers","hadoop101:9092") prop.setProperty("group.id","gmall") def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= { val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop) myKafkaConsumer } def getProducer(topic:String):FlinkKafkaProducer011[String]={ new FlinkKafkaProducer011[String]("hadoop101:9092",topic,new SimpleStringSchema()) } } //sink到kafka unionDStream.map(_.toString).addSink(MyKafkaUtil.getProducer("gmall_union")) ///opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic gmall_union
Redis
import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object MyRedisUtil { private val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).build() def getRedisSink(): RedisSink[(String, String)] = { new RedisSink[(String, String)](config, new MyRedisMapper) } } class MyRedisMapper extends RedisMapper[(String, String)]{ //用何種命令進行保存 override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "channel_sum") //hset類型, apple, 111 } //流中的元素哪部分是value override def getKeyFromData(channel_sum: (String, String)): String = channel_sum._2 //流中的元素哪部分是key override def getValueFromData(channel_sum: (String, String)): String = channel_sum._1 } object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中 //將json轉換成json對象 val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString => JSON.parseObject(jsonString, classOf[StartupLog]) } //sink到redis //把按渠道的統計值保存到redis中 hash key: channel_sum field ch value: count //按照不一樣渠道進行累加 val chCountDStream: DataStream[(String, Int)] = startupLogDStream.map(startuplog => (startuplog.ch, 1)).keyBy(0).sum(1) //把上述結果String, Int轉換成String, String類型 val channelDStream: DataStream[(String, String)] = chCountDStream.map(chCount => (chCount._1, chCount._2.toString)) channelDStream.addSink(MyRedisUtil.getRedisSink())
ES
object MyEsUtil { val hostList: util.List[HttpHost] = new util.ArrayList[HttpHost]() hostList.add(new HttpHost("hadoop101", 9200, "http")) hostList.add(new HttpHost("hadoop102", 9200, "http")) hostList.add(new HttpHost("hadoop103", 9200, "http")) def getEsSink(indexName: String): ElasticsearchSink[String] = { //new接口---> 要實現一個方法 val esSinkFunc: ElasticsearchSinkFunction[String] = new ElasticsearchSinkFunction[String] { override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { val jSONObject: JSONObject = JSON.parseObject(element) val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jSONObject) indexer.add(indexRequest) } } val esSinkBuilder = new ElasticsearchSink.Builder[String](hostList, esSinkFunc) esSinkBuilder.setBulkFlushMaxActions(10) val esSink: ElasticsearchSink[String] = esSinkBuilder.build() esSink } } object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //sink之三 保存到ES val esSink: ElasticsearchSink[String] = MyEsUtil.getEsSink("gmall_startup") dstream.addSink(esSink) //dstream來自kafka的數據源 GET gmall_startup/_search
Mysql
class MyjdbcSink(sql: String) extends RichSinkFunction[Array[Any]] { val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop101:3306/gmall?useSSL=false" val username = "root" val password = "123456" val maxActive = "20" var connection: Connection = null // 建立鏈接 override def open(parameters: Configuration) { val properties = new Properties() properties.put("driverClassName",driver) properties.put("url",url) properties.put("username",username) properties.put("password",password) properties.put("maxActive",maxActive) val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties) connection = dataSource.getConnection() } // 把每一個Array[Any] 做爲數據庫表的一行記錄進行保存 override def invoke(values: Array[Any]): Unit = { val ps: PreparedStatement = connection.prepareStatement(sql) for (i <- 0 to values.length-1) { ps.setObject(i+1, values(i)) } ps.executeUpdate() } override def close(): Unit = { if (connection != null){ connection.close() } } } object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中 //將json轉換成json對象 val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString => JSON.parseObject(jsonString, classOf[StartupLog]) } //sink之四 保存到Mysql中 startupLogDStream.map(startuplog => Array(startuplog.mid, startuplog.uid, startuplog.ch, startuplog.area,startuplog.ts)) .addSink(new MyjdbcSink("insert into fink_startup values(?,?,?,?,?)")) env.execute() } }
在Flink的流式處理中,會涉及到時間的不一樣概念,以下圖所示
Event Time:是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。
Ingestion Time:是數據進入Flink的時間。
Processing Time:是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
例如,一條日誌進入Flink的時間爲2017-11-12 10:00:00.123,到達Window的系統時間爲2017-11-12 10:00:01.234,日誌的內容以下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?—— eventTime,由於咱們要根據日誌的生成時間進行統計。
咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡、分佈式等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的Event Time順序排列的。
那麼此時出現一個問題,一旦出現亂序,若是隻根據eventTime決定window的運行,咱們不能明確數據是否所有到位,但又不能無限期的等下去,此時必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。
Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。
數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,所以,window的執行也是由Watermark觸發的。
Watermark能夠理解成一個延遲觸發機制,咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime - t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime – t,那麼這個窗口被觸發執行。
streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。
Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的」buckets」桶,咱們能夠在這些桶上作計算操做。
Window能夠分紅兩類:
CountWindow:按照指定的數據條數生成一個Window,與時間無關。
TimeWindow:按照時間生成Window。
,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
將數據依據固定的窗口長度對數據進行切片。
特色:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。例如:若是你指定了一個5分鐘大小的滾動窗口,窗口的建立以下圖所示:
適用場景:適合作BI統計等(作每一個時間段的聚合計算)。
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.api.scala._ object StreamEventTimeApp { def main(args: Array[String]): Unit = { //環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //聲明使用eventTime;引入EventTime 從調用時刻開始給env建立的每個stream追加時間特徵 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777) val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } // 1 告知 flink如何獲取數據中的event時間戳 2 告知延遲的watermark val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了 override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //每5秒開一個窗口 統計key的個數 5秒是一個數據的時間戳爲準 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0) textKeyStream.print("textKey: ") //滾動窗口 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000))) windowStream.sum(2).print("windows: ").setParallelism(1) env.execute() } [kris@hadoop101 gmall]$ nc -lk 7777 abc 1000 abc 3000 abc 4000 abc 5000 abc 6000 abc 7999 abc 8000 abc 9999 abc 10000 abc 12000 abc 13000 textKey: :8> (abc,1000,1) textKey: :8> (abc,3000,1) textKey: :8> (abc,4000,1) textKey: :8> (abc,5000,1) textKey: :8> (abc,6000,1) textKey: :8> (abc,7999,1) windows: > (abc,1000,3) textKey: :8> (abc,8000,1) textKey: :8> (abc,9999,1) textKey: :8> (abc,10000,1) textKey: :8> (abc,12000,1) textKey: :8> (abc,13000,1) windows: > (abc,5000,5)
滾動窗口:
X秒開一個窗口
watermark 3s
發車時間看:X+3,車上攜帶的[X, nX)秒內的
如5s發一次車:[0, 5),發車時間8s、[5, 10)發車時間13s、[10, 15)發車時間18s
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特色:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那麼每一個窗口中5分鐘的窗口裏包含着上個10分鐘產生的數據,以下圖所示:
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)
object StreamEventTimeApp { def main(args: Array[String]): Unit = { //環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //聲明使用eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777) val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } // 1 告知 flink如何獲取數據中的event時間戳 2 告知延遲的watermark val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了 override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //每5秒開一個窗口 統計key的個數 5秒是一個數據的時間戳爲準 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0) textKeyStream.print("textKey: ") //滾動窗口 //val windowDStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000))) //滑動窗口 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L))) windowStream.sum(2).print("windows: ").setParallelism(1) env.execute() } } [kris@hadoop101 gmall]$ nc -lk 7777 aaa 100 aaa 500 aaa 1000 aaa 3000 aaa 3999 abc 100 abc 1000 abc 3998 abc 3999 abc 5000 abc 8000 abc 10000 textKey: :8> (abc,100,1) textKey: :8> (abc,1000,1) textKey: :8> (abc,3998,1) textKey: :8> (abc,3999,1) //窗口大小0-4999;前面這些都是在4999窗口如下的範圍內,可是開車的時機是在步長+watermark=4000,但開車的時候只有100這一個在裏邊;步長爲1 windows: > (abc,100,1) textKey: :8> (abc,5000,1) //開車取決於時間間隔步長1s, 每隔1s發一次;第二次發車是在2s的時候,延遲3s,即5s的時候發車,但這個時候車裏就只有100和1000兩個; windows: > (abc,100,2) textKey: :8> (abc,8000,1) //一車接5s的人;8000--5000--4000--3000--(這個時候它倆已經開車走了,不要了)-2000-1000 windows: > (abc,100,2) //3000那輛車 windows: > (abc,100,4) //走的是4000那輛車--100、1000、399八、3999 windows: > (abc,100,4)//5000,走的仍是100、1000、399八、3999這四個,5000應該是在下一個窗口大小的範圍; textKey: :8> (abc,10000,1) //10000-3000 ==> 7000s,到5000是走完第一個窗口大小, 6000走一輛(5999--1000);7000(發車的6999-2000) windows: > (abc,1000,4) //6000: 1000/3998/3999/5000/ windows: > (abc,3998,3) //7000: 3998/3999/5000
由一系列事件組合一個指定時間長度的timeout間隙組成,相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特色:時間無對齊。
session窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的時間週期內再也不收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。
object StreamEventTimeApp { def main(args: Array[String]): Unit = { //環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //聲明使用eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777) val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } // 1 告知 flink如何獲取數據中的event時間戳 2 告知延遲的watermark val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了 override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //每5秒開一個窗口 統計key的個數 5秒是一個數據的時間戳爲準 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0) textKeyStream.print("textKey: ") //滾動窗口 //val windowDStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000))) //滑動窗口 //val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L))) //會話窗口 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(5000L))) windowStream.sum(2).print("windows: ").setParallelism(1) env.execute() } } 只能兩次時間的間隔是否知足條件 在觸發水位5s的基礎上再加延遲3s, [kris@hadoop101 gmall]$ nc -lk 7777 abc 1000 abc 7000 abc 10000 =======>>> textKey: :8> (abc,1000,1) textKey: :8> (abc,7000,1) textKey: :8> (abc,10000,1) //在上一個基礎上+延遲時間3s纔會開車 windows: > (abc,1000,1) [kris@hadoop101 gmall]$ nc -lk 7777 aaa 1000 aaa 2000 aaa 7001 aaa 9000 aaa 10000 =====>> textKey: :5> (aaa,1000,1) textKey: :5> (aaa,2000,1) textKey: :5> (aaa,7001,1) //兩個時間點之間相差達到鴻溝5s了,在這個基礎之上再加3s才能開車; textKey: :5> (aaa,9000,1) textKey: :5> (aaa,10000,1) windows: > (aaa,1000,2)
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的全部元素的總數。
滾動窗口
默認的CountWindow是一個滾動窗口,只須要指定窗口大小便可,當元素數量達到窗口大小時,就會觸發窗口的執行。
滑動窗口
滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置爲了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是5個元素。