此文選自Google大神Tyler Akidau的另外一篇文章:Streaming 102: The world beyond batch算法
歡迎回來!若是您錯過了我之前的帖子,Streaming-大數據的將來,強烈建議您先花時間閱讀那篇文章。apache
簡要回顧一下,上一篇咱們介紹了Streaming,批量與流式計算,正確性與推理時間的工具,數據處理模式,事件事件與處理時間,窗口化。windows
在這篇文章中,我想進一步關注上次的數據處理模式,但更詳細。網絡
這裏會用到一些Google Cloud Dataflow的代碼片斷,這是谷歌的一個框架,相似於Spark Streaming或Stormsession
。app
這裏還有再說三個概念:框架
Watermarks:水印是關於事件時間的輸入完整性的概念。若是到某一個時間的水印,應該是已經獲取到了小於該時間的全部數據。在處理無界數據時,水印就做爲處理進度的標準。機器學習
Triggers: 觸發器是一種機制,用於聲明窗口什麼時候應該輸出,觸發器可靈活選擇什麼時候應發出輸出。咱們能夠隨着時間的推移不斷改進結果,也能夠處理那些比水印晚到達的數據,改進結果。分佈式
Accumulation: 累積模式指定在同一窗口中觀察到的多個結果之間的關係。這些結果多是徹底脫節的,即隨着時間的推移表示獨立的增量,或者它們之間可能存在重疊。工具
四個新的問題: what? where? when? How?
計算什麼? 但願經過數據計算的結果,和批處理相似,構建直方圖,計算總和,訓練機器學習等等。
在哪裏計算? 事件時間窗口能夠回答這個問題,好比以前提到的(固定,滑動,會話),固然這個時間也多是處理時間。
何時處理產生結果?經過水印和觸發器來回答。可能有無限的變化,常見的模式是使用水印描述給定窗口的輸入是否完整,觸發器指定早期和後期結果。
結果如何相關? 經過累計模式來回答,丟棄不一樣的,累積產生的結果。
詳細介紹Streaming 101的一些概念,並提供一些例子。
計算的結果是什麼?熟悉批處理的應該很熟悉這個。
舉一個例子,計算由10個值組成的簡單數據集的整數和。您能夠想象爲求一組人的分數和,或者是計費,監控等場景。
若是您瞭解Spark Streaming或Flink之類的東西,那麼您應該相對容易地瞭解Dataflow代碼正在作什麼。
Dataflow Java SDK 模型:
PCollections,表示能夠執行並行轉換的數據集(多是大量的數據集)。
PTransforms,將PCollections建立成新的PCollections。PTransforms能夠執行逐元素變換,它們能夠將多個元素聚合在一塊兒,或者它們能夠是多個PTransforms的組合。
圖二 轉換類型
咱們從IO源中獲取消息,以KV的形式轉換,最後求出分數和。示例代碼以下:
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey());
這個過程能夠是在多個機器分佈式執行的,分佈的將不一樣時間狀況的數據進行累加,輸出獲得最終的結果,咱們不用關心分佈式的問題,只要把全部的結果集轉換累加便可。
圖三 x爲事件時間 y爲處理時間
這裏咱們計算的是全部事件時間,沒有進行窗口轉換,所以輸出矩形覆蓋整個X軸,可是咱們處理無界數據時,這就不夠了,咱們不能等到結束了再處理,由於永遠不會結束。全部咱們須要考慮在哪裏計算呢?這就須要窗口。
還記得咱們以前提過的三種窗口,固定,滑動,會話。
圖四 三種窗口
咱們用剛纔的例子,將其固定爲兩分鐘的窗口。
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))) .apply(Sum.integersPerKey());
Dataflow提供了一個統一的模型,能夠在批處理和流式處理中同時工做,由於批處理實際上只是流的一個子集。
圖五 窗口處理
和之前同樣,輸入的數據在累積,直到它們被徹底處理,而後產生輸出。在這種狀況下,咱們獲得四個輸出而不是一個輸出:四個基於這個兩分鐘事件時間窗口中的單個輸出。
如今咱們能夠經過更具體的水印,觸發器和累計來解決更多的問題了。
剛纔的處理仍是通用的批處理方式,延遲很大,但咱們已經成功把每一個窗口的輸入都計算了,咱們目前缺少一種對無限數據處理方法,還要能保證其完整性。
水印是何時處理產生結果?其實也就是咱們以前研究事件時間和處理時間的那張圖。
上文圖 事件時間 處理時間 水印
這條紅色曲線就是水印,它隨着處理時間的推移不斷的去捕獲事件時間。從概念上講,咱們將其視爲從處理時間到事件時間的映射。水印能夠有兩種類型:
完美水印:這要求咱們對的輸入數據所有了解。也就沒有了後期數據,全部的數據準時到達。
啓發式水印:對於大部分分佈式輸入源,完整的瞭解輸入數據是不可能的,這就須要啓發式水印。啓發式水印經過分區,分區排序等提供儘量準確的估計。因此是有可能錯誤的,這就須要觸發器在後期解決,這個一會會講。
下面是兩個使用了不一樣水印的流處理引擎:
圖六 左完美 右啓發
在這兩種狀況下,當水印經過窗口的末端時,窗口被實現。兩次執行之間的主要區別在於右側水印計算中使用的啓發式算法未考慮9的值,這極大地改變了水印的形狀。這些例子突出了水印的兩個缺點:
太慢:若是由於網絡等緣由致使有數據未處理時,只能延遲輸出結果。左圖比較明顯,遲到的9影響了總體的進度,這對於第二個窗口[12:02,12:04]尤其明顯,從窗口中的第一個值開始到咱們看到窗口的任何結果爲止須要將近7分鐘。而啓發式水印要好一點只用了兩分鐘。
太快:當啓發式水印錯誤地提早超過應有的水平時,水印以前的事件時間數據可能會在一段時間後到達,從而產生延遲數據。這就是右邊示例中發生的狀況:在觀察到該窗口的全部輸入數據以前,水印超過了第一個窗口的末尾,致使輸出值不正確,正確的應該是14。這個缺點嚴格來講是啓發式水印的問題, 他們的啓發性意味着他們有時會出錯。所以,若是您關心正確性,單靠它們來肯定什麼時候實現輸出是不夠的。
這時候咱們就須要觸發器。
觸發器用於聲明窗口什麼時候應該輸出。
觸發的信號包括:水印進度,處理時間進度,計數,數據觸發,重複,邏輯與AND,邏輯或OR,序列。
仍是用上面的例子,咱們增長一個觸發器:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark())) .apply(Sum.integersPerKey());
這裏規定了觸發的狀況,咱們能夠考慮水印太快和太慢的狀況。
太慢時,咱們假設任何給定窗口都存在穩定的傳入,咱們能夠週期性的觸發。
太快時,能夠在後期數據到達後去修正結果。若是後期數據不頻繁,並不會影響性能。
最後咱們能夠綜合考慮,協調早期,準時,晚期的狀況:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1)))) .apply(Sum.integersPerKey());
生成結果以下,這個版本有了明顯的改進:
圖七 增長早期晚期
對於[12:02,12:04]窗口太慢的狀況,每分鐘定時更新。延遲時間從七分鐘減小到三分半。
對於[12:00,12:02]窗口太快的狀況,當值9顯示較晚時,咱們當即將其合併到一個值爲14的新的已更正窗格中。
可是這裏有一個問題,窗口要保持多長時間呢?這裏咱們須要垃圾收集機制。
在[啓發式水印示例中,每一個窗口的持久狀態在示例的整個生命週期,這是必要的,這樣咱們纔可以在他們到達時適當處理遲到的數據。可是,雖然可以保持全部持久狀態直到時間結束是很棒的,但實際上,在處理無限數據源時,保持給定窗口的狀態一般是不切實際的。無限, 咱們最終會耗盡磁盤空間。
所以,任何真實的無序處理系統都須要提供一些方法來限制它正在處理的窗口的生命週期。
咱們能夠定義一個範圍,當超出這個範圍後,咱們就丟棄無用的數據。
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .withAllowedLateness(Duration.standardMinutes(1))) .apply(Sum.integersPerKey());
一旦水印經過窗口的延遲範圍,該窗口就會關閉,這意味着窗口的全部狀態都將被丟棄。
圖八 垃圾收集
這裏的6在容許遲到的範圍內,能夠被收集,而9不在這個範圍,就被丟棄了。
有兩點要注意:
若是您正在使用可得到完美水印的數據源的數據,就不須要處理延遲數據。
即便在使用啓發式水印時,若是是將有限數量聚合,並且能保證一直可控,也不用考慮窗口的壽命問題。
如今時間的問題解決了,下面咱們討論如何累積數據。
有三種不一樣的累積模式:
丟棄:當下遊的消費者進行累積計算時,直接相加所要的,就能夠獲得最終結果。
累積:好比將來的能夠覆蓋以前的,一直要保持最新狀態,例如Hbase這種鍵值對的存儲。
累積和撤回:和累積相似,但更復雜。好比從新分組的狀況,可能不僅是覆蓋那麼簡單,須要先刪掉以前的,再加入最新的;還有動態窗口的狀況,新窗口會替換舊窗口,但數據要放在不一樣的位置。
好比上圖中事件時間範圍[12:02,12:04],下表顯示了三種累積模式:
丟棄 | 累積 | 累積和收回 | |
---|---|---|---|
窗格1:[7] | 7 | 7 | 7 |
第2頁:[3,4] | 7 | 14 | 14,-7 |
第3頁:[8] | 8 | 22 | 22,-14 |
觀察到最後的價值 | 8 | 22 | 22 |
總和 | 22 | 51 | 22 |
**丟棄:**每一個窗格僅包含在該特定窗格期間到達的值。所以,觀察到的最終值並未徹底捕獲總和。可是,若是您要本身對全部獨立窗格求和,那麼您將獲得22的正確答案。
**累積:**每一個窗格結合了特定窗格期間到達的值,加上從先前的窗格中的全部值。所以,正確觀察到的最終值能夠捕獲22的總和。
**累積和撤回:**每一個窗格都包含新的累積模式值以及前一個窗格值的縮進。所以,觀察到的最後一個(非回縮)值以及全部物化窗格的總和(包括撤回)都爲您提供了22的正確答案。這就是撤回如此強大的緣由。
圖九 三種累積模式
隨着丟棄,累積,累積和撤回的順序,存儲和計算成本在提升,所以累積模式的選擇要在正確性,延遲和成本中作出選擇。
咱們已經解決了全部四個問題,What,Where,When,How。但咱們都是再事件時間的固定窗口。
因此咱們還要討論一下處理時間中的固定窗口和事件時間中的會話窗口。
先討論處理時間中的固定窗口,處理時間窗口很重要,緣由有兩個:
有兩種方法可用於實現處理時窗口:
**觸發器:**忽略事件時間(即,使用跨越全部事件時間的全局窗口)並使用觸發器在處理時間軸上提供該窗口的快照。
入口時間:將入口時間指定爲數據到達時的事件時間,並使用正常的事件時間窗口。這基本上就像Spark Streaming目前所作的那樣。
處理時間窗口的一個重大缺點是,當輸入的觀察順序發生變化時,窗口的內容會發生變化。爲了以更具體的方式展現,咱們將看看這三個用例:
這裏咱們將兩種事件時間相同而處理時間不一樣的狀況比較。
事件時間窗口
圖10 事件時間窗口
四個窗口最終結果依然相同。
經過觸發器處理時間窗口
使用全局事件時間窗口,在處理時間域按期觸發,使用丟棄模式進行
圖11 觸發器處理時間窗口
經過入口時間處理時間窗口
當元素到達時,它們的事件時間須要在入口時被覆蓋。返回使用標準的固定事件時間窗口。因爲入口時間提供了計算完美水印的能力,咱們可使用默認觸發器,在這種狀況下,當水印經過窗口末端時,它會隱式觸發一次。因爲每一個窗口只有一個輸出,所以累積模式可有可無。
圖12 入口時間處理時間窗口
若是您關心事件實際發生的時間,您必須使用事件時間窗口,不然您的結果將毫無心義。
動態的,數據驅動的窗口,稱爲會話。
會話是一種特殊類型的窗口,它捕獲數據中的一段活動,它們在數據分析中特別有用。
圖13 會話
咱們來構建一個會話:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
咱們獲得結果以下:
圖14 會話窗口
當遇到值爲5的第一個記錄時,它被放置在一個原始會話窗口中。
到達的第二個記錄是7,它一樣被放入它本身的原始會話窗口,由於它不與5的窗口重疊。
同時,水印已通過了第一個窗口的末尾,因此5的值在12:06以前被實現爲準時結果。此後不久,第二個窗口也被實現爲具備值7的推測結果,正如處理時間達到12:06那樣。
咱們接下來觀察一系列記錄,3,4和3,原始會話都重疊。結果,它們所有合併在一塊兒,而且在12:07觸發的早期觸發時,發出值爲10的單個窗口。
當8在此後不久到達時,它與具備值7的原始會話和具備值10的會話重疊。所以全部三個被合併在一塊兒,造成具備值25的新組合會話。
當9到達時,將值爲5的原始會話和值爲25的會話加入到值爲39的單個較大會話中。
這個很是強大的功能,Spark Streaming已經作了實現。
簡單回顧一下,咱們討論了事件時間與處理時間,窗口化,水印,觸發器,累積。探索了What,When,Where,How四個問題。而最終,咱們將平衡正確性,延遲和成本問題,獲得最適合本身的實時流式處理方案。
更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算