RDD轉換操做原理

本節介紹RDD的Transformations函數的原理和做用。還會介紹transformations函數的分類,和不一樣類型的轉換產生的效果。shell

Transformations簡介

在RDD中定義了兩類操做函數:action和transformations。transformations經過在一些RDD中執行一些數據操做來產生一個或多個新的RDD。這些transformations函數包括:map,filter,join,reduceByKey,cogroup,randomSplit等。apache

也就是說,transformations是一系列函數,它們的輸入是一個RDD,輸出是一個或多個RDD。但這些函數並不會改變輸入RDD的值(這是RDD不可改變的特性),但經過transformations函數的計算會產生一個或多個新的RDD。dom

Transformations的性質

RDD的transformations函數是懶評估(evaluated lazily)的。所謂懶評估是指:在調用transformations函數時不會當即執行,直到action函數被調用。也就是說,transformations函數的執行是由action函數的調用來觸發的。ide

經過使用transformations轉換函數,您可使用最終RDD的全部父RDD逐步構建RDD血緣(RDD Lineage)。函數

一個RDD經過transformations轉換函數處理後獲得的新的結果RDD一般與父RDD的值不一樣,該結果RDD的數據集可能變得更大(例如:flatMap,union等),也可能變得更小(例如:filter,distinct,count等),或則大小相同(例如:map等)。oop

注意:有些轉換函數也可能發起計算,例如:例如sortBy,zipWithIndex等lua

Transformations函數的使用

安裝好spark(能夠是單機的),並啓動spark-shell,在spark-shell中輸入一下命令:spa

 # 1. 加載一個文件,文件內容不重要
 scala> val file = sc.textFile("derby.log")
 fileorg.apache.spark.rdd.RDD[String= derby.logMapPartitionsRDD[1at textFile at <console>:24
 
 # 2. 查看file的結果
 scala> file.toDebugString
 res14String =
 (2derby.log MapPartitionsRDD[1at textFile at <console>:24 []
  |  derby.log HadoopRDD[0at textFile at <console>:24 []
 
 # 3. 把文件中的內容按\s+進行分割,也能夠執行其餘文本操做
 scala> val allWords = file.flatMap(_.split("\\s+"))
 allWordsorg.apache.spark.rdd.RDD[String= MapPartitionsRDD[2]at flatMap at <console>:26
 
 # 4. 查看一下allWords這個RDD的linage或邏輯執行計劃
 scala> allWords.toDebugString
 res0String =
 (2MapPartitionsRDD[2at flatMap at <console>:26 []
  |  derby.log MapPartitionsRDD[1at textFile at <console>:24 []
  |  derby.log HadoopRDD[0at textFile at <console>:24 []

從以上第2步的代碼和輸出能夠看到,執行sc.textFile函數後,會產生兩種RDD,一種是:HadoopRDD,一種是:MapPartitionsRDD,其中HadoopRDD是中間狀態的RDD,最後獲得的是MapPartitionsRDD。這個結果從第1步的系統輸出能夠看到。scala

而後再對文件中的內容進行處理,會產生新的RDD,產生的新的RDD也是MapPartitionsRDD類型的。從第4步的輸出能夠看到。orm

爲了更好的理解這些輸出,咱們能夠看一下這些函數的源代碼:

   def textFile(
       pathString,
       minPartitionsInt = defaultMinPartitions): RDD[String=withScope {
     assertNotStopped()
     hadoopFile(pathclassOf[TextInputFormat],classOf[LongWritable], classOf[Text],
       minPartitions).map(pair => pair._2.toString).setName(path)
  }

能夠看到textFile會調用hadoopFile函數建立一個HadoopRDD,而後再執行map操做,這樣就獲得了一個MapPartitionsRDD,而後再對該RDD設置一個名稱,該RDD的名稱被設置爲參數path的值。

 scala> file.name
 res18: String = derby.log

窄轉換(Narrow transformations)和寬轉換(Wide transformations)

窄轉換(Narrow transformations)

窄轉換是基於窄依賴(narrow dependencies)進行的RDD轉換。

所謂窄依賴是指:父RDD的每一個分區最多被兒子RDD的一個分區使用。

產生窄轉換的函數有:map,filter,distinct,union,基於分區的jion等。

Spark能夠將窄轉換進行分組,造成一個pipeline,以便提升計算效率。

下圖是2個或3個RDD進行各類操做時的依賴關係,他們都是窄依賴。由於,全部的父RDD(前面的RDD)的每一個分區都最多被兒子RDD(後面的RDD)一個分區使用。

image.png

基於窄依賴的窄轉換操做是高效的,由於窄轉換一般能夠在同一個節點中完成,不須要有集羣中各個計算節點之間的數據傳輸。也就是,窄依賴不會產生shuffle。

那麼,讓咱們再進一步思考一下,爲何窄依賴不會產生shuffle?

從《RDD的分區原理》一章,咱們能夠知曉,RDD的數據是以分區的形式保存在spark的各個worker節點上。這樣,由於窄依賴,因此輸出的結果RDD(兒子RDD)的分區數據,都是基於父RDD的分區獲得的,計算結果是父RDD分區數據的子集,或者和父RDD的分區數相等,這就意味着:計算兒子RDD的分區和須要的父RDD的分區會在同一個節點上,也就是說:轉換的過程能夠在同一個節點中完成,不會產生worker節點之間的數據傳輸,因此窄轉換不會產生shuffle。

另外,當須要從新計算子RDD某個分區時,因爲父RDD的分區只服務於子RDD的某一個分區,因此,複用率是100%,計算過程不會有任何浪費。

寬轉換(wide transformations)

寬轉換是基於寬依賴(wide dependencies)進行的RDD轉換。

所謂寬依賴是指:父RDD的每一個分區均可能被子RDD的多個分區使用。

也就是說,計算單個分區中的記錄所需的數據可能存在父RDD的多個分區中。因此,寬轉換會發生shuffle過程,有時候把寬轉換也稱爲:shuffle transformations。

具備相同key的全部元組必須最終位於同一分區中,由同一任務處理。爲了知足這些操做,Spark必須執行RDD shuffle動做,它在集羣之間傳輸數據,使用一組新的分區建立一個新階段。以下圖所示:

image.png

致使寬轉換的函數有:groupByKey,reduceByKey等。

進一步思考一下,當須要進行子RDD重算時,因爲須要從新計算父RDD的分區數據,但因爲父RDD的分區數據被多個子RDD分區依賴,而所有從新計算父RDD的某個分區,其實會形成計算資源的浪費。由於,計算出來的數據不會100%被子RDD所使用。

總結

本節分析了spark RDD的Transformations性質和原理,着重講解了,窄轉換和款轉換的特性。窄轉換和寬轉換是spark的核心概念,要着重進行理解。

相關文章
相關標籤/搜索