Spark--大數據的「電光石火」

Spark已正式申請加入Apache孵化器,從靈機一閃的實驗室「電火花」成長爲大數據技術平臺中異軍突起的新銳。本文主要講述Spark的設計思想。Spark如其名,展示了大數據不常見的「電光石火」。具體特色歸納爲「輕、快、靈和巧」。

apache

  • :Spark 0.6核心代碼有2萬行,Hadoop 1.0爲9萬行,2.0爲22萬行。一方面,感謝Scala語言的簡潔和豐富表達力;另外一方面,Spark很好地利用了Hadoop和Mesos(伯克利 另外一個進入孵化器的項目,主攻集羣的動態資源管理)的基礎設施。雖然很輕,但在容錯設計上不打折扣。主創人Matei聲稱:「不把錯誤當特例處理。」言下 之意,容錯是基礎設施的一部分。編程

  • :Spark對小數據集能達到亞秒級的延遲,這對於Hadoop MapReduce(如下簡稱MapReduce)是沒法想象的(因爲「心跳」間隔機制,僅任務啓動就有數秒的延遲)。就大數據集而言,對典型的迭代機器 學習、即席查詢(ad-hoc query)、圖計算等應用,Spark版本比基於MapReduce、Hive和Pregel的實現快上十倍到百倍。其中內存計算、數據本地性 (locality)和傳輸優化、調度優化等該居首功,也與設計伊始即秉持的輕量理念不無關係。數組

  • :Spark提供了不一樣層面的靈活性。在實現層,它完美演繹了Scala trait動態混入(mixin)策略(如可更換的集羣調度器、序列化庫);在原語(Primitive)層,它容許擴展新的數據算子 (operator)、新的數據源(如HDFS以外支持DynamoDB)、新的language bindings(Java和Python);在範式(Paradigm)層,Spark支持內存計算、多迭代批量處理、即席查詢、流處理和圖計算等多種 範式。緩存

  • :巧在借勢和借力。Spark借Hadoop之勢,與Hadoop無縫結合;接着Shark(Spark上的數據倉庫實現)借了Hive的勢;圖計算借 用Pregel和PowerGraph的API以及PowerGraph的點分割思想。一切的一切,都藉助了Scala(被普遍譽爲Java的將來取代 者)之勢:Spark編程的Look'n'Feel就是原汁原味的Scala,不管是語法仍是API。在實現上,又能靈巧借力。爲支持交互式編 程,Spark只需對Scala的Shell小作修改(相比之下,微軟爲支持JavaScript Console對MapReduce交互式編程,不只要跨越Java和JavaScript的思惟屏障,在實現上還要大動干戈)。安全

說了一大堆好處,仍是要指出Spark未臻完美。它有先天的限制,不能很好地支持細粒度、異步的數據處理;也有後天的緣由,即便有很棒的基因,畢竟還剛剛起步,在性能、穩定性和範式的可擴展性上還有很大的空間。網絡

計算範式和抽象 數據結構

Spark首先是一種粗粒度數據並行(data parallel)的計算範式。多線程

數據並行跟任務並行(task parallel)的區別體如今如下兩方面。閉包

  • 計算的主體是數據集合,而非個別數據。集合的長度視實現而定,如SIMD(單指令多數據)向量指令通常是4到64,GPU的SIMT(單指令多線程)通常 是32,SPMD(單程序多數據)能夠更寬。Spark處理的是大數據,所以採用了粒度很粗的集合,叫作Resilient Distributed Datasets(RDD)。app

  • 集合內的全部數據都通過一樣的算子序列。數據並行可編程性好,易於得到高並行性(與數據規模相關,而非與程序邏輯的並行性相關),也易於高效地映射到底層 的並行或分佈式硬件上。傳統的array/vector編程語言、SSE/AVX intrinsics、CUDA/OpenCL、Ct(C++ for throughput),都屬於此類。不一樣點在於,Spark的視野是整個集羣,而非單個節點或並行處理器。

數據並行的範式決定了 Spark沒法完美支持細粒度、異步更新的操做。圖計算就有此類操做,因此此時Spark不如GraphLab(一個大規模圖計算框架);還有一些應用, 須要細粒度的日誌更新和數據檢查點,它也不如RAMCloud(斯坦福的內存存儲和計算研究項目)和Percolator(Google增量計算技術)。 反過來,這也使Spark可以精心耕耘它擅長的應用領域,試圖粗細通吃的Dryad(微軟早期的大數據平臺)反而不甚成功。

Spark的RDD,採用了Scala集合類型的編程風格。它一樣採用了函數式語義(functional semantics):一是閉包,二是RDD的不可修改性。邏輯上,每個RDD算子都生成新的RDD,沒有反作用,因此算子又被稱爲是肯定性的;因爲所 有算子都是冪等的,出現錯誤時只需把算子序列從新執行便可。

Spark的計算抽象是數據流,並且是帶有工做集(working set)的數據流。流處理是一種數據流模型,MapReduce也是,區別在於MapReduce須要在屢次迭代中維護工做集。工做集的抽象很廣泛,如多 迭代機器學習、交互式數據挖掘和圖計算。爲保證容錯,MapReduce採用了穩定存儲(如HDFS)來承載工做集,代價是速度慢。HaLoop採用循環 敏感的調度器,保證前次迭代的Reduce輸出和本次迭代的Map輸入數據集在同一臺物理機上,這樣能夠減小網絡開銷,但沒法避免磁盤I/O的瓶頸。

Spark的突破在於,在保證容錯的前提下,用內存來承載工做集。內存的存取速度快於磁盤多個數量級,從而能夠極大提高性能。關鍵是實現容錯,傳統上有兩種方法:日 志和檢查點。考慮到檢查點有數據冗餘和網絡通訊的開銷,Spark採用日誌數據更新。細粒度的日誌更新並不便宜,並且前面講過,Spark也不擅長。 Spark記錄的是粗粒度的RDD更新,這樣開銷能夠忽略不計。鑑於Spark的函數式語義和冪等特性,經過重放日誌更新來容錯,也不會有反作用。

編程模型

來看一段代碼:textFile算子從HDFS讀取日誌文件,返回「file」(RDD);filter算子篩出帶「ERROR」的行,賦給 「errors」(新RDD);cache算子把它緩存下來以備將來使用;count算子返回「errors」的行數。RDD看起來與Scala集合類型 沒有太大差異,但它們的數據和運行模型大相迥異。

圖1給出了RDD數據模型,並將上例中用到的四個算子映射到四種算子類型。Spark程序工做在兩個空間中:Spark RDD空間和Scala原生數據空間。在原生數據空間裏,數據表現爲標量(scalar,即Scala基本類型,用橘色小方塊表示)、集合類型(藍色虛線 框)和持久存儲(紅色圓柱)。


圖1 兩個空間的切換,四類不一樣的RDD算子

輸入算子(橘色箭頭)將Scala集合類型或存儲中的數據吸入RDD空間,轉爲RDD(藍色實線框)。輸入算子的輸入大體有兩類:一類針對Scala集合類型,如parallelize;另外一類針對存儲數據,如上例中的textFile。輸入算子的輸出就是Spark空間的RDD。

由於函數語義,RDD通過變換(transformation)算子(藍色箭頭)生成新的RDD。變換算子的輸入和輸出都是RDD。RDD會被劃分紅不少的分區 (partition)分佈到集羣的多個節點中,圖1用藍色小方塊表明分區。注意,分區是個邏輯概念,變換先後的新舊分區在物理上多是同一塊內存或存 儲。這是很重要的優化,以防止函數式不變性致使的內存需求無限擴張。有些RDD是計算的中間結果,其分區並不必定有相應的內存或存儲與之對應,若是須要 (如以備將來使用),能夠調用緩存算子(例子中的cache算子,灰色箭頭表示)將分區物化(materialize)存下來(灰色方塊)。

一部分變換算子視RDD的元素爲簡單元素,分爲以下幾類:

  • 輸入輸出一對一(element-wise)的算子,且結果RDD的分區結構不變,主要是map、flatMap(map後展平爲一維RDD);

  • 輸入輸出一對一,但結果RDD的分區結構發生了變化,如union(兩個RDD合爲一個)、coalesce(分區減小);

  • 從輸入中選擇部分元素的算子,如filter、distinct(去除冗餘元素)、subtract(本RDD有、它RDD無的元素留下來)和sample(採樣)。

另外一部分變換算子針對Key-Value集合,又分爲:

  • 對單個RDD作element-wise運算,如mapValues(保持源RDD的分區方式,這與map不一樣);

  • 對單個RDD重排,如sort、partitionBy(實現一致性的分區劃分,這個對數據本地性優化很重要,後面會講);

  • 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey;

  • 對兩個RDD基於key進行join和重組,如join、cogroup。

後三類操做都涉及重排,稱爲shuffle類操做。

從RDD到RDD的變換算子序列,一直在RDD空間發生。這裏很重要的設計是lazy evaluation:計算並不實際發生,只是不斷地記錄到元數據。元數據的結構是DAG(有向無環圖),其中每個「頂點」是RDD(包括生產該RDD 的算子),從父RDD到子RDD有「邊」,表示RDD間的依賴性。Spark給元數據DAG取了個很酷的名字,Lineage(世系)。這個 Lineage也是前面容錯設計中所說的日誌更新。

Lineage一直增加,直到趕上行動(action)算子(圖1中的綠色箭頭),這時 就要evaluate了,把剛纔累積的全部算子一次性執行。行動算子的輸入是RDD(以及該RDD在Lineage上依賴的全部RDD),輸出是執行後生 成的原生數據,多是Scala標量、集合類型的數據或存儲。當一個算子的輸出是上述類型時,該算子必然是行動算子,其效果則是從RDD空間返回原生數據 空間。

行動算子有以下幾類:生成標量,如count(返回RDD中元素的個數)、reduce、fold/aggregate(見 Scala同名算子文檔);返回幾個標量,如take(返回前幾個元素);生成Scala集合類型,如collect(把RDD中的全部元素倒入 Scala集合類型)、lookup(查找對應key的全部值);寫入存儲,如與前文textFile對應的saveAsText-File。還有一個檢 查點算子checkpoint。當Lineage特別長時(這在圖計算中時常發生),出錯時從新執行整個序列要很長時間,能夠主動調用 checkpoint把當前數據寫入穩定存儲,做爲檢查點。

這裏有兩個設計要點。首先是lazy evaluation。熟悉編譯的都知道,編譯器能看到的scope越大,優化的機會就越多。Spark雖然沒有編譯,但調度器實際上對DAG作了線性復 雜度的優化。尤爲是當Spark上面有多種計算範式混合時,調度器能夠打破不一樣範式代碼的邊界進行全局調度和優化。下面的例子中把Shark的SQL代碼 和Spark的機器學習代碼混在了一塊兒。各部分代碼翻譯到底層RDD後,融合成一個大的DAG,這樣能夠得到更多的全局優化機會。


另外一個要點是一旦行動算子產生原生數據,就必須退出RDD空間。由於目前Spark只可以跟蹤RDD的計算,原生數據的計算對它來講是不可見的(除非之後 Spark會提供原生數據類型操做的重載、wrapper或implicit conversion)。這部分不可見的代碼可能引入先後RDD之間的依賴,以下面的代碼:


第三行filter對errors.count()的依賴是由(cnt-1)這個原生數據運算產生的,但調度器看不到這個運算,那就會出問題了。

因爲Spark並不提供控制流,在計算邏輯須要條件分支時,也必須回退到Scala的空間。因爲Scala語言對自定義控制流的支持很強,不排除將來Spark也會支持。

Spark 還有兩個很實用的功能。一個是廣播(broadcast)變量。有些數據,如lookup表,可能會在多個做業間反覆用到;這些數據比RDD要小得多,不 宜像RDD那樣在節點之間劃分。解決之道是提供一個新的語言結構——廣播變量,來修飾此類數據。Spark運行時把廣播變量修飾的內容發到各個節點,並保 存下來,將來再用時無需再送。相比Hadoop的distributed cache,廣播內容能夠跨做業共享。Spark提交者Mosharaf師從P2P的老法師Ion Stoica,採用了BitTorrent(沒錯,就是下載電影的那個BT)的簡化實現。有興趣的讀者能夠參考SIGCOMM'11的論文 Orchestra。另外一個功能是Accumulator(源於MapReduce的counter):容許Spark代碼中加入一些全局變量作 bookkeeping,如記錄當前的運行指標。

運行和調度

圖2顯示了Spark程序的運行場景。它由客戶端啓動,分兩個階段:第一階段記錄變換算子序列、增量構建DAG圖;第二階段由行動算子觸 發,DAGScheduler把DAG圖轉化爲做業及其任務集。Spark支持本地單節點運行(開發調試有用)或集羣運行。對於後者,客戶端運行於 master節點上,經過Cluster manager把劃分好分區的任務集發送到集羣的worker/slave節點上執行。


圖2 Spark程序運行過程

Spark 傳統上與Mesos「焦不離孟」,也可支持Amazon EC2和YARN。底層任務調度器的基類是個trait,它的不一樣實現能夠混入實際的執行。例如,在Mesos上有兩種調度器實現,一種把每一個節點的全部 資源分給Spark,另外一種容許Spark做業與其餘做業一塊兒調度、共享集羣資源。worker節點上有任務線程(task thread)真正運行DAGScheduler生成的任務;還有塊管理器(block manager)負責與master上的block manager master通訊(完美使用了Scala的Actor模式),爲任務線程提供數據塊。

最有趣的部分是DAGScheduler。下面詳解它的工做過程。RDD的數據結構裏很重要的一個域是對父RDD的依賴。如圖3所示,有兩類依賴:窄(Narrow)依賴和寬(Wide)依賴。


圖3 窄依賴和寬依賴

窄依賴指父RDD的每個分區最多被一個子RDD的分區所用,表現爲一個父RDD的分區對應於一個子RDD的分區,和兩個父RDD的分區對應於一個子RDD 的分區。圖3中,map/filter和union屬於第一類,對輸入進行協同劃分(co-partitioned)的join屬於第二類。

寬依賴指子RDD的分區依賴於父RDD的全部分區,這是由於shuffle類操做,如圖3中的groupByKey和未經協同劃分的join。

窄依賴對優化頗有利。邏輯上,每一個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個並行任務的barrier): 把計算fork到每一個分區,算完後join,而後fork/join下一個RDD的算子。若是直接翻譯到物理實現,是很不經濟的:一是每個RDD(即便 是中間結果)都須要物化到內存或存儲中,費時費空間;二是join做爲全局的barrier,是很昂貴的,會被最慢的那個節點拖死。若是子RDD的分區到 父RDD的分區是窄依賴,就能夠實施經典的fusion優化,把兩個fork/join合爲一個;若是連續的變換算子序列都是窄依賴,就能夠把不少個 fork/join併爲一個,不但減小了大量的全局barrier,並且無需物化不少中間結果RDD,這將極大地提高性能。Spark把這個叫作流水線 (pipeline)優化。

變換算子序列一碰上shuffle類操做,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前算子往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的算子序列。在這個stage裏,能夠安全地實 施流水線優化。而後,又從那個寬依賴開始繼續回溯,生成下一個stage。

要深究兩個問題:一,分區如何劃分;二,分區該放到集羣內哪一個節點。這正好對應於RDD結構中另外兩個域:分區劃分器(partitioner)和首選位置(preferred locations)。

分區劃分對於shuffle類操做很關鍵,它決定了該操做的父RDD和子RDD之間的依賴類型。上文提到,同一個join算子,若是協同劃分的話,兩個父 RDD之間、父RDD與子RDD之間能造成一致的分區安排,即同一個key保證被映射到同一個分區,這樣就能造成窄依賴。反之,若是沒有協同劃分,致使寬 依賴。

所謂協同劃分,就是指定分區劃分器以產生先後一致的分區安排。Pregel和HaLoop把這個做爲系統內置的一部分;而Spark 默認提供兩種劃分器:HashPartitioner和RangePartitioner,容許程序經過partitionBy算子指定。注意,HashPartitioner可以發揮做用,要求key的hashCode是有效的,即一樣內容的key產生一樣的hashCode。這對 String是成立的,但對數組就不成立(由於數組的hashCode是由它的標識,而非內容,生成)。這種狀況下,Spark容許用戶自定義 ArrayHashPartitioner。

第二個問題是分區放置的節點,這關乎數據本地性:本地性好,網絡通訊就少。有些RDD產生時就 有首選位置,如HadoopRDD分區的首選位置就是HDFS塊所在的節點。有些RDD或分區被緩存了,那計算就應該送到緩存分區所在的節點進行。再不 然,就回溯RDD的lineage一直找到具備首選位置屬性的父RDD,並據此決定子RDD的放置。

寬/窄依賴的概念不止用在調度中,對容錯也頗有用。若是一個節點宕機了,並且運算是窄依賴,那隻要把丟失的父RDD分區重算便可,跟其餘節點沒有依賴。而寬依賴須要父RDD的全部分區都存在, 重算就很昂貴了。因此若是使用checkpoint算子來作檢查點,不只要考慮lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加檢查點是最物 有所值的。

結語

由於篇幅所限,本文只能介紹Spark的基本概念和設計思想,內容來自Spark的多篇論文(以NSDI'12 「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」爲主),也有我和同事研究Spark的心得,以及多年來從事並行/分佈式系統研究的感悟。Spark核心成員/Shark主創者辛湜 對本文做了審閱和修改,特此致謝!

Spark站在一個很高的起點上,有着高尚的目標,但它的征程還剛剛開始。Spark致力於構建開放的生態系統( http://spark-project.org/ https://wiki.apache.org/incubator/SparkProposal),願與你們一塊兒爲之努力!

相關文章
相關標籤/搜索