當你第一次看斷背山的時候,印象最深入的是什麼? 有沒有多是他們數羊的片斷?? 哈哈,雖然確定不是,可是今天的問題和數羊有關係. 若是一羣羊都在跑, 你怎麼數的清? (前提是沒有標記)git
我先表揚本身一下, 由於在我看來,數羊是對於下面這個問題的一個絕妙的比喻.
可能你已經想到一個方法: 那就是給整個羊羣照一張照片,而後本身拿着照片回家慢慢數. 這個方法用術語來表示就是你獲得了一個全局快照(global snapshot).經過這個快照,你保留了一個能夠恢復能夠處理的穩定狀態.
然而問題又來了,羊太多,一張照片照不全該怎麼辦?你怎麼保證同時照幾張照片呢?github
若是你說把羊所有打暈再數, 那我如今就打暈你.算法
好久不更新了,最近在公司過了一段讀論文的日子,爲了下一version項目的實時計算部分閱讀了不少流計算資料. 包括Apache著名的四大流計算框架. Apache Storm, Spark streaming,Apache Flink,Samza. 以及google剛剛聯合apache基金會成功孵化出的大數據處理統一編程模型 Apache Beam( 原型爲Google的Dataflow.)apache
如下挑出一個有意思的點來分享. 這個是Apache Flink 消息傳遞機制保證和容錯機制中確保每一個消息被處理且僅被處理一次(因此不會出現消息丟失,同事又不會由於容錯中消息重發而致使消息重複)的依賴原理.編程
在計算機世界中,各個機器很難有一個統一的時鐘來有統一的時間表示. 可能你會想咱們人類世界中就能夠保證你們對時間的認識一致.好比你和小明約定」咱們明早8點的時候同時開始看某個電影」,你說的8點和小明說的8點咱們都認爲是一個時刻,即大家是能夠實現」同時」的.框架
這個實際上是由於咱們統一按照格林尼治的時間來作了一個統一. 可是讓分佈式系統中的每一個機器都作到這樣是很難的,或者說有一個總的master強制統一這個」同時」的開銷是很大的.異步
若是沒有這個」同時」,咱們就沒法得到一個全局的狀態(global state).async
而分佈式系統中的不少問題均可以化爲想要獲得全局狀態的問題.
好比,你想知道系統是否達到了穩定特徵(stable property),譬如死鎖問題,你須要這種穩定的狀態來探測到系統已經發生死鎖.分佈式
穩定性的定義是這種狀態會永久的持續下去,你能夠想象在無阻力環境中的勻速運動或者靜止狀態.ide
穩定性感應能夠用來作全局快照. 當穩定以後,好比羊羣都中止移動或者同速同向平移,你能夠在不一樣時刻照不少照片來合成一張」同時」的全局大照片.
你想要在同一時刻保存系統狀態,這個便是同步快照.
分佈式異步快照的主要思想是你經過協調,使得他們並非在同一時刻保存快照,可是快照可以反應出系統數據的處理狀態. 即一種」邏輯同步」, 這個是基於Chandy-Lamport算法,嚴謹地被證實在論文中.(reference中可查)
消息傳輸保障機制分爲三種:
1.at most once
一條消息通過系統,由這條消息產生的後續tuple在各個處理節點最多會被處理一次,含義就是,出現故障時,不保證這條消息本來應該涉及的全部處理節點計算都順利完成。
2.at least once
一條消息通過系統,由這條消息產生的後續tuple在各個處理節點至少會被處理一次,含義就是,出現故障時,系統可以識別並進行tuple重發,可是沒辦法判斷是否以前該元組被成功處理完成了,所以可能會有重複處理的狀況,對於某些改變外部狀態的場景,會形成髒數據。
3.exactly once
一條消息通過系統,不論是否發生故障,由其產生的後續tuple,在全部處理節點上有且僅會被處理一次,這是最理想的狀況,即便出現故障,也能符合正確的業務預期,但通常會帶來比較大的性能開銷。
由於作到exactly-once 有相對較大的性能開銷,而且不是冪等的計算所必須,因此並不是全部的流計算框架作到了這一點.
相對於micro-batch底層實現的spark streaming,Apache Flink 便使用了分佈式快照和檢查點(checkpointing)機制來實現了exactly-once 的容錯級別.
Flink 進行週期性的全局快照(periodic global state)保存,從而在出現系統failure的時候,只要從上一次保存成功的全局快照中恢復每一個節點的恢復狀態,而後再使源數據節點從相應快照標記源數據節點從新開始處理便可恢復無措運行狀態(Kafka能夠作到這一點).
同步快照有如下兩種潛在的問題:
輕量級的異步柵欄快照能夠用來爲數據流引擎提供容錯機制,同時減少的存儲空間需求.
由於ABS只須要保存一個無環拓撲中每一個操做節點的處理狀態(operator states).
Apache Flink 就使用了ABS的機制.
當運行能夠分爲幾個階段的時候,快照是能夠不包含每一個task節點間的通道狀態的.
Stages divide the injected data streams and all associated into a series of possible executions where all prior inputs and generated outputs have been fully processed.
The set of operator states at the end of a stage reflects the whole execution history, therefore, it can be solely used for a snapshot. The core idea behind our
algorithm is to create identical snapshots with staged
snapshotting while keeping a continuous data ingestion.
這段話太經典.
能夠經過切分源數據來劃分階段,每一個集合的源數據也表明了其所須要的計算. 當上一個集合的輸入數據以及輸出都被徹底處理,就表明上一個階段結束了.
因此當一個階段結束時,操做節點的狀態能夠表明這整個個運行歷史,從而快照能夠僅僅依賴於operator states.
這些階段能夠經過週期性的在源頭出插入一些柵欄(barrier)來劃分. 這些柵欄起到了階段的標記做用,而後跟着數據流經過整個數據處理pipeline,知道系統的sinks.
全局狀態在這個過程當中,被增量地構建, 即當每一個處理tast接收到對應id的柵欄的時候對本身的狀態進行快照,而每一個節點異步的快照共同組成了一個階段(stage)
詳細的過程能夠見下圖, Source1,2,3,4 在接到Master的checkpointing消息時,
保存本身的消息消費狀態,而後釋放一個barrier(包含一個id標記).
當以後的task節點在接收到barrier時,中止處理下一條數據,立刻對本身的狀態進行快照而且持久化存儲, 而且記錄此次狀態的id, 快照保存以後繼續輸出barrier並恢復數據梳理流程.
當數據sinks收到全部barriers而且進行自身狀態保存以後,也進行ack的checkpointing.
看到這裏,你會數羊了嗎?