大數據開發-Spark-一文理解常見RDD

1.五個基本Properties

  • A list of partitions數組

  • A function for computing each split函數

  • A list of dependencies on other RDDsoop

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)測試

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)大數據

這是RDD的源碼中註釋中寫到的,下面介紹這五種特徵屬性this

1.1 分區

一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決
定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值人工智能

1.2 計算的函數

一個對分區數據進行計算的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現 compute 函數以
達到該目的。compute函數會對迭代器進行組合,不須要保存每次計算的結果scala

1.3 依賴關係

RDD之間的存在依賴關係。RDD的每次轉換都會生成一個新的RDD,RDD之間造成相似於流水線同樣的先後依
賴關係(lineage)。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是
對RDD的全部分區進行從新計算3d

1.4 分區器

對於 key-value 的RDD而言,可能存在分區器(Partitioner)。Spark 實現了兩種類型的分片函數,一個是基於
哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有 key-value 的RDD,纔可能有
Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數決定了RDD自己的分片數量,也
決定了parent RDD Shuffle輸出時的分片數量code

1.5 優先存儲位置

一個列表,存儲存儲每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保
存的就是每一個Partition所在的塊的位置。按照「移動數據不移動計算」的理念,Spark在任務調度的時候,會盡可
能地將計算任務分配到其所要處理數據塊的存儲位置

2. RDD轉換之間的常見算子

從前面的RDD的基本特徵入手,在工做中常編寫的程序是,建立RDDRDD的轉換,RDD的算子的執行,建立對應着外部系統的數據流入Spark集羣的必選步驟,至於之間從集合建立的數據,通常在測試時候使用,因此不細述,RDD的轉換對應一個專門的算子叫Transformation其是惰性加載使用的, 而行動對應着觸發Transformation執行的操做,通常是輸出到集合,或者打印出來,或者返回一個值,另外就是從集羣輸出到別的系統,這有一個專業詞叫Action.

2.1 常見轉換算子

轉換算子,即從一個RDD到另一個RDD的轉換操做,對應一些內置的Compute函數,可是這些函數被有沒有shuffle來分爲寬依賴算子和窄依賴算子

2.1.1 寬依賴和窄依賴的區別

通常網上文章有兩種,一種是搬運定義的,便是否一個父RDD分區會被多個子分區依賴,另一種是看有沒有Shuffle,有Shuffle就是寬依賴,沒有則是窄依賴,第一種還靠譜點,第二種就是拿自己來講自己,因此沒有參考價值,2.1.3 如何區別寬依賴和窄依賴,能夠之間看這個

2.1.2 寬依賴和窄依賴的常見算子

窄依賴常見算子

map(func):對數據集中的每一個元素都使用func,而後返回一個新的RDD
filter(func):對數據集中的每一個元素都使用func,而後返回一個包含使func爲true的元素構成的RDD
flatMap(func):與 map 相似,每一個輸入元素被映射爲0或多個輸出元素
mapPartitions(func):和map很像,可是map是將func做用在每一個元素上,而mapPartitions是func做用在整個分
區上。假設一個RDD有N個元素,M個分區(N >> M),那麼map的函數將被調用N次,而mapPartitions中的函數
僅被調用M次,一次處理一個分區中的全部元素
mapPartitionsWithIndex(func):與 mapPartitions 相似,多了分區的索引值的信息

glom():將每個分區造成一個數組,造成新的RDD類型 RDD[Array[T]]
sample(withReplacement, fraction, seed):採樣算子。以指定的隨機種子(seed)隨機抽樣出數量爲fraction的數
據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣

coalesce(numPartitions,false):無shuffle,通常用來減小分區

union(otherRDD) : 求兩個RDD的並集

cartesian(otherRDD):笛卡爾積

zip(otherRDD):將兩個RDD組合成 key-value 形式的RDD,默認兩個RDD的partition數量以及元素數量都相同,否
則會拋出異常。

map 與 mapPartitions 的區別
map:每次處理一條數據
mapPartitions:每次處理一個分區的數據,分區的數據處理完成後,數據才能釋放,資源不足時容易致使
OOM
最佳實踐:當內存資源充足時,建議使用mapPartitions,以提升處理效率

寬依賴常見算子

groupBy(func):按照傳入函數的返回值進行分組。將key相同的值放入一個迭代器

distinct([numTasks])):對RDD元素去重後,返回一個新的RDD。可傳入numTasks參數改變RDD分區數

coalesce(numPartitions, true):有shuffle,不管增長分區仍是減小分區,通常用repartition來代替

repartition(numPartitions):增長或減小分區數,有shuffle

sortBy(func, [ascending], [numTasks]):使用 func 對數據進行處理,對處理後的結果進行排序

intersection(otherRDD) : 求兩個RDD的交集

subtract (otherRDD) : 求兩個RDD的差集

2.1.3 如何區別寬依賴和窄依賴

這裏我建議理解不了的算子,直接從Sparkhistory的依賴圖來看,有沒有劃分Stage,若是劃分了就是寬依賴,沒有劃分就是窄依賴,固然這是實戰派的作法,能夠在同事或者同窗說明問題的時候,show your code 給他,而後把依賴圖拿給他 ,固然做爲理論加實踐的並行者,我這裏再拿一種來判別,是從理解定義開始的,定義說是父RDD分區有沒有被多個子分區依賴,那能夠從這個角度想一下,父分區單個分區數據,有沒有可能流向不一樣的子RDD的分區,好比想想distinct算子,或者sortBy算子,全局去重和全局排序,假設剛開始1,2,3在一個分區,通過map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) 去重後,雖然分區數量沒有變,可是每一個分區數據必然要看別的分區的數據,才能知道最後本身要不要保留,從輸入分區,到輸出分區,必然通過匯合重組,因此必然有shuffle的。sortBy同理。

2.2 常見行動算子

Action觸發Job。一個Spark程序(Driver程序)包含了多少 Action 算子,那麼就有多少Job;
典型的Action算子: collect / count
collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發了Job

collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)

first():Return the first element in this RDD
take(n):Take the first num elements of the RDD
top(n):按照默認(降序)或者指定的排序規則,返回前num個元素。
takeSample(withReplacement, num, [seed]):返回採樣的數據
foreach(func) / foreachPartition(func):與map、mapPartitions相似,區別是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)

3. PairRDD常見操做

RDD總體上分爲 Value 類型和 Key-Value 類型。
前面介紹的是 Value 類型的RDD的操做,實際使用更多的是 key-value 類型的RDD,也稱爲 PairRDD。
Value 類型RDD的操做基本集中在 RDD.scala 中;
key-value 類型的RDD操做集中在 PairRDDFunctions.scala 中;

前面介紹的大多數算子對 Pair RDD 都是有效的,RDD的值爲key-value的時候便可隱式轉換爲PairRDD, Pair RDD還有屬於本身的 Transformation、Action 算子;

file

3.1 常見PairRDD的Transformation操做

3.1.1 相似 map 操做

mapValues / flatMapValues / keys / values,這些操做均可以使用 map 操做實現,是簡化操做。

3.1.2 聚合操做【重要、難點】

PariRDD(k, v)使用範圍廣,聚合
groupByKey / reduceByKey / foldByKey / aggregateByKey
combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實現
subtractByKey:相似於subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素

結論:效率相等用最熟悉的方法;groupByKey在通常狀況下效率低,儘可能少用

3.1.3 排序操做

sortByKey:sortByKey函數做用於PairRDD,對Key進行排序

3.1.4 join操做

cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin

file

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// 仿照源碼實現join操做
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

3.1.5 Action操做

collectAsMap / countByKey / lookup(key)

file

lookup(key):高效的查找方法,只查找對應分區的數據(若是RDD有分區器的話

4.寄語

實戰出真知,想要某種實現的時候,假設剛好你想到某個算子,那麼去使用它,不懂的地方看源碼,大業可成!
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注
file

相關文章
相關標籤/搜索