實時計算大數據處理的基石-Google Dataflow

​ 此文選自Google大神Tyler Akidau的另外一篇文章:Streaming 102: The world beyond batch算法

​ 歡迎回來!若是您錯過了我之前的帖子,Streaming-大數據的將來,強烈建議您先花時間閱讀那篇文章。apache

簡要回顧一下,上一篇咱們介紹了Streaming,批量與流式計算,正確性與推理時間的工具,數據處理模式,事件事件與處理時間,窗口化。windows

在這篇文章中,我想進一步關注上次的數據處理模式,但更詳細。網絡

​ 這裏會用到一些Google Cloud Dataflow的代碼片斷,這是谷歌的一個框架,相似於Spark Streaming或Stormsession

app

這裏還有再說三個概念:框架

Watermarks:水印是關於事件時間的輸入完整性的概念。若是到某一個時間的水印,應該是已經獲取到了小於該時間的全部數據。在處理無界數據時,水印就做爲處理進度的標準。機器學習

Triggers: 觸發器是一種機制,用於聲明窗口什麼時候應該輸出,觸發器可靈活選擇什麼時候應發出輸出。咱們能夠隨着時間的推移不斷改進結果,也能夠處理那些比水印晚到達的數據,改進結果。分佈式

Accumulation: 累積模式指定在同一窗口中觀察到的多個結果之間的關係。這些結果多是徹底脫節的,即隨着時間的推移表示獨立的增量,或者它們之間可能存在重疊。工具

四個新的問題: what? where? when? How?

計算什麼? 但願經過數據計算的結果,和批處理相似,構建直方圖,計算總和,訓練機器學習等等。

在哪裏計算? 事件時間窗口能夠回答這個問題,好比以前提到的(固定,滑動,會話),固然這個時間也多是處理時間。

何時處理產生結果?經過水印和觸發器來回答。可能有無限的變化,常見的模式是使用水印描述給定窗口的輸入是否完整,觸發器指定早期和後期結果。

結果如何相關? 經過累計模式來回答,丟棄不一樣的,累積產生的結果。

1、Streaming 101 Redux

詳細介紹Streaming 101的一些概念,並提供一些例子。

What:transformations

計算的結果是什麼?熟悉批處理的應該很熟悉這個。

舉一個例子,計算由10個值組成的簡單數據集的整數和。您能夠想象爲求一組人的分數和,或者是計費,監控等場景。

若是您瞭解Spark Streaming或Flink之類的東西,那麼您應該相對容易地瞭解Dataflow代碼正在作什麼。

Dataflow Java SDK 模型:

  • PCollections,表示能夠執行並行轉換的數據集(多是大量的數據集)。

  • PTransforms,將PCollections建立成新的PCollections。PTransforms能夠執行逐元素變換,它們能夠將多個元素聚合在一塊兒,或者它們能夠是多個PTransforms的組合。

file

圖二 轉換類型

咱們從IO源中獲取消息,以KV的形式轉換,最後求出分數和。示例代碼以下:

PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Sum.integersPerKey());

這個過程能夠是在多個機器分佈式執行的,分佈的將不一樣時間狀況的數據進行累加,輸出獲得最終的結果,咱們不用關心分佈式的問題,只要把全部的結果集轉換累加便可。

file 圖三 x爲事件時間 y爲處理時間

這裏咱們計算的是全部事件時間,沒有進行窗口轉換,所以輸出矩形覆蓋整個X軸,可是咱們處理無界數據時,這就不夠了,咱們不能等到結束了再處理,由於永遠不會結束。全部咱們須要考慮在哪裏計算呢?這就須要窗口。

Where:windowing

還記得咱們以前提過的三種窗口,固定,滑動,會話。

file

圖四 三種窗口

咱們用剛纔的例子,將其固定爲兩分鐘的窗口。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

Dataflow提供了一個統一的模型,能夠在批處理和流式處理中同時工做,由於批處理實際上只是流的一個子集。

file

圖五 窗口處理

和之前同樣,輸入的數據在累積,直到它們被徹底處理,而後產生輸出。在這種狀況下,咱們獲得四個輸出而不是一個輸出:四個基於這個兩分鐘事件時間窗口中的單個輸出。

如今咱們能夠經過更具體的水印,觸發器和累計來解決更多的問題了。

2、Streaming 102

剛纔的處理仍是通用的批處理方式,延遲很大,但咱們已經成功把每一個窗口的輸入都計算了,咱們目前缺少一種對無限數據處理方法,還要能保證其完整性。

When

Watermarks

水印是何時處理產生結果?其實也就是咱們以前研究事件時間和處理時間的那張圖。

file

上文圖 事件時間 處理時間 水印

這條紅色曲線就是水印,它隨着處理時間的推移不斷的去捕獲事件時間。從概念上講,咱們將其視爲從處理時間到事件時間的映射。水印能夠有兩種類型:

完美水印:這要求咱們對的輸入數據所有了解。也就沒有了後期數據,全部的數據準時到達。

啓發式水印:對於大部分分佈式輸入源,完整的瞭解輸入數據是不可能的,這就須要啓發式水印。啓發式水印經過分區,分區排序等提供儘量準確的估計。因此是有可能錯誤的,這就須要觸發器在後期解決,這個一會會講。

下面是兩個使用了不一樣水印的流處理引擎:

file

圖六 左完美 右啓發

在這兩種狀況下,當水印經過窗口的末端時,窗口被實現。兩次執行之間的主要區別在於右側水印計算中使用的啓發式算法未考慮9的值,這極大地改變了水印的形狀。這些例子突出了水印的兩個缺點:

太慢:若是由於網絡等緣由致使有數據未處理時,只能延遲輸出結果。左圖比較明顯,遲到的9影響了總體的進度,這對於第二個窗口[12:02,12:04]尤其明顯,從窗口中的第一個值開始到咱們看到窗口的任何結果爲止須要將近7分鐘。而啓發式水印要好一點只用了兩分鐘。

太快:當啓發式水印錯誤地提早超過應有的水平時,水印以前的事件時間數據可能會在一段時間後到達,從而產生延遲數據。這就是右邊示例中發生的狀況:在觀察到該窗口的全部輸入數據以前,水印超過了第一個窗口的末尾,致使輸出值不正確,正確的應該是14。這個缺點嚴格來講是啓發式水印的問題, 他們的啓發性意味着他們有時會出錯。所以,若是您關心正確性,單靠它們來肯定什麼時候實現輸出是不夠的。

這時候咱們就須要觸發器。

triggers

觸發器用於聲明窗口什麼時候應該輸出。

觸發的信號包括:水印進度,處理時間進度,計數,數據觸發,重複,邏輯與AND,邏輯或OR,序列。

仍是用上面的例子,咱們增長一個觸發器:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(AtWatermark()))
  .apply(Sum.integersPerKey());

這裏規定了觸發的狀況,咱們能夠考慮水印太快和太慢的狀況。

太慢時,咱們假設任何給定窗口都存在穩定的傳入,咱們能夠週期性的觸發。

太快時,能夠在後期數據到達後去修正結果。若是後期數據不頻繁,並不會影響性能。

最後咱們能夠綜合考慮,協調早期,準時,晚期的狀況:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1))))
  .apply(Sum.integersPerKey());

生成結果以下,這個版本有了明顯的改進:

file

圖七 增長早期晚期

對於[12:02,12:04]窗口太慢的狀況,每分鐘定時更新。延遲時間從七分鐘減小到三分半。

對於[12:00,12:02]窗口太快的狀況,當值9顯示較晚時,咱們當即將其合併到一個值爲14的新的已更正窗格中。

可是這裏有一個問題,窗口要保持多長時間呢?這裏咱們須要垃圾收集機制。

Garbage collection

在[啓發式水印示例中,每一個窗口的持久狀態在示例的整個生命週期,這是必要的,這樣咱們纔可以在他們到達時適當處理遲到的數據。可是,雖然可以保持全部持久狀態直到時間結束是很棒的,但實際上,在處理無限數據源時,保持給定窗口的狀態一般是不切實際的。無限, 咱們最終會耗盡磁盤空間。

所以,任何真實的無序處理系統都須要提供一些方法來限制它正在處理的窗口的生命週期。

咱們能夠定義一個範圍,當超出這個範圍後,咱們就丟棄無用的數據。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .withAllowedLateness(Duration.standardMinutes(1)))
  .apply(Sum.integersPerKey());

一旦水印經過窗口的延遲範圍,該窗口就會關閉,這意味着窗口的全部狀態都將被丟棄。

file

圖八 垃圾收集

這裏的6在容許遲到的範圍內,能夠被收集,而9不在這個範圍,就被丟棄了。

有兩點要注意:

若是您正在使用可得到完美水印的數據源的數據,就不須要處理延遲數據。

即便在使用啓發式水印時,若是是將有限數量聚合,並且能保證一直可控,也不用考慮窗口的壽命問題。

如今時間的問題解決了,下面咱們討論如何累積數據。

How:Accumulation

有三種不一樣的累積模式:

丟棄:當下遊的消費者進行累積計算時,直接相加所要的,就能夠獲得最終結果。

累積:好比將來的能夠覆蓋以前的,一直要保持最新狀態,例如Hbase這種鍵值對的存儲。

累積和撤回:和累積相似,但更復雜。好比從新分組的狀況,可能不僅是覆蓋那麼簡單,須要先刪掉以前的,再加入最新的;還有動態窗口的狀況,新窗口會替換舊窗口,但數據要放在不一樣的位置。

好比上圖中事件時間範圍[12:02,12:04],下表顯示了三種累積模式:

丟棄 累積 累積和收回
窗格1:[7] 7 7 7
第2頁:[3,4] 7 14 14,-7
第3頁:[8] 8 22 22,-14
觀察到最後的價值 8 22 22
總和 22 51 22

**丟棄:**每一個窗格僅包含在該特定窗格期間到達的值。所以,觀察到的最終值並未徹底捕獲總和。可是,若是您要本身對全部獨立窗格求和,那麼您將獲得22的正確答案。

**累積:**每一個窗格結合了特定窗格期間到達的值,加上從先前的窗格中的全部值。所以,正確觀察到的最終值能夠捕獲22的總和。

**累積和撤回:**每一個窗格都包含新的累積模式值以及前一個窗格值的縮進。所以,觀察到的最後一個(非回縮)值以及全部物化窗格的總和(包括撤回)都爲您提供了22的正確答案。這就是撤回如此強大的緣由。

file

圖九 三種累積模式

隨着丟棄,累積,累積和撤回的順序,存儲和計算成本在提升,所以累積模式的選擇要在正確性,延遲和成本中作出選擇。

When*/*Where: Processing-time windows

咱們已經解決了全部四個問題,What,Where,When,How。但咱們都是再事件時間的固定窗口。

因此咱們還要討論一下處理時間中的固定窗口和事件時間中的會話窗口。

先討論處理時間中的固定窗口,處理時間窗口很重要,緣由有兩個:

  • 對於某些用例,例如使用監控(例如,Web服務流量QPS),您但願在觀察到的狀況下分析傳入的數據流,處理時窗口絕對是適當的方法。
  • 對於事件發生的時間很重要的用例(例如,分析用戶行爲趨勢,計費,評分等),處理時間窗口絕對是錯誤的方法,而且可以識別這些狀況是相當重要的。

有兩種方法可用於實現處理時窗口:

**觸發器:**忽略事件時間(即,使用跨越全部事件時間的全局窗口)並使用觸發器在處理時間軸上提供該窗口的快照。

入口時間:將入口時間指定爲數據到達時的事件時間,並使用正常的事件時間窗口。這基本上就像Spark Streaming目前所作的那樣。

處理時間窗口的一個重大缺點是,當輸入的觀察順序發生變化時,窗口的內容會發生變化。爲了以更具體的方式展現,咱們將看看這三個用例:

這裏咱們將兩種事件時間相同而處理時間不一樣的狀況比較。

事件時間窗口

file

圖10 事件時間窗口

四個窗口最終結果依然相同。

經過觸發器處理時間窗口

使用全局事件時間窗口,在處理時間域按期觸發,使用丟棄模式進行

file

圖11 觸發器處理時間窗口

  • 因爲咱們經過事件時間窗格模擬處理時間窗口,所以在處理時間軸中描繪了「窗口」,這意味着它們的寬度是在Y軸而不是X軸上測量的。
  • 因爲處理時間窗口對遇到輸入數據的順序敏感,所以每一個「窗口」的結果對於兩個觀察訂單中的每個都不一樣,即便事件自己在技術上在每一個版本中同時發生。在左邊咱們獲得12,21,18,而在右邊咱們獲得7,36,4。

經過入口時間處理時間窗口

當元素到達時,它們的事件時間須要在入口時被覆蓋。返回使用標準的固定事件時間窗口。因爲入口時間提供了計算完美水印的能力,咱們可使用默認觸發器,在這種狀況下,當水印經過窗口末端時,它會隱式觸發一次。因爲每一個窗口只有一個輸出,所以累積模式可有可無。

file

圖12 入口時間處理時間窗口

  • 與其餘處理時間窗口示例同樣,即便輸入的值和事件時間保持不變,當輸入的順序發生變化時,咱們也會獲得不一樣的結果。
  • 與其餘示例不一樣,窗口在事件時域中再次描繪(所以沿X軸)。儘管如此,它們並非真正的事件時間窗口; 咱們只是簡單地將處理時間映射到事件時間域,刪除每一個輸入的原始記錄,並用新的輸入替換它,而不是表示管道首次觀察數據的時間。
  • 儘管如此,因爲水印,觸發器發射仍然與前一個處理時間示例徹底相同。此外,產生的輸出值與該示例相同,如預測的那樣:左側爲12,21,18,右側爲7,36,4。

若是您關心事件實際發生的時間,您必須使用事件時間窗口,不然您的結果將毫無心義。

Where: session windows

動態的,數據驅動的窗口,稱爲會話。

會話是一種特殊類型的窗口,它捕獲數據中的一段活動,它們在數據分析中特別有用。

  • 會話是數據驅動窗口的一個示例:窗口的位置和大小是輸入數據自己的直接結果,而不是基於某些預約義模式在時間內,如固定窗口和滑動窗口。
  • 會話也是未對齊窗口的示例,即,不是均勻地跨數據應用的窗口,而是僅對數據的特定子集(例如,每一個用戶)。這與固定窗口和滑動窗口等對齊窗口造成對比,後者一般均勻地應用於數據。

file

圖13 會話

咱們來構建一個會話:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

咱們獲得結果以下:

file

圖14 會話窗口

當遇到值爲5的第一個記錄時,它被放置在一個原始會話窗口中。

到達的第二個記錄是7,它一樣被放入它本身的原始會話窗口,由於它不與5的窗口重疊。

同時,水印已通過了第一個窗口的末尾,因此5的值在12:06以前被實現爲準時結果。此後不久,第二個窗口也被實現爲具備值7的推測結果,正如處理時間達到12:06那樣。

咱們接下來觀察一系列記錄,3,4和3,原始會話都重疊。結果,它們所有合併在一塊兒,而且在12:07觸發的早期觸發時,發出值爲10的單個窗口。

當8在此後不久到達時,它與具備值7的原始會話和具備值10的會話重疊。所以全部三個被合併在一塊兒,造成具備值25的新組合會話。

當9到達時,將值爲5的原始會話和值爲25的會話加入到值爲39的單個較大會話中。

這個很是強大的功能,Spark Streaming已經作了實現。

簡單回顧一下,咱們討論了事件時間與處理時間,窗口化,水印,觸發器,累積。探索了What,When,Where,How四個問題。而最終,咱們將平衡正確性,延遲和成本問題,獲得最適合本身的實時流式處理方案。

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

file

相關文章
相關標籤/搜索