Fink

Fink 

         

 

Flink主頁在其頂部展現了該項目的理念:「Apache Flink是爲分佈式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架」。java

Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在全部常見的集羣環境中運行,以內存執行速度和任意規模來執行計算。mysql

                               

 

1.  Flink的重要特色 

① 事件驅動型(Event-driven)linux

事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取數據,並根據到來的事件觸發計算、狀態更新或其餘外部動做。比較典型的就是以kafka爲表明的消息隊列幾乎都是事件驅動型應用。   事件驅動型以下圖:web

                                                     

與之不一樣的就是SparkStreaming微批次如圖:redis

                                                   

 

② 分層APIsql

                                                               

  最底層級的抽象僅僅提供了有狀態流,它將經過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其能夠對某些特定的操做進行底層的抽象,它容許用戶能夠自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此以外,用戶能夠註冊事件時間並處理時間回調,從而使程序能夠處理複雜的計算。數據庫

大多數應用並不須要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,好比DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API爲數據處理提供了通用的構建模塊,好比由用戶定義的多種形式的轉換(transformations),鏈接(joins),聚合(aggregations),窗口操做(windows)等等。apache

DataSet API 爲有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。編程

Table API 是以表爲中心的聲明式編程,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(相似於關係數據庫中的表),同時API提供可比較的操做,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什麼邏輯操做應該執行,而不是準確地肯定這些操做代碼的看上去如何 。 儘管Table API能夠經過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,可是使用起來卻更加簡潔(代碼量更少)。除此以外,Table API程序在執行以前會通過內置優化器進行優化。json

你能夠在表與 DataStream/DataSet 之間無縫切換,以容許程序將 Table API 與 DataStream 以及 DataSet 混合使用

Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢能夠直接在Table API定義的表上執行。

③ 流與批的世界觀

       批處理的特色有界、持久、大量,很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計。

  流處理的特色無界、實時,  無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計。

   在spark的世界觀中,一切都是由次組成的,離線數據是一個大批次,而實時數據是由一個一個無限的小批次組成的。

   而在flink的世界觀中,一切都是由組成的,離線數據是有界限的流,實時數據是一個沒有界限的流,這就是所謂的有界流和無界流。

無界數據流無界數據流有一個開始可是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取後當即處理event。對於無界數據流咱們沒法等待全部數據都到達,由於輸入是無界的,而且在任什麼時候間點都不會完成。處理無界數據一般要求以特定順序(例如事件發生的順序)獲取event,以便可以推斷結果完整性。

有界數據流有界數據流有明肯定義的開始和結束,能夠在執行任何計算以前經過獲取全部數據來處理有界流,處理有界流不須要有序獲取,由於能夠始終對有界數據集進行排序,有界流的處理也稱爲批處理。 

    這種以流爲世界觀的架構,得到的最大好處就是具備極低的延遲。

2. Flink 批處理Api

  2.1 Source

Flink+kafka是如何實現exactly-once語義的

  Flink經過checkpoint來保存數據是否處理完成的狀態;

  有JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也能夠改成文件級的進行持久化保存。

執行過程其實是一個兩段式提交,每一個算子執行完成,會進行「預提交」,直到執行完sink操做,會發起「確認提交」,若是執行失敗,預提交會放棄掉。

若是宕機須要經過StateBackend進行恢復,只能恢復全部確認提交的操做。

                                                            

 Spark中要想實現有狀態的,須要使用updateBykey或者藉助redis;

而Fink是把它記錄在State Bachend,只要是通過keyBy等處理以後結果會記錄在State Bachend(已處理未提交; 若是是處理完了就是已提交狀態;),

它還會記錄另一種狀態值:keyState,好比keyBy累積的結果;

StateBachend若是不想存儲在內存中,也能夠存儲在fs文件中或者HDFS中; IDEA的工具只支持memory內存式存儲,一旦重啓就沒了;部署到linux中就支持存儲在文件中了;

Kakfa的自動提交:「enable.auto.commit」,好比從kafka出來後到sparkStreaming以後,一進來consumer會幫你自動提交,若是在處理過程當中,到最後有一個沒有寫出去(好比寫到redis、ES),雖然處理失敗了但kafka的偏移量已經發生改變;因此移偏移量的時機很重要;

Transform  轉換算子

map

object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

    //需求一 相同渠道的值進行累加
    val sumDStream: DataStream[(String, Int)] = startupLogDStream.map { startuplog => (startuplog.ch, 1) }.keyBy(0)
    .reduce { (startuplogCount1, startuplogCount2) => val newCount: Int = startuplogCount1._2 + startuplogCount2._2 (startuplogCount1._1, newCount) } //val sumDStream: DataStream[(String, Int)] = startupLogDStream.map{startuplog => (startuplog.ch,1)}.keyBy(0).sum(1) //sumDStream.print() env.execute() } }

  flatMap  

  Filter

 KeyBy

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的。

 Reduce

KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

Split 和 Select

Split

DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。

select

SplitStream→DataStream從一個SplitStream中獲取一個或者多個DataStream。

    //需求二   把 appstore   和其餘的渠道的數據 分紅兩個流
    val splitableStream: SplitStream[StartupLog] = startupLogDStream.split { startuplog =>
      var flagList: List[String] = List()
      if (startuplog.ch.equals("appstore")) {
        flagList = List("apple")
      } else {
        flagList = List("other")
      }
      flagList
    }
    val appleStream: DataStream[StartupLog] = splitableStream.select("apple")
    //appleStream.print("this is apple").setParallelism(1)
    val otherdStream: DataStream[StartupLog] = splitableStream.select("other")
    //otherdStream.print("this is other").setParallelism(1)

 Connect和 CoMap

DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

ConnectedStreams → DataStream:做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。

 //需求三 把上面兩個流合併爲一個
    val connStream: ConnectedStreams[StartupLog, StartupLog] = appleStream.connect(otherdStream)
    val allDataStream: DataStream[String] = connStream.map((startuplog1: StartupLog) => startuplog1.ch, (startuplog2: StartupLog) => startuplog2.ch)
    allDataStream.print("all").setParallelism(1)

Union

                                             

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。

    //合併流union
    val unionDStream: DataStream[StartupLog] = appleStream.union(otherdStream)
    unionDStream.print("union").setParallelism(1)

Connect與 Union 區別:

1 、 Union以前兩個流的類型必須是同樣,Connect能夠不同,在以後的coMap中再去調整成爲同樣的。

2    Connect只能操做兩個流,Union能夠操做多個

 

 

 2.2 Sink

Flink沒有相似於spark中foreach方法,讓用戶進行迭代的操做。雖有對外的輸出操做都要利用Sink完成。最後經過相似以下方式完成整個任務最終輸出操做。

   myDstream.addSink(new MySink(xxxx)) 

官方提供了一部分的框架的sink。除此之外,須要用戶自定義實現sink。  

                       

 

 Kafka

object MyKafkaUtil {
  val prop = new Properties()
  prop.setProperty("bootstrap.servers","hadoop101:9092")
  prop.setProperty("group.id","gmall")

  def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
    val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
    myKafkaConsumer
  }

  def getProducer(topic:String):FlinkKafkaProducer011[String]={
    new FlinkKafkaProducer011[String]("hadoop101:9092",topic,new SimpleStringSchema())
  }

}


//sink到kafka
    unionDStream.map(_.toString).addSink(MyKafkaUtil.getProducer("gmall_union"))
    ///opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic gmall_union

Redis

import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object MyRedisUtil {
  private val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).build()

  def getRedisSink(): RedisSink[(String, String)] = {
    new RedisSink[(String, String)](config, new MyRedisMapper)
  }

}
class MyRedisMapper extends RedisMapper[(String, String)]{
  //用何種命令進行保存
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "channel_sum") //hset類型, apple, 111
  }
  //流中的元素哪部分是value
  override def getKeyFromData(channel_sum: (String, String)): String = channel_sum._2
  //流中的元素哪部分是key
  override def getValueFromData(channel_sum: (String, String)): String = channel_sum._1

}


object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

//sink到redis
    //把按渠道的統計值保存到redis中  hash   key: channel_sum  field ch  value: count
    //按照不一樣渠道進行累加
    val chCountDStream: DataStream[(String, Int)] = startupLogDStream.map(startuplog => (startuplog.ch, 1)).keyBy(0).sum(1)
    //把上述結果String, Int轉換成String, String類型
    val channelDStream: DataStream[(String, String)] = chCountDStream.map(chCount => (chCount._1, chCount._2.toString))
    channelDStream.addSink(MyRedisUtil.getRedisSink())

ES

object MyEsUtil {
  val hostList: util.List[HttpHost] = new util.ArrayList[HttpHost]()
  hostList.add(new HttpHost("hadoop101", 9200, "http"))
  hostList.add(new HttpHost("hadoop102", 9200, "http"))
  hostList.add(new HttpHost("hadoop103", 9200, "http"))

  def getEsSink(indexName: String): ElasticsearchSink[String] = {
    //new接口---> 要實現一個方法
    val esSinkFunc: ElasticsearchSinkFunction[String] = new ElasticsearchSinkFunction[String] {
      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        val jSONObject: JSONObject = JSON.parseObject(element)
        val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jSONObject)
        indexer.add(indexRequest)
      }
    }
    val esSinkBuilder = new ElasticsearchSink.Builder[String](hostList, esSinkFunc)
    esSinkBuilder.setBulkFlushMaxActions(10)
    val esSink: ElasticsearchSink[String] = esSinkBuilder.build()
    esSink
  }
}



object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
//sink之三  保存到ES
    val esSink: ElasticsearchSink[String] = MyEsUtil.getEsSink("gmall_startup")
    dstream.addSink(esSink) //dstream來自kafka的數據源
GET gmall_startup/_search

Mysql

class MyjdbcSink(sql: String) extends RichSinkFunction[Array[Any]] {
  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://hadoop101:3306/gmall?useSSL=false"
  val username = "root"
  val password = "123456"
  val maxActive = "20"
  var connection: Connection = null

  // 建立鏈接
  override def open(parameters: Configuration) {
    val properties = new Properties()
    properties.put("driverClassName",driver)
    properties.put("url",url)
    properties.put("username",username)
    properties.put("password",password)
    properties.put("maxActive",maxActive)

    val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)
    connection = dataSource.getConnection()
  }

  // 把每一個Array[Any] 做爲數據庫表的一行記錄進行保存

  override def invoke(values: Array[Any]): Unit = {
    val ps: PreparedStatement = connection.prepareStatement(sql)
    for (i <- 0 to values.length-1) {
      ps.setObject(i+1, values(i))
    }
    ps.executeUpdate()
  }


  override def close(): Unit = {
    if (connection != null){
      connection.close()
    }
  }
}


object StartupApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
    val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

    //dstream.print().setParallelism(1) 測試從kafka中得到數據是否打通到了flink中

    //將json轉換成json對象
    val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString =>
      JSON.parseObject(jsonString, classOf[StartupLog])
    }

   //sink之四 保存到Mysql中
    startupLogDStream.map(startuplog => Array(startuplog.mid, startuplog.uid, startuplog.ch, startuplog.area,startuplog.ts))
      .addSink(new MyjdbcSink("insert into fink_startup values(?,?,?,?,?)"))

    env.execute()

  }
}

 Time與Window

在Flink的流式處理中,會涉及到時間的不一樣概念,以下圖所示

                             

Event Time:是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。

Ingestion Time:是數據進入Flink的時間。

Processing Time:是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。

例如,一條日誌進入Flink的時間爲2017-11-12 10:00:00.123,到達Window的系統時間爲2017-11-12 10:00:01.234,日誌的內容以下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?—— eventTime,由於咱們要根據日誌的生成時間進行統計。

 

 Watermark

  咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡、分佈式等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的Event Time順序排列的

  那麼此時出現一個問題,一旦出現亂序,若是隻根據eventTime決定window的運行,咱們不能明確數據是否所有到位,但又不能無限期的等下去,此時必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。

  Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。

  Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現

數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,所以,window的執行也是由Watermark觸發的

Watermark能夠理解成一個延遲觸發機制,咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime - t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime – t,那麼這個窗口被觸發執行

                         

 

Window概述

streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。

Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的」buckets」桶,咱們能夠在這些桶上作計算操做。

Window能夠分紅兩類:

   CountWindow:按照指定的數據條數生成一個Window,與時間無關。

   TimeWindow按照時間生成Window。

TimeWindow

,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。

  • 滾動窗口(Tumbling Windows)

    將數據依據固定的窗口長度對數據進行切片

    特色時間對齊,窗口長度固定,沒有重疊

    滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。例如:若是你指定了一個5分鐘大小的滾動窗口,窗口的建立以下圖所示:

                    

適用場景:適合作BI統計等(作每一個時間段的聚合計算)。

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._
object StreamEventTimeApp {
  def main(args: Array[String]): Unit = {
    //環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //聲明使用eventTime;引入EventTime 從調用時刻開始給env建立的每個stream追加時間特徵
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777)

    val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text =>
      val arr: Array[String] = text.split(" ")
      (arr(0), arr(1).toLong, 1)
    }
    // 1 告知 flink如何獲取數據中的event時間戳  2 告知延遲的watermark
    val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了
        override def extractTimestamp(element: (String, Long, Int)): Long = {
          return element._2
        }
      }).setParallelism(1)
    //每5秒開一個窗口 統計key的個數  5秒是一個數據的時間戳爲準
    val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0)
    textKeyStream.print("textKey: ")
    //滾動窗口
    val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000)))

    windowStream.sum(2).print("windows: ").setParallelism(1)

    env.execute()
  }
[kris@hadoop101 gmall]$ nc -lk 7777
abc 1000
abc 3000
abc 4000
abc 5000
abc 6000
abc 7999
abc 8000
abc 9999
abc 10000
abc 12000
abc 13000

textKey: :8> (abc,1000,1)
textKey: :8> (abc,3000,1)
textKey: :8> (abc,4000,1)
textKey: :8> (abc,5000,1)
textKey: :8> (abc,6000,1)
textKey: :8> (abc,7999,1)
windows: > (abc,1000,3)
textKey: :8> (abc,8000,1)
textKey: :8> (abc,9999,1)
textKey: :8> (abc,10000,1)
textKey: :8> (abc,12000,1)
textKey: :8> (abc,13000,1)
windows: > (abc,5000,5)

  滾動窗口:
  X秒開一個窗口
  watermark 3s

  發車時間看:X+3,車上攜帶的[X, nX)秒內的
    如5s發一次車:[0, 5),發車時間8s、[5, 10)發車時間13s、[10, 15)發車時間18s

  • 滑動窗口(Sliding Windows)

    滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成

    特色時間對齊,窗口長度固定,有重疊

    滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。

例如,你有10分鐘的窗口和5分鐘的滑動,那麼每一個窗口中5分鐘的窗口裏包含着上個10分鐘產生的數據,以下圖所示:

                 

 適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)

 

 

object StreamEventTimeApp {
  def main(args: Array[String]): Unit = {
    //環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //聲明使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777)

    val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text =>
      val arr: Array[String] = text.split(" ")
      (arr(0), arr(1).toLong, 1)
    }
    // 1 告知 flink如何獲取數據中的event時間戳  2 告知延遲的watermark
    val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了
        override def extractTimestamp(element: (String, Long, Int)): Long = {
          return element._2
        }
      }).setParallelism(1)
    //每5秒開一個窗口 統計key的個數  5秒是一個數據的時間戳爲準
    val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0)
    textKeyStream.print("textKey: ")
    //滾動窗口
    //val windowDStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000)))

    //滑動窗口
    val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L)))
    windowStream.sum(2).print("windows: ").setParallelism(1)

    env.execute()
  }
}
[kris@hadoop101 gmall]$ nc -lk 7777
aaa 100
aaa 500
aaa 1000
aaa 3000
aaa 3999
abc 100
abc 1000
abc 3998
abc 3999 abc 5000
abc 8000 abc 10000

textKey: :8> (abc,100,1)
textKey: :8> (abc,1000,1)
textKey: :8> (abc,3998,1)
textKey: :8> (abc,3999,1) //窗口大小0-4999;前面這些都是在4999窗口如下的範圍內,可是開車的時機是在步長+watermark=4000,但開車的時候只有100這一個在裏邊;步長爲1
windows: > (abc,100,1)
textKey: :8> (abc,5000,1) //開車取決於時間間隔步長1s, 每隔1s發一次;第二次發車是在2s的時候,延遲3s,即5s的時候發車,但這個時候車裏就只有100和1000兩個;
windows: > (abc,100,2)
textKey: :8> (abc,8000,1) //一車接5s的人;8000--5000--4000--3000--(這個時候它倆已經開車走了,不要了)-2000-1000
windows: > (abc,100,2) //3000那輛車
windows: > (abc,100,4) //走的是4000那輛車--100、1000、399八、3999
windows: > (abc,100,4)//5000,走的仍是100、1000、399八、3999這四個,5000應該是在下一個窗口大小的範圍;
textKey: :8> (abc,10000,1) //10000-3000 ==> 7000s,到5000是走完第一個窗口大小, 6000走一輛(5999--1000);7000(發車的6999-2000)
windows: > (abc,1000,4) //6000: 1000/3998/3999/5000/
windows: > (abc,3998,3) //7000: 3998/3999/5000

 

  • 會話窗口(Session Windows)

    由一系列事件組合一個指定時間長度的timeout間隙組成,相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口

    特色時間無對齊

    session窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的時間週期內再也不收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。

            

 

object StreamEventTimeApp {
  def main(args: Array[String]): Unit = {
    //環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //聲明使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777)

    val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text =>
      val arr: Array[String] = text.split(" ")
      (arr(0), arr(1).toLong, 1)
    }
    // 1 告知 flink如何獲取數據中的event時間戳  2 告知延遲的watermark
    val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了
        override def extractTimestamp(element: (String, Long, Int)): Long = {
          return element._2
        }
      }).setParallelism(1)
    //每5秒開一個窗口 統計key的個數  5秒是一個數據的時間戳爲準
    val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0)
    textKeyStream.print("textKey: ")
    //滾動窗口
    //val windowDStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
    //滑動窗口
    //val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L)))

   //會話窗口
    val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(5000L)))
    windowStream.sum(2).print("windows: ").setParallelism(1)
    env.execute()
  }
}

只能兩次時間的間隔是否知足條件
在觸發水位5s的基礎上再加延遲3s,
[kris@hadoop101 gmall]$ nc -lk 7777
abc 1000
abc 7000
abc 10000
=======>>>
textKey: :8> (abc,1000,1)
textKey: :8> (abc,7000,1)
textKey: :8> (abc,10000,1) //在上一個基礎上+延遲時間3s纔會開車
windows: > (abc,1000,1)


[kris@hadoop101 gmall]$ nc -lk 7777
aaa 1000
aaa 2000
aaa 7001
aaa 9000
aaa 10000
=====>>
textKey: :5> (aaa,1000,1)
textKey: :5> (aaa,2000,1)
textKey: :5> (aaa,7001,1) //兩個時間點之間相差達到鴻溝5s了,在這個基礎之上再加3s才能開車;
textKey: :5> (aaa,9000,1)
textKey: :5> (aaa,10000,1)
windows: > (aaa,1000,2)

 

CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果

注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的全部元素的總數

  滾動窗口

默認的CountWindow是一個滾動窗口,只須要指定窗口大小便可,當元素數量達到窗口大小時,就會觸發窗口的執行。

   滑動窗口

滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。

下面代碼中的sliding_size設置爲了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是5個元素。

相關文章
相關標籤/搜索