Spark RDD基本概念與基本用法

1. 什麼是RDD

RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯,位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。html

2. RDD的屬性

1)  A list of partitions 一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。node

2) A function for computing each split 一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。算法

3) A list of dependencies on other RDDs  RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。shell

4) Optionally,a Partitoner for key-value RDDs 一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。編程

5) Optionally,a list of preferred locations to compute each split on 一個列表,存儲每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Patition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。數組

3. 建立RDD

1) 由一個已經存在的Scala集合建立;緩存

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))分佈式

2) 由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等;ide

val rdd2 = sc.textFile("hdfs://hadoop1:9000/words.txt")函數

4. RDD編程API

4.1 Transformation(轉換)

RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。

經常使用的Transformation:

轉換 含義
map(func) 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成。
filter(func) 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成。
flatMap(func) 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
mapPartitions(func) 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, function, seed) 根據fraction指定比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks]) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)對組成的RDD上調用,返回一個(K, Iterator[V])對組成的RDD。默認狀況下,輸出結果的並行度依賴於父RDD的分區數目。若是想要對Key進行聚合,使用reduceByKey或者combineByKey會有更好的性能。
reduceByKey(func, [numTasks]) 在一個(K,V)對的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似。reduce任務的個數能夠經過第二個可選的參數來設置
aggregateByKey(zeroValue)  
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func, [ascending], [numTasks]) 與sortByKey相似,可是更靈活
join(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K, (V,W))對,每一個key中的全部元素都在一塊兒的RDD
cogroup(otherDataset, [numTasks]) 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesion(otherDataset) 笛卡爾積,但在數據集T和U上調用,返回一個(T, U)對的數據集,全部元素交互進行笛卡爾積
pipe(command, [envVars])  經過管道的方式對RDD的每一個分區使用Shell命令進行操做,返回對應的結果
coalesce(numPartitions)  對RDD中的分區減小指定的數目,一般再過濾完一個大的數據集以後進行操做
rePartition(numPartitions)  對RDD中全部records平均劃分到numPartitions個partition中
rePartitionAndSordWithPartitions(partitioner)  
   

4.2 Action

動做 h含義
reduce(func) 經過func函數彙集RDD中的全部元素,這個功能必須是可交換且並聯的
collect() 在驅動程序中,以數組的形式返回數據集的全部元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(相似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement, num, [seed]) 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])  排序後的limit(n)
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它轉換爲文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統
saveAsObjectFile(path)  使用Java的序列化方法保存到本地文件,能夠被SparkContext.objectFile加載
countByKey() 針對(K,V)類型的RDD,返回一個(K,int)的map,表示每個key對應的元素個數
foreach(func) 在數據集的每個元素上,運行函數func進行更新

注意:Transformation與Action的區別,Transformation通常執行後都會返回一個RDD,而Action不是。

4.3    練習

啓動spark-shell

spark-shell --master spark://node1.itcast.cn:7077

 

練習1

//經過並行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

//對rdd1裏的每個元素乘2而後排序

val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)

//過濾出大於等於十的元素

val rdd3 = rdd2.filter(_ >= 10)

//將元素以數組的方式在客戶端顯示

rdd3.collect

 

練習2

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//將rdd1裏面的每個元素先切分在壓平

val rdd2 = rdd1.flatMap(_.split(' '))

rdd2.collect

 

練習3

val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//求並集

val rdd3 = rdd1.union(rdd2)

//求交集

val rdd4 = rdd1.intersection(rdd2)

//去重

rdd3.distinct.collect

rdd4.collect

 

練習4

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//求jion

val rdd3 = rdd1.join(rdd2)

rdd3.collect

//求並集

val rdd4 = rdd1 union rdd2

//按key進行分組

rdd4.groupByKey

rdd4.collect

 

練習5

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

//cogroup

val rdd3 = rdd1.cogroup(rdd2)

//注意cogroup與groupByKey的區別

rdd3.collect

 

練習6

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))

//reduce聚合

val rdd2 = rdd1.reduce(_ + _)

rdd2.collect

 

練習7

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))

val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))

val rdd3 = rdd1.union(rdd2)

//按key進行聚合

val rdd4 = rdd3.reduceByKey(_ + _)

rdd4.collect

//按value的降序排序

val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))

rdd5.collect

 

//想要了解更多,訪問下面的地址

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

5. RDD的緩存

Spark速度很是快的緣由之一,就是在不一樣操做中能夠在內存中持久化或緩存整個數據集。當持久化某個RDD後,每個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其餘動做中重用。這使得後續的動做變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特徵之一。能夠說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。

5.1 RDD緩存方式

RDD經過persist方法或cache方法將前面的計算結果緩存,可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

經過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。緩存有可能丟失,或者存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有的Partition。

6. RDD的依賴關係

RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

 

6.1 窄依賴

窄依賴指的是每個父RDD的Partition最多被子RDD的一個Parition使用

總結:窄依賴,咱們形象的比喻爲獨生子女

6.2 寬依賴

寬依賴指的是多個子RDD的Parition會依賴同一個父RDD的Partition

總結:寬依賴,咱們形象的比喻爲超生

6.3 Lineage

RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。

7. DAG的生成

DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。

8. WordCount程序的執行流程及其Stage劃分

9. Spark任務提交流程

Spark向worker提交任務的流程,至關於Spark任務提交的總體流程中的driver內部操做和第④步操做

DAGScheduler和TaskScheduler是在Driver中造成的,TaskScheduler向Worker提交執行任務則至關於第④步。

Spark任務提交的總體流程:

 

相關文章
相關標籤/搜索