在分佈式系統中,組成系統的各個計算機是獨立的。這些計算機有可能fail。node
一個sender發送一條message到receiver。根據receiver出現fail時sender如何處理fail,能夠將message delivery分爲三種語義:算法
At Most once: 對於一條message,receiver最多收到一次(0次或1次).異步
能夠達成At Most Once的策略:分佈式
sender把message發送給receiver.不管receiver是否收到message,sender都再也不重發message.3d
At Least once: 對於一條message,receiver最少收到一次(1次及以上).blog
能夠達成At Least Once的策略:內存
sender把message發送給receiver.當receiver在規定時間內沒有回覆ACK或回覆了error信息,那麼sender重發這條message給receiver,直到sender收到receiver的ACK.input
Exactly once: 對於一條message,receiver確保只收到一次it
Flink實現Exactly once的策略: Flink會持續地對整個系統作snapshot,而後把global state(根據config文件設定)儲存到master node或HDFS.當系統出現failure,Flink會中止數據處理,而後把系統恢復到最近的一次checkpoint.io
分佈式系統由空間上分立的process和鏈接這些process的channel組成.
空間上分立的含義是,這些process不共享memory,而是經過在communication channel上進行的message pass來異步交流.
分佈式系統的global state就是全部process,channel的local state的集合.
process的local state取決於the state of local memory and the history of its activity.
channel的local state是上游process發送進channel的message集減去下游process從channel接收的message的差集.
假設有兩個銀行帳戶A,B.A中初始有600美圓,B中初始有200美圓. SA, SB, CAB, CBA由A和B分別記錄,組成了global state.
在t0時刻,A向B轉帳50美圓;在t1時刻,B向A轉帳80美圓.
若是SA, SB記錄於(t0, t1), CAB, CBA記錄於(t1, t2),那麼global state = 550+200+50+80 = 880,比真實值多了$80. 這就是不一致性global state.
若是 SA, SB, CAB, CBA同屬於一個時間區間,那麼獲得的global state就是一致性的.
分佈式系統沒有共享內存(globally shared memory)和全局時鐘(global clock).
若是分佈式系統有共享內存,那麼能夠從共享內存中直接獲取整個分佈式系統的snapshot,無需分別得到各個process,channel的local state再組合成global state.
若是分佈式系統有global clock,那麼全部的process能在同一時刻各自記錄local state,這樣就保證了state的一致性.
精髓:該算法在普通message中插入了control message – marker
前提:
1) message的傳輸可能有delay,但必定會到達
2) 每兩個process之間都有一條communication path(可能由多條channel組成)
3) Channel是單向的FIFO
描述:
Marker sending rule for process Pi
(1) Process Pi 記錄自身state
(2) Pi在記錄自身state後,發送下一條message前,Pi向本身全部的outgoing channel發送marker
Marker receiving rule for process Pj on receiving a marker along channel C
若是Pj第一次接收到marker,那麼
把channel C的state記爲空集
執行marker sending rule
不然(並不是第一次接收到marker)
把記錄自身state(或最近一次記錄另外一個channel的state)後,收到這個marker前的message集記爲C的state
每一個process會記錄自身的state和它的incoming channel的state
圖解:
A,B,C,D表明4個process.有向線段表明FIFO的channel.綠色圓形表明普通message,橙色矩形表明marker.藍色的節點和線段表明已經記錄state的process和channel
Process A啓動snapshot算法,A執行marker sending rule(記錄自身state,而後發送marker):
Process B接收到marker,執行marker receiving rule:將channel AB的state記爲空集,而後記錄自身state並向下發送marker:
Process C接收到marker, 執行marker receiving rule:將channel AC的state記爲空集,而後記錄自身state並向下發送marker:
Process D接收到來自於process B的marker, 執行marker receiving rule:將channel BD的state記爲空集,而後記錄自身state並向下發送marker:
Process D接收到來自於process C的marker, 執行marker receiving rule:這是process D第二次接收到marker,將channel CD的state記爲{5},不會向下發送marker:
自此process A,B,C,D的local state和全部Channel的state都記錄完畢. 將這些local state組合,所獲得的就是global state
爲了消去記錄channel state這一步驟,process在接收到第一個barrier後不會立刻作snapshot,
而是等待接受其餘上游channel的barrier.
在等待期間,process會把barrier已到的channel的record放入input buffer.
當全部上游channel的barrier到齊後,process才記錄自身state,以後向全部下游channel發送barrier.
由於先到的barrier會等待後到的barrier,全部全部barrier至關於同時到達process,
所以,該process的上游channel的state都是空集.這就避免了去記錄channel的state
圖解:
A是JobManager, B C是source,D是普通task.
JobManager發起一次snapshot:向全部source發送barrier.
每一個Barrier前後到達各自的source.Source在收到barrier後記錄自身state,而後向下遊節點發送barrier
Barrier (from)B 到達process D,但不會進行snapshot
Barrier (from)B已經到達process D,
因此當來自於channel BD的record 6 7到達後,process D不會處理它們,而是將它們放入input buffer.
而Barrier (from)C還沒有到達process D,因此當來自於channel CD的record 4到達後,process D會處理它.
Barrier C也到達process D.
這樣,process D已經接收到了全部上游process的barrier.process D記錄自身state,而後向下遊節點發送barrier
當process接收到barrier後,會馬上作snapshot. Process會繼續處理全部channel的record.後來的snapshot會覆蓋以前的snapshot.
Record 6本不屬於此次checkpoint,卻包含在process D的local state中.
在recovery時,source認爲record 6尚未被處理過,因此重發record 6. 這就致使stream中出現了兩個record 6,形成了at least once.
這裏的問題在於,當第二個barrier到達時,節點D再次對自身作了snapshot.
而在Chandy-Lamport的算法中,第二個barrier到達時,節點D應該對barrier來源的channel作snapshot.
對單一input channel的算子來講,沒有Alignment這個概念.這些算子在at least once模式下也是呈現exactly once的行爲.