RDD是Spark中的核心數據模型,一個RDD表明着一個被分區(partition)的只讀數據集。java
RDD的生成只有兩種途徑:數據庫
一種是來自於內存集合或外部存儲系統;apache
另外一種是經過轉換操做來自於其餘RDD;數組
通常須要了解RDD的如下五個接口:app
partition分區,一個RDD會有一個或者多個分區函數
dependencies()RDD的依賴關係oop
preferredLocations(p)對於每一個分區而言,返回數據本地化計算的節點學習
compute(p,context)對於分區而言,進行迭代計算大數據
partitioner()RDD的分區函數spa
一個RDD包含一個或多個分區,每一個分區都有分區屬性,分區的多少決定了對RDD進行並行計算的並行度。
在生成RDD時候能夠指定分區數,若是不指定分區數,則採用默認值,系統默認的分區數,是這個程序所分配到的資源的CPU核數。
可使用RDD的成員變量partitions返回RDD對應的分區數組:
scala> var file = sc.textFile("/tmp/lxw1234/1.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at :21
scala> file.partitions
res14: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@735, org.apache.spark.rdd.HadoopPartition@736)
scala> file.partitions.size
res15: Int = 2 //默認兩個分區
//能夠指定RDD的分區數
scala> var file = sc.textFile("/tmp/lxw1234/1.txt",4)
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at :21
scala> file.partitions
res16: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@787, org.apache.spark.rdd.HadoopPartition@788, org.apache.spark.rdd.HadoopPartition@789, org.apache.spark.rdd.HadoopPartition@78a)
scala> file.partitions.size
res17: Int = 4
因爲RDD便可以由外部存儲而來,也能夠從另外一個RDD轉換而來,所以,一個RDD會存在一個或多個父的RDD,這裏面也就存在依賴關係,
窄依賴:
每個父RDD的分區最多隻被子RDD的一個分區所使用,如圖所示:
寬依賴
多個子RDD的分區會依賴同一個父RDD的分區,如圖所示:
如下代碼能夠查看RDD的依賴信息:
scala> var file = sc.textFile("/tmp/lxw1234/1.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at :21
scala> file.dependencies.size
res20: Int = 1 //返回RDD的依賴數量
scala> file.dependencies(0)
res19:
org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@33c5abd0
//返回RDD file的第一個依賴
scala> file.dependencies(1)
java.lang.IndexOutOfBoundsException: 1
//由於file只有一個依賴,想獲取第二個依賴時候,報了數組越界
須要大數據學習資料和交流學習的同窗能夠加大數據學習羣:724693112 有免費資料分享和一羣學習大數據的小夥伴一塊兒努力
再看一個存在多個父依賴的例子:
scala> var rdd1 = sc.textFile("/tmp/lxw1234/1.txt")
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at :21
scala> var rdd2 = sc.textFile("/tmp/lxw1234/1.txt")
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at :21
scala> var rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[String] = UnionRDD[14] at union at :25
scala> rdd3.dependencies.size
res24: Int = 2 // rdd3依賴rdd1和rdd2兩個RDD
//分別打印出rdd3的兩個父rdd,即 rdd1和rdd2的內容
scala> rdd3.dependencies(0).rdd.collect
res29: Array[_] = Array(hello world, hello spark, hello hive, hi spark)
scala> rdd3.dependencies(1).rdd.collect
res30: Array[_] = Array(hello world, hello spark, hello hive, hi spark)
1.3 RDD優先位置(preferredLocations)
RDD的優先位置,返回的是此RDD的每一個partition所存儲的位置,這個位置和Spark的調度有關(任務本地化),Spark會根據這個位置信息,儘量的將任務分配到數據塊所存儲的位置,以從Hadoop中讀取數據生成RDD爲例,preferredLocations返回每個數據塊所在的機器名或者IP地址,若是每個數據塊是多份存儲的(HDFS副本數),那麼就會返回多個機器地址。
看如下代碼:
scala> var file = sc.textFile("/tmp/lxw1234/1.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at textFile at :21
//這裏的file爲MappedRDD
scala> var hadoopRDD = file.dependencies(0).rdd
hadoopRDD: org.apache.spark.rdd.RDD[_] = /tmp/lxw1234/1.txt HadoopRDD[15] at textFile at :21 //這裏獲取file的父RDD,即hdfs文件/tmp/lxw1234/1.txt對應的HadoopRDD
scala> hadoopRDD.partitions.size
res31: Int = 2 //hadoopRDD默認有兩個分區
//下面分別獲取兩個分區的位置信息
scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
res32: Seq[String] = WrappedArray(slave007.lxw1234.com, slave004.lxw1234.com)
scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(1))
res33: Seq[String] = WrappedArray(slave007. lxw1234.com, slave004.lxw1234.com)
##
因爲HDFS副本數設置爲2,所以每一個分區的位置信息中包含了全部副本(2個)的位置信息,這樣Spark能夠調度時候,根據任何一個副本所處的位置進行本地化任務調度。
基於RDD的每個分區,執行compute操做。
對於HadoopRDD來講,compute中就是從HDFS讀取分區中數據塊信息。
對於JdbcRDD來講,就是鏈接數據庫,執行查詢,讀取每一條數據。
目前Spark中實現了兩種類型的分區函數,HashPartitioner(哈希分區)和RangePartitioner(區域分區)。
partitioner只存在於類型的RDD中,非類型的RDD的partitioner值爲None.
partitioner函數既決定了RDD自己的分區數量,也可做爲其父RDD Shuffle輸出中每一個分區進行數據切割的依據。
scala> var a = sc.textFile("/tmp/lxw1234/1.txt").flatMap(line => line.split("\\s+"))
a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at :21
scala> a.partitioner
res15: Option[org.apache.spark.Partitioner] = None // RDD a爲非類型
scala> var b = a.map(l => (l,1)).reduceByKey((a,b) => a + b)
b: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at :30
scala> b.partitioner
res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
//RDD b爲類型,採用的是默認的partitioner- HashPartitioner