RDD是存儲數據的最小單位,spark在並行計算的時候會將任務細化到rdd的維度,分到不一樣的cluster上計算。html
生成RDD
// @param numSlices number of partitions to divide the collection into // parallelize() 的第二個參數是slices的數目,它指定了將數據集切分的份數。 sc.parallelize(Array("one", "two", "three", "four"), 2) sc.parallelize(List("one", "two", "three", "four"), 2) sc.textFile("hdfs:///user/test.txt")
map
map 會操做 RDD 中的每個元素,map 中的每一次循環都是RDD中的某一個元素,map 對數據的操做互不影響,因此 map 操做是天生並行化的。apache
// 測試數據 => 姓名#語文成績#數學成績#英語成績 // 小紅#70#80#80 // 小明#10#20#30 // 小張#30#40#80 // 小李#80#90#90 // 小亮#30#80#80 // 小慧#80#20#10 // 小黑#10#20#10 // 小紅#30#20#20 // 小黑#90#80#90 // map操做數據集的每一行數據,先按照 "#" 來把數據分割成 List,而後再操做每一個 List 將 List 中的數據按照須要取出來進行整合 val data = sc.textFile(student_grade).map(_.split("#", -1)).map { line => val name = line(0) val chinese = line(1) val math = line(2) val english = line(3) (name, chinese, math, english) }
RDD全稱叫作彈性分佈式數據集(Resilient Distributed Datasets),它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘RDD轉換而建立,爲此,RDD支持豐富的轉換操做(如map, join, filter, groupBy等),經過這種轉換操做,新的RDD則包含了如何從其餘RDDs衍生所必需的信息,因此說RDDs之間是有依賴關係的。基於RDDs之間的依賴,RDDs會造成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區,總結起來,基於RDD的流式計算任務可描述爲:從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的DAG,而後寫回穩定存儲。另外RDD還能夠將數據集緩存到內存中,使得在多個操做之間能夠重用數據集,基於這個特色能夠很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。能夠說Spark最初也就是實現RDD的一個分佈式系統,後面經過不斷髮展壯大成爲如今較爲完善的大數據生態系統,簡單來說,Spark-RDD的關係相似於Hadoop-MapReduce關係。編程
RDD特色
RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。緩存
分區
以下圖所示,RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據。若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換。 機器學習
只讀
以下圖所示,RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDD。分佈式
由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫map和reduce了,以下圖所示。ide
RDD的操做算子包括兩類,一類叫作transformations,它是用來將RDD進行轉化,構建RDD的血緣關係;另外一類叫作actions,它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中。下圖是RDD所支持的操做算子列表。函數
依賴
RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另外一種是寬依賴,下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。oop
經過RDDs之間的這種依賴關係,一個任務流能夠描述爲DAG(有向無環圖),以下圖所示,在實際執行過程當中寬依賴對應於Shuffle(圖中的reduceByKey和join),窄依賴中的全部轉換操做能夠經過相似於管道的方式一鼓作氣執行(圖中map和union能夠一塊兒執行)。post
緩存
若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1通過一系列的轉換後獲得RDD-n並保存到hdfs,RDD-1在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的RDD-1轉換到RDD-m這一過程當中,就不會計算其以前的RDD-0了。
checkpoint
雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。
小結
總結起來,給定一個RDD咱們至少能夠知道以下幾點信息:一、分區數以及分區方式;二、由父RDDs衍生而來的相關依賴信息;三、計算每一個分區的數據,計算步驟爲:1)若是被緩存,則從緩存中取的分區的數據;2)若是被checkpoint,則從checkpoint處恢復數據;3)根據血緣關係計算分區的數據。
編程模型
在Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
要使用Spark,開發者須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,以下圖所示。Driver中定義了一個或多個RDD,並調用RDD上的action,Worker則執行RDD分區計算任務。
應用舉例
下面介紹一個簡單的spark應用程序實例WordCount,統計一個數據集中每一個單詞出現的次數,首先將從hdfs中加載數據獲得原始RDD-0,其中每條記錄爲數據中的一行句子,通過一個flatMap操做,將一行句子切分爲多個獨立的詞,獲得RDD-1,再經過map操做將每一個詞映射爲key-value形式,其中key爲詞自己,value爲初始計數值1,獲得RDD-2,將RDD-2中的全部記錄歸併,統計每一個詞的計數,獲得RDD-3,最後將其保存到hdfs。
import org.apache.spark._ import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: WordCount <inputfile> <outputfile>"); System.exit(1); } val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) val result = sc.textFile(args(0)) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) result.saveAsTextFile(args(1)) } }
小結
基於RDD實現的Spark相比於傳統的Hadoop MapReduce有什麼優點呢?總結起來應該至少有三點:1)RDD提供了豐富的操做算子,再也不是隻有map和reduce兩個操做了,對於描述應用程序來講更加方便;2)經過RDDs之間的轉換構建DAG,中間結果不用落地;3)RDD支持緩存,能夠在內存中快速完成計算。