Apache Spark 2.2.0 中文文檔 - Spark RDD(Resilient Distributed Datasets)論文 | ApacheCN

Spark RDD(Resilient Distributed Datasets)論文

概要

爲了能解決程序員能在大規模的集羣中以一種容錯的方式進行內存計算這個問題, 咱們提出了 RDDs 的概念. 當前的不少框架對迭代式算法場景與交互性數據挖掘場景的處理性能很是差, 這個是 RDDs 的提出的動機. 若是能將數據保存在內存中, 將會使的上面兩種場景的性能提升一個數量級. 爲了能達到高效的容錯, RDDs 提供了一種受限制的共享內存的方式, 這種方式是基於粗粒度的轉換共享狀態而非細粒度的更新共享狀態. 然而, 咱們分析代表 RDDs 能夠表達出不少種類的計算, 包括目前專門從事迭代任務的編程計算模型, 好比 Pregel, 固然也能夠表達出目前模型表達不出的計算. 咱們經過 Spark 系統來實現了 RDDs, 而且經過各類各樣的用戶應用和測試來評估了這個系統.html

1: 介紹

像 MapReduce 和 Dryad 等分佈式計算框架已經普遍應用於大數據集的分析. 這些系統可讓用戶不用擔憂分佈式工做以及容錯, 而是使用一系列的高層次的操做 api 來達到並行計算的目的.java

雖然當前的框架提供了大量的對訪問利用計算資源的抽象, 可是它們缺乏了對利用分佈式內存的抽象.樣使的它們在處理須要在多個計算之間複用中間結果的應用的時候會很是的不高效. 數據的複用在迭代機器學習和圖計算領域(好比 PageRank, K-means 以及線性迴歸等算法)是很常見的. 在交互式數據挖中, 一個用戶會常常對一個相同的數據子集進行屢次不一樣的特定查詢, 因此數據複用在交互式數據挖掘也是很常見的. 然而, 目前的大部分的框架對計算之間的數據複用的處理方式就是將中間數據寫到一個靠穩定的系統中(好比分佈式文件系統), 這樣會因爲數據的複製備份, 磁盤的 I/O 以及數據的序列化而致應用任務執行很費時間.git

認識到這個問題後, 研究者們已經爲一些須要中間數據複用的應用開發出了一些特殊的框架.好比Pregel 在作迭代式圖計算的時候會將中間結果放在內存中. HaLoop 也提供了迭代式 MapReduce 接口.然而, 這些框架僅僅支持一些特殊的計算模式(好比循環一系列的 MapReduce 步驟), 而且它們是隱式的爲些計算模式提供數據共享. 它們沒有提供更加廣泛數據複用的抽象, 好比可讓用戶加載幾個數據集到存中而後對這些內存中的數據集進行專門的查詢.程序員

在這篇論文中, 咱們提出了一個全新的抽象, 叫作 RDDs, 它能夠高效的處理普遍的應用中涉及到的數據用的場景. RDDs 是一個能夠容錯且並行的數據結構, 它可讓用戶顯式的將中間結果數據集保存在內中、控制數據集的分區來達到數據存放處理最優以及可使用豐富的操做 api 來操做數據集在設計 RDDs 的時候, 最大的挑戰是定義一個能夠高效容錯的編程接口. 已經存在的分佈式內存抽象系統好比 distributed shared memory、key-value stores、databases 以及 Poccolo, 都是提供了基於粒度的更新可變狀態(好比 table 中的 cells)的接口, 基於這種接口下, 保證容錯的方式無非是將數據復備份到多臺機器或者在多臺機器上記錄更新的日誌, 這兩種方式在數據密集性的工做任務中都是很是的時的, 由於須要經過網絡傳輸在機器節點間複製大量的數據, 寬帶傳輸數據的速度遠遠比 RAM 內存慢, 而這兩種方式會佔用大量的存儲空間.github

與這些系統相反, RDDs 提供了基於粗粒度轉換(好比 map, filter 以及 join)的接口, 這些接口能夠對多的數據條目應用相同的操做.這樣就能夠經過記錄來生成某個數據集的一系列轉換 (就是這個數據集 lineage)而不是記錄真實的數據來達到提供高效的容錯機制. 這個 RDD 就有足夠的信息知道它是從哪 RDDs 轉換計算來的, 若是一個 RDD 的分區數據丟失掉了, 那麼從新計算這個 RDD 所依賴的那個 RDD 對應的區就好了. 所以能夠很快且不用經過複製備份方式來恢復丟失的數據.web

雖然基於粗粒度的轉換一開始看起來受限制, 可是 RDDs 很是適合不少並行計算的應用, 由於這些應用基都是在大量的數據元素上應用相同的操做方法. 事實上, 咱們分析代表 RDDs 不只能夠高效的表達出目前括 MapReduce, DryadLINQ, SQL, Pregel 以及 HaLoop 等系統提出的分佈式編程模型, 並且還能表達它們表達不了的新的應用的計算模型, 好比交互型數據挖掘. 咱們相信, RDDs 解決那些新的框架提出來計算需求的能力將會成爲是 RDD 抽象強大的最有力證據.算法

咱們在 Spark 系統中實現了 RDDs, 這個系統已經在 UC Berkeley 以及好些個公司中應用於研究和生產應中.Spark 和 DryadLINQ 相似使用scala語言提供了很方便語言集成編程接口.另外, Spark能夠利用 scala 的解釋器來對大數據集進行交互式的查詢.咱們相信 spark 是首個容許使用多種編程語言來進行分佈式內存中交互式數據挖掘的系統.shell

咱們經過爲基準測試以及用戶應用的測試兩個方面來評估了 RDDs 和 spark. 咱們分析顯示, Spark 在迭代應用中能夠比 hadoop 快上 20 倍以上、使的現實中的數據分析報表的速度提高了 40 倍以及使的交互式的掃1TB數據集的延遲在 5-7 秒. 更重要的是, 爲了彰顯 RDDs 的廣泛性, 咱們基於spark 用相對較小的程序(每一個包只有 200 行代碼)實現了 Pregel 和 HaLoop 的編程模型, 包括它們使用的數據分佈優化. 本篇論文以 RDDs(第二節)和 Spark(第三節)的概述開始. 而後在第四節中討論 了RDD s內部的表達、在第節中討論了咱們的實現以及在第六節中討論了實驗結果. 最後, 咱們討論了 RDDs 是怎麼樣來表達如今已存在的幾個系統的編程模型(第七節)、調查相關工做(第八節)以及總結.數據庫

2: Resilient Distributed Datasets(RDDs)

這節主要講述 RDDs 的概要, 首先定義 RDDs(2.1)以及介紹 RDDs 在 spark 中的編程接口(2.2), 而後對 RDDs 和細粒度共享內存抽象進行的對比(2.3).最後咱們討論了 RDD 模型的限制性.apache

2.1 RDD 抽象

一個 RDD 是一個只讀, 被分區的數據集.咱們能夠經過兩種對穩定的存儲系統和其餘的 RDDs 進行操做而建立一個新的 RDDs.爲了區別開 RDDs 的其餘操做, 咱們稱這些操做爲 transformations, 好比 map, filter 以及 join 等都是 transformations 操做.

RDDs 並不要始終被具體化, 一個 RDD 有足夠的信息知道本身是從哪一個數據集計算而來的(就是所謂的依賴血統), 這是一個很是強大的屬性:其實, 一個程序你能引用一個不能從失敗中從新構建的 RDD.

最後, 用戶能夠控制 RDDs 的兩個方面:數據存儲和分區.對於須要複用的 RDD, 用戶能夠明確的選擇一個數據存儲策略(好比內存緩存). 他們也能夠基於一個元素的 key 來爲 RDD 全部的元素在機器節點間進行數據分區, 這樣很是利於數據分佈優化, 好比給兩個數據集進行相同的 hash 分區, 而後進行 join, 能夠提升 join 的性能.

2.2 Spark 編程接口

Spark 和 DryadLINQ 和 FlumeJava 同樣經過集成編程語言 api 來暴露 RDDs, 這樣的話, 每個數據集就表明一個對象, 咱們能夠調用這個對象中的方法來操做這個對象.

編程者能夠經過對穩定存儲的數據進行轉換操做(即 transformations, 好比 map 和 filter 等)來獲得一個或者多個 RDDs. 而後能夠對這些 RDDs 進行 actions 操做, 這些操做能夠是獲得應用的結果值, 也能夠是將結果數據寫入到存儲系統中, actions 包括: count(表示返回這個數據集的元素的個數)、collect(表示返回數據集的全部元素)以及 save(表示將輸出結果寫入到存儲系統中). 和 DryadLINQ 同樣, spark 在定義 RDDs 的時候並不會真正的計算, 而是要等到對這個 RDDs 觸發了 actions 操做纔會真正的觸發計算, 這個稱之爲 RDDs 的 lazy 特性, 因此咱們能夠先對 transformations 進行組裝一系列的 pipelines, 而後再計算.

另外, 編程者能夠經過調用 RDDs 的 persist 方法來緩存後續須要複用的 RDDs. Spark 默認是將緩存數據放在內存中, 可是若是內存不足的話則會寫入到磁盤中. 用戶能夠經過 persist 的參數來調整緩存策略, 好比只將數據存儲在磁盤中或者複製備份數據到多臺機器. 最後, 用戶能夠爲每個 RDDs 的緩存設置優先級, 以達到哪一個在內存中的 RDDs 應該首先寫道磁盤中

2.2.1 例子 – 監控日誌數據挖掘

假設一個 web 服務正發生了大量的錯誤, 而後運維人員想從存儲在 hdfs 中的幾 TB 的日誌中找出錯誤的緣由. 運維人員能夠經過 spark 將日誌中的錯誤信息加載到分佈式的內存中, 而後對這些內存中的數據進行查詢. 她首先須要寫下面的 scala 代碼:

line = spark.textFile("hdfs://..")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第一行表示從一個 HDFS 文件(許多行的文件數據集)上定義了一個 RDD, 第二行表示基於前面定義的 RDD 進行過濾數據.第三行將過濾後的 RDD 結果存儲在內存中, 以達到多個對這個共享 RDD 的查詢. 須要注意的事, filter 的參數是 scala 語法中的閉包.

到目前爲止, 集羣上尚未真正的觸發計算.然而, 用戶能夠對RDD進行action操做, 好比對錯誤信息的計數:

errors.count()

用戶也能夠繼續對 RDD 進行 transformations 操做, 而後計算其結果, 好比:

//對錯誤中含有 」MySQL」 單詞的數據進行計數
errors.filters(_.contains("MySQL")).count()

//返回錯誤信息中含有 "HDFS" 字樣的信息中的時間字段的值(假設每行數據的字段是以 tab 來切分的, 時間字段是第 3 個字段)
errors.filter(_.contains("HDFS"))
        .map(_.split("\t")(3))
        .collect()

在對 errors 第一次作 action 操做的後, spark 會將 errors 的全部分區的數據存儲在內存中, 這樣後面對 errors 的計算速度會有很大的提高.須要注意的是, 像 lines 這種基礎數據的 RDD 是不會存儲在內存中的.由於包含錯誤信息的數據可能只是整個日誌數據的一小部分, 因此將包含錯誤數據的日誌放在內存中是比較合理的.

最後, 爲了說明咱們的模型是如何達到容錯的, 咱們在圖一種展現了第三個查詢的血緣關係圖(lineage graph).在這個查詢種, 咱們以對 lines 進行過濾後的 errors 開始, 而後在對 errors 進行了 filter 和 map 操做, 最後作了 action 操做即 collect. Spark 會最後兩個 transformations 組成一個 pipeline, 而後將這個 pipeline 分解成一系列的 task, 最後將這些 task 調度到含有 errors 緩存數據的機器上進行執行. 此外, 若是 errors 的一個分區的數據丟失了, spark 會對 lines 的相對應的分區應用 filter 函數來從新建立 errors 這個分區的數據


圖一: 咱們例子中第三個查詢的血緣關係圖, 其中方框表示 RDDs, 箭頭表示轉換

2.3 RDD 模型的優點

爲了理解做爲分佈式內存抽象的 RDDs 的好處, 咱們在表一種用 RDDs 和分佈式共享內存系統(Distributed shared memory 即 DSM)進行了對比. 在全部的 DSM 系統中, 應用從一個全局的地址空間中的任意位置中讀寫數據. 須要注意的是, 依據這個定義, 咱們所說的 DSM 系統不只包含了傳統的共享內存系統, 還包含了對共享狀態的細粒度寫操做的其餘系統(好比 Piccolo), 以及分佈式數據庫. DSM 是一個很廣泛的抽象, 可是這個廣泛性使得它在商用集羣中實現高效且容錯的系統比較困難.

Aspect(概念) RDDs Distribute shared memory(分佈式共享內存)
Reads 粗粒度或者細粒度 細粒度
Writes 粗粒度 細粒度
數據一致性 不重要的(由於RDD是不可變的) 取決於app 或者 runtime
容錯 利用lineage達到細粒度且低延遲的容錯 須要應用checkpoints(就是須要寫磁盤)
而且須要程序回滾
計算慢的任務 能夠利用備份的任務來解決 很難作到
計算數據的位置 自動的機遇數據本地性 取決於app (runtime是以透明爲目標的)
內存不足時的行爲 和已經存在的數據流處理系統同樣, 寫磁盤 很是糟糕的性能(須要內存的交換?)

表一: RDDs 和 Distributed shared memory 對比

RDDs 只能經過粗粒度的轉換被建立(或者被寫) , 然而 DSM 容許對每個內存位置進行讀寫, 這個是 RDDs 和 DSM 最主要的區別. 這樣使都 RDDs在 應用中大量寫數據受到了限制, 可是可使的容錯變的更加高效. 特別是, RDDs 不須要發生很是耗時的 checkpoint 操做, 由於它能夠根據 lineage 進行恢復數據 . 並且, 只有丟掉了數據的分區纔會須要從新計算, 並不須要回滾整個程序, 而且這些從新計算的任務是在多臺機器上並行運算的.

RDDs 的第二個好處是:它不變的特性使的它能夠和 MapReduce 同樣來運行執行很慢任務的備份任務來達到緩解計算很慢的節點的問題. 在 DSM 中, 備份任務是很難實現的, 由於原始任務和備份任務或同時更新訪問同一個內存地址和接口.

最後, RDDs 比 DSM 多提供了兩個好處. 第一, 在對 RDDs 進行大量寫操做的過程當中, 咱們能夠根據數據的本地性來調度 task 以提升性能. 第二, 若是在 scan-base 的操做中, 且這個時候內存不足以存儲這個 RDDs, 那麼 RDDs 能夠慢慢的從內存中清理掉. 在內存中存儲不下的分區數據會被寫到磁盤中, 且提供了和現有並行數據處理系統相同的性能保證.

2.4 不適合用 RDDs 的應用

通過上面的討論介紹, 咱們知道 RDDs 很是適合將相同操做應用在整個數據集的全部的元素上的批處理應用. 在這些場景下, RDDs 能夠利用血緣關係圖來高效的記住每個 transformations 的步驟, 而且不須要記錄大量的數據就能夠恢復丟失的分區數據. RDDs 不太適合用於須要異步且細粒度的更新共享狀態的應用, 好比一個 web 應用或者數據遞增的 web 爬蟲應用的存儲系統. 對於這些應用, 使用傳統的紀錄更新日誌以及對數據進行 checkpoint 會更加高效. 好比使用數據庫、RAMCloud、Percolator 以及 Piccolo. 咱們的目標是給批量分析提供一個高效的編程模型, 對於這些異步的應用須要其餘的特殊系統來實現.

3 Spark 編程接口

Spark 使用 scala 語言實現了抽象的 RDD, scala 是創建在 java VM 上的靜態類型函數式編程語言. 咱們選擇 scala 是由於它結合了簡潔(很方便進行交互式使用)與高效(因爲它的靜態類型). 然而, 並非說 RDD 的抽象須要函數式語言來實現.

開發員須要寫鏈接集羣中的 workers 的 driver 程序來使用 spark, 就好比圖 2 展現的. Driver 端程序定義了一系列的 RDDs 而且調用了 RDD 的 action 操做. Driver 的程序同時也會跟蹤 RDDs 之間的的血緣關係. workers 是能夠將 RDD 分區數據存儲在內存中的長期存活的進程.


圖二: 這個是 Spark 運行時的圖, 用戶寫的 driver 端程序啓動多個 workers, 這些 workers 能夠從分佈書的存儲系統中讀取數據塊而且能夠將計算出來的 RDD 分區數據存放在內存中.

在 2.2.1 小節中的日誌挖掘例子中, 咱們提到, 用戶提供給 RDD 操做好比 map 以參數做爲這個操做的閉包(說白了就是函數). Scala 將這些函數看做一個 java 對象, 這些對象是能夠序列化的, 而且能夠經過網絡傳輸傳輸到其餘的機器節點上的. Scala 將函數中的變量看做一個對象中的變量. 好比, 咱們能夠寫一段這樣的代碼: var x = 5; rdd.map(_ + 5)來達到給這個 RDD 每個元素加上 5 的目的.

RDDs 是被一元素類型參數化的靜態類型對象, 好比, RDD[Int] 表示一個類型爲整數的 RDD. 然而, 咱們不少例子中的 RDD 都會省去這個類型, 這個是由於 scala 支持類型推斷.

雖然咱們用 scala 實現 RDD 的方法很簡單, 可是咱們須要處理用反射實現的閉包對象相關的工做, 咱們還須要作不少的工做使的 spark 能夠用 scala 的解釋器, 這個咱們在 5.2 小節中會討論到. 儘管如此, 咱們是不須要修改 scala 的編譯器的.

3.1 Spark 中 RDD 的操做

表 2 中列舉了 Spark 中 RDD 經常使用的 transformations 和 actions 操做, 且描述了每個方法的簽名以及類型.咱們須要記住 transformations 是用來定義一個新的 RDD 的 lazy 操做, 而actions 是真正觸發一個能返回結果或者將結果寫到文件系統中的計算.


表二: Spark 中 RDD 經常使用的 transformations 和 actions 操做.Seq[T] 表示元素類型爲 T 的一個列表.

須要注意的是, 一些操做好比 join 只適合用於 key-value 類型的 RDDs. 咱們取的函數的名稱和 scala 或者其餘函數式編程語言的函數名是一致的. 好比, map 是一個 one-to-one 的映射操做, 而 flatMap 的每個輸入值會對應一個或者更多的輸出值(有點像 MapReduce 中的 map)

除了這些操做, 用戶能夠經過 persist 操做來請求緩存 RDD. 另外, 用戶能夠拿到被 Partitioner 分區後的分區數以及根據 Partitioner 對另外一個 dataset 進行分區. 像 groupByKey、reduceByKey 以及 sort 等操做都是通過了hash 或者 rang 分區後的 RDD.

3.2 舉例應用

咱們用兩個迭代式的應用:線性迴歸和 PageRank 來補充 2.2.1 提到的數據挖掘的例子. 稍後也會展現下如何控制 RDD 的分區以達到提高性能的目的.

3.2.1 線性迴歸

不少的機器學習算法通常都是迭代式的計算, 由於它們須要跑迭代的優化程序(好比梯度降低)來達到最大化功能. 他們將數據存放在內存中以達到很快的速度.

做爲一個例子, 下面的程序實現了線性迴歸, 一個能找到最佳區分兩種點集(垃圾郵件以及非垃圾郵件)的超平面 w 的經常使用的分類算法. 這個算法用了梯度降低的方法:一個隨機的值做爲 w 的初始值, 每次迭代都會將含有 w 的方法應用到每個數據點而後累加獲得梯度值, 而後將 w 往改善結果的方向移動.


一開始咱們定義一個叫 points 的 RDD, 這個 RDD 從一個文本文件中通過 map 將每一行轉換爲 Point 對象獲得. 而後咱們重複對 points 進行 map 和 reduce 操做計算出每一步的梯度值. 在迭代之間咱們將 points 存放在內存中可使的性能提升 20 倍, 咱們將會在 6.1 節中討論.

3.2.2 PageRank

在 PageRank 中數據共享更加複雜. 若是一個文檔引用另外一個文檔, 那被引用的文檔的排名值(rank)須要加上引用的文檔發送過來的貢獻值, 固然這個過程是個迭代的過程. 在每一次迭代中, 每個文檔都會發送 r/n 的貢獻值給它的鄰居, 其中 r 表示這個文檔的排名值, n 表示這個文檔的鄰居數量. 而後更新文檔的排名值爲, 這個表達式值表示這個文檔收到的貢獻值, N 表示全部的文檔的數量, 咱們能夠用以下的 spark 代碼來表達 PageRank:

其中 links 表示( URL , outlinks )鍵值對. 這個程序的 RDD 的血緣關係圖如圖三. 在每一次迭代中咱們都是根據上一次迭代的 contribs 和 ranks 以及原始不變的 links 數據集來建立一個新的 ranks 數據集. 隨着迭代次數的變多這張圖會變的越長, 這個是這個圖比較有意思的特色. 若是這個 job 的迭代次數不少的話, 那麼備份一些版本的 ranks 來達到減小從錯誤中恢復出來的時間是頗有必要的, 用戶能夠調用標記爲 RELIABLE 的 persist 函數來達到這個目的. 須要注意的是, links 是不須要備份的, 由於它的分區數據能夠快速的從從新計算輸入文件中對應的數據塊而獲得, 這個數據集通常會比 ranks 數據集大上不少倍, 由於每個文檔會有不少的鏈接但只會有一個排名值, 因此利用 RDD 的血緣關係來恢復數據確定比 checkpoint 內存中的數據快不少(由於數據量太大).

最後, 咱們能夠控制 RDDs 的分區方式來優化 PageRank 中的節點通信. 若是咱們事先爲 links 指定一個分區方式(好比, 根據 link 的 url 來 hash 分區, 就是將相同的 url 發送到同一個節點中), 而後咱們對 ranks 進行相同的分區方式, 這樣就能夠保證 links 和 ranks 之間的 join 不須要機器節點之間的通信(由於相同的 url 都在同一個機器節點了, 那麼相對應的 rank 和 link 確定也是在同一個機器節點了). 咱們也能夠自定義分區器來實現將一組頁面 url 放到一塊兒(好比按照 url 的 domain 進行分區). 以上兩種優化方式均可以經過在定義 links 的時候調用 partitionBy 來實現:

在調用了 partitionBy 後, links 和 ranks 之間的 join 操做會自動的在 link 所在的機器進行每個 URL 的貢獻值的聚合計算, 而後在相同的機器計算新的排名值, 而後計算出來的新的 ranks 在相同的機器和 links 進行 join. 這種在迭代之間進行數據一致分區是像 Pregel 這種框架中的主要的優化計算方式. RDDs 使的用戶能夠直接本身來實現這種優化機制.

4 表達 RDDs

在抽象 RDDs 的過程當中, 怎麼表達出 RDDs 能跟蹤不少的 transformations 操做之間血緣關係是一個比較大的挑戰. 理想的狀況下, 一個實現 RDDs 系統應該是儘量多的提供 transformations 操做(好比表二中的操做), 而且可讓用戶以任意的方式來組合這些 transformations 操做. 咱們提出了基於圖的 RDDs 展示方式來達到以上的目的. 咱們在 spark 中利用這種展示方式達到了在不須要給調度系統爲每個 transformation 操做增長任何的特殊邏輯就能夠支持大量的 transformations 操做, 這樣極大的簡化了咱們的系統設計.

歸納的說, 如下五個信息能夠表達 RDDs: 一個分區列表, 每個分區就是數據集的原子塊. 一個父親 RDDs 的依賴列表. 一個計算父親的數據集的函數. 分區模式的元數據信息以及數據存儲信息. 好比, 基於一個 HDFS 文件建立出來的的 RDD 中文件的每個數據塊就是一個分區, 而且這個 RDD 知道每個數據塊存儲在哪些機器上, 同時, 在這個 RDD 上進行 map 操做後的結果有相同的分區數, 當計算元素的時候, 將 map 函數應用到父親 RDD 數據中的. 咱們在表三總結了這些接口:

操做接口 含義
partitions() 返回一個分區對象的列表
preferredLocations(p) 分區p數據存儲在哪些機器節點中
dependencies() 返回一個依賴列表
iterator(p, parentIters) 根據父親分區的數據輸入計算分區p的全部數據
partitioner() 返回這個RDD是hash仍是range分區的元數據信息

表三: Spark 中表達 RDDs 的接口

在設計如何表達 RDDs 之間依賴的接口是一個很是有意思的問題. 咱們發現將依賴定義成兩種類型就足夠了: 窄依賴, 表示父親 RDDs 的一個分區最多被子 RDDs 一個分區所依賴. 寬依賴, 表示父親 RDDs 的一個分區能夠被子 RDDs 的多個子分區所依賴. 好比, map 操做是一個窄依賴, join 操做是一個寬依賴操做(除非父親 RDDs 已經被 hash 分區過), 圖四顯示了其餘的例子:


圖四:窄依賴和寬依賴的例子.每個方框表示一個 RDD , 帶有顏色的矩形表示分區

如下兩個緣由使的這種區別頗有用, 第一, 窄依賴可使得在集羣中一個機器節點的執行流計算全部父親的分區數據, 好比, 咱們能夠將每個元素應用了 map 操做後緊接着應用 filter 操做, 與此相反, 寬依賴須要父親 RDDs 的全部分區數據準備好而且利用相似於 MapReduce 的操做將數據在不一樣的節點之間進行從新洗牌和網絡傳輸. 第二, 窄依賴從一個失敗節點中恢復是很是高效的, 由於只須要從新計算相對應的父親的分區數據就能夠, 並且這個從新計算是在不一樣的節點進行並行重計算的, 與此相反, 在一個含有寬依賴的血緣關係 RDDs 圖中, 一個節點的失敗可能致使一些分區數據的丟失, 可是咱們須要從新計算父 RDD 的全部分區的數據.

Spark 中的這些 RDDs 的通用接口使的實現不少 transformations 操做的時候只花了少於 20 行的代碼. 實際上, 新的 spark 用戶能夠在不瞭解調度系統的細節之上來實現新的 transformations 操做(好比, 採樣和各類 join 操做). 下面簡要的歸納了一些 RDD 的實現:

  • HDFS files: 抽樣的輸入 RDDs 是 HDFS 中的文件.對於這些 RDDs, partitions 返回文件中每個數據塊對應的一個分區信息(數據塊的位置信息存儲在 Partition 對象中), preferredLocations 返回每個數據塊所在的機器節點信息, 最後 iterator 負責數據塊的讀取操做.
  • map: 對任意的 RDDs 調用 map 操做將會返回一個 MappedRDD 對象.這個對象含有和其父親 RDDs 相同的分區信息和數據存儲節點信息, 可是在 iterator 中對父親的全部輸出數據記錄應用傳給 map 的函數.
  • union: 對兩個 RDDs 調用 union 操做將會返回一個新的 RDD , 這個 RDD 的分區數是他全部父親 RDDs 的全部分區數的總數.每個子分區經過相對應的窄依賴的父親分區計算獲得.
  • sample: sampling 和 mapping 相似, 除了 sample RDD 中爲每個分區存儲了一個隨機數, 做爲從父親分區數據中抽樣的種子.
  • join: 對兩個 RDDs 進行 join 操做, 可能致使兩個窄依賴(若是兩個 RDDs 都是事先通過相同的 hash/range 分區器進行分區), 或者致使兩個寬依賴, 或者一個窄依賴一個寬依賴(一個父親 RDD 通過分區而另外一個沒有分區).在上面全部的惡場景中, join 以後的輸出 RDD 會有一個 partitioner (從父親 RDD 中繼承過來的或者是一個默認的 hash partitioner).

5 實現

咱們用了 14000 行 scala 代碼實現了 spark. Spark 系統跑在集羣管理者 mesos 上, 這樣可使的它和其餘的應用好比 hadoop 、 MPI 等共享資源, 每個 spark 程序都是由它的 driver 和 workers 組成, 這些 driver 和 workers 都是以一個 mesos 應用運行在 mesos 上的, mesos 能夠管理這些應用之間的資源共享問題.

Spark 能夠利用已經存在的 hadoop 的 api 組件讀取任何的 hadoop 的輸入數據源(好比: HDFS 和 Hbase 等), 這個程序 api 是運行在沒有更改的 scala 版本上.

咱們會簡要的歸納下幾個比較有意思的技術點:咱們的 job 調度器( 5.1 節), 能夠用於交互的 spark 解釋器( 5.2 節), 內存管理( 5.3 節)以及對 checkpointing 的支持( 5.4 節).

5.1 job 調度器

spark 的調度器依賴咱們在第 4 章中討論的 RDDs 的表達.

從整體上看, 咱們的調度系統有點和 Dryad 類似, 可是它還考慮了被存儲的 RDDs 的哪些分區還在內存中.當一個用戶對某個 RDD 調用了 action 操做(好比 count 或者 save )的時候調度器會檢查這個 RDD 的血緣關係圖, 而後根據這個血緣關係圖構建一個含有 stages 的有向無環圖( DAG ), 最後按照步驟執行這個 DAG 中的 stages , 如圖 5 的說明.每個 stage 包含了儘量多的帶有窄依賴的 transformations 操做. 這個 stage 的劃分是根據須要 shuffle 操做的寬依賴或者任何能夠切斷對父親 RDD 計算的某個操做(由於這些父親 RDD 的分區已經計算過了). 而後調度器能夠調度啓動 tasks 來執行沒有父親 stage 的 stage (或者父親 stage 已經計算好了的 stage ),一直到計算完咱們的最後的目標 RDD .

 圖五: 怎麼計算 spark job stage 的例子.實現的方框表示 RDDs ,帶有顏色的方形表示分區, 黑色的是表示這個分區的數據存儲在內存中, 對 RDD G 調用 action 操做, 咱們根據寬依賴生成不少 stages , 且將窄依賴的 transformations 操做放在 stage 中.在這個場景中, stage 1 的輸出結果已經在內存中, 因此咱們開始運行 stage 2 , 而後是 stage 3.

咱們調度器在分配 tasks 的時候是採用延遲調度來達到數據本地性的目的(說白了, 就是數據在哪裏, 計算就在哪裏). 若是某個分區的數據在某個節點上的內存中, 那麼將這個分區的計算髮送到這個機器節點中. 若是某個 RDD 爲它的某個分區提供了這個數據存儲的位置節點, 則將這個分區的計算髮送到這個節點上.

對於寬依賴(好比 shuffle 依賴), 咱們將中間數據寫入到節點的磁盤中以利於從錯誤中恢復, 這個和 MapReduce 將 map 後的結果寫入到磁盤中是很類似的.

只要一個任務所在的 stage 的父親 stage 仍是有效的話, 那麼當這個 task 失敗的時候, 咱們就能夠在其餘的機器節點中從新跑這個任務. 若是一些 stages 變的無效的話(好比由於一個 shuffle 過程當中 map 端的一個輸出結果丟失了), 咱們須要從新並行提交沒有父親 stage 的 stage (或者父親 stage 已經計算好了的 stage )的計算任務. 雖然備份 RDD 的血緣關係圖示比較容易的, 可是咱們還不能容忍調度器調度失敗的場景.

雖然目前 spark 中全部的計算都是響應 driver 程序中調用的 action 操做, 可是咱們也是須要嘗試在集羣中調用 lookup 操做, 這種操做是根據 key 來隨機訪問已經 hash 分區過的 RDD 全部元素以獲取相應的 value. 在這種場景中, 若是一個分區沒有計算的話, 那麼 task 須要將這個信息告訴調度器.

5.2 集成解釋器

scala 和 Ruby 以及 Python 同樣包含了一個交互型的 shell 腳本工具. 考慮到利用內存數據能夠得到低延遲的特性, 咱們想讓用戶經過解釋器來交互性的運行 spark , 從而達到查詢大數據集的目的.

Scala 解釋器一般是將用戶輸入的每一行代碼編譯成一個類, 而後將這個類加載到 JVM 中, 而後調用這個類的方法. 這個類中包含了一個單例對象, 這個單例對象包含了用戶輸入一行代碼中的變量或者函數, 還包含了一個運行用戶輸入那行代碼的初始化方法. 好比, 用戶輸入 var x = 5 , 而後再輸入 println(x), scala 解釋器定義個包含了 x 的叫作 Line 1 的類, 而後將第二行代碼編譯成 println(Line 1.getInstance(). x ).

咱們對 spark 中的解釋器作了以下兩個改變:

  1. Class shipping: 爲了讓 worker 節點能拿到用戶輸入的每一行代碼編譯成的 class 的二進制代碼, 咱們使的解釋器爲這些 classes 的二進制代碼提供 HTTP 服務.
  2. 修改了代碼生成:正常狀況下, 咱們經過訪問對應的類的靜態方法來達到訪問將用戶輸入每一行代碼編譯成的單例對象.這個覺得着, 當咱們將一個含有在前面行中定義的變量(好比上面例子中的 Line 1.x )的閉包序列化發送到 worker 節點的時候, java 是不會經過對象圖來跟蹤含有 x 的實力 Line 1 的, 這樣的話 worker 節點將收不到變量 x.咱們修改了代碼生成邏輯來達到能直接引用每一行代碼生成的實例.

圖六顯示了通過咱們的改變後, 解釋器是如何將用戶輸入的一系列的代碼轉換成 java 對象.

圖六:顯示 spark 解釋器是如何將用戶輸入的代碼轉換成 java 對象的例子

咱們發現 spark 解釋器在處理咱們研究中的大量已經獲取到的痕跡數據以及探索存儲在 hdfs 中的數據集時是很是有用的.咱們正在打算用這個來實現更高層面的交互查詢語言, 好比 SQL.

5.3 內存管理

Spark 在持久化 RDDs 的時候提供了 3 種存儲選:存在內存中的非序列化的 java 對象、存在內存中的序列化的數據以及存儲在磁盤中. 第一種選擇的性能是最好的, 由於 java VM 能夠很快的訪問 RDD 的每個元素. 第二種選擇是在內存有限的狀況下, 使的用戶能夠以很低的性能代價而選擇的比 java 對象圖更加高效的內存存儲的方式. 若是內存徹底不夠存儲的下很大的 RDDs , 並且計算這個 RDD 又很費時的, 那麼選擇第三種方式.

爲了管理有限的內存資源, 咱們在 RDDs 的層面上採用 LRU (最近最少使用)回收策略. 當一個新的 RDD 分區被計算可是沒有足夠的內存空間來存儲這個分區的數據的時候, 咱們回收掉最近不多使用的 RDD 的分區數據的佔用內存, 若是這個 RDD 和這個新的計算分區的 RDD 時同一個 RDD 的時候, 咱們則不對這個分區數據佔用的內存作回收. 在這種狀況下, 咱們將相同的 RDD 的老分區的數據保存在內存中是爲了避免讓總是從新計算這些分區的數據, 這點事很是重要的, 由於不少操做都是對整個 RDD 的全部的 tasks 進行計算的, 因此很是有必要將後續要用到的數據保存在內存中.到目前爲止, 咱們發現這種默認的機制在全部的應用中工做的很好, 可是咱們仍是將持久每個 RDD 數據的策略的控制權交給用戶.

最後, 在一個集羣中的每個 spark 實例的內存空間都是分開的, 咱們之後打算經過統一內存管理達到在 spark 實例之間共享 RDDs.

5.4 對 Checkpointing 的支持

雖然咱們老是可使用 RDDs 的血緣關係來恢復失敗的 RDDs 的計算, 可是若是這個血緣關係鏈很長的話, 則恢復是須要耗費很多時間的.所以, 將一些 RDDs 的數據持久化到穩定存儲系統中是有必要的

通常來講, checkpointing 對具備很長的血緣關係鏈且包含了寬依賴的 RDDs 是很是有用的, 好比咱們在 3.2.2 小節中提到的 PageRank 的例子. 在這些場景下, 集羣中的某個節點的失敗會致使每個父親 RDD 的一些數據的丟失, 進而須要從新全部的計算. 與此相反的, 對於存儲在穩定存儲系統中且是窄依賴的 RDDs (好比 3.2.1 小節中線性迴歸例子中的 points 和 PageRank 中的 link 列表數據), checkpointing 可能一點用都沒有. 若是一個節點失敗了, 咱們能夠在其餘的節點中並行的從新計算出丟失了數據的分區, 這個成本只是備份整個 RDD 的成本的一點點而已.

spark 目前提供了一個 checkpointing 的 api ( persist 中的標識爲 REPLICATE , 還有 checkpoint ()), 可是須要將哪些數據須要 checkpointing 的決定權留給了用戶. 然而, 咱們也在調查怎麼樣自動的 checkpoing , 由於咱們的調度系統知道數據集的大小以及第一次計算這個數據集花的時間, 因此有必要選擇一些最佳的 RDDs 來進行 checkpointing , 來達到最小化恢復時間

最後, 須要知道的事 RDDs 天生的只讀的特性使的他們比通常的共享內存系統作 checkpointing 更簡單了. 由於不用考慮數據的一致性, 咱們能夠不終止程序或者 take 快照, 而後在後臺將 RDDs 的數據寫入到存儲系統中.

6 評估

咱們經過在亞馬遜 EC 2 傷進行一系列的實驗以及用用戶的應用作基準測試來評估 spark , 總的來講, 下面是咱們的結論:

  • 在迭代式機器學習和圖計算中, spark 以 20 倍的速度超過了 hadoop .提速的點主要是在避免了 I / O 操做以及將數據以 java 對象的形式存在內存中從而下降了反序列化的成本.
  • 用戶寫的應用程序運行平穩以及很好擴展.特別的, 咱們利用 spark 爲一個分析報表提速了 40 倍, 相對於 hadoop 來講.
  • 當節點失敗的時候, spark 能夠經過從新計算失去的 rdd 分區數據達到快速的恢復.
  • spark 在查詢 1 TB 的數據的時候的延遲能夠控制在 5 到 7 秒.

咱們經過和 hadoop 對比, 展現迭代式機器學習( 6.1 節)和 PageRank ( 6.2 節)的基準測試.而後咱們評估了 spark 的錯誤恢復機制( 6.3 節)以及當內存不足以存儲一個數據集的行爲( 6.4 節), 最後咱們討論了用戶應用( 6.5 節)和交互式數據挖掘( 6.6 節)的結果 除非另外聲明, 咱們都是用類型爲 m 1.xlarge 的 EC 2 節點, 4 核以及 15 GB 內存. 咱們是有數據塊大小爲 256 M 的 HDFS 存儲系統. 在每一次測試以前, 咱們都會清理 OS 的緩存, 以達到準確的測量 IO 成本的目的

6.1 迭代式機器學習應用

咱們實現了兩種迭代式機器學習應用, 線性迴歸核 K - means , 來和下面的系統進行性能的對比:

  • Hadoop:版本號爲 0.20.0 的穩定版.
  • HadoopBinMem:這個系統在迭代的一開始會將輸入數據轉換成底開銷的二進制形式, 這樣能夠爲接下來的迭代消除解析文本數據的開銷, 而且將數據存儲在 hdfs 實例的內存中.
  • Spark:咱們的 RDDs 的實現.

咱們在 25-100 臺機器上存儲 100 G 數據, 兩種算法都是對這 100 G 數據跑 10 次迭代. 兩個應用之間的關鍵不一樣點是他們對相同數據的計算量不同. K-means 的迭代時間都是花在計算上, 然而線性迴歸是一個計算量不大, 時間都是花在反序列化和 I/O 上. 因爲典型的機器學習算法都是須要上千次的迭代來達到收斂, 因此咱們將第一次迭代花的時間和接下來的迭代時間分開顯示. 咱們發現經過 RDDs 的共享數據極大的提升了後續的迭代速度

圖七:在 100 臺機器的集羣上分別用 hadoop 、 hadoopBinMem 以及 spark 對 100 GB 的數據進行,線性迴歸和 k - means 的首次迭代和隨後迭代花的時間

首次迭代:三個系統在首次迭代中都是讀取 HDFS 中的數據, 從圖七的條形圖中咱們能夠看出, 在實驗中, spark 穩定的比 hadoop 要快. 這個是因爲 hadoop 主從節點之間的心跳信息的信號開銷致使的. HadoopBinMen 是最慢的, 這個是由於它啓動了一個額外的 MapReduce 任務來將數據轉換爲二進制, 它還須要經過網絡傳輸數據以達到備分內存中的數據的目的. 隨後的迭代:圖七也顯示了隨後迭代花的平均時間. 圖八則是顯示了集羣大小不斷擴展時候的花的時間. 對於線性迴歸, 在 100 臺機器上, spark 分別比 hadoop 和 hadoopBinMem 快上 25.3 倍和 20.7 倍. 對於計算型的 k - means 應用, spark 仍然分別提升了 1.9 倍和 3.2 倍.

圖八: hadoop 、 hadoopBinMem 以及 spark 在隨後的迭代花的時間, 都是處理 100 G 的數據

理解爲何提速了: 咱們驚奇的發現 spark 甚至比基於內存存儲二進制數據的 hadoopBinMem 還要快 20 倍. 在 hadoopBinMem 中, 咱們使用的是 hadoop 標準的二進制文件格式( sequenceFile )和 256 m 這麼大的數據塊大小, 以及咱們強制將 hadoop 的數據目錄放在一個內存的文件系統中. 然而, Hadoop 仍然由於下面幾點而比 spark 慢:

  1. Hadoop 軟件棧的最低開銷.
  2. HDFS 提供數據服務的開銷.
  3. 將二進制數據轉換成有效的內存中的 java 對象的反序列化的成本開銷.

咱們依次來調查上面的每個因素.爲了測量第一個因素, 咱們跑了一些空的 hadoop 任務, 咱們發現單單完成 job 的設置、任務的啓動以及任務的清理等工做就花掉了至少 25 秒鐘. 對於第二個元素, 咱們發現 HDFS 須要執行多分內存數據的拷貝以及爲每個數據塊作 checksum 計算.

最後, 爲了測試第 3 個因素, 咱們在單機上作了一個微型的基準測試, 就是針對不一樣文件類型的 256 M 數據來跑線性迴歸計算. 咱們特別的對比了分別從 HDFS 文件( HDFS 技術棧的耗時將會很明顯)和本地內存文件(內核能夠很高效的將數據傳輸給應用程序)中處理文本和二進制類型數據所話的時間、

圖九中是咱們咱們測試結果的展現. 從 In - memory HDFS (數據是在本地機器中的內存中)中讀數據比從本地內存文件中讀數據要多花費 2 秒中.解析文本文件要比解析二進制文件多花費 7 秒鐘. 最後, 即便從本地內存文件中讀數據, 可是將預先解析了二進制數據轉換成 java 對象也須要 3 秒鐘, 這個對於線性迴歸來講也是一個很是耗時的操做. Spark 將 RDDs 全部元素以 java 對象的形式存儲在內存中, 進而避免了上述說的全部的耗時

6.2 PageRank

咱們分別用 spark 和 hadoop 對 54 GB 的維基百科的轉儲數據進行了 PageRank 機器學習, 並比對了它們的性能. 咱們用 PageRank 的算法處理了大約 4 百萬相互鏈接的文章, 並進行了 10 次迭代. 圖十展現了在 30 個節點上, 只用內存存儲使的 spark 擁有了比 hadoop 2.4 倍的性能提高. 另外, 就和 3.2.2 小節討論的, 若是控制 RDD 的分區使的迭代之間數據的平衡更可使的性能速度提高到 7.2 倍. 將節點數量擴展到 60 個, spark 的性能速度提高也是上面的結果

 圖十:分別基於 Hadoop 和 spark 的 PageRank 的性能對比

咱們也評估了在第 7.1 節中提到的用咱們基於 spark 而實現的 Pregel 重寫的 PageRank .迭代次數和圖十是同樣的, 可是慢了 4 秒鐘, 這個是由於每一次迭代 Pregel 都要跑額外的操做來讓頂點進行投票決定是否須要結束任務.

6.3 容錯

咱們評估了當在 k - means 應用中一個節點失敗了而利用 RDD 的血緣關係鏈來重建 RDD 的分區須要的成本.圖十一對比的是在 75 個節點中運行 10 次迭代的 k - means 正常狀況和在第 6 次迭代一個節點失敗的狀況. 若是沒有任何失敗的話, 每一次迭代都是在 100 GB 的數據上跑 400 個 tasks.

 圖十一:出現了失敗的 k - means 每次迭代時間.在第 6 次迭代中一臺機器被殺掉了, 致使須要利用血緣關係鏈重建 RDD 的部分分區

第五次迭代的時間是 58 秒. 在第 6 次迭代, 一臺機器被殺死了, 致使丟失了運行在這臺機器上的 tasks 以及存儲在這臺機器上的 RDD 分區數據. Spark 在其餘機器節點上從新讀取相應的輸入數據以及經過血緣關係來重建 RDD , 而後並行的重跑丟失的 tasks , 使的此次迭代的時間增長到 80s. 一旦丟失的 RDD 分區數據重建好了, 迭代的時間又回到了 58s.

須要注意的是, 若是是基於 checkpoint 的容錯機制的話, 那麼須要經過重跑好幾個迭代才能恢復, 須要重跑幾個取決於 checkpoints 的頻率. 此外, 系統須要經過網絡傳輸來備份應用須要的 100GB 數據(被轉化爲二進制的文本數據), 以及爲了達到在內存中備份數據而消耗掉 2 倍的內存, 或者等待將 100GB 數據寫入到磁盤中. 與此相反的是, 在咱們的例子中每個 RDDs 的血緣關係圖的大小都是小於 10KB 的.

6.4 內存不足的行爲

在目前爲止, 咱們都是假設集羣中的每一臺機器都是有足夠的內存來存儲迭代之間的 RDDs 的數據的. 當沒有足夠的內存來存儲任務的數據的時候 spark 是怎麼運行的呢? 在這個實驗中, 咱們給 spark 每個節點配置不多的內存, 這些內存不足以存儲的下 RDDs. 咱們在圖十二中, 咱們展現了不一樣存儲空間下的運行線性迴歸應用須要的時間. 能夠看出, 隨着空間的減小, 性能速度慢慢的降低:

 圖十二: 每次都是使用不一樣的內存, 而後在 25 臺機器中對 100 GB 的數據運行線性迴歸的性能對比圖

6.5 用 spark 構建的用戶應用

內存中分析: Conviva Inc 是一個視頻提供商, 他們用 spark 來加速以前在 hadoop 上運行的幾個數據報表分析. 好比, 其中一個報表是運行一系列的 Hive 查詢來計算一個用戶的各類統計信息. 這些查詢都是基於相同的數據子集(基於自定義的過濾器過濾出來的數據)可是須要不少 MapReduce 任務來爲分組字段進行聚合運算(平均值、百分位數值以及 count distinct). 將這些數據子集建立成一個能夠共享的 spark 中的 RDD 來實現這些查詢使的這個報表的速度提高了 40 倍. 對 200 GB 已經壓縮的數據在 hadoop 集羣上跑這個報表花了 20 個小時, 可是利用 2 臺機器的 spark 只用了 30 分鐘而已. 此外, spark 程序只花了 96 G 的內存, 由於只須要將報表關心的列數據存儲在內存中進行共享就行, 而不是全部的解壓後的數據.

交通模型:伯克利分校的 Mobile Millennium 項目組的研究員在收集到的零星的汽車的 GPS 信息上並行運行一個機器學習算法試圖推斷出道路交通是否擁擠. 在都市區域道路網絡中的 10000 條道路和 600000 個裝有 GPS 設備的汽車點對點的旅行時間(每一條路線的旅行時間可能包含了多條道路)樣本是數據源. 利用交通模型能夠估算出經過每一條獨立的道路須要多長時間. 研究人員利用 EM 算法來訓練模型, 這個算法在迭代的過程當中重複執行 map 和 reduceByKey 步驟. 這個應用近似線性的將機器規模從 20 臺擴展到 80 臺, 每臺機器 4 個 cores , 如圖 13 ( a )所示.

圖十三:兩個用 spark 實現的用戶應用的每次迭代的時間, 偏差線表示標準偏差

推特垃圾郵件分類:伯克利分校的 Monarch 項目利用 spark 來標記推特消息中的垃圾連接. 它們實現的線性迴歸分類器和第 6.1 節中很類似, 可是他們利用了分佈式的 reduceByKey 來並行的累加梯度向量值. 圖 13(b) 中顯示了對 50 GB 的數據子集進行分類訓練須要的時間(隨着機器擴展), 這個數據子集中包含了 25000 URLs 以及每個 url 對應的頁面的網絡和內容屬性相關的 10000000 個特徵/緯度. 圖 13(b) 中的時間不是線性的降低是由於每一次迭代花費了很高的且固定的通信成本.

6.6 交互性的數據挖掘

爲了演示 spark 在交互查詢大數據集的能力, 咱們來分析 1 TB 的維基頁面訪問日誌數據( 2 年的數據). 在這個實驗中, 咱們使用 100 個 m 2.4 xlarge EC 2 實例, 每個實例 8 個 cores 以及 68 G 內存. 咱們查詢出( 1 )全部頁面的瀏覽量, ( 2 )頁面標題等於某個單詞的頁面的瀏覽量以及( 3 )頁面標題部分的等於某個單詞的頁面的瀏覽量. 每個查詢都是掃描整個輸入數據集.

圖十四展現的分別是查詢整個數據集、一半數據集一集十分之一的數據集的響應時間. 即便是 1 TB 的數據, 用 spark 來查詢僅僅花了 5-7 秒而已.這個比查詢磁盤數據的速度快了一個數量級, 好比, 查詢磁盤文件中的 1 TB 數據須要 170 秒.這個能夠說明 RDDs 使的 spark 是一個很是強大的交互型數據挖掘的工具.

7 討論

雖然因爲 RDDs 的自然不可變性以及粗粒度的轉換致使它們彷佛提供了有限制的編程接口, 可是咱們發現它們適合不少類型的應用. 特別的, RDDs 能夠表達出如今各類各樣的框架提出的編程模型, 並且還能夠將這些模型組合在同一個程序中(好比跑一個 MapReduce 任務來建立一個圖, 而後基於這個圖來運行 Pregel )以及能夠在這些模型中共享數據. 在這一章中, 咱們在第 7.1 節中討論 RDDs 能夠表達哪些模型以及爲何適合表達這些編程模型. 另外, 咱們在第 7.2 節中討論咱們推崇的 RDD 的血緣信息的好處, 利用這些信息能夠幫助咱們 debug 模型.

7.1 已經存在的編程模型的表達

對於到目前爲止不少獨立提出的編程模型, RDDs 均可以高效的表達出來. 這裏所說的 「高效」, 不只僅是指使用 RDDs 的輸出結果和獨立提出的編程模型狂簡的輸出結果是一致的, 並且 RDDs 在優化性能方面比這些框架還要強大, 好比將特定的數據保存在內存中、對數據分區以減小網絡傳輸以及高效的從錯誤中恢復. 能夠用 RDDs 表達的模型以下:

  • MapReduce: 能夠利用 spark 中的 flatMap 和 groupByKey 操做來表達這個模型, 或者若是須要聚合的話可使用 reduceByKey .
  • DryadLINQ: DryadLINQ 系統比 MapReduce 更多的操做, 可是這些操做都是直接和 RDD 的轉換操做( map , groupByKey , join 等)對應的批量操做.
  • SQL: 和 DryadLINQ 同樣, SQL 查詢都是對一個數據集進行並行的操做計算.
  • Pregel: Google 的 Pregel 是一個專門解決迭代圖計算應用的模型, 它一開始看起來和麪向數據集的編程模型的其餘系統徹底不一樣.在 Pregel 中, 一個程序運行一些列的相互協調的「 supersteps 」.在每個 superstep 上, 對圖上的每個頂點運行用戶自定義的函數來更新這個頂點的相關的狀態、改變圖的拓撲結構以及向其餘頂點發送下一個 superstep 須要的消息.這種模型能夠表達很是多的圖計算算法, 包括最短路徑、二部圖匹配以及 PageRank.

Pregel 在每一次迭代中都是對全部頂點應用相同的用戶定義的函數, 這個是使的咱們用 RDDs 來實現這個模型的關鍵點. 所以, 每次迭代後, 咱們均可以將頂點的狀態保存在 RDD 中, 而後執行一個批量轉換操做( apply )來應用這個函數以及生成一個消息的 RDD . 而後咱們能夠用這個 RDD 通頂點的狀態進行 join 來完成消息的交換. 和 Pregel 同樣, RDDs 容許將點的狀態保存在內存中、控制它們的分區以減小網絡通信以及指出從失敗中恢復. 咱們在 spark 上用了 200 行代碼的包實現了 Pregel , 讀者能夠參考第 33 個文獻來了解更多的細節

  • 迭代 MapReduce: 最近提出的幾個系統, 包括 HaLoop 和 Twister , 它們提供了可讓用戶循環跑一系列的 MapReduce 任務的迭代式 MapReduce 模型.這些系統在迭代之間保持數據分區一致, Twister 也能夠將數據保存在內存中. RDDs 能夠很簡單的表達以上兩個優化, 並且咱們基於 spark 花了 200 行代碼實現了 HaLoop.
  • 批量流處理: 研究人員最近提出了一些增量處理系統, 這些系統是爲按期接受新數據而後根據數據更新結果的應用服務的.好比, 一個應用須要實時接收新數據, 而後每 15 分鐘就將接收到的數據和前面 15 分鐘的時間窗口的數據進行 join 聚合, 將聚合的結果更新到統計數據中.這些系統執行和 Dryad 相似的批處理, 可是它們將應用的狀態數據存儲在分佈式系統中.將中間結果放在 RDDs 中能夠提升處理速度.
  • 闡釋 RDDs 的表達力爲何這麼豐富:爲何 RDDs 能夠表達多種多樣編程模型?緣由就是 RDDs 的限制性對不少並行計算的應用的影響是很小的.特別指出的是, 雖然 RDDs 只能經過批量轉換而獲得, 可是不少的並行計算的程序都是將相同的操做應用到大量的數據條目中, 這樣使的 RDDs 的表達力變的豐富.相似的, RDDs 的不變性並非障礙, 由於咱們能夠建立多個 RDDs 來表達不一樣版本的相同數據集.事實上, 如今不少的 MapReduce 的應用都是運行在不能對文件修改數據的文件系統中, 好比 HDFS.

最後一個問題是爲何以前的框架沒有提供這中通用型的表達能力呢? 咱們相信這個是由於這些框架解決的是 MapReduce 和 Dryad 不能解決的特殊性的問題, 好比迭代, 它們沒有洞察到這些問題的共同緣由是由於缺乏了數據共享的抽象.

7.2 利用 RDDs 來 debug

當咱們一開始設計 RDDs 經過從新計算來達到容錯的時候, 這種特性同時也促使了 debugging 的產生. 特別的, 在一個任務中經過記錄 RDDs 的建立的血緣, 咱們能夠:

  1. 後面能夠從新構建這些 RDDs 以及可讓用戶交互性的查詢它們.
  2. 經過從新計算其依賴的 RDD 分區來達到在一個進程 debugger 中重跑任何的任務.

和傳統的通用分佈式系統的重跑 debugger 不同, 傳統的須要捕獲和引用多個節點之間的事件發生的順序, RDDs 這種 debugger 方式不須要依賴任何的數據, 而只是須要記錄 RDD 的血緣關係圖.咱們目前正在基於這些想法來開發一個 spark debbger.

8 相關工做

集羣編程模型:集羣編程模型的相關工做分爲如下幾類:

  • 第一, 像 MapReduce , Dryad 以及 Ciel 同樣支持一系列處理數據的操做, 而且須要經過穩定的存儲系統來共享數據, RDDs 表達了一種比穩定存儲系統更高效的數據共享抽象, 由於它避免了數據備份、 I / O 以及序列化的成本.

  • 第二, 幾個數據流系統的高層面上的編程接口, 包括 DryadLINQ 和 FlumeJava ,它們提供了語言集成 api , 使的用戶能夠經過像 map 和 join 等操做來操做並行的集合.然而, 在這些系統中, 並行的集合是指在磁盤上的文件或者一個查詢計劃表達的臨時數據集.雖然這些系統在相同的查詢中的操做之間組裝數據的 pipeline (好比, 一個 map 操做後跟另一個 map ),可是它們不能在查詢之間進行高效的數據共享.咱們在並行集合模式上創建 spark api , 是因爲它的便利性以及在集成語言接口上不要求新穎性, 可是咱們基於在這些接口背後以 RDDs 做爲存儲抽象, 就可使的 spark 支持大量類型的應用了.
  • 第三種系統爲許多專門的須要數據共享的應用提供了高層的接口.好比, pregel 支持迭代式的圖計算應用、 Twister 和 HaLoop 支持迭代式的 MapReduce .然而, 這些框架只是爲他們支持的計算模型隱式的共享數據, 並無提供可讓用戶根據本身的需求顯式的共享數據的通用抽象.好比, 一個用戶不能用 Pregel 或者 Twister 將數據加載到內存中而後決定在數據集上面跑什麼樣的查詢. RDDs 提供了一個顯式的分佈式存儲抽象, 所以能夠支持那些特殊系統不能支持的應用, 好比交互式數據挖掘.

最後, 一些系統暴露共享可變狀態以使的用戶能夠執行內存計算. 好比, Piccolo 使的用戶經過分佈式的函數來讀取和更新分佈式 hash 表中的單元格數據. DSM 和像 RAMCloud 這樣的 key - value 存儲系統提供了相似的模型. RDDs 和這些系統有兩個方面的不一樣, 第一, RDDs 基於像 map , sot 以及 join 等操做提供了高層的編程接口, 然而, 在 Piccolo 和 DSM 中的接口只是讀取和更新表的單元格數據. 第二, Piccolo 和 DSM 經過 checkpoint 和回滾機制實現容錯, 在許多應用中這種機制比機遇血緣機制的 RDDs 的容錯的成本更大. 最後, 如 2.3 小節討論的, 相對於 DSM , RDDs 還提供了其餘的優點功能, 好比執行慢的 task 的處理機制

緩存系統: Nectar 能夠經過標識通用的程序分析的子表達式來達到在 DryadLINQ 任務之間對中間數據結果的複用. 這種能力確定會加入到基於 RDD 的系統中. 然而, Nectar 即沒有提供基於內存的緩存(他是將數據放到分佈式文件系統中)也不能讓用戶能夠顯式的對數據集進行緩存控制和分區控制. Ciel 和 FlumeJava 一樣能夠緩存任務結果, 可是也不提供基於內存的緩存或者顯式的控制哪些數據能夠緩存

Ananthanarayanan et al 已經提出在分佈式文件系統上加一層基於內存的緩存來利用數據訪問的暫時性和本地性. 這種解決方案倒是加快了已經存在於文件系統中的數據訪問速度, 可是它在共享同一個應用中的中間結果方面並無 RDD 高效, 由於它在每個 stage 之間仍然須要將數據寫入到文件系統中

血緣:在科學計算以及數據庫中, 捕獲數據的血緣或者來源信息是一個很長時間被研究的話題了, 對於像解釋結果的應用, 須要讓他們能夠從其餘的應用從新產生, 且當在工做流中存在一個 bug 或者數據丟失的時候能夠從新對數據進行計算. 對於這邊面的容錯的工做, 咱們推薦讀者看第 [5] 和 [9]的資料. RDDs 提供了一種並行編程模型, 記錄跟蹤細粒度的血緣的成本很低, 咱們能夠根據血緣來達到容錯的目的.

咱們基於血緣的容錯機制和 MapReduce 以及 Dryad 一個任務中的容錯機制是相似的, 在 DAG 中跟蹤任務的依賴. 然而, 在這些系統中, 一個任務結束後血緣信息也就丟失了, 用戶須要依靠數據備份式的存儲系統來共享任務之間的數據. 與此相反, RDDs 能夠在任務之間經過將數據存儲在內存中達到高效的數據共享, 並不須要數據的備份和磁盤的 I/O 關係型數據庫: RDDs 在概念上和數據庫中的視圖比較相似, 存儲的 RDDs 則像具體的視圖. 然而, 像 DSM 系統, 數據庫通常容許細粒度的對全部的數據進行讀寫訪問, 這種方式須要對操做和數據進行記錄日誌, 用於容錯, 以及須要額外的開銷來保持數據的一致性, 對於粗粒度轉換模型的 RDDs 來講, 這些額外的開銷式不須要的.

9 結尾

咱們已經展現了在集羣應用中一個高效的, 通用的以及容錯的對共享數據的抽象的 RDDs . RDDs 能夠表達大量的並行應用, 包括特殊的編程模型提出的迭代式計算以及這些模型表達不了的新的應用. 和已經存在的對集羣存儲抽象不一樣的是, RDDs 提供了基於粗粒度轉換的 api , 可使的用戶經過血緣達到高效的容錯. 咱們在 spark 系統中實現了 RDDs, 在迭代式的應用中, 性能是 hadoop 的 20 倍, 而且能夠用於交互式的查詢數百 GB 的數據.

咱們已經在 spark-project.org 中開源了 Spark, 做爲一個穩定的數據分析和系統研究的手段.

鳴謝

We thank the first Spark users, including Tim Hunter, Lester Mackey, Dilip Joseph, and Jibin Zhan, for trying out our system in their real applications, providing many good suggestions, and identifying a few research challenges along the way. We also thank our shepherd, Ed Nightingale, and our reviewers for their feedback. This research was supported in part by Berkeley AMP Lab sponsors Google, SAP, Amazon Web Services, Cloudera, Huawei, IBM, Intel, Microsoft, NEC, NetApp and VMWare, by DARPA (contract #FA8650-11-C-7136), by a Google PhD Fellowship, and by the Natural Sciences and Engineering Research Council of Canada.

引用資料

[1] ApacheHive.http://hadoop.apache.org/hive.
[2] Scala.http://www.scala-lang.org.
[3] G.Ananthanarayanan,A.Ghodsi,S.Shenker,andI.Stoica. Disk-locality in datacenter computing considered irrelevant. In HotOS ’11, 2011.
[4] P.Bhatotia,A.Wieder,R.Rodrigues,U.A.Acar,and R. Pasquin. Incoop: MapReduce for incremental computations. In ACM SOCC ’11, 2011.
[5] R.BoseandJ.Frew.Lineageretrievalforscientificdata processing: a survey. ACM Computing Surveys, 37:1–28, 2005.
[6] S.BrinandL.Page.Theanatomyofalarge-scalehypertextual web search engine. In WWW, 1998.
[7] Y.Bu,B.Howe,M.Balazinska,andM.D.Ernst.HaLoop: efficient iterative data processing on large clusters. Proc. VLDB Endow., 3:285–296, September 2010.
[8] C.Chambers,A.Raniwala,F.Perry,S.Adams,R.R.Henry, R. Bradshaw, and N. Weizenbaum. FlumeJava: easy, efficient data-parallel pipelines. In PLDI ’10. ACM, 2010.
[9] J.Cheney,L.Chiticariu,andW.-C.Tan.Provenancein databases: Why, how, and where. Foundations and Trends in Databases, 1(4):379–474, 2009.
[10] J.DeanandS.Ghemawat.MapReduce:Simplifieddata processing on large clusters. In OSDI, 2004.
[11] J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S.-H. Bae, J. Qiu, and G. Fox. Twister: a runtime for iterative mapreduce. In HPDC ’10, 2010.
[12] P.K.Gunda,L.Ravindranath,C.A.Thekkath,Y.Yu,and L. Zhuang. Nectar: automatic management of data and computation in datacenters. In OSDI ’10, 2010.
[13] Z.Guo,X.Wang,J.Tang,X.Liu,Z.Xu,M.Wu,M.F. Kaashoek, and Z. Zhang. R2: an application-level kernel for record and replay. OSDI’08, 2008.
[14] T.Hastie,R.Tibshirani,andJ.Friedman.TheElementsof Statistical Learning: Data Mining, Inference, and Prediction. Springer Publishing Company, New York, NY, 2009.
[15] B.He,M.Yang,Z.Guo,R.Chen,B.Su,W.Lin,andL.Zhou. Comet: batched stream processing for data intensive distributed computing. In SoCC ’10.
[16] A.Heydon,R.Levin,andY.Yu.Cachingfunctioncallsusing precise dependencies. In ACM SIGPLAN Notices, pages 311–320, 2000.
[17] B.Hindman,A.Konwinski,M.Zaharia,A.Ghodsi,A.D. Joseph, R. H. Katz, S. Shenker, and I. Stoica. Mesos: A platform for fine-grained resource sharing in the data center. In NSDI ’11.
[18] T.Hunter,T.Moldovan,M.Zaharia,S.Merzgui,J.Ma,M.J. Franklin, P. Abbeel, and A. M. Bayen. Scaling the Mobile Millennium system in the cloud. In SOCC ’11, 2011.
[19] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. In EuroSys ’07, 2007. [20] S.Y.Ko,I.Hoque,B.Cho,andI.Gupta.Onavailabilityof intermediate data in cloud computations. In HotOS ’09, 2009.
[21] D. Logothetis, C. Olston, B. Reed, K. C. Webb, and K. Yocum. Stateful bulk processing for incremental analytics. SoCC ’10.
[22] G.Malewicz,M.H.Austern,A.J.Bik,J.C.Dehnert,I.Horn, N. Leiser, and G. Czajkowski. Pregel: a system for large-scale graph processing. In SIGMOD, 2010.
[23] D.G.Murray,M.Schwarzkopf,C.Smowton,S.Smith, A. Madhavapeddy, and S. Hand. Ciel: a universal execution engine for distributed data-flow computing. In NSDI, 2011.
[24] B.NitzbergandV.Lo.Distributedsharedmemory:asurveyof issues and algorithms. Computer, 24(8):52 –60, Aug 1991.
[25] J.Ousterhout,P.Agrawal,D.Erickson,C.Kozyrakis, J. Leverich, D. Mazie`res, S. Mitra, A. Narayanan, G. Parulkar, M. Rosenblum, S. M. Rumble, E. Stratmann, and R. Stutsman. The case for RAMClouds: scalable high-performance storage entirely in DRAM. SIGOPS Op. Sys. Rev., 43:92–105, Jan 2010. [26] D.PengandF.Dabek.Large-scaleincrementalprocessingusing distributed transactions and notifications. In OSDI 2010.
[27] R.PowerandJ.Li.Piccolo:Buildingfast,distributedprograms with partitioned tables. In Proc. OSDI 2010, 2010.
[28] R.RamakrishnanandJ.Gehrke.DatabaseManagement Systems. McGraw-Hill, Inc., 3 edition, 2003.
[29] K.Thomas,C.Grier,J.Ma,V.Paxson,andD.Song.Designand evaluation of a real-time URL spam filtering service. In IEEE Symposium on Security and Privacy, 2011. [30] J.W.Young.Afirstorderapproximationtotheoptimum checkpoint interval. Commun. ACM, 17:530–531, Sept 1974.
[31] Y.Yu,M.Isard,D.Fetterly,M.Budiu,U ́.Erlingsson,P.K. Gunda, and J. Currey. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In OSDI ’08, 2008.
[32] M.Zaharia,D.Borthakur,J.SenSarma,K.Elmeleegy,S. Shenker, and I. Stoica. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys ’10, 2010.
[33] M.Zaharia,M.Chowdhury,T.Das,A.Dave,J.Ma,M. McCauley, M. Franklin, S. Shenker, and I. Stoica. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. Technical Report UCB/EECS-2011-82, EECS Department, UC Berkeley, 2011.

原文連接

http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

貢獻者

 


相關文章
相關標籤/搜索