參考,html
http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/數據庫
對於batch分析,fault-tolerant很容易作,失敗只須要replay,就能夠完美作到容錯。apache
對於streaming分析, 數據流自己是動態,沒有所謂的開始或結束,雖然能夠replay buffer的部分數據,但fault-tolerant作起來會複雜的多編程
當前主流的一些streaming分析平臺,都有一些各自特有的fault-tolerant的機制,在此分析和總結一下,windows
無狀態流數據處理,網絡
這是種比較簡單的流式數據的場景,典型的應用是數據ETL,數據存儲,數據流過是沒有狀態的分佈式
保證at least once語義,
分鐘級別,Storm的acker機制,就能夠很好的保證, http://storm.apache.org/documentation/Guaranteeing-message-processing.html
message沒有被正確處理,收到ack時,能夠選擇重發,這樣每條message對能夠保證被處理到,但可能會被重複處理性能
小時,天級別,利用kafka的replay,通常達到天級別的cache優化
保證exactly once語義,
對於無狀態數據流,其實只要依賴最終存儲的去重性(deduplication), 就能夠達到exactly once
好比對於數據庫,經過unique key和insert ignore就能夠解決這個問題,不管你以前重複處理多少次,最終我只存儲一次。
若是最終存儲不支持去重,或者場景比較複雜不只僅是存儲,好比作疊加計數 或 update
作疊加計數,當前的機制,你沒法知道這個message是否加過
作update的時候,更新的時序性很重要,這個是ack機制沒法保證的
Storm 0.7就提供transactional topology特性,http://storm.apache.org/documentation/Transactional-topologies.html
首先給message加上transaction id,這樣有兩個好處,能夠保證時序性,在寫入存儲的時候,能夠按transaction id順序寫入
而且在能夠外部存儲上記錄當前最新的transaction id,保證相同的transaction,不會被重複寫入
這個是transactional topology的核心思路,這樣確實是能夠保證強一致性,exactly once語義
但這個方案只適用於無狀態,或是依賴外部存儲的,狀態必需要存儲在外部存儲上
至於使用batch,或將topology分爲processing和commit階段,都是對性能的優化,並不會提高一致性的保障
但因爲使用micro-batch是必須的,因此也稱這類方案是micro-batch方案,除了transactional topology,還有Apache Spark Streaming
micro-batch的壞處,
1. 改變編程模型,僞流式
2. windows based聚合的限制,只能是micro-batch的倍數,好比micro-batch是3分鐘,你想作個5分鐘聚合,無法作
2. 延遲變大,若是自己秒級別,但若是micro-batch是1分鐘,那延遲就至少1分鐘
有狀態流數據處理,
典型的場景,就是windows-based的聚合或計算,好比計算1分鐘內的計數或平均值,這樣會有部分數據須要cache在內存中
這樣當fail-over時,如何能夠恢復cache,並保證exactly once語義
最直接的想法,
局部的snapshot
每一個component對cache按期作snapshot,而後在fail-over後,各自恢復本身的cache,
這樣作的問題,
1. snapshot很難增量作,若是cache比較大,成本會比較高
2. snapshot只能按期作,會有部分丟失
3. 最關鍵的,對於分佈式系統,各個compoent獨立的進行snapshot,很難達到同一個狀態,每一個component的處理速度都是不同的,有的處理到n作了snapshot,而有的可能作到n+1才作,
缺少一個統一的參照系。
change-log
每一個 component,當接收到一個 message 的時候,產生一條 change log 記錄該 message 和更新的狀態,存入 transactional log 和數據庫
當作 fail-over 的時候,只須要每一個 component 將數據庫中的 log,拿出來 replay 便可
這種方式使用的平臺如 Google Cloud Dataflow,Apache Samza
對於 Apache Samza,會將 change log 放入kafka中,
當fail-over後,每一個task從相應的kafka topic裏面讀出change-log,完成local state的replay
這樣作的好處,是不用直接去snapshot local cache,若是cache比較大的話,這樣是比較划算的
可是若是數據流很big的話,這樣作也不合適了,由於change-log會很是大
Distributed Snapshots (Apache Flink),全局的 snapshot
針對前面提到的局部 snapshot 最關鍵的問題,提出全局 snapshot 的方法,
其實最大的問題仍然是分佈式系統的根本問題,統一參照系的問題,如何讓每一個 component 在同一的狀態下,進行 snapshot
這個原理來自 Chandy and Lamport, 1985,的paper 「Distributed Snapshots: Determining Global States of Distributed Systems」
局部的snapshot會有的問題,
狀態丟失,以下圖,但狀態中傳輸的時候,對P和Q進行snapshot,會致使隊列中的綠藍橙狀態丟失
狀態重複,brown狀態中P和Q的snapshot裏面同時出現
怎麼解這樣的問題?分佈式系統中缺少統一參照系的狀況下,只有經過通訊才能肯定偏序的問題
因此這裏使用marker來作組件間的同步,並防止丟失狀態,會同時對組件,以及隊列同時作snapshot, 以下圖
P作snapshot,而後發送marker到Q
Q收到marker的時候,知道P作了snapshot,那麼我也要作snapshot
同時還要對PQ channel作snapshot,此時channel中有個green,可是因爲green是在marker後面的,說明它在P的snapshot裏面已經作過,不須要再作,因此此時PQ的snapshot爲空
Q在作完snapshot後,還須要把marker返回給P,由於在過程當中orange從Q被髮送到P
當P收到Q返回的marker時,因爲P的snapshot已經作過,沒法改變
因此把orange放在QP channel的snapshot中
最終作出的全局的snapshot爲,
P(red, green, blue) channel PQ () Q(brown, pink) channel QP (orange)
這樣就解決了狀態丟或重複的問題
Flink’s distributed snapshotting實現基於stream barriers
可見,barrier能夠將流拆分紅一段段的數據,每一個barrier都是一個snapshot點,可是這種拆分不一樣於micro-batch,並不會影響到正常的流式處理
在DAG,即有向無環圖的case下,是不須要對channel作snapshot的,場景會比較簡單
只是每一個組件收到barrier的時候去作snapshot就好,該算法的幾個前提:
1. 網絡可靠,消息FIFO;
2. channel能夠block,unblock,支持對全部output channel進行廣播
3. 可自動識別注入的barrier
完成過程如圖,這是個有兩條入邊的case,相對複雜些
當收到一條channel的barrier時,須要先block該channel,而後等待另外一個channel中的barrier
當兩條channel的barrier都到達時,說明達到統一狀態,進行checkpoint
而後unblock以前block的channel,並對全部的output channel廣播該barrier
當DAG上的全部組件都完成snapshot時,那麼一個全局的snapshot就完成了,以barrier爲惟一標識
比較抽象,下圖以kafka爲例子解釋一下,https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
對於kafka而言,不一樣的partition須要不一樣的線程讀,
圖中,4個source thread分別從4個partition讀取數據
其中由惟一的master來發起checkpoint流程,
過程是,
1. Master給全部的source thread發checkpoint請求
2. source thread接收到cp請求後,會記錄當前的offset,好比5791,並作該offset的message前發出streaming barrier
並將offset返回給master
3. 這樣master收到全部source的ack offset,就至關於對source作了snapshot,恢復時只須要將相應的source置到該offset便可
4. 中間每一個組件,當收到全部input channel的barrier時,將cp存入數據庫,並通知Master
5. 層層下去,直到全部Sink節點,最終節點,完成snapshot
6. master接收到全部節點的作完cp的ack,知道此次checkpoint所有完成
這個方案的最大的問題是,當多個input channel時,須要等全部的barrier到齊,這個明顯會增長latency
Flink的優化是,不等,看到barrier就打snapshot,這樣的問題就是沒法保證exactly once,會重複,
由於後來的barrier打checkpoint時會覆蓋先前的cp,
此時barrier先到的channel已經處理了一些barrier以後的數據,這部分結果也會存在cp中
但當fail-over的時候,由於replay是根據你發送barrier的offset來重發的,因此這部分會重複