一.Spark上下文
1.做用:鏈接Spark集羣,用戶建立RDD、累加器和廣播。shell
2.RDD:Resilient Distributed Dataset,彈性式分佈式數據集,有4種類型,以下:
a.建立RDD:3類(parallelize:將Seq序列數據轉化爲RDD、textFile將外部文件轉化RDD、makeRDD將Seq序列數據轉化爲RDD並可指定分區)
b.轉換操做:將RDD轉換爲另外一個RDD,轉換操做不觸發運算
c.行動操做:觸發運算,執行Job
d.控制操做:對RDD的cache(緩存應用)apache
二.RDD的說明
1.RDD表明一個不可變的,可並行操做的元素分區集合。緩存
2.內部表現形式上,RDD可分爲如下5部分:
a.partition分區:獲取partition分區列表,實際上就是切分(inputSplit)對象
b.partitioner分區函數:針對每一個Split提供相對應的分區函數(partitioner)
c.dependencies依賴關係:描述RDD之間的依賴關係,獲取RDD的依賴關係的列表
d.hash分區定義:針對KV類型的RDD,默認採起Hashpartitioon
e.preferredlocations首選位置:用於計算每一個split的首選位置列表(例如用於HDFS文件塊的位置)分佈式
三.rdd建立partition說明
1.local版:spark-shell --Master local[n]
a.【parallelize】-->默認建立的分區數爲"n"
b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
-->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數
2.spark集羣 spark-shell --Master spark://master:7077
a.【parallelize】-->默認建立的分區數爲Cores的總數==Executor數
b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
-->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數
3.yarn集羣 spark-shell --Master yarn
a.【parallelize】-->默認建立的分區數爲yarn集羣的Executor數
b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
-->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數 函數
四.切分大小源碼分析
val rdd = sc.textFile("file:///D:/測試數據/spark_wordcount.txt")
-->【SparkContext.scala】def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]
說明:def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
-->調用hadoopFile() 方法
-->new HadoopRDD()
-->經過getPartitions()方法獲取partitions分區數
-->調用【org.apache.hadoop.mapred.FileInputFormat】Hadoop老版本的API的getSplits()
-->計算規則以下:
1.首先計算goalSize(目標大小)
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
2.獲取SplitSize(切分大小)
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
computeSplitSize --> Math.max(minSize, Math.min(goalSize, blockSize));oop
五.preferredlocations首選位置
獲取RDD首選位置,首先經過RDD依賴獲得HadoopRDD,調用preferredLocations方法
scala> val rdd = sc.textFile("/worldcount/test1.txt")
rdd: org.apache.spark.rdd.RDD[String] = /worldcount/test1.txt MapPartitionsRDD[3] at textFile at <console>:24源碼分析
scala> rdd.dependencies
res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@bd9abda)測試
scala> rdd.dependencies(0)
res18: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@bd9abdathis
scala> val rdd1 = rdd.dependencies(0).rdd
rdd1: org.apache.spark.rdd.RDD[_] = /worldcount/test1.txt HadoopRDD[2] at textFile at <console>:24spa
scala> rdd1.preferredLocations(rdd.partitions(0))
res19: Seq[String] = ArraySeq(master, slave1)
scala> rdd1.preferredLocations(rdd.partitions(1))
res20: Seq[String] = ArraySeq(master, slave1)