SparkContext和RDD的說明

一.Spark上下文

  1.做用:鏈接Spark集羣,用戶建立RDD、累加器和廣播。shell

  2.RDD:Resilient Distributed Dataset,彈性式分佈式數據集,有4種類型,以下:
    a.建立RDD:3類(parallelize:將Seq序列數據轉化爲RDD、textFile將外部文件轉化RDD、makeRDD將Seq序列數據轉化爲RDD並可指定分區)
    b.轉換操做:將RDD轉換爲另外一個RDD,轉換操做不觸發運算
    c.行動操做:觸發運算,執行Job
    d.控制操做:對RDD的cache(緩存應用)apache

二.RDD的說明

  1.RDD表明一個不可變的,可並行操做的元素分區集合。緩存

  2.內部表現形式上,RDD可分爲如下5部分:
    a.partition分區:獲取partition分區列表,實際上就是切分(inputSplit)對象
    b.partitioner分區函數:針對每一個Split提供相對應的分區函數(partitioner)
    c.dependencies依賴關係:描述RDD之間的依賴關係,獲取RDD的依賴關係的列表
    d.hash分區定義:針對KV類型的RDD,默認採起Hashpartitioon
    e.preferredlocations首選位置:用於計算每一個split的首選位置列表(例如用於HDFS文件塊的位置)分佈式

三.rdd建立partition說明

  1.local版:spark-shell --Master local[n]
    a.【parallelize】-->默認建立的分區數爲"n"
    b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
          -->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數
  2.spark集羣 spark-shell --Master spark://master:7077
    a.【parallelize】-->默認建立的分區數爲Cores的總數==Executor數
    b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
          -->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數
  3.yarn集羣 spark-shell --Master yarn
    a.【parallelize】-->默認建立的分區數爲yarn集羣的Executor數
    b.【textFile】 -->加載本地文件:默認建立的分區數爲最大爲2,若是n=1<2,則爲1
          -->hdfs文件系統:加載的分區數爲block個數;注:block塊的大小應大於10%(inputsplit知識點);更改分區數必須大於等於block數  函數

四.切分大小源碼分析

val rdd = sc.textFile("file:///D:/測試數據/spark_wordcount.txt")
-->【SparkContext.scala】def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String]
  說明:def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
-->調用hadoopFile() 方法
  -->new HadoopRDD()
    -->經過getPartitions()方法獲取partitions分區數
      -->調用【org.apache.hadoop.mapred.FileInputFormat】Hadoop老版本的API的getSplits()
        -->計算規則以下:
          1.首先計算goalSize(目標大小)
            long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
          2.獲取SplitSize(切分大小)
            long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
            computeSplitSize --> Math.max(minSize, Math.min(goalSize, blockSize));oop

五.preferredlocations首選位置

獲取RDD首選位置,首先經過RDD依賴獲得HadoopRDD,調用preferredLocations方法
  scala> val rdd = sc.textFile("/worldcount/test1.txt")
    rdd: org.apache.spark.rdd.RDD[String] = /worldcount/test1.txt MapPartitionsRDD[3] at textFile at <console>:24源碼分析

  scala> rdd.dependencies
    res17: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@bd9abda)測試

  scala> rdd.dependencies(0)
    res18: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@bd9abdathis

  scala> val rdd1 = rdd.dependencies(0).rdd
    rdd1: org.apache.spark.rdd.RDD[_] = /worldcount/test1.txt HadoopRDD[2] at textFile at <console>:24spa

  scala> rdd1.preferredLocations(rdd.partitions(0))
    res19: Seq[String] = ArraySeq(master, slave1)

  scala> rdd1.preferredLocations(rdd.partitions(1))
    res20: Seq[String] = ArraySeq(master, slave1)

相關文章
相關標籤/搜索