Flink 原理與實現:數據流上的類型和操做

Flink 爲流處理和批處理分別提供了 DataStream API 和 DataSet API。正是這種高層的抽象和 flunent API 極大地便利了用戶編寫大數據應用。不過不少初學者在看到官方 Streaming 文檔中那一大坨的轉換時,經常會蒙了圈,文檔中那些隻言片語也很難講清它們之間的關係。因此本文將介紹幾種關鍵的數據流類型,它們之間是如何經過轉換關聯起來的。下圖展現了 Flink 中目前支持的主要幾種流的類型,以及它們之間的轉換關係。數據庫

DataStream

DataStream 是 Flink 流處理 API 中最核心的數據結構。它表明了一個運行在多個分區上的並行流。一個 DataStream 能夠從 StreamExecutionEnvironment 經過env.addSource(SourceFunction) 得到。緩存

DataStream 上的轉換操做都是逐條的,好比 map()flatMap()filter()。DataStream 也能夠執行 rebalance(再平衡,用來減輕數據傾斜)和 broadcaseted(廣播)等分區轉換。數據結構

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))
val str1: DataStream[(String, MyType)] = stream.flatMap { ... }
val str2: DataStream[(String, MyType)] = stream.rebalance()
val str3: DataStream[AnotherType] = stream.map { ... }

上述 DataStream 上的轉換在運行時會轉換成以下的執行圖:app

如上圖的執行圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例(S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() 和 map() 之間因爲加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3個map()實例)。這與 Apache Kafka 是很相似的,把流想象成 Kafka Topic,而一個流分區就表示一個 Topic Partition,流的目標並行算子實例就是 Kafka Consumers。ide

KeyedStream

KeyedStream用來表示根據指定的key進行分組的數據流。一個KeyedStream能夠經過調用DataStream.keyBy()來得到。而在KeyedStream上進行任何transformation都將轉變回DataStream。在實現中,KeyedStream是把key的信息寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函數能夠方便地操做和保存對應key的狀態。函數

WindowedStream & AllWindowedStream

WindowedStream表明了根據key分組,而且基於WindowAssigner切分窗口的數據流。因此WindowedStream都是從KeyedStream衍生而來的。而在WindowedStream上進行任何transformation也都將轉變回DataStream學習

val stream: DataStream[MyType] = ...
val windowed: WindowedDataStream[MyType] = stream
        .keyBy("userId")
        .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
val result: DataStream[ResultType] = windowed.reduce(myReducer)

上述 WindowedStream 的樣例代碼在運行時會轉換成以下的執行圖:大數據

Flink 的窗口實現中會將到達的數據緩存在對應的窗口buffer中(一個數據可能會對應多個窗口)。當到達窗口發送的條件時(由Trigger控制),Flink 會對整個窗口中的數據進行處理。Flink 在聚合類窗口有必定的優化,即不會保存窗口中的全部值,而是每到一個元素執行一次聚合函數,最終只保存一份數據便可。優化

在key分組的流上進行窗口切分是比較經常使用的場景,也可以很好地並行化(不一樣的key上的窗口聚合能夠分配到不一樣的task去處理)。不過有時候咱們也須要在普通流上進行窗口的操做,這就是 AllWindowedStreamAllWindowedStream是直接在DataStream上進行windowAll(...)操做。AllWindowedStream 的實現是基於 WindowedStream 的(Flink 1.1.x 開始)。Flink 不推薦使用AllWindowedStream,由於在普通流上進行窗口操做,就勢必須要將全部分區的流都聚集到單個的Task中,而這個單個的Task很顯然就會成爲整個Job的瓶頸。spa

JoinedStreams & CoGroupedStreams

雙流 Join 也是一個很是常見的應用場景。深刻源碼你能夠發現,JoinedStreams 和 CoGroupedStreams 的代碼實現有80%是如出一轍的,JoinedStreams 在底層又調用了 CoGroupedStreams 來實現 Join 功能。除了名字不同,一開始很難將它們區分開來,並且爲何要提供兩個功能相似的接口呢??

實際上這二者仍是很點區別的。首先 co-group 側重的是group,是對同一個key上的兩組集合進行操做,而 join 側重的是pair,是對同一個key上的每對元素進行操做。co-group 比 join 更通用一些,由於 join 只是 co-group 的一個特例,因此 join 是能夠基於 co-group 來實現的(固然有優化的空間)。而在 co-group 以外又提供了 join 接口是由於用戶更熟悉 join(源於數據庫吧),並且可以跟 DataSet API 保持一致,下降用戶的學習成本。

JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,因此 CoGroupedStreams 最終又調用了 WindowedStream 來實現。

val firstInput: DataStream[MyType] = ...
val secondInput: DataStream[AnotherType] = ...
 
val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)
    .where("userId").equalTo("id")
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

上述 JoinedStreams 的樣例代碼在運行時會轉換成以下的執行圖:

雙流上的數據在同一個key的會被分別分配到同一個window窗口的左右兩個籃子裏,當window結束的時候,會對左右籃子進行笛卡爾積從而獲得每一對pair,對每一對pair應用 JoinFunction。不過目前(Flink 1.1.x)JoinedStreams 只是簡單地實現了流上的join操做而已,距離真正的生產使用仍是有些距離。由於目前 join 窗口的雙流數據都是被緩存在內存中的,也就是說若是某個key上的窗口數據太多就會致使 JVM OOM(然而數據傾斜是常態)。雙流join的難點也正是在這裏,這也是社區後面對 join 操做的優化方向,例如能夠借鑑Flink在批處理join中的優化方案,也能夠用ManagedMemory來管理窗口中的數據,並當數據超過閾值時能spill到硬盤。

ConnectedStreams

在 DataStream 上有一個 union 的轉換 dataStream.union(otherStream1, otherStream2, ...),用來合併多個流,新的流會包含全部流中的數據。union 有一個限制,就是全部合併的流的類型必須是一致的。ConnectedStreams 提供了和 union 相似的功能,用來鏈接兩個流,可是與 union 轉換有如下幾個區別:

  1. ConnectedStreams 只能鏈接兩個流,而 union 能夠鏈接多於兩個流。
  2. ConnectedStreams 鏈接的兩個流類型能夠不一致,而 union 鏈接的流的類型必須一致。
  3. ConnectedStreams 會對兩個流的數據應用不一樣的處理方法,而且雙流之間能夠共享狀態。這在第一個流的輸入會影響第二個流時, 會很是有用。

以下 ConnectedStreams 的樣例,鏈接 input 和 other 流,並在input流上應用map1方法,在other上應用map2方法,雙流能夠共享狀態(好比計數)。

val input: DataStream[MyType] = ...
val other: DataStream[AnotherType] = ...
 
val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)
 
val result: DataStream[ResultType] = 
        connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {
            override def map1(value: MyType): ResultType = { ... }
            override def map2(value: AnotherType): ResultType = { ... }
        })

當並行度爲2時,其執行圖以下所示:

總結

本文介紹經過不一樣數據流類型的轉換圖來解釋每一種數據流的含義、轉換關係。後面的文章會深刻講解 Window 機制的實現,雙流 Join 的實現等。

相關文章
相關標籤/搜索