上一章講了 Spark 提交做業的過程,這一章咱們要講 RDD。簡單的講,RDD 就是 Spark 的 input,知道 input 是啥吧,就是輸入的數據。sql
RDD 的全名是 Resilient Distributed Dataset,意思是容錯的分佈式數據集,每個 RDD 都會有 5 個特徵:shell
一、有一個分片列表。就是能被切分,和 hadoop 同樣的,可以切分的數據才能並行計算。數據庫
二、有一個函數計算每個分片,這裏指的是下面會提到的 compute 函數。api
三、對其餘的 RDD 的依賴列表,依賴還具體分爲寬依賴和窄依賴,但並非全部的 RDD 都有依賴。bash
四、可選:key-value 型的 RDD 是根據哈希來分區的,相似於 mapreduce 當中的 Paritioner 接口,控制 key 分到哪一個 reduce。app
五、可選:每個分片的優先計算位置(preferred locations),好比 HDFS 的 block 的所在位置應該是優先計算的位置。分佈式
對應着上面這幾點,咱們在 RDD 裏面能找到這 4 個方法和 1 個屬性,彆着急,下面咱們會慢慢展開說這 5 個東東。ide
//只計算一次
protected def getPartitions: Array[Partition]
//對一個分片進行計算,得出一個可遍歷的結果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只計算一次,計算RDD對父RDD的依賴
protected def getDependencies: Seq[Dependency[_]] = deps
//可選的,分區的方法,針對第4點,相似於mapreduce當中的Paritioner接口,控制key分到哪一個reduce
@transient val partitioner: Option[Partitioner] = None
//可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
複製代碼
下面用一個實例講解一下吧,就拿咱們經常使用的一段代碼來說吧,而後會把咱們經常使用的 RDD 都會講到。函數
val hdfsFile = sc.textFile(args(1))
val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
val filterRdd = flatMapRdd.filter(_.length == 2)
val mapRdd = filterRdd.map(word => (word, 1))
val reduce = mapRdd.reduceByKey(_ + _)
複製代碼
這裏涉及到不少個 RDD,textFile 是一個 HadoopRDD 通過 map 後的 MappredRDD,通過 flatMap 是一個 FlatMappedRDD,通過 filter 方法以後生成了一個 FilteredRDD,通過 map 函數以後,變成一個 MappedRDD,經過隱式轉換成 PairRDD,最後通過 reduceByKey。oop
咱們首先看 textFile 的這個方法,進入 SparkContext 這個方法,找到它。
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}
複製代碼
看它的輸入參數,path,TextInputFormat,LongWritable,Text,同志們聯想到什麼?寫過 mapreduce 的童鞋都應該知道哈。
一、hdfs 的地址
二、InputFormat 的類型
三、Mapper 的第一個類型
四、Mapper 的第二類型
這就不難理解爲何立馬就對 hadoopFile 後面加了一個 map 方法,取 pair 的第二個參數了,最後在 shell 裏面咱們看到它是一個 MappredRDD 了。
那麼如今若是你們要用的不是 textFile,而是一個別的 hadoop 文件類型,你們會不會使用 hadoopFile 來獲得本身要獲得的類型呢,不要告訴我不會哈,不會的趕忙回去複習 mapreduce。
言歸正傳,默認的 defaultMinPartitions 的 2 過小了,咱們用的時候仍是設置大一點吧。
咱們繼續追殺下去,看看 hadoopFile 方法,裏面咱們看到它作了 3 個操做。
一、把 hadoop 的配置文件保存到廣播變量裏。
二、設置路徑的方法
三、new 了一個 HadoopRDD 返回
好,咱們接下去看看 HadoopRDD 這個類吧,咱們重點看看它的 getPartitions、compute、getPreferredLocations。
先看 getPartitions,它的核心代碼以下:
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
複製代碼
它調用的是 inputFormat 自帶的 getSplits 方法來計算分片,而後把分片 HadoopPartition 包裝到到 array 裏面返回。
這裏順便順帶提一下,由於 1.0 又出來一個 NewHadoopRDD,它使用的是 mapreduce 新 api 的 inputformat,getSplits 就不要有 minPartitions 了,別的邏輯都是同樣的,只是使用的類有點區別。
咱們接下來看 compute 方法,它的輸入值是一個 Partition,返回是一個 Iterator[(K, V)] 類型的數據,這裏面咱們只須要關注 2 點便可。
一、把 Partition 轉成 HadoopPartition,而後經過 InputSplit 建立一個 RecordReader
二、重寫 Iterator 的 getNext 方法,經過建立的 reader 調用 next 方法讀取下一個值。
// 轉換成HadoopPartition
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
// 經過Inputform的getRecordReader來建立這個InputSpit的Reader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// 調用Reader的next方法
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}
複製代碼
從這裏咱們能夠看得出來 compute 方法是經過分片來得到 Iterator 接口,以遍歷分片的數據。
getPreferredLocations 方法就更簡單了,直接調用 InputSplit 的 getLocations 方法得到所在的位置。
下面咱們看 RDD 裏面的 map 方法
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
複製代碼
直接 new 了一個 MappedRDD,還把匿名函數 f 處理了再傳進去,咱們繼續追殺到 MappedRDD。
private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
}
複製代碼
MappedRDD 把 getPartitions 和 compute 給重寫了,並且都用到了 firstParent[T],這個 firstParent 是何必人也?咱們能夠先點擊進入 RDDU 這個構造函數裏面去。
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))
複製代碼
就這樣你會發現它把 RDD 複製給了 deps,HadoopRDD 成了 MappedRDD 的父依賴了,這個 OneToOneDependency 是一個窄依賴,子 RDD 直接依賴於父 RDD,繼續看 firstParent。
protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
複製代碼
由此咱們能夠得出兩個結論:
一、getPartitions 直接沿用了父 RDD 的分片信息
二、compute 函數是在父 RDD 遍歷每一行數據時套一個匿名函數 f 進行處理
好吧,如今咱們能夠理解 compute 函數真正是在幹嗎的了
它的兩個顯著做用:
一、在沒有依賴的條件下,根據分片的信息生成遍歷數據的 Iterable 接口
二、在有前置依賴的條件下,在父 RDD 的 Iterable 接口上給遍歷每一個元素的時候再套上一個方法
咱們看看點擊進入 map(f) 的方法進去看一下
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
複製代碼
看黃色的位置,看它的 next 函數,不得不說,寫得真的很妙!
咱們接着看 RDD 的 flatMap 方法,你會發現它和 map 函數幾乎沒什麼區別,只是 RDD 變成了 FlatMappedRDD,可是 flatMap 和 map 的效果仍是差異挺大的。
好比 ((1,2),(3,4)), 若是是調用了 flatMap 函數,咱們訪問到的就是(1,2,3,4)4 個元素;若是是 map 的話,咱們訪問到的就是(1,2),(3,4) 兩個元素。
有興趣的能夠去看看 FlatMappedRDD 和 FilteredRDD 這裏就不講了,和 MappedRDD 相似。
前面的 RDD 轉換都簡單,但是到了 reduceByKey 可就不簡單了哦,由於這裏有一個同相同 key 的內容聚合的一個過程,因此它是最複雜的那一類。
那 reduceByKey 這個方法在哪裏呢,它在 PairRDDFunctions 裏面,這是個隱式轉換,因此比較隱蔽哦,你在 RDD 裏面是找不到的。
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
複製代碼
它調用的是 combineByKey 方法,過程過程蠻複雜的,摺疊起來,喜歡看的人看看吧。
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
// 通常的RDD的partitioner是None,這個條件不成立,即便成立只須要對這個數據作一次按key合併value的操做便可
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
// 默認是走的這個方法,須要map端的combinber.
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// 不須要map端的combine,直接就來shuffle
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}
複製代碼
按照一個比較標準的流程來看的話,應該是走的中間的這條路徑,它幹了三件事:
一、給每一個分片的數據在外面套一個 combineValuesByKey 方法的 MapPartitionsRDD。
二、用 MapPartitionsRDD 來 new 了一個 ShuffledRDD 出來。
三、對 ShuffledRDD 作一次 combineCombinersByKey。
下面咱們先看 MapPartitionsRDD,我把和別的 RDD 有別的兩行給拿出來了,很明顯的區別,f 方法是套在 iterator 的外邊,這樣才能對 iterator 的全部數據作一個合併。
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def compute(split: Partition, context: TaskContext) =
f(context, split.index, firstParent[T].iterator(split, context))
}
複製代碼
接下來咱們看 Aggregator 的 combineValuesByKey 的方法吧。
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
// 是否使用外部排序,是由參數spark.shuffle.spill,默認是true
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 用map來去重,用update方法來更新值,若是沒值的時候,返回值,若是有值的時候,經過mergeValue方法來合併
// mergeValue方法就是咱們在reduceByKey裏面寫的那個匿名函數,在這裏就是(_ + _)
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
// 用了一個外部排序的map來去重,就不停的往裏面插入值便可,基本原理和上面的差很少,區別在於須要外部排序
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
combiners.iterator
}
複製代碼
View Code
這個就是一個很典型的按照 key 來作合併的方法了,咱們繼續看 ShuffledRDD 吧。
ShuffledRDD 和以前的 RDD 很明顯的特徵是
一、它的依賴傳了一個 Nil(空列表)進去,表示它沒有依賴。
二、它的 compute 計算方式比較特別,這個在以後的文章說,過程比較複雜。
三、它的分片默認是採用 HashPartitioner,數量和前面的 RDD 的分片數量同樣,也能夠不同,咱們能夠在 reduceByKey 的時候多傳一個分片數量便可。
在 new 完 ShuffledRDD 以後又來了一遍 mapPartitionsWithContext,不過調用的匿名函數變成了 combineCombinersByKey。
combineCombinersByKey 和 combineValuesByKey 的邏輯基本相同,只是輸入輸出的類型有區別。combineCombinersByKey 只是作單純的合併,不會對輸入輸出的類型進行改變,combineValuesByKey 會把 iter[K, V] 的 V 值變成 iter[K, C]。
case class Aggregator[K, V, C] (   createCombiner: V => C,   mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)
  ......
}
複製代碼
這個方法會根據咱們傳進去的匿名方法的參數的類型作一個自動轉換。
到這裏,做業都沒有真正執行,只是將 RDD 各類嵌套,咱們經過 RDD 的 id 和類型的變化觀測到這一點,RDD[1]->RDD[2]->RDD[3]......
日常咱們除了從 hdfs 上面取數據以後,咱們還可能從數據庫裏面取數據,那怎麼辦呢?不要緊,有個 JdbcRDD!
val rdd = new JdbcRDD(
sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 3,
(r: ResultSet) => { r.getInt(1) }
).cache()
複製代碼
前幾個參數你們都懂,咱們重點說一下後面 1, 100, 3 是咋回事?
在這個 JdbcRDD 裏面它默認咱們是會按照一個 long 類型的字段對數據進行切分,(1,100)分別是最小值和最大值,3 是分片的數量。
好比咱們要一次查 ID 爲 1-1000,000 的的用戶,分紅 10 個分片,咱們就填(1, 1000,000, 10)便可,在 sql 語句裏面還必須有 "? <= ID AND ID <= ?" 的句式,別嘗試着本身造句哦!
最後是怎麼處理 ResultSet 的方法,本身愛怎麼處理怎麼處理去吧。不過確實覺着用得不方便的能夠本身重寫一個 RDD。
小結:
這一章重點介紹了各類 RDD 那 5 個特徵,以及 RDD 之間的轉換,但願你們能夠對 RDD 有更深刻的瞭解,下一章咱們將要講做業的運行過程,敬請關注!