Spark內核以及源碼解析

一:圖RDD性能優化

1.上圖groupBy,Join會產生shuffle,shuffle能夠作性能優化。
2.stage1和stage2的數據要計算完成才shuffle。函數

 

二:Spark Core圖oop

SparkContext:應用程序通往集羣的惟一通道,會構建DAGSchedler,TaskSchedler
廣播變量:在每個Executor中的全局變量,對全部的Executor只發送一次,Executor中的每個task能夠獲取
累加器:全局累加器只能增長和讀取源碼分析

三:和MapReduce的區別
性能

spark全在內存作計算,Hadoop會保存在磁盤
pipeline:算子沿着數據通道計算下去 ,全在內存作計算,優化執行效率優化

 

四:幾個重要的RDD
spa

 

五:源碼分析scala

一:RDD
 必須 
 *  - A list of partitions  一系列partitions集合
 *  - A function for computing each split  爲每一個分區提供一個computing的函數
 *  - A list of dependencies on other RDDs  RDD會依賴其餘RDDs, 這種特性叫作:lineage(生命線);特例:第一個RDD不依賴其餘RDD
 可選
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)

兩個Partitioner:range Partitioner, Hash Partitioner
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  protected def getPartitions: Array[Partition]: ===> 獲取當前RDD全部的分區
  def compute(split: Partition, context: TaskContext): Iterator[T] ===> 對每一個分區上的數據進行計算操做
  protected def getDependencies: Seq[Dependency[_]]: ===> 獲取依賴的RDD,依賴的RDD是一個集合
  protected def getPreferredLocations(split: Partition): Seq[String] ===> 數據計算本地化專用
  val partitioner: Option[Partitioner] ===> 獲取分區器

}



二:HadoopRDD
1:分區:每一個HDFS block
2:依賴:無依賴,由於直接從hdfs讀取數據
3:函數:讀取每個block
4:最佳位置:Hdfs block所在的位置
5:分區:無

@DeveloperApi
class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

//讀取數據getInputFormat數據有不一樣的Format方式
 protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
      .asInstanceOf[InputFormat[K, V]]
    newInputFormat match {
      case c: Configurable => c.setConf(conf)
      case _ =>
    }
    newInputFormat
  }

」
相關文章
相關標籤/搜索