一:圖RDD性能優化
1.上圖groupBy,Join會產生shuffle,shuffle能夠作性能優化。
2.stage1和stage2的數據要計算完成才shuffle。函數
二:Spark Core圖oop
SparkContext:應用程序通往集羣的惟一通道,會構建DAGSchedler,TaskSchedler
廣播變量:在每個Executor中的全局變量,對全部的Executor只發送一次,Executor中的每個task能夠獲取
累加器:全局累加器只能增長和讀取源碼分析
三:和MapReduce的區別
性能
spark全在內存作計算,Hadoop會保存在磁盤
pipeline:算子沿着數據通道計算下去 ,全在內存作計算,優化執行效率優化
四:幾個重要的RDD
spa
五:源碼分析scala
一:RDD 必須 * - A list of partitions 一系列partitions集合 * - A function for computing each split 爲每一個分區提供一個computing的函數 * - A list of dependencies on other RDDs RDD會依賴其餘RDDs, 這種特性叫作:lineage(生命線);特例:第一個RDD不依賴其餘RDD 可選 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) 兩個Partitioner:range Partitioner, Hash Partitioner abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { protected def getPartitions: Array[Partition]: ===> 獲取當前RDD全部的分區 def compute(split: Partition, context: TaskContext): Iterator[T] ===> 對每一個分區上的數據進行計算操做 protected def getDependencies: Seq[Dependency[_]]: ===> 獲取依賴的RDD,依賴的RDD是一個集合 protected def getPreferredLocations(split: Partition): Seq[String] ===> 數據計算本地化專用 val partitioner: Option[Partitioner] ===> 獲取分區器 } 二:HadoopRDD 1:分區:每一個HDFS block 2:依賴:無依賴,由於直接從hdfs讀取數據 3:函數:讀取每個block 4:最佳位置:Hdfs block所在的位置 5:分區:無 @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { //讀取數據getInputFormat數據有不一樣的Format方式 protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] newInputFormat match { case c: Configurable => c.setConf(conf) case _ => } newInputFormat } 」