經典分佈式論文閱讀:Naiad

本文是Naiad論文的閱讀筆記,Naiad是一個執行循環並行數據流程序的分佈式系統,提供了高吞吐批量處理、低延遲流式處理、迭代計算和增量計算等特性。算法

總覽

不少數據處理任務要求對結果進行低延遲可交互訪問、迭代子計算、一致的中間輸出。下圖展現了這些需求:這個應用在實時數據流上進行迭代計算,在結果上交互式查詢最新並一致的結果。編程

爲了知足以上要求,做者設計了時序數據流計算模型:安全

  • 結構化的循環容許數據流中存在反饋
  • 帶狀態的節點在不須要全局協調之下產生和消費記錄
  • 在收到指定輪次的輸入或者循環迭代的記錄時候通知節點

時序數據流

在時序數據流是一個有向無環圖,每一個節點是有狀態的,從有向的邊上發送和接收帶有時間戳的消息。bash

圖結構

時序數據流圖包含輸入節點輸出節點,用來接收輸入和給出輸出。外部的消息產生者會給每一個消息標上epoch,而且在數據流結束後發送「關閉」消息。網絡

時序數據流支持循環上下文,一個循環上下文包含進入節點(I)退出節點(E)反饋節點(F)。爲了支持循環上下文,消息的時間戳設計以下:分佈式

\text{Timestamp:}(e\in \mathbb N,(c_1,\dots,c_k)\in \mathbb{N}^k)

其中e爲epoch,c_k爲第k層循環對應的計數器,一次循環中的時間戳變化以下函數

節點 輸入時間戳 輸出時間戳
進入節點 (e,<c_1,\dots,c_k>) (e,<c_1,\dots,c_k,0>)
退出節點 (e,<c_1,\dots,c_k,c_{k+1}>) (e,<c_1,\dots,c_k>)
反饋節點 (e,<c_1,\dots,c_k>) (e,<c_1,\dots,c_k+1>)

時間戳的順序定義爲,對於t_1=(x_1,\overrightarrow{c}_1)t_2=(x_2,\overrightarrow{c}_2)t_1\le t_2當且僅當x_1 \le x_2而且\overrightarrow{c}_1 \le \overrightarrow{c}_2優化

節點計算

每一個節點須要實現兩個回調函數:this

v.OnRecv(e: Edge, m: Message, t: Timestamp)
v.OnNotify(t: Timestamp).
複製代碼

在回調函數中能夠調用兩個函數:spa

this.SendBy(e: Edge, m: Message, t: Timestamp)
this.NotifyAt(t: Timestamp)
複製代碼

全部的調用都會排隊執行,v.OnNotify(t)會在全部t'\le tv.OnRecv(e,m,t')執行完以後才執行。另外,調用v.OnNotify(t')v.OnRecv(e,m,t')的參數須要t'\ge t

下面是一個示例程序,獲取輸入後將惟一的元素輸出到output1,將計數輸出到output2

class DistinctCount<S,T> : Vertex<T>
{
    Dictionary<T, Dictionary<S,int>> counts;
    
    void OnRecv(Edge e, S msg, T time) {
        if (!counts.ContainsKey(time)) {
            counts[time] = new Dictionary<S,int>();
            this.NotifyAt(time);
        }
        if (!counts[time].ContainsKey(msg)) {
            counts[time][msg] = 0;
            this.SendBy(output1, msg, time);
        }
        counts[time][msg]++;
    }
    
    void OnNotify(T time) {
        foreach (var pair in counts[time])
            this.SendBy(output2, pair, time);
        counts.Remove(time);
    }
}
複製代碼

實現時序數據流

發送通知須要判斷將來再也不會出現帶有給定時間戳的消息。將來消息會綁定的時間戳和未處理的事件(消息和通知)以及圖結構決定,根據消息不能逆時間傳遞的特性,能夠計算出每一個消息時間的下界。

每一個事件都對應着一個時間戳和一個位置,能夠將其組成點戳

\text{Pointstamp:}(t\in\text{Timestamp},l\in\text{Edge}\cup\text{Vertex})

咱們說(t_1,l_1)會致使(t_2,l_2)當且僅當存在一條路徑\Psi=<l_1,\dots,l_2>,最終獲得的點戳\Psi(t_1)知足\Psi(t_1)\le t_2。Naiad會找出l_1l_2最短的路徑,檢查是否知足\Psi(t_1)\le t_2來得知(t_1,l_1)是否會致使(t_2,l_2)

調度會維護一個活躍點戳集合,每一個對應着至少一個未完成的事件。每一個點戳包含一個出現計數(包含這個點戳的時間數目)和先導計數(致使這個點戳的時間數目)。當節點產生和消耗事件時,點戳更新方式以下:

操做 更新規則
v.SendBy(e,m,t) OC[(t, e)] ← OC[(t, e)] + 1
v.OnRecv(e,m,t) OC[(t, e)] ← OC[(t, e)] − 1
v.NotifyAt(t) OC[(t, v)] ← OC[(t, v)] + 1
v.OnNotify(t) OC[(t, v)] ← OC[(t, v)] − 1

當點戳的出現計數變爲零以後,便可減少能夠致使的後續點戳的先導計數,而當點戳的先導計數爲零時,在這個點戳以前的通知均可以安全發送。

分佈式實現

數據流圖會分佈到不一樣的工做節點上,邊可使用分區函數將消息傳送到不一樣節點上,若是沒有分區函數那麼消息會傳遞給本機上的下一個節點。

其中的工做節點負責本身部分的消息的通知的接受和發送。 爲了在分佈式環境下正確觸發通知,經過廣播點戳的出現計數維護全局的出現計數。

Naiad使用了一個簡單的容錯方法:每一個節點實現CHECKPOINTRESTORE接口,系統調用它們保存全局一致的檢查點用於故障恢復。

由於丟包、垃圾回收等緣由會致使一些工做變慢,做者使用了多種方法儘可能避免這種狀況的發生:

  • 網絡:Naiad工做節點之間的通訊數據量會有短期的爆發,做者對於TCP協議棧進行了一些優化:例如禁用Nagle算法、下降確認超時時間、下降重傳時間,另外能夠考慮使用RDMA來加速通訊。
  • 數據競爭:Naiad減少檢測到競爭後的等待時間粒度來下降延遲。
  • 垃圾回收:Naiad使用.Net實現,使用了多種方法儘可能避免垃圾回收,例如使用緩衝池來複用內存。

使用Naiad編寫程序

全部的Naiad程序有如下模式:首先,定義一個數據流圖,包含輸入階段、計算階段和輸出階段;而後,將數據送給輸入階段。例如,一個典型的MapReduce程序以下:

// 1a. Define input stages for the dataflow.
var input = controller.NewInput<string>();

// 1b. Define the timely dataflow graph.
// Here, we use LINQ to implement MapReduce.
var result = input.SelectMany(y => map(y))
                  .GroupBy(y => key(y), 
                        (k, vs) => reduce(k, vs));

// 1c. Define output callbacks for each epoch
result.Subscribe(result => { ... });

// 2. Supply input data to the query.
input.OnNext(/* 1st epoch data */);
input.OnNext(/* 2nd epoch data */);
input.OnNext(/* 3rd epoch data */);
input.OnCompleted();
複製代碼

做者將一些高層次的編程模型打包成了庫供開發人員使用,但願大部分的應用場景可使用庫來實現,這些庫是基於Naiad提供的圖構建接口。

參考文獻

  1. Murray, Derek G., et al. "Naiad: a timely dataflow system." Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013.
相關文章
相關標籤/搜索