A list of partitions數組
A function for computing each split函數
A list of dependencies on other RDDsoop
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)測試
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)大數據
這是RDD的源碼中註釋中寫到的,下面介紹這五種特徵屬性this
一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決
定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值人工智能
一個對分區數據進行計算的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現 compute 函數以
達到該目的。compute函數會對迭代器進行組合,不須要保存每次計算的結果scala
RDD之間的存在依賴關係。RDD的每次轉換都會生成一個新的RDD,RDD之間造成相似於流水線同樣的先後依
賴關係(lineage)。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是
對RDD的全部分區進行從新計算3d
對於 key-value
的RDD而言,可能存在分區器(Partitioner
)。Spark 實現了兩種類型的分片函數,一個是基於
哈希的HashPartitioner
,另一個是基於範圍的RangePartitioner。只有 key-value 的RDD,纔可能有
Partitioner
,非key-value的RDD的Parititioner
的值是None。Partitioner
函數決定了RDD
自己的分片數量,也
決定了parent RDD Shuffle
輸出時的分片數量code
一個列表,存儲存儲每一個Partition
的優先位置(preferred location)。對於一個HDFS
文件來講,這個列表保
存的就是每一個Partition
所在的塊的位置。按照「移動數據不移動計算」的理念,Spark
在任務調度的時候,會盡可
能地將計算任務分配到其所要處理數據塊的存儲位置
從前面的RDD
的基本特徵入手,在工做中常編寫的程序是,建立RDD
,RDD
的轉換,RDD
的算子的執行,建立對應着外部系統的數據流入Spark集羣的必選步驟,至於之間從集合建立的數據,通常在測試時候使用,因此不細述,RDD
的轉換對應一個專門的算子叫Transformation
其是惰性加載使用的, 而行動對應着觸發Transformation
執行的操做,通常是輸出到集合,或者打印出來,或者返回一個值,另外就是從集羣輸出到別的系統,這有一個專業詞叫Action
.
轉換算子,即從一個RDD到另一個RDD的轉換操做,對應一些內置的Compute函數,可是這些函數被有沒有shuffle來分爲寬依賴算子和窄依賴算子
通常網上文章有兩種,一種是搬運定義的,便是否一個父RDD
分區會被多個子分區依賴,另一種是看有沒有Shuffle
,有Shuffle
就是寬依賴,沒有則是窄依賴,第一種還靠譜點,第二種就是拿自己來講自己,因此沒有參考價值,2.1.3 如何區別寬依賴和窄依賴,能夠之間看這個
map(func)
:對數據集中的每一個元素都使用func,而後返回一個新的RDD
filter(func)
:對數據集中的每一個元素都使用func,而後返回一個包含使func爲true的元素構成的RDD
flatMap(func)
:與 map 相似,每一個輸入元素被映射爲0或多個輸出元素
mapPartitions(func)
:和map很像,可是map是將func做用在每一個元素上,而mapPartitions是func做用在整個分
區上。假設一個RDD有N個元素,M個分區(N >> M),那麼map的函數將被調用N次,而mapPartitions中的函數
僅被調用M次,一次處理一個分區中的全部元素
mapPartitionsWithIndex(func)
:與 mapPartitions 相似,多了分區的索引值的信息
glom()
:將每個分區造成一個數組,造成新的RDD類型 RDD[Array[T]]
sample(withReplacement, fraction, seed)
:採樣算子。以指定的隨機種子(seed)隨機抽樣出數量爲fraction的數
據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣
coalesce(numPartitions,false)
:無shuffle,通常用來減小分區
union(otherRDD)
: 求兩個RDD的並集
cartesian(otherRDD)
:笛卡爾積
zip(otherRDD)
:將兩個RDD組合成 key-value 形式的RDD,默認兩個RDD的partition數量以及元素數量都相同,否
則會拋出異常。
map 與 mapPartitions 的區別
map
:每次處理一條數據
mapPartitions
:每次處理一個分區的數據,分區的數據處理完成後,數據才能釋放,資源不足時容易致使
OOM
最佳實踐:當內存資源充足時,建議使用mapPartitions
,以提升處理效率
groupBy(func)
:按照傳入函數的返回值進行分組。將key相同的值放入一個迭代器
distinct([numTasks]))
:對RDD元素去重後,返回一個新的RDD。可傳入numTasks參數改變RDD分區數
coalesce(numPartitions, true)
:有shuffle,不管增長分區仍是減小分區,通常用repartition來代替
repartition(numPartitions)
:增長或減小分區數,有shuffle
sortBy(func, [ascending], [numTasks])
:使用 func 對數據進行處理,對處理後的結果進行排序
intersection(otherRDD)
: 求兩個RDD的交集
subtract (otherRDD)
: 求兩個RDD的差集
這裏我建議理解不了的算子,直接從Spark
的history
的依賴圖來看,有沒有劃分Stage
,若是劃分了就是寬依賴,沒有劃分就是窄依賴,固然這是實戰派的作法,能夠在同事或者同窗說明問題的時候,show your code
給他,而後把依賴圖拿給他 ,固然做爲理論加實踐的並行者,我這裏再拿一種來判別,是從理解定義開始的,定義說是父RDD分區有沒有被多個子分區依賴,那能夠從這個角度想一下,父分區單個分區數據,有沒有可能流向不一樣的子RDD的分區,好比想想distinct算子,或者sortBy算子,全局去重和全局排序,假設剛開始1,2,3在一個分區,通過map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)
去重後,雖然分區數量沒有變,可是每一個分區數據必然要看別的分區的數據,才能知道最後本身要不要保留,從輸入分區,到輸出分區,必然通過匯合重組,因此必然有shuffle
的。sortBy
同理。
Action觸發Job。一個Spark程序(Driver程序)包含了多少 Action 算子,那麼就有多少Job;
典型的Action算子: collect / count
collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發了Job
collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)
first()
:Return the first element in this RDD
take(n)
:Take the first num elements of the RDD
top(n)
:按照默認(降序)或者指定的排序規則,返回前num個元素。
takeSample(withReplacement, num, [seed])
:返回採樣的數據
foreach(func) / foreachPartition(func)
:與map、mapPartitions相似,區別是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
RDD總體上分爲 Value 類型和 Key-Value 類型。
前面介紹的是 Value 類型的RDD的操做,實際使用更多的是 key-value 類型的RDD,也稱爲 PairRDD。
Value 類型RDD的操做基本集中在 RDD.scala 中;
key-value 類型的RDD操做集中在 PairRDDFunctions.scala 中;
前面介紹的大多數算子對 Pair RDD 都是有效的,RDD的值爲key-value的時候便可隱式轉換爲PairRDD, Pair RDD還有屬於本身的 Transformation、Action 算子;
mapValues / flatMapValues / keys / values,這些操做均可以使用 map 操做實現,是簡化操做。
PariRDD(k, v)使用範圍廣,聚合
groupByKey / reduceByKey / foldByKey / aggregateByKey
combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實現
subtractByKey:相似於subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素
結論:效率相等用最熟悉的方法;groupByKey在通常狀況下效率低,儘可能少用
sortByKey:sortByKey函數做用於PairRDD,對Key進行排序
cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin
val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink"))) val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七"))) val rdd3 = rdd1.cogroup(rdd2) rdd3.collect.foreach(println) rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect // 仿照源碼實現join操做 rdd3.flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java"))) val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K"))) rdd1.join(rdd2).collect rdd1.leftOuterJoin(rdd2).collect rdd1.rightOuterJoin(rdd2).collect rdd1.fullOuterJoin(rdd2).collect
collectAsMap / countByKey / lookup(key)
lookup(key)
:高效的查找方法,只查找對應分區的數據(若是RDD有分區器的話
實戰出真知,想要某種實現的時候,假設剛好你想到某個算子,那麼去使用它,不懂的地方看源碼,大業可成!
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注