本節介紹RDD的Transformations函數的原理和做用。還會介紹transformations函數的分類,和不一樣類型的轉換產生的效果。shell
在RDD中定義了兩類操做函數:action和transformations。transformations經過在一些RDD中執行一些數據操做來產生一個或多個新的RDD。這些transformations函數包括:map,filter,join,reduceByKey,cogroup,randomSplit等。apache
也就是說,transformations是一系列函數,它們的輸入是一個RDD,輸出是一個或多個RDD。但這些函數並不會改變輸入RDD的值(這是RDD不可改變的特性),但經過transformations函數的計算會產生一個或多個新的RDD。dom
RDD的transformations函數是懶評估(evaluated lazily)的。所謂懶評估是指:在調用transformations函數時不會當即執行,直到action函數被調用。也就是說,transformations函數的執行是由action函數的調用來觸發的。ide
經過使用transformations轉換函數,您可使用最終RDD的全部父RDD逐步構建RDD血緣(RDD Lineage)。函數
一個RDD經過transformations轉換函數處理後獲得的新的結果RDD一般與父RDD的值不一樣,該結果RDD的數據集可能變得更大(例如:flatMap,union等),也可能變得更小(例如:filter,distinct,count等),或則大小相同(例如:map等)。oop
注意:有些轉換函數也可能發起計算,例如:例如sortBy,zipWithIndex等lua
安裝好spark(能夠是單機的),並啓動spark-shell,在spark-shell中輸入一下命令:spa
# 1. 加載一個文件,文件內容不重要
scala> val file = sc.textFile("derby.log")
file: org.apache.spark.rdd.RDD[String] = derby.logMapPartitionsRDD[1] at textFile at <console>:24
# 2. 查看file的結果
scala> file.toDebugString
res14: String =
(2) derby.log MapPartitionsRDD[1] at textFile at <console>:24 []
| derby.log HadoopRDD[0] at textFile at <console>:24 []
# 3. 把文件中的內容按\s+進行分割,也能夠執行其餘文本操做
scala> val allWords = file.flatMap(_.split("\\s+"))
allWords: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2]at flatMap at <console>:26
# 4. 查看一下allWords這個RDD的linage或邏輯執行計劃
scala> allWords.toDebugString
res0: String =
(2) MapPartitionsRDD[2] at flatMap at <console>:26 []
| derby.log MapPartitionsRDD[1] at textFile at <console>:24 []
| derby.log HadoopRDD[0] at textFile at <console>:24 []
從以上第2步的代碼和輸出能夠看到,執行sc.textFile函數後,會產生兩種RDD,一種是:HadoopRDD,一種是:MapPartitionsRDD,其中HadoopRDD是中間狀態的RDD,最後獲得的是MapPartitionsRDD。這個結果從第1步的系統輸出能夠看到。scala
而後再對文件中的內容進行處理,會產生新的RDD,產生的新的RDD也是MapPartitionsRDD類型的。從第4步的輸出能夠看到。orm
爲了更好的理解這些輸出,咱們能夠看一下這些函數的源代碼:
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] =withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat],classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
能夠看到textFile會調用hadoopFile函數建立一個HadoopRDD,而後再執行map操做,這樣就獲得了一個MapPartitionsRDD,而後再對該RDD設置一個名稱,該RDD的名稱被設置爲參數path的值。
scala> file.name
res18: String = derby.log
窄轉換是基於窄依賴(narrow dependencies)進行的RDD轉換。
所謂窄依賴是指:父RDD的每一個分區最多被兒子RDD的一個分區使用。
產生窄轉換的函數有:map,filter,distinct,union,基於分區的jion等。
Spark能夠將窄轉換進行分組,造成一個pipeline,以便提升計算效率。
下圖是2個或3個RDD進行各類操做時的依賴關係,他們都是窄依賴。由於,全部的父RDD(前面的RDD)的每一個分區都最多被兒子RDD(後面的RDD)一個分區使用。
基於窄依賴的窄轉換操做是高效的,由於窄轉換一般能夠在同一個節點中完成,不須要有集羣中各個計算節點之間的數據傳輸。也就是,窄依賴不會產生shuffle。
那麼,讓咱們再進一步思考一下,爲何窄依賴不會產生shuffle?
從《RDD的分區原理》一章,咱們能夠知曉,RDD的數據是以分區的形式保存在spark的各個worker節點上。這樣,由於窄依賴,因此輸出的結果RDD(兒子RDD)的分區數據,都是基於父RDD的分區獲得的,計算結果是父RDD分區數據的子集,或者和父RDD的分區數相等,這就意味着:計算兒子RDD的分區和須要的父RDD的分區會在同一個節點上,也就是說:轉換的過程能夠在同一個節點中完成,不會產生worker節點之間的數據傳輸,因此窄轉換不會產生shuffle。
另外,當須要從新計算子RDD某個分區時,因爲父RDD的分區只服務於子RDD的某一個分區,因此,複用率是100%,計算過程不會有任何浪費。
寬轉換是基於寬依賴(wide dependencies)進行的RDD轉換。
所謂寬依賴是指:父RDD的每一個分區均可能被子RDD的多個分區使用。
也就是說,計算單個分區中的記錄所需的數據可能存在父RDD的多個分區中。因此,寬轉換會發生shuffle過程,有時候把寬轉換也稱爲:shuffle transformations。
具備相同key的全部元組必須最終位於同一分區中,由同一任務處理。爲了知足這些操做,Spark必須執行RDD shuffle動做,它在集羣之間傳輸數據,使用一組新的分區建立一個新階段。以下圖所示:
致使寬轉換的函數有:groupByKey,reduceByKey等。
進一步思考一下,當須要進行子RDD重算時,因爲須要從新計算父RDD的分區數據,但因爲父RDD的分區數據被多個子RDD分區依賴,而所有從新計算父RDD的某個分區,其實會形成計算資源的浪費。由於,計算出來的數據不會100%被子RDD所使用。
本節分析了spark RDD的Transformations性質和原理,着重講解了,窄轉換和款轉換的特性。窄轉換和寬轉換是spark的核心概念,要着重進行理解。