Spark RDD學習筆記

1、學習Spark RDD

RDD是Spark中的核心數據模型,一個RDD表明着一個被分區(partition)的只讀數據集。java

RDD的生成只有兩種途徑:數據庫

一種是來自於內存集合或外部存儲系統;apache

另外一種是經過轉換操做來自於其餘RDD;數組

通常須要了解RDD的如下五個接口:app

partition分區,一個RDD會有一個或者多個分區函數

dependencies()RDD的依賴關係oop

preferredLocations(p)對於每一個分區而言,返回數據本地化計算的節點學習

compute(p,context)對於分區而言,進行迭代計算大數據

partitioner()RDD的分區函數spa

 

1.1 RDD分區(partitions)

一個RDD包含一個或多個分區,每一個分區都有分區屬性,分區的多少決定了對RDD進行並行計算的並行度。

在生成RDD時候能夠指定分區數,若是不指定分區數,則採用默認值,系統默認的分區數,是這個程序所分配到的資源的CPU核數。

可使用RDD的成員變量partitions返回RDD對應的分區數組:

scala> var file = sc.textFile("/tmp/lxw1234/1.txt")

file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at textFile at :21

 

scala> file.partitions

res14: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@735, org.apache.spark.rdd.HadoopPartition@736)

 

scala> file.partitions.size

res15: Int = 2 //默認兩個分區

 

//能夠指定RDD的分區數

scala> var file = sc.textFile("/tmp/lxw1234/1.txt",4)

file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at :21

 

scala> file.partitions

res16: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.HadoopPartition@787, org.apache.spark.rdd.HadoopPartition@788, org.apache.spark.rdd.HadoopPartition@789, org.apache.spark.rdd.HadoopPartition@78a)

 

scala> file.partitions.size

res17: Int = 4

1.2 RDD依賴關係(dependencies)

因爲RDD便可以由外部存儲而來,也能夠從另外一個RDD轉換而來,所以,一個RDD會存在一個或多個父的RDD,這裏面也就存在依賴關係,

窄依賴:

每個父RDD的分區最多隻被子RDD的一個分區所使用,如圖所示:

 


 

寬依賴

多個子RDD的分區會依賴同一個父RDD的分區,如圖所示:

 


 

如下代碼能夠查看RDD的依賴信息:

scala> var file = sc.textFile("/tmp/lxw1234/1.txt")

file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at :21

 

scala> file.dependencies.size

res20: Int = 1 //返回RDD的依賴數量

scala> file.dependencies(0)

res19:

org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@33c5abd0

//返回RDD file的第一個依賴

scala> file.dependencies(1)

java.lang.IndexOutOfBoundsException: 1

//由於file只有一個依賴,想獲取第二個依賴時候,報了數組越界

須要大數據學習資料和交流學習的同窗能夠加大數據學習羣:724693112 有免費資料分享和一羣學習大數據的小夥伴一塊兒努力

再看一個存在多個父依賴的例子:

scala> var rdd1 = sc.textFile("/tmp/lxw1234/1.txt")

rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at :21

 

scala> var rdd2 = sc.textFile("/tmp/lxw1234/1.txt")

rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at :21

 

scala> var rdd3 = rdd1.union(rdd2)

rdd3: org.apache.spark.rdd.RDD[String] = UnionRDD[14] at union at :25

 

scala> rdd3.dependencies.size

res24: Int = 2 // rdd3依賴rdd1和rdd2兩個RDD

 

//分別打印出rdd3的兩個父rdd,即 rdd1和rdd2的內容

scala> rdd3.dependencies(0).rdd.collect

res29: Array[_] = Array(hello world, hello spark, hello hive, hi spark)

 

scala> rdd3.dependencies(1).rdd.collect

res30: Array[_] = Array(hello world, hello spark, hello hive, hi spark)

1.3 RDD優先位置(preferredLocations)

RDD的優先位置,返回的是此RDD的每一個partition所存儲的位置,這個位置和Spark的調度有關(任務本地化),Spark會根據這個位置信息,儘量的將任務分配到數據塊所存儲的位置,以從Hadoop中讀取數據生成RDD爲例,preferredLocations返回每個數據塊所在的機器名或者IP地址,若是每個數據塊是多份存儲的(HDFS副本數),那麼就會返回多個機器地址。

看如下代碼:

scala> var file = sc.textFile("/tmp/lxw1234/1.txt")

file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at textFile at :21

//這裏的file爲MappedRDD

scala> var hadoopRDD = file.dependencies(0).rdd

hadoopRDD: org.apache.spark.rdd.RDD[_] = /tmp/lxw1234/1.txt HadoopRDD[15] at textFile at :21 //這裏獲取file的父RDD,即hdfs文件/tmp/lxw1234/1.txt對應的HadoopRDD

scala> hadoopRDD.partitions.size

res31: Int = 2 //hadoopRDD默認有兩個分區

 

//下面分別獲取兩個分區的位置信息

scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

res32: Seq[String] = WrappedArray(slave007.lxw1234.com, slave004.lxw1234.com)

 

scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(1))

res33: Seq[String] = WrappedArray(slave007. lxw1234.com, slave004.lxw1234.com)

 

##

因爲HDFS副本數設置爲2,所以每一個分區的位置信息中包含了全部副本(2個)的位置信息,這樣Spark能夠調度時候,根據任何一個副本所處的位置進行本地化任務調度。

 

1.4 RDD分區計算(compute)

基於RDD的每個分區,執行compute操做。

對於HadoopRDD來講,compute中就是從HDFS讀取分區中數據塊信息。

對於JdbcRDD來講,就是鏈接數據庫,執行查詢,讀取每一條數據。

 

1.5 RDD分區函數(partitioner)

目前Spark中實現了兩種類型的分區函數,HashPartitioner(哈希分區)和RangePartitioner(區域分區)。

partitioner只存在於類型的RDD中,非類型的RDD的partitioner值爲None.

 

partitioner函數既決定了RDD自己的分區數量,也可做爲其父RDD Shuffle輸出中每一個分區進行數據切割的依據。

 

scala> var a = sc.textFile("/tmp/lxw1234/1.txt").flatMap(line => line.split("\\s+"))

a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at :21

 

scala> a.partitioner

res15: Option[org.apache.spark.Partitioner] = None // RDD a爲非類型

 

scala> var b = a.map(l => (l,1)).reduceByKey((a,b) => a + b)

b: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at :30

 

scala> b.partitioner

res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

//RDD b爲類型,採用的是默認的partitioner- HashPartitioner

相關文章
相關標籤/搜索