本文是Naiad論文的閱讀筆記,Naiad是一個執行循環並行數據流程序的分佈式系統,提供了高吞吐批量處理、低延遲流式處理、迭代計算和增量計算等特性。算法
不少數據處理任務要求對結果進行低延遲可交互訪問、迭代子計算、一致的中間輸出。下圖展現了這些需求:這個應用在實時數據流上進行迭代計算,在結果上交互式查詢最新並一致的結果。編程
爲了知足以上要求,做者設計了時序數據流計算模型:安全
在時序數據流是一個有向無環圖,每一個節點是有狀態的,從有向的邊上發送和接收帶有時間戳的消息。bash
時序數據流圖包含輸入節點和輸出節點,用來接收輸入和給出輸出。外部的消息產生者會給每一個消息標上epoch,而且在數據流結束後發送「關閉」消息。網絡
時序數據流支持循環上下文,一個循環上下文包含進入節點(I)、退出節點(E)和反饋節點(F)。爲了支持循環上下文,消息的時間戳設計以下:分佈式
其中e爲epoch,爲第k層循環對應的計數器,一次循環中的時間戳變化以下函數
節點 | 輸入時間戳 | 輸出時間戳 |
---|---|---|
進入節點 | ||
退出節點 | ||
反饋節點 |
時間戳的順序定義爲,對於和,當且僅當而且。優化
每一個節點須要實現兩個回調函數:this
v.OnRecv(e: Edge, m: Message, t: Timestamp)
v.OnNotify(t: Timestamp).
複製代碼
在回調函數中能夠調用兩個函數:spa
this.SendBy(e: Edge, m: Message, t: Timestamp)
this.NotifyAt(t: Timestamp)
複製代碼
全部的調用都會排隊執行,v.OnNotify(t)
會在全部的v.OnRecv(e,m,t')
執行完以後才執行。另外,調用v.OnNotify(t')
和v.OnRecv(e,m,t')
的參數須要。
下面是一個示例程序,獲取輸入後將惟一的元素輸出到output1
,將計數輸出到output2
。
class DistinctCount<S,T> : Vertex<T>
{
Dictionary<T, Dictionary<S,int>> counts;
void OnRecv(Edge e, S msg, T time) {
if (!counts.ContainsKey(time)) {
counts[time] = new Dictionary<S,int>();
this.NotifyAt(time);
}
if (!counts[time].ContainsKey(msg)) {
counts[time][msg] = 0;
this.SendBy(output1, msg, time);
}
counts[time][msg]++;
}
void OnNotify(T time) {
foreach (var pair in counts[time])
this.SendBy(output2, pair, time);
counts.Remove(time);
}
}
複製代碼
發送通知須要判斷將來再也不會出現帶有給定時間戳的消息。將來消息會綁定的時間戳和未處理的事件(消息和通知)以及圖結構決定,根據消息不能逆時間傳遞的特性,能夠計算出每一個消息時間的下界。
每一個事件都對應着一個時間戳和一個位置,能夠將其組成點戳
咱們說會致使當且僅當存在一條路徑,最終獲得的點戳知足。Naiad會找出到最短的路徑,檢查是否知足來得知是否會致使。
調度會維護一個活躍點戳集合,每一個對應着至少一個未完成的事件。每一個點戳包含一個出現計數(包含這個點戳的時間數目)和先導計數(致使這個點戳的時間數目)。當節點產生和消耗事件時,點戳更新方式以下:
操做 | 更新規則 |
---|---|
v.SendBy(e,m,t) | OC[(t, e)] ← OC[(t, e)] + 1 |
v.OnRecv(e,m,t) | OC[(t, e)] ← OC[(t, e)] − 1 |
v.NotifyAt(t) | OC[(t, v)] ← OC[(t, v)] + 1 |
v.OnNotify(t) | OC[(t, v)] ← OC[(t, v)] − 1 |
當點戳的出現計數變爲零以後,便可減少能夠致使的後續點戳的先導計數,而當點戳的先導計數爲零時,在這個點戳以前的通知均可以安全發送。
數據流圖會分佈到不一樣的工做節點上,邊可使用分區函數將消息傳送到不一樣節點上,若是沒有分區函數那麼消息會傳遞給本機上的下一個節點。
其中的工做節點負責本身部分的消息的通知的接受和發送。 爲了在分佈式環境下正確觸發通知,經過廣播點戳的出現計數維護全局的出現計數。
Naiad使用了一個簡單的容錯方法:每一個節點實現CHECKPOINT
和RESTORE
接口,系統調用它們保存全局一致的檢查點用於故障恢復。
由於丟包、垃圾回收等緣由會致使一些工做變慢,做者使用了多種方法儘可能避免這種狀況的發生:
全部的Naiad程序有如下模式:首先,定義一個數據流圖,包含輸入階段、計算階段和輸出階段;而後,將數據送給輸入階段。例如,一個典型的MapReduce程序以下:
// 1a. Define input stages for the dataflow.
var input = controller.NewInput<string>();
// 1b. Define the timely dataflow graph.
// Here, we use LINQ to implement MapReduce.
var result = input.SelectMany(y => map(y))
.GroupBy(y => key(y),
(k, vs) => reduce(k, vs));
// 1c. Define output callbacks for each epoch
result.Subscribe(result => { ... });
// 2. Supply input data to the query.
input.OnNext(/* 1st epoch data */);
input.OnNext(/* 2nd epoch data */);
input.OnNext(/* 3rd epoch data */);
input.OnCompleted();
複製代碼
做者將一些高層次的編程模型打包成了庫供開發人員使用,但願大部分的應用場景可使用庫來實現,這些庫是基於Naiad提供的圖構建接口。