RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯,位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。html
1) A list of partitions 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。node
2) A function for computing each split 一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。算法
3) A list of dependencies on other RDDs RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。shell
4) Optionally,a Partitoner for key-value RDDs 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。編程
5) Optionally,a list of preferred locations to compute each split on 一個列表,存儲每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Patition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。數組
1) 由一個已經存在的Scala集合建立;緩存
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))分佈式
2) 由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等;ide
val rdd2 = sc.textFile("hdfs://hadoop1:9000/words.txt")函數
RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。
經常使用的Transformation:
轉換 | 含義 |
map(func) | 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成。 |
filter(func) | 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成。 |
flatMap(func) | 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, function, seed) | 根據fraction指定比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks]) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)對組成的RDD上調用,返回一個(K, Iterator[V])對組成的RDD。默認狀況下,輸出結果的並行度依賴於父RDD的分區數目。若是想要對Key進行聚合,使用reduceByKey或者combineByKey會有更好的性能。 |
reduceByKey(func, [numTasks]) | 在一個(K,V)對的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似。reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue) | |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func, [ascending], [numTasks]) | 與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K, (V,W))對,每一個key中的全部元素都在一塊兒的RDD |
cogroup(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesion(otherDataset) | 笛卡爾積,但在數據集T和U上調用,返回一個(T, U)對的數據集,全部元素交互進行笛卡爾積 |
pipe(command, [envVars]) | 經過管道的方式對RDD的每一個分區使用Shell命令進行操做,返回對應的結果 |
coalesce(numPartitions) | 對RDD中的分區減小指定的數目,一般再過濾完一個大的數據集以後進行操做 |
rePartition(numPartitions) | 對RDD中全部records平均劃分到numPartitions個partition中 |
rePartitionAndSordWithPartitions(partitioner) | |
動做 | h含義 |
reduce(func) | 經過func函數彙集RDD中的全部元素,這個功能必須是可交換且並聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的全部元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(相似於take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement, num, [seed]) | 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | 排序後的limit(n) |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它轉換爲文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統 |
saveAsObjectFile(path) | 使用Java的序列化方法保存到本地文件,能夠被SparkContext.objectFile加載 |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,int)的map,表示每個key對應的元素個數 |
foreach(func) | 在數據集的每個元素上,運行函數func進行更新 |
注意:Transformation與Action的區別,Transformation通常執行後都會返回一個RDD,而Action不是。
啓動spark-shell
spark-shell --master spark://node1.itcast.cn:7077
練習1:
//經過並行化生成rdd val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)) //對rdd1裏的每個元素乘2而後排序 val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true) //過濾出大於等於十的元素 val rdd3 = rdd2.filter(_ >= 10) //將元素以數組的方式在客戶端顯示 rdd3.collect
練習2:
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j")) //將rdd1裏面的每個元素先切分在壓平 val rdd2 = rdd1.flatMap(_.split(' ')) rdd2.collect
練習3:
val rdd1 = sc.parallelize(List(5, 6, 4, 3)) val rdd2 = sc.parallelize(List(1, 2, 3, 4)) //求並集 val rdd3 = rdd1.union(rdd2) //求交集 val rdd4 = rdd1.intersection(rdd2) //去重 rdd3.distinct.collect rdd4.collect
練習4:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //求jion val rdd3 = rdd1.join(rdd2) rdd3.collect //求並集 val rdd4 = rdd1 union rdd2 //按key進行分組 rdd4.groupByKey rdd4.collect
練習5:
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) //cogroup val rdd3 = rdd1.cogroup(rdd2) //注意cogroup與groupByKey的區別 rdd3.collect
練習6:
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) //reduce聚合 val rdd2 = rdd1.reduce(_ + _) rdd2.collect
練習7:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2) //按key進行聚合 val rdd4 = rdd3.reduceByKey(_ + _) rdd4.collect //按value的降序排序 val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1)) rdd5.collect
//想要了解更多,訪問下面的地址
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Spark速度很是快的緣由之一,就是在不一樣操做中能夠在內存中持久化或緩存整個數據集。當持久化某個RDD後,每個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其餘動做中重用。這使得後續的動做變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特徵之一。能夠說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。
RDD經過persist方法或cache方法將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。緩存有可能丟失,或者存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有的Partition。
RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴指的是每個父RDD的Partition最多被子RDD的一個Parition使用
總結:窄依賴,咱們形象的比喻爲獨生子女
寬依賴指的是多個子RDD的Parition會依賴同一個父RDD的Partition
總結:寬依賴,咱們形象的比喻爲超生
RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
Spark向worker提交任務的流程,至關於Spark任務提交的總體流程中的driver內部操做和第④步操做
DAGScheduler和TaskScheduler是在Driver中造成的,TaskScheduler向Worker提交執行任務則至關於第④步。
Spark任務提交的總體流程: