RDD:基於內存的集羣計算容錯抽象

轉載自:http://shiyanjun.cn/archives/744.htmlhtml


摘要

本文提出了分佈式內存抽象的概念——彈性分佈式數據集(RDD,Resilient Distributed Datasets),它具有像MapReduce等數據流模型的容錯特性,而且容許開發人員在大型集羣上執行基於內存的計算。現有的數據流系統對兩種應用的處理並不高效:一是迭代式算法,這在圖應用和機器學習領域很常見;二是交互式數據挖掘工具。這兩種狀況下,將數據保存在內存中可以極大地提升性能。爲了有效地實現容錯,RDD提供了一種高度受限的共享內存,即RDD是隻讀的,而且只能經過其餘RDD上的批量操做來建立。儘管如此,RDD仍然足以表示不少類型的計算,包括MapReduce和專用的迭代編程模型(如Pregel)等。咱們實現的RDD在迭代計算方面比Hadoop快20多倍,同時還能夠在5-7秒內交互式地查詢1TB數據集。程序員

1.引言

不管是工業界仍是學術界,都已經普遍使用高級集羣編程模型來處理日益增加的數據,如MapReduce和Dryad。這些系統將分佈式編程簡化爲自動提供位置感知性調度、容錯以及負載均衡,使得大量用戶可以在商用集羣上分析超大數據集。web

大多數現有的集羣計算系統都是基於非循環的數據流模型。從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的DAG,而後寫回穩定存儲。DAG數據流圖可以在運行時自動實現任務調度和故障恢復。算法

儘管非循環數據流是一種很強大的抽象方法,但仍然有些應用沒法使用這種方式描述。咱們就是針對這些不太適合非循環模型的應用,它們的特色是在多個並行操做之間重用工做數據集。這類應用包括:(1)機器學習和圖應用中經常使用的迭代算法(每一步對數據執行類似的函數);(2)交互式數據挖掘工具(用戶反覆查詢一個數據子集)。基於數據流的框架並不明確支持工做集,因此須要將數據輸出到磁盤,而後在每次查詢時從新加載,這帶來較大的開銷。shell

咱們提出了一種分佈式的內存抽象,稱爲彈性分佈式數據集(RDD,Resilient Distributed Datasets)。它支持基於工做集的應用,同時具備數據流模型的特色:自動容錯、位置感知調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。數據庫

RDD提供了一種高度受限的共享內存模型,即RDD是隻讀的記錄分區的集合,只能經過在其餘RDD執行肯定的轉換操做(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。與分佈式共享內存系統須要付出高昂代價的檢查點和回滾機制不一樣,RDD經過Lineage來重建丟失的分區:一個RDD中包含了如何從其餘RDD衍生所必需的相關信息,從而不須要檢查點操做就能夠重構丟失的數據分區。儘管RDD不是一個通用的共享內存抽象,但卻具有了良好的描述能力、可伸縮性和可靠性,但卻可以普遍適用於數據並行類應用。編程

第一個指出非循環數據流存在不足的並不是是咱們,例如,Google的Pregel[21],是一種專門用於迭代式圖算法的編程模型;Twister[13]和HaLoop[8],是兩種典型的迭代式MapReduce模型。可是,對於一些特定類型的應用,這些系統提供了一個受限的通訊模型。相比之下,RDD則爲基於工做集的應用提供了更爲通用的抽象,用戶能夠對中間結果進行顯式的命名和物化,控制其分區,還能執行用戶選擇的特定操做(而不是在運行時去循環執行一系列MapReduce步驟)。RDD能夠用來描述Pregel、迭代式MapReduce,以及這兩種模型沒法描述的其餘應用,如交互式數據挖掘工具(用戶將數據集裝入內存,而後執行ad-hoc查詢)。數組

Spark是咱們實現的RDD系統,在咱們內部可以被用於開發多種並行應用。Spark採用Scala語言[5]實現,提供相似於DryadLINQ的集成語言編程接口[34],使用戶能夠很是容易地編寫並行任務。此外,隨着Scala新版本解釋器的完善,Spark還可以用於交互式查詢大數據集。咱們相信Spark會是第一個可以使用有效、通用編程語言,並在集羣上對大數據集進行交互式分析的系統。緩存

咱們經過微基準和用戶應用程序來評估RDD。實驗代表,在處理迭代式應用上Spark比Hadoop快高達20多倍,計算數據分析類報表的性能提升了40多倍,同時可以在5-7秒的延時內交互式掃描1TB數據集。此外,咱們還在Spark之上實現了Pregel和HaLoop編程模型(包括其位置優化策略),以庫的形式實現(分別使用了100和200行Scala代碼)。最後,利用RDD內在的肯定性特性,咱們還建立了一種Spark調試工具rddbg,容許用戶在任務期間利用Lineage重建RDD,而後像傳統調試器那樣從新執行任務。網絡

本文首先在第2部分介紹了RDD的概念,而後第3部分描述Spark API,第4部分解釋如何使用RDD表示幾種並行應用(包括Pregel和HaLoop),第5部分討論Spark中RDD的表示方法以及任務調度器,第6部分描述具體實現和rddbg,第7部分對RDD進行評估,第8部分給出了相關研究工做,最後第9部分總結。

2.彈性分佈式數據集(RDD)

本部分描述RDD和編程模型。首先討論設計目標(2.1),而後定義RDD(2.2),討論Spark的編程模型(2.3),並給出一個示例(2.4),最後對比RDD與分佈式共享內存(2.5)。

2.1 目標和概述

咱們的目標是爲基於工做集的應用(即多個並行操做重用中間結果的這類應用)提供抽象,同時保持MapReduce及其相關模型的優點特性:即自動容錯、位置感知性調度和可伸縮性。RDD比數據流模型更易於編程,同時基於工做集的計算也具備良好的描述能力。

在這些特性中,最難實現的是容錯性。通常來講,分佈式數據集的容錯性有兩種方式:即數據檢查點和記錄數據的更新。咱們面向的是大規模數據分析,數據檢查點操做成本很高:須要經過數據中心的網絡鏈接在機器之間複製龐大的數據集,而網絡帶寬每每比內存帶寬低得多,同時還須要消耗更多的存儲資源(在內存中複製數據能夠減小須要緩存的數據量,而存儲到磁盤則會拖慢應用程序)。因此,咱們選擇記錄更新的方式。可是,若是更新太多,那麼記錄更新成本也不低。所以,RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列轉換記錄下來(即Lineage),以便恢復丟失的分區。

雖然只支持粗粒度轉換限制了編程模型,但咱們發現RDD仍然能夠很好地適用於不少應用,特別是支持數據並行的批量分析應用,包括數據挖掘、機器學習、圖算法等,由於這些程序一般都會在不少記錄上執行相同的操做。RDD不太適合那些異步更新共享狀態的應用,例如並行web爬行器。所以,咱們的目標是爲大多數分析型應用提供有效的編程模型,而其餘類型的應用交給專門的系統。

2.2 RDD抽象

RDD是隻讀的、分區記錄的集合。RDD只能基於在穩定物理存儲中的數據集和其餘已有的RDD上執行肯定性操做來建立。這些肯定性操做稱之爲轉換,如map、filter、groupBy、join(轉換不是程開發人員在RDD上執行的操做)。

RDD不須要物化。RDD含有如何從其餘RDD衍生(即計算)出本RDD的相關信息(即Lineage),據此能夠從物理存儲的數據計算出相應的RDD分區。

2.3 編程模型

在Spark中,RDD被表示爲對象,經過這些對象上的方法(或函數)調用轉換。

定義RDD以後,程序員就能夠在動做中使用RDD了。動做是嚮應用程序返回值,或向存儲系統導出數據的那些操做,例如,count(返回RDD中的元素個數),collect(返回元素自己),save(將RDD輸出到存儲系統)。在Spark中,只有在動做第一次使用RDD時,纔會計算RDD(即延遲計算)。這樣在構建RDD的時候,運行時經過管道的方式傳輸多個轉換。

程序員還能夠從兩個方面控制RDD,即緩存和分區。用戶能夠請求將RDD緩存,這樣運行時將已經計算好的RDD分區存儲起來,以加速後期的重用。緩存的RDD通常存儲在內存中,但若是內存不夠,能夠寫到磁盤上。

另外一方面,RDD還容許用戶根據關鍵字(key)指定分區順序,這是一個可選的功能。目前支持哈希分區和範圍分區。例如,應用程序請求將兩個RDD按照一樣的哈希分區方式進行分區(將同一機器上具備相同關鍵字的記錄放在一個分區),以加速它們之間的join操做。在Pregel和HaLoop中,屢次迭代之間採用一致性的分區置換策略進行優化,咱們一樣也容許用戶指定這種優化。

2.4 示例:控制檯日誌挖掘

本部分咱們經過一個具體示例來闡述RDD。假定有一個大型網站出錯,操做員想要檢查Hadoop文件系統(HDFS)中的日誌文件(TB級大小)來找出緣由。經過使用Spark,操做員只需將日誌中的錯誤信息裝載到一組節點的內存中,而後執行交互式查詢。首先,須要在Spark解釋器中輸入以下Scala命令:

1 lines = spark.textFile("hdfs://...")
2 errors = lines.filter(_.startsWith("ERROR"))
3 errors.cache()

第1行從HDFS文件定義了一個RDD(即一個文本行集合),第2行得到一個過濾後的RDD,第3行請求將errors緩存起來。注意在Scala語法中filter的參數是一個閉包。

這時集羣尚未開始執行任何任務。可是,用戶已經能夠在這個RDD上執行對應的動做,例如統計錯誤消息的數目:

1 errors.count()

用戶還能夠在RDD上執行更多的轉換操做,並使用轉換結果,如:

1 // Count errors mentioning MySQL:
2 errors.filter(_.contains("MySQL")).count()
3 // Return the time fields of errors mentioning
4 // HDFS as an array (assuming time is field
5 // number 3 in a tab-separated format):
6 errors.filter(_.contains("HDFS"))
7     .map(_.split('\t')(3))
8     .collect()

使用errors的第一個action運行之後,Spark會把errors的分區緩存在內存中,極大地加快了後續計算速度。注意,最初的RDD lines不會被緩存。由於錯誤信息可能只佔原數據集的很小一部分(小到足以放入內存)。
最後,爲了說明模型的容錯性,圖1給出了第3個查詢的Lineage圖。在lines RDD上執行filter操做,獲得errors,而後再filter、map後獲得新的RDD,在這個RDD上執行collect操做。Spark調度器以流水線的方式執行後兩個轉換,向擁有errors分區緩存的節點發送一組任務。此外,若是某個errors分區丟失,Spark只在相應的lines分區上執行filter操做來重建該errors分區。
f1-lineage
圖1 示例中第三個查詢的Lineage圖。(方框表示RDD,箭頭表示轉換)

2.5 RDD與分佈式共享內存

爲了進一步理解RDD是一種分佈式的內存抽象,表1列出了RDD與分佈式共享內存(DSM,Distributed Shared Memory)[24]的對比。在DSM系統中,應用能夠向全局地址空間的任意位置進行讀寫操做。(注意這裏的DSM,不只指傳統的共享內存系統,還包括那些經過分佈式哈希表或分佈式文件系統進行數據共享的系統,好比Piccolo[28])DSM是一種通用的抽象,但這種通用性同時也使得在商用集羣上實現有效的容錯性更加困難。

RDD與DSM主要區別在於,不只能夠經過批量轉換建立(即「寫」)RDD,還能夠對任意內存位置讀寫。也就是說,RDD限制應用執行批量寫操做,這樣有利於實現有效的容錯。特別地,RDD沒有檢查點開銷,由於可使用Lineage來恢復RDD。並且,失效時只須要從新計算丟失的那些RDD分區,能夠在不一樣節點上並行執行,而不須要回滾整個程序。

表1 RDD與DSM對比
對比項目 RDD 分佈式共享內存(DSM)
批量或細粒度操做 細粒度操做
批量轉換操做 細粒度操做
一致性 不重要(RDD是不可更改的) 取決於應用程序或運行時
容錯性 細粒度,低開銷(使用Lineage) 須要檢查點操做和程序回滾
落後任務的處理 任務備份 很難處理
任務安排 基於數據存放的位置自動實現 取決於應用程序(經過運行時實現透明性)
若是內存不夠 與已有的數據流系統相似 性能較差(交換?)

注意,經過備份任務的拷貝,RDD還能夠處理落後任務(即運行很慢的節點),這點與MapReduce[12]相似。而DSM則難以實現備份任務,由於任務及其副本都須要讀寫同一個內存位置。

與DSM相比,RDD模型有兩個好處。第一,對於RDD中的批量操做,運行時將根據數據存放的位置來調度任務,從而提升性能。第二,對於基於掃描的操做,若是內存不足以緩存整個RDD,就進行部分緩存。把內存放不下的分區存儲到磁盤上,此時性能與現有的數據流系統差很少。

最後看一下讀操做的粒度。RDD上的不少動做(如count和collect)都是批量讀操做,即掃描整個數據集,能夠將任務分配到距離數據最近的節點上。同時,RDD也支持細粒度操做,即在哈希或範圍分區的RDD上執行關鍵字查找。

3. Spark編程接口

Spark用Scala[5]語言實現了RDD的API。Scala是一種基於JVM的靜態類型、函數式、面向對象的語言。咱們選擇Scala是由於它簡潔(特別適合交互式使用)、有效(由於是靜態類型)。可是,RDD抽象並不侷限於函數式語言,也可使用其餘語言來實現RDD,好比像Hadoop[2]那樣用類表示用戶函數。

要使用Spark,開發者須要編寫一個driver程序,鏈接到集羣以運行Worker,如圖2所示。Driver定義了一個或多個RDD,並調用RDD上的動做。Worker是長時間運行的進程,將RDD分區以Java對象的形式緩存在內存中。
f2-spark-runtime
圖2 Spark的運行時。用戶的driver程序啓動多個worker,worker從分佈式文件系統中讀取數據塊,並將計算後的RDD分區緩存在內存中。

再看看2.4中的例子,用戶執行RDD操做時會提供參數,好比map傳遞一個閉包(closure,函數式編程中的概念)。Scala將閉包表示爲Java對象,若是傳遞的參數是閉包,則這些對象被序列化,經過網絡傳輸到其餘節點上進行裝載。Scala將閉包內的變量保存爲Java對象的字段。例如,var x = 5; rdd.map(_ + x) 這段代碼將RDD中的每一個元素加5。總的來講,Spark的語言集成相似於DryadLINQ。

RDD自己是靜態類型對象,由參數指定其元素類型。例如,RDD[int]是一個整型RDD。不過,咱們舉的例子幾乎都省略了這個類型參數,由於Scala支持類型推斷。

雖然在概念上使用Scala實現RDD很簡單,但仍是要處理一些Scala閉包對象的反射問題。如何經過Scala解釋器來使用Spark還須要更多工做,這點咱們將在第6部分討論。無論怎樣,咱們都不須要修改Scala編譯器。

3.1 Spark中的RDD操做

表2列出了Spark中的RDD轉換和動做。每一個操做都給出了標識,其中方括號表示類型參數。前面說過轉換是延遲操做,用於定義新的RDD;而動做啓動計算操做,並向用戶程序返回值或向外部存儲寫數據。

表3 Spark中支持的RDD轉換和動做
轉換 map(f : T ) U) : RDD[T] ) RDD[U]
filter(f : T ) Bool) : RDD[T] ) RDD[T]
flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U]
sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling)
groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])]
reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)]
union() : (RDD[T]; RDD[T]) ) RDD[T]
join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))]
cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))]
crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)]
mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning)
sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)]
partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)]
動做 count() : RDD[T] ) Long
collect() : RDD[T] ) Seq[T]
reduce(f : (T; T) ) T) : RDD[T] ) T
lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs)
save(path : String) : Outputs RDD to a storage system, e.g., HDFS

注意,有些操做只對鍵值對可用,好比join。另外,函數名與Scala及其餘函數式語言中的API匹配,例如map是一對一的映射,而flatMap是將每一個輸入映射爲一個或多個輸出(與MapReduce中的map相似)。

除了這些操做之外,用戶還能夠請求將RDD緩存起來。並且,用戶還能夠經過Partitioner類獲取RDD的分區順序,而後將另外一個RDD按照一樣的方式分區。有些操做會自動產生一個哈希或範圍分區的RDD,像groupByKey,reduceByKey和sort等。

4. 應用程序示例

如今咱們講述如何使用RDD表示幾種基於數據並行的應用。首先討論一些迭代式機器學習應用(4.1),而後看看如何使用RDD描述幾種已有的集羣編程模型,即MapReduce(4.2),Pregel(4.3),和Hadoop(4.4)。最後討論一下RDD不適合哪些應用(4.5)。

4.1 迭代式機器學習

不少機器學習算法都具備迭代特性,運行迭代優化方法來優化某個目標函數,例如梯度降低方法。若是這些算法的工做集可以放入內存,將極大地加速程序運行。並且,這些算法一般採用批量操做,例如映射和求和,這樣更容易使用RDD來表示。

例以下面的程序是邏輯迴歸[15]的實現。邏輯迴歸是一種常見的分類算法,即尋找一個最佳分割兩組點(即垃圾郵件和非垃圾郵件)的超平面w。算法採用梯度降低的方法:開始時w爲隨機值,在每一次迭代的過程當中,對w的函數求和,而後朝着優化的方向移動w。

1 val points = spark.textFile(...)
2      .map(parsePoint).persist()
3 var = // random initial vector
4 for (i <- 1 to ITERATIONS) {
5      val gradient = points.map{ p =>
6           p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
7      }.reduce((a,b) => a+b)
8      w -= gradient
9 }

首先定義一個名爲points的緩存RDD,這是在文本文件上執行map轉換以後獲得的,即將每一個文本行解析爲一個Point對象。而後在points上反覆執行map和reduce操做,每次迭代時經過對當前w的函數進行求和來計算梯度。7.1小節咱們將看到這種在內存中緩存points的方式,比每次迭代都從磁盤文件裝載數據並進行解析要快得多。

已經在Spark中實現的迭代式機器學習算法還有:kmeans(像邏輯迴歸同樣每次迭代時執行一對map和reduce操做),指望最大化算法(EM,兩個不一樣的map/reduce步驟交替執行),交替最小二乘矩陣分解和協同過濾算法。Chu等人提出迭代式MapReduce也能夠用來實現經常使用的學習算法[11]。

4.2 使用RDD實現MapReduce

MapReduce模型[12]很容易使用RDD進行描述。假設有一個輸入數據集(其元素類型爲T),和兩個函數myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],代碼以下:

1 data.flatMap(myMap)
2     .groupByKey()
3     .map((k, vs) => myReduce(k, vs))

若是任務包含combiner,則相應的代碼爲:

1 data.flatMap(myMap)
2     .reduceByKey(myCombiner)
3     .map((k, v) => myReduce(k, v))

ReduceByKey操做在mapper節點上執行部分彙集,與MapReduce的combiner相似。

4.3 使用RDD實現Pregel

Pregel[21]是面向圖算法的基於BSP範式[32]的編程模型。程序由一系列超步(Superstep)協調迭代運行。在每一個超步中,各個頂點執行用戶函數,並更新相應的頂點狀態,變異圖拓撲,而後向下一個超步的頂點集發送消息。這種模型可以描述不少圖算法,包括最短路徑,雙邊匹配和PageRank等。

以PageRank爲例介紹一下Pregel的實現。當前PageRank[7]記爲r,頂點表示狀態。在每一個超步中,各個頂點向其全部鄰居發送貢獻值r/n,這裏n是鄰居的數目。下一個超步開始時,每一個頂點將其分值(rank)更新爲 α/N + (1 - α) * Σci,這裏的求和是各個頂點收到的全部貢獻值的和,N是頂點的總數。

Pregel將輸入的圖劃分到各個worker上,並存儲在其內存中。在每一個超步中,各個worker經過一種相似MapReduce的Shuffle操做交換消息。

Pregel的通訊模式能夠用RDD來描述,如圖3。主要思想是:將每一個超步中的頂點狀態和要發送的消息存儲爲RDD,而後根據頂點ID分組,進行Shuffle通訊(即cogroup操做)。而後對每一個頂點ID上的狀態和消息應用用戶函數(即mapValues操做),產生一個新的RDD,即(VertexID, (NewState, OutgoingMessages))。而後執行map操做分離出下一次迭代的頂點狀態和消息(即mapValues和flatMap操做)。代碼以下:

1 val vertices = // RDD of (ID, State) pairs
2 val messages = // RDD of (ID, Message) pairs
3 val grouped = vertices.cogroup(messages)
4 val newData = grouped.mapValues {
5     (vert, msgs) => userFunc(vert, msgs)
6     // returns (newState, outgoingMsgs)
7 }.cache()
8 val newVerts = newData.mapValues((v,ms) => v)
9 val newMsgs = newData.flatMap((id,(v,ms)) => ms)

f3-iteration-pregel-using_rdd
圖3 使用RDD實現Pregel時,一步迭代的數據流。(方框表示RDD,箭頭表示轉換)
須要注意的是,這種實現方法中,RDD grouped,newData和newVerts的分區方法與輸入RDD vertices同樣。因此,頂點狀態一直存在於它們開始執行的機器上,這跟原Pregel同樣,這樣就減小了通訊成本。由於cogroup和mapValues保持了與輸入RDD相同的分區方法,因此分區是自動進行的。

完整的Pregel編程模型還包括其餘工具,好比combiner,附錄A討論了它們的實現。下面將討論Pregel的容錯性,以及如何在實現相同容錯性的同時減小須要執行檢查點操做的數據量。

咱們差很少用了100行Scala代碼在Spark上實現了一個類Pregel的API。7.2小節將使用PageRank算法評估它的性能。

4.3.1 Pregel容錯

當前,Pregel基於檢查點機制來爲頂點狀態及其消息實現容錯[21]。然而做者是這樣描述的:經過在其它的節點上記錄已發消息日誌,而後單獨重建丟失的分區,只須要恢復局部數據便可。上面提到這兩種方式,RDD都可以很好地支持。

經過4.3小節的實現,Spark老是可以基於Lineage實現頂點和消息RDD的重建,可是因爲過長的Lineage鏈,恢復可能會付出高昂的代價。由於迭代RDD依賴於上一個RDD,對於部分分區來講,節點故障可能會致使這些分區狀態的全部迭代版本丟失,這就要求使用一種「級聯-從新執行」[20]的方式去依次重建每個丟失的分區。爲了不這個問題,用戶能夠週期性地在頂點和消息RDD上執行save操做,將狀態信息保存到持久存儲中。而後,Spark可以在失敗的時候自動地從新計算這些丟失的分區(而不是回滾整個程序)。

最後,咱們意識到,RDD也可以實現檢查點數據的reduce操做,這要求經過一種高效的檢查點方案來表達檢查點數據。在不少Pregel做業中,頂點狀態都包括可變與不可變的組件,例如,在PageRank中,與一個頂點相鄰的頂點列表是不可變的,可是它們的排名是可變的,在這種狀況下,咱們可使用一個來自可變數據的單獨RDD來替換不可變RDD,基於這樣一個較短的Lineage鏈,檢查點僅僅是可變狀態,圖4解釋了這種方式。
f4-data-flow-of-pregel-using-rdd
圖4 通過優化的Pregel使用RDD的數據流。可變狀態RDD必須設置檢查點,不可變狀態纔可被快速重建。
在PageRank中,不可變狀態(相鄰頂點列表)遠大於可變狀態(浮點值),因此這種方式可以極大地下降開銷。

4.4 使用RDD實現HaLoop

HaLoop[8]是Hadoop的一個擴展版本,它可以改善具備迭代特性的MapReduce程序的性能。基於HaLoop編程模型的應用,使用reduce階段的輸出做爲map階段下一輪迭代的輸入。它的循環感知任務調度器可以保證,在每一輪迭代中處理同一個分區數據的連續map和reduce任務,必定可以在同一臺物理機上執行。確保迭代間locality特性,reduce數據在物理節點之間傳輸,而且容許數據緩存在本地磁盤而可以被後續迭代重用。

使用RDD來優化HaLoop,咱們在Spark上實現了一個相似HaLoop的API,這個庫只使用了200行Scala代碼。經過partitionBy可以保證跨迭代的分區的一致性,每個階段的輸入和輸出被緩存以用於後續迭代。

4.5 不適合使用RDD的應用

在2.1節咱們討論過,RDD適用於具備批量轉換需求的應用,而且相同的操做做用於數據集的每個元素上。在這種狀況下,RDD可以記住每一個轉換操做,對應於Lineage圖中的一個步驟,恢復丟失分區數據時不須要寫日誌記錄大量數據。RDD不適合那些經過異步細粒度地更新來共享狀態的應用,例如Web應用中的存儲系統,或者增量抓取和索引Web數據的系統,這樣的應用更適合使用一些傳統的方法,例如數據庫、RAMCloud[26]、Percolator[27]和Piccolo[28]。咱們的目標是,面向批量分析應用的這類特定系統,提供一種高效的編程模型,而不是一些異步應用程序。

5. RDD的描述及做業調度

咱們但願在不修改調度器的前提下,支持RDD上的各類轉換操做,同時可以從這些轉換獲取Lineage信息。爲此,咱們爲RDD設計了一組小型通用的內部接口。

簡單地說,每一個RDD都包含:(1)一組RDD分區(partition,即數據集的原子組成部分);(2)對父RDD的一組依賴,這些依賴描述了RDD的Lineage;(3)一個函數,即在父RDD上執行何種計算;(4)元數據,描述分區模式和數據存放的位置。例如,一個表示HDFS文件的RDD包含:各個數據塊的一個分區,並知道各個數據塊放在哪些節點上。並且這個RDD上的map操做結果也具備一樣的分區,map函數是在父數據上執行的。表3總結了RDD的內部接口。

表3 Spark中RDD的內部接口
操做 含義
partitions() 返回一組Partition對象
preferredLocations(p) 根據數據存放的位置,返回分區p在哪些節點訪問更快
dependencies() 返回一組依賴
iterator(p, parentIters) 按照父分區的迭代器,逐個計算分區p的元素
partitioner() 返回RDD是否hash/range分區的元數據信息

設計接口的一個關鍵問題就是,如何表示RDD之間的依賴。咱們發現RDD之間的依賴關係能夠分爲兩類,即:(1)窄依賴(narrow dependencies):子RDD的每一個分區依賴於常數個父分區(即與數據規模無關);(2)寬依賴(wide dependencies):子RDD的每一個分區依賴於全部父RDD分區。例如,map產生窄依賴,而join則是寬依賴(除非父RDD被哈希分區)。另外一個例子見圖5。
f5-rdd-narrow-and-wide-dependencies
圖5 窄依賴和寬依賴的例子。(方框表示RDD,實心矩形表示分區)
區分這兩種依賴頗有用。首先,窄依賴容許在一個集羣節點上以流水線的方式(pipeline)計算全部父分區。例如,逐個元素地執行map、而後filter操做;而寬依賴則須要首先計算好全部父分區數據,而後在節點之間進行Shuffle,這與MapReduce相似。第二,窄依賴可以更有效地進行失效節點的恢復,即只需從新計算丟失RDD分區的父分區,並且不一樣節點之間能夠並行計算;而對於一個寬依賴關係的Lineage圖,單個節點失效可能致使這個RDD的全部祖先丟失部分分區,於是須要總體從新計算。

經過RDD接口,Spark只須要不超過20行代碼實現即可以實現大多數轉換。5.1小節給出了例子,而後咱們討論了怎樣使用RDD接口進行調度(5.2),最後討論一下基於RDD的程序什麼時候須要數據檢查點操做(5.3)。

5.1 RDD實現舉例

HDFS文件:目前爲止咱們給的例子中輸入RDD都是HDFS文件,對這些RDD能夠執行:partitions操做返回各個數據塊的一個分區(每一個Partition對象中保存數據塊的偏移),preferredLocations操做返回數據塊所在的節點列表,iterator操做對數據塊進行讀取。

map:任何RDD上均可以執行map操做,返回一個MappedRDD對象。該操做傳遞一個函數參數給map,對父RDD上的記錄按照iterator的方式執行這個函數,並返回一組符合條件的父RDD分區及其位置。

union:在兩個RDD上執行union操做,返回兩個父RDD分區的並集。經過相應父RDD上的窄依賴關係計算每一個子RDD分區(注意union操做不會過濾重複值,至關於SQL中的UNION ALL)。

sample:抽樣與映射相似,可是sample操做中,RDD須要存儲一個隨機數產生器的種子,這樣每一個分區可以肯定哪些父RDD記錄被抽樣。

join:對兩個RDD執行join操做可能產生窄依賴(若是這兩個RDD擁有相同的哈希分區或範圍分區),多是寬依賴,也可能兩種依賴都有(好比一個父RDD有分區,而另外一父RDD沒有)。

5.2 Spark任務調度器

調度器根據RDD的結構信息爲每一個動做肯定有效的執行計劃。調度器的接口是runJob函數,參數爲RDD及其分區集,和一個RDD分區上的函數。該接口足以表示Spark中的全部動做(即count、collect、save等)。

總的來講,咱們的調度器跟Dryad相似,但咱們還考慮了哪些RDD分區是緩存在內存中的。調度器根據目標RDD的Lineage圖建立一個由stage構成的無迴路有向圖(DAG)。每一個stage內部儘量多地包含一組具備窄依賴關係的轉換,並將它們流水線並行化(pipeline)。stage的邊界有兩種狀況:一是寬依賴上的Shuffle操做;二是已緩存分區,它能夠縮短父RDD的計算過程。例如圖6。父RDD完成計算後,能夠在stage內啓動一組任務計算丟失的分區。
f6-spark-compute-stage
圖6 Spark怎樣劃分任務階段(stage)的例子。實線方框表示RDD,實心矩形表示分區(黑色表示該分區被緩存)。要在RDD G上執行一個動做,調度器根據寬依賴建立一組stage,並在每一個stage內部將具備窄依賴的轉換流水線化(pipeline)。 本例不用再執行stage 1,由於B已經存在於緩存中了,因此只須要運行2和3。

調度器根據數據存放的位置分配任務,以最小化通訊開銷。若是某個任務須要處理一個已緩存分區,則直接將任務分配給擁有這個分區的節點。不然,若是須要處理的分區位於多個可能的位置(例如,由HDFS的數據存放位置決定),則將任務分配給這一組節點。

對於寬依賴(例如須要Shuffle的依賴),目前的實現方式是,在擁有父分區的節點上將中間結果物化,簡化容錯處理,這跟MapReduce中物化map輸出很像。

若是某個任務失效,只要stage中的父RDD分區可用,則只需在另外一個節點上從新運行這個任務便可。若是某些stage不可用(例如,Shuffle時某個map輸出丟失),則須要從新提交這個stage中的全部任務來計算丟失的分區。

最後,lookup動做容許用戶從一個哈希或範圍分區的RDD上,根據關鍵字讀取一個數據元素。這裏有一個設計問題。Driver程序調用lookup時,只須要使用當前調度器接口計算關鍵字所在的那個分區。固然任務也能夠在集羣上調用lookup,這時能夠將RDD視爲一個大的分佈式哈希表。這種狀況下,任務和被查詢的RDD之間的並無明確的依賴關係(由於worker執行的是lookup),若是全部節點上都沒有相應的緩存分區,那麼任務須要告訴調度器計算哪些RDD來完成查找操做。

5.3 檢查點

儘管RDD中的Lineage信息能夠用來故障恢復,但對於那些Lineage鏈較長的RDD來講,這種恢復可能很耗時。例如4.3小節中的Pregel任務,每次迭代的頂點狀態和消息都跟前一次迭代有關,因此Lineage鏈很長。若是將Lineage鏈存到物理存儲中,再按期對RDD執行檢查點操做就頗有效。

通常來講,Lineage鏈較長、寬依賴的RDD須要採用檢查點機制。這種狀況下,集羣的節點故障可能致使每一個父RDD的數據塊丟失,所以須要所有從新計算[20]。將窄依賴的RDD數據存到物理存儲中能夠實現優化,例如前面4.1小節邏輯迴歸的例子,將數據點和不變的頂點狀態存儲起來,就再也不須要檢查點操做。

當前Spark版本提供檢查點API,但由用戶決定是否須要執行檢查點操做。從此咱們將實現自動檢查點,根據成本效益分析肯定RDD Lineage圖中的最佳檢查點位置。

值得注意的是,由於RDD是隻讀的,因此不須要任何一致性維護(例如寫複製策略,分佈式快照或者程序暫停等)帶來的開銷,後臺執行檢查點操做。

咱們使用10000行Scala代碼實現了Spark。系統可使用任何Hadoop數據源(如HDFS,Hbase)做爲輸入,這樣很容易與Hadoop環境集成。Spark以庫的形式實現,不須要修改Scala編譯器。

這裏討論關於實現的三方面問題:(1)修改Scala解釋器,容許交互模式使用Spark(6.1);(2)緩存管理(6.2);(3)調試工具rddbg(6.3)。

6. 實現

6.1 解釋器的集成

像Ruby和Python同樣,Scala也有一個交互式shell。基於內存的數據能夠實現低延時,咱們但願容許用戶從解釋器交互式地運行Spark,從而在大數據集上實現大規模並行數據挖掘。

Scala解釋器一般根據將用戶輸入的代碼行,來對類進行編譯,接着裝載到JVM中,而後調用類的函數。這個類是一個包含輸入行變量或函數的單例對象,並在一個初始化函數中運行這行代碼。例如,若是用戶輸入代碼var x = 5,接着又輸入println(x),則解釋器會定義一個包含x的Line1類,並將第2行編譯爲println(Line1.getInstance().x)。

在Spark中咱們對解釋器作了兩點改動:

  1. 類傳輸:解釋器可以支持基於HTTP傳輸類字節碼,這樣worker節點就能獲取輸入每行代碼對應的類的字節碼。
  2. 改進的代碼生成邏輯:一般每行上建立的單態對象經過對應類上的靜態方法進行訪問。也就是說,若是要序列化一個閉包,它引用了前面代碼行中變量,好比上面的例子Line1.x,Java不會根據對象關係傳輸包含x的Line1實例。因此worker節點不會收到x。咱們將這種代碼生成邏輯改成直接引用各個行對象的實例。圖7說明了解釋器如何將用戶輸入的一組代碼行解釋爲Java對象。

f7-spark-interpreter-translation
圖7 Spark解釋器如何將用戶輸入的兩行代碼解釋爲Java對象
Spark解釋器便於跟蹤處理大量對象關係引用,而且便利了HDFS數據集的研究。咱們計劃以Spark解釋器爲基礎,開發提供高級數據分析語言支持的交互式工具,好比相似SQL和Matlab。

6.2 緩存管理

Worker節點將RDD分區以Java對象的形式緩存在內存中。因爲大部分操做是基於掃描的,採起RDD級的LRU(最近最少使用)替換策略(即不會爲了裝載一個RDD分區而將同一RDD的其餘分區替換出去)。目前這種簡單的策略適合大多數用戶應用。另外,使用帶參數的cache操做能夠設定RDD的緩存優先級。

6.3 rddbg:RDD程序的調試工具

RDD的初衷是爲了實現容錯以可以再計算(re-computation),這個特性使得調試更容易。咱們建立了一個名爲rddbg的調試工具,它是經過基於程序記錄的Lineage信息來實現的,容許用戶:(1)重建任何由程序建立的RDD,並執行交互式查詢;(2)使用一個單進程Java調試器(如jdb)傳入計算好的RDD分區,可以從新運行做業中的任何任務。

咱們強調一下,rddbg不是一個徹底重放的調試器:特別是不對非肯定性的代碼或動做進行重放。但若是某個任務一直運行很慢(好比因爲數據分佈不均勻或者異常輸入等緣由),仍然能夠用它來幫助找到其中的邏輯錯誤和性能問題。

舉個例子,咱們使用rddbg去解決用戶Spam分類做業中的一個bug,這個做業中的每次迭代都產生0值。在調試器中從新執行reduce任務,很快就能發現,輸入的權重向量(存儲在一個用戶自定義的向量類中)居然是空值。因爲從一個未初始化的稀疏向量中讀取老是返回0,運行時也不會拋出異常。在這個向量類中設置一個斷點,而後運行這個任務,引導程序很快就運行到設置的斷點處,咱們發現向量類的一個數組字段的值爲空,咱們診斷出了這個bug:稀疏向量類中的數據字段被錯誤地使用transient來修飾,致使序列化時忽略了該字段的數據。

rddbg給程序執行帶來的開銷很小。程序原本就須要將各個RDD中的全部閉包序列化並經過網絡傳送,只不過使用rddbg同時還要將這些閉集記錄到磁盤。

7. 評估

咱們在Amazon EC2[1]上進行了一系列實驗來評估Spark及RDD的性能,並與Hadoop及其餘應用程序的基準進行了對比。總的說來,結果以下:
(1)對於迭代式機器學習應用,Spark比Hadoop快20多倍。這種加速比是由於:數據存儲在內存中,同時Java對象緩存避免了反序列化操做。
(2)用戶編寫的應用程序執行結果很好。例如,Spark分析報表比Hadoop快40多倍。
(3)若是節點發生失效,經過重建那些丟失的RDD分區,Spark可以實現快速恢復。
(4)Spark可以在5-7s延時範圍內,交互式地查詢1TB大小的數據集。
咱們基準測試首先從一個運行在Hadoop上的具備迭代特徵的機器學習應用(7.1)和PageRank(7.2)開始,而後評估在Spark中當工做集不能適應緩存(7.4)時系統容錯恢復能力(7.3),最後討論用戶應用程序(7.5)和交互式數據挖掘(7.6)的結果。
除非特殊說明,咱們的實驗使用m1.xlarge EC2 節點,4核15GB內存,使用HDFS做爲持久存儲,塊大小爲256M。在每一個做業運行執行時,爲了保證磁盤讀時間更加精確,咱們清理了集羣中每一個節點的操做系統緩存。

7.1 可迭代的機器學習應用

咱們實現了2個迭代式機器學習(ML)應用,Logistic迴歸和K-means算法,與以下系統進行性能對比:

  • Hadoop:Hadoop 0.20.0穩定版。
  • HadoopBinMem:在首輪迭代中執行預處理,經過將輸入數據轉換成爲開銷較低的二進制格式來減小後續迭代過程當中文本解析的開銷,在HDFS中加載到內存。
  • Spark:基於RDD的系統,在首輪迭代中緩存Java對象以減小後續迭代過程當中解析、反序列化的開銷。

咱們使用同一數據集在相同條件下運行Logistic迴歸和K-means算法:使用400個任務(每一個任務處理的輸入數據塊大小爲256M),在25-100臺機器,執行10次迭代處理100G輸入數據集(表4)。兩個做業的關鍵區別在於每輪迭代單個字節的計算量不一樣。K-means的迭代時間取決於更新聚類座標耗時,Logistic迴歸是非計算密集型的,可是在序列化和解析過程當中很是耗時。
因爲典型的機器學習算法須要數10輪迭代,而後再合併,咱們分別統計了首輪迭代和後續迭代計算的耗時,並從中發現,在內存中緩存RDD極大地加快了後續迭代的速度。

表4 用於Spark基準程序的數據
應用 數據描述 大小
Logistic迴歸 10億9維點數據 100G
K-means 10億10維點數據(k=10) 100G
PageRank 400萬Wikipedia文章超連接圖 49G
交互式數據挖掘 Wikipedia瀏覽日誌(2008-10~2009-4) 1TB

首輪迭代。在首輪迭代過程當中,三個系統都是從HDFS中讀取文本數據做爲輸入。圖9中「First Iteration」顯示了首輪迭代的柱狀圖,實驗中Spark快於Hadoop,主要是由於Hadoop中的各個分佈式組件基於心跳協議來發送信號帶來了開銷。HadoopBinMem是最慢的,由於它經過一個額外的MapReduce做業將數據轉換成二進制格式。
f8-first-iteration-bars
圖8 首輪迭代後Hadoop、HadoopBinMen、Spark運行時間對比

後續迭代。圖9顯示了後續迭代的平均耗時,圖8對比了不一樣聚類大小條件下耗時狀況,咱們發如今100個節點上運行Logistic迴歸程序,Spark比Hadoop、HadoopBinMem分別快25.三、20.7倍。從圖8(b)能夠看到,Spark僅僅比Hadoop、HadoopBinMem分別快1.九、3.2倍,這是由於K-means程序的開銷取決於計算(用更多的節點有助於提升計算速度的倍數)。

後續迭代中,Hadoop仍然從HDFS讀取文本數據做爲輸入,因此從首輪迭代開始Hadoop的迭代時間並無明顯的改善。使用預先轉換的SequenceFile文件(Hadoop內建的二進制文件格式),HadoopBinMem在後續迭代中節省了解析的代價,可是仍然帶來的其餘的開銷,如從HDFS讀SequenceFile文件並轉換成Java對象。由於Spark直接讀取緩存於RDD中的Java對象,隨着聚類尺寸的線性增加,迭代時間大幅降低。
f9-length-of-first-and-later-iterations
圖9:首輪及其後續迭代平均時間對比
理解速度提高。咱們很是驚奇地發現,Spark甚至賽過了基於內存存儲二進制數據的Hadoop(HadoopBinMem),幅度高達20倍之多,Hadoop運行慢是因爲以下幾個緣由:

  1. Hadoop軟件棧的最小開銷
  2. 讀數據時HDFS棧的開銷
  3. 將二進制記錄轉換成內存Java對象的代價

爲了估測1,咱們運行空的Hadoop做業,僅僅執行做業的初始化、啓動任務、清理工做就至少耗時25秒。對於2,咱們發現爲了服務每個HDFS數據塊,HDFS進行了屢次複製以及計算校驗和操做。

爲了估測3,咱們在單個節點上運行了微基準程序,在輸入的256M數據上計算Logistic迴歸,結果如表5所示。首先,在內存中的HDFS文件和本地文件的不一樣致使經過HDFS接口讀取耗時2秒,甚至數據就在本地內存中。其次,文本和二進制格式輸入的不一樣形成了解析耗時7秒的開銷。最後,預解析的二進制文件轉換爲內存中的Java對象,耗時3秒。每一個節點處理多個塊時這些開銷都會累積起來,然而經過緩存RDD做爲內存中的Java對象,Spark只須要耗時3秒。

表5 Logistic迴歸迭代時間
內存中的HDFS文件 內存中的本地文件 緩存的RDD
文本輸入

二進制輸入
15.38 (0.26)

8.38 (0.10)
13.13 (0.26)

6.86 (0.02)
2.93 (0.31)

2.93 (0.31)

7.2 PageRank

經過使用存儲在HDFS上的49G Wikipedia導出數據,咱們比較了使用RDD實現的Pregel與使用Hadoop計算PageRank的性能。PageRank算法經過10輪迭代處理了大約400萬文章的連接圖數據,圖10顯示了在30個節點上,Spark處理速度是Hadoop的2倍多,改進後對輸入進行Hash分區速度提高到2.6倍,使用Combiner後提高到3.6倍,這些結果數據也隨着節點擴展到60個時同步放大。
f10-compare-spark-and-hadoop
圖10 迭代時間對比

7.3 容錯恢復

基於K-means算法應用程序,咱們評估了在單點故障(SPOF)時使用Lneage信息建立RDD分區的開銷。圖11顯示了,K-means應用程序運行在75個節點的集羣中進行了10輪迭代,咱們在正常操做和進行第6輪迭代開始時一個節點發生故障的狀況下對耗時進行了對比。沒有任何失敗,每輪迭代啓動了400個任務處理100G數據。
f11-iteration-k-means-spof
圖11 SPOF時K-means應用程序迭代時間
第5輪迭代結束時大約耗時58秒,第6輪迭代時Kill掉一個節點,該節點上的任務都被終止(包括緩存的分區數據)。Spark調度器調度這些任務在其餘節點上從新並行運行,而且從新讀取基於Lineage信息重建的RDD輸入數據並進行緩存,這使得迭代計算耗時增長到80秒。一旦丟失的RDD分區被重建,平均迭代時間又回落到58秒。

7.4 內存不足時表現

到如今爲止,咱們能保證集羣中的每一個節點都有足夠的內存去緩存迭代過程當中使用的RDD,若是沒有足夠的內存來緩存一個做業的工做集,Spark又是如何運行的呢?在實驗中,咱們經過在每一個節點上限制緩存RDD所須要的內存資源來配置Spark,在不一樣的緩存配置條件下執行Logistic迴歸,結果如圖12。咱們能夠看出,隨着緩存的減少,性能平緩地降低。
f12-spark-performance-limit-cache-size-of-rdd
圖12 Spark上運行Logistic迴歸的性能表現

7.5 基於Spark構建的用戶應用程序

In-Memory分析。視頻分發公司Conviva使用Spark極大地提高了爲客戶處理分析報告的速度,之前基於Hadoop使用大約20個Hive[3]查詢來完成,這些查詢做用在相同的數據子集上(知足用戶提供的條件),可是在不一樣分組的字段上執行聚合操做(SUM、AVG、COUNT DISTINCT等)須要使用單獨的MapReduce做業。該公司使用Spark只須要將相關數據加載到內存中一次,而後運行上述聚合操做,在Hadoop集羣上處理200G壓縮數據並生成報耗時20小時,而使用Spark基於96G內存的2個節點耗時30分鐘便可完成,速度提高40倍,主要是由於不須要再對每一個做業重複地執行解壓縮和過濾操做。

城市交通建模。在Berkeley的Mobile Millennium項目[17]中,基於一系列分散的汽車GPS監測數據,研究人員使用並行化機器學習算法來推算公路交通擁堵情況。數據來自市區10000個互聯的公路線路網,還有600000個由汽車GPS裝置採集到的樣本數據,這些數據記錄了汽車在兩個地點之間行駛的時間(每一條路線的行駛時間可能跨多個公路線路網)。使用一個交通模型,經過推算跨多個公路網行駛耗時預期,系統可以估算擁堵情況。研究人員使用Spark實現了一個可迭代的EM算法,其中包括向Worker節點廣播路線網絡信息,在E和M階段之間執行reduceByKey操做,應用從20個節點擴展到80個節點(每一個節點4核),如圖13(a)所示:
f13-run-time-of-per-iteration
圖13 每輪迭代運行時間(a)交通建模應用程序(b)基於Spark的社交網絡的Spam分類
社交網絡Spam分類。Berkeley的Monarch項目[31]使用Spark識別Twitter消息上的Spam連接。他們在Spark上實現了一個相似7.1小節中示例的Logistic迴歸分類器,不一樣的是使用分佈式的reduceByKey操做並行對梯度向量求和。圖13(b)顯示了基於50G數據子集訓練訓練分類器的結果,整個數據集是250000的URL、至少10^7個與網絡相關的特徵/維度,內容、詞性與訪問一個URL的頁面相關。隨着節點的增長,這並不像交通應用程序那樣近似線性,主要是由於每輪迭代的固定通訊代價較高。

7.6 交互式數據挖掘

爲了展現Spark交互式處理大數據集的能力,咱們在100個m2.4xlarge EC2實例(8核68G內存)上使用Spark分析1TB從2008-10到2009-4這段時間的Wikipedia頁面瀏覽日誌數據,在整個輸入數據集上簡單地查詢以下內容以獲取頁面瀏覽總數:(1)所有頁面;(2)頁面的標題能精確匹配給定的關鍵詞;(3)頁面的標題能部分匹配給定的關鍵詞。
f14-response-time-of-interactive-queries
圖14 顯示了分別在整個、1/二、1/10的數據上查詢的響應時間,甚至1TB數據在Spark上查詢僅耗時5-7秒,這比直接操做磁盤數據快幾個數量級。例如,從磁盤上查詢1TB數據耗時170秒,這代表了RDD緩存使得Spark成爲一個交互式數據挖掘的強大工具。

8. 相關工做

分佈式共享內存(DSM)。RDD能夠當作是一個基於DSM研究[24]獲得的抽象。在2.5節咱們討論過,RDD提供了一個比DSM限制更嚴格的編程模型,並能在節點失效時高效地重建數據集。DSM經過檢查點[19]實現容錯,而Spark使用Lineage重建RDD分區,這些分區能夠在不一樣的節點上從新並行處理,而不須要將整個程序回退到檢查點再從新運行。RDD可以像MapReduce同樣將計算推向數據[12],並經過推測執行來解決某些任務計算進度落後的問題,推測執行在通常的DSM系統上是很難實現的。

In-Memory集羣計算。Piccolo[28]是一個基於可變的、In-Memory的分佈式表的集羣編程模型。由於Piccolo容許讀寫表中的記錄,它具備與DSM相似的恢復機制,須要檢查點和回滾,可是不能推測執行,也沒有提供相似groupBy、sort等更高級別的數據流算子,用戶只能直接讀取表單元數據來實現。可見,Piccolo是比Spark更低級別的編程模型,可是比DSM要高級。

RAMClouds[26]適合做爲Web應用的存儲系統,它一樣提供了細粒度讀寫操做,因此須要經過記錄日誌來實現容錯。

數據流系統。RDD借鑑了DryadLINQ[34]、Pig[25]和FlumeJava[9]的「並行收集」編程模型,經過容許用戶顯式地將未序列化的對象保存在內存中,以此來控制分區和基於key隨機查找,從而有效地支持基於工做集的應用。RDD保留了那些數據流系統更高級別的編程特性,這對那些開發人員來講也比較熟悉,並且,RDD也可以支持更多類型的應用。RDD新增的擴展,從概念上看很簡單,其中Spark是第一個使用了這些特性的系統,相似DryadLINQ編程模型,可以有效地支持基於工做集的應用。

面向基於工做集的應用,已經開發了一些專用系統,像Twister[13]、HaLoop[8]實現了一個支持迭代的MapReduce模型;Pregel[21],支持圖應用的BSP計算模型。RDD是一個更通用的抽象,它可以描述支持迭代的MapReduce、Pregel,還有現有一些系統未能處理的應用,如交互式數據挖掘。特別地,它可以讓開發人員動態地選擇操做來運行在RDD上(如查看查詢的結果以決定下一步運行哪一個查詢),而不是提供一系列固定的步驟去執行迭代,RDD還支持更多類型的轉換。

最後,Dremel[22]是一個低延遲查詢引擎,它面向基於磁盤存儲的大數據集,這類數據集是把嵌套記錄數據生成基於列的格式。這種格式的數據也可以保存爲RDD並在Spark系統中使用,但Spark也具有將數據加載到內存來實現快速查詢的能力。

Lineage。咱們經過參考[6]到[10]作過調研,在科學計算和數據庫領域,對於一些應用,如須要解釋結果以及容許被從新生成、工做流中發現了bug或者數據集丟失須要從新處理數據,表示數據的Lineage和原始信息一直以來都是一個研究課題。RDD提供了一個受限的編程模型,在這個模型中使用細粒度的Lineage來表示是很是容易的,所以它能夠被用於容錯。

緩存系統。Nectar[14]可以經過識別帶有程序分析的子表達式,跨DryadLINQ做業重用中間結果,若是將這種能力加入到基於RDD的系統會很是有趣。可是Nectar並無提供In-Memory緩存,也不可以讓用戶顯式地控制應該緩存那個數據集,以及如何對其進行分區。Ciel[23]一樣可以記住任務結果,但不能提供In-Memory緩存並顯式控制它。

語言迭代。DryadLINQ[34]可以使用LINQ獲取到表達式樹而後在集羣上運行,Spark系統的語言集成與它很相似。不像DryadLINQ,Spark容許用戶顯式地跨查詢將RDD存儲到內存中,並經過控制分區來優化通訊。Spark支持交互式處理,但DryadLINQ卻不支持。

關係數據庫。從概念上看,RDD相似於數據庫中的視圖,緩存RDD相似於物化視圖[29]。然而,數據庫像DSM系統同樣,容許典型地讀寫全部記錄,經過記錄操做和數據的日誌來實現容錯,還須要花費額外的開銷來維護一致性。RDD編程模型經過增長更多限制來避免這些開銷。

9. 總結

咱們提出的RDD是一個面向,運行在普通商用機集羣之上並行數據處理應用的分佈式內存抽象。RDD普遍支持基於工做集的應用,包括迭代式機器學習和圖算法,還有交互式數據挖掘,然而它保留了數據流模型中引人注目的特色,如自動容錯恢復,處理執行進度落後的任務,以及感知調度。它是經過限制編程模型,進而容許高效地重建RDD分區來實現的。RDD實現處理迭代式做業的速度超過Hadoop大約20倍,並且還可以交互式查詢數百G數據。

相關文章
相關標籤/搜索