這是我參與更文挑戰的第2天,活動詳情查看:更文挑戰數據庫
Spark核心編程的三大數據結構 之 RDD基礎編程 (一)apache
RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。編程
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
println(rdd.toDebugString)
println("--------------------------------------")
val rdd1 = rdd.map(t => (t._1, t._2 * 2))
println(rdd1.toDebugString)
println("--------------------------------------")
val rdd2 = rdd1.mapValues(_ + 100)
println(rdd2.toDebugString)
println("--------------------------------------")
val rdd3 = rdd2.reduceByKey(_ + _)
println(rdd3.toDebugString)
println("--------------------------------------")
val res = rdd2.collect()
println(res.mkString("\n"))
}
複製代碼
(2) ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[1] at map at _1.scala:27 []
| ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
| MapPartitionsRDD[1] at map at _1.scala:27 []
| ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) ShuffledRDD[3] at reduceByKey at _1.scala:37 []
+-(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
| MapPartitionsRDD[1] at map at _1.scala:27 []
| ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
複製代碼
這裏所謂的依賴關係,其實就是兩個相鄰RDD之間的關係緩存
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
println(rdd.dependencies)
println("--------------------------------------")
val rdd1 = rdd.map(t => (t._1, t._2 * 2))
println(rdd1.dependencies)
println("--------------------------------------")
val rdd2 = rdd1.mapValues(_ + 100)
println(rdd2.dependencies)
println("--------------------------------------")
val rdd3 = rdd2.reduceByKey(_ + _)
println(rdd3.dependencies)
println("--------------------------------------")
val res = rdd2.collect()
println(res.mkString("\n"))
}
複製代碼
List()
--------------------------------------
List(org.apache.spark.OneToOneDependency@38704ff0)
--------------------------------------
List(org.apache.spark.OneToOneDependency@44de94c3)
--------------------------------------
List(org.apache.spark.ShuffleDependency@2c58dcb1)
--------------------------------------
複製代碼
窄依賴表示每個父(上游)RDD的Partition最多被子(下游)RDD的一個Partition使用.markdown
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
複製代碼
寬依賴表示同一個父(上游)RDD的Partition被多個子(下游)RDD的Partition依賴,會引發Shuffle.數據結構
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
複製代碼
DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具備方向,不會閉環。例如,DAG記錄了RDD的轉換過程和任務的階段。app
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
//建立新階段可能會拋出異常,例如,做業運行在
//刪除底層HDFS文件的HadoopRDD。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
.......
}
/** * 建立一個與提供的jobId相關聯的ResultStage */
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
/** * 獲取或建立給定RDD的父階段列表。新的stage將使用提供的firstJobId建立 */
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
/** * 若是在shuffleIdToMapStage中存在shuffle map stage,則獲取一個shuffle map stage。不然,若是洗牌地圖階段不存在,該方法將建立洗牌地圖階段,以及任何丟失的祖先洗牌地圖階段。 */
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
//爲全部缺失的祖先洗牌依賴建立階段。
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
//即便getmissing祖宗shuffledependencies只返回shuffle依賴
//沒有在shuffleIdToMapStage中,當咱們
//在foreach循環中獲取一個特定的依賴,它被添加到
// shuffleIdToMapStage經過早期依賴的階段建立過程。看到
// SPARK-13902獲取更多信息。
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
//最後,爲給定的shuffle依賴建立一個stage。
createShuffleMapStage(shuffleDep, firstJobId)
}
}
複製代碼
RDD任務切分中間分爲:Application、Job、Stage和Taskide
注意:Application->Job->Stage->Task每一層都是1對n的關係
函數
//劃分源碼
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
}
// Figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
//findMissingPartitions 有兩個實現類
//ShuffleMapStage實現
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
//ResultStage實現
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
複製代碼
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
1, 2, 5, 9
), 2)
val mapRdd = rdd.map(i => {
println("map--------")
("a", i)
})
mapRdd.cache() //源碼調用的persist()
//指定存儲級別
//mapRdd.persist(StorageLevel.DISK_ONLY)
mapRdd.reduceByKey(_ + _).collect().foreach(println)
mapRdd.groupByKey().collect().foreach(println)
sc.stop()
}
複製代碼
StorageLevel的全部存儲級別
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
複製代碼
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
//設置檢查點存儲路徑
sc.setCheckpointDir("./check")
val rdd = sc.makeRDD(List(
1, 2, 5, 9
), 2)
val mapRdd = rdd.map(i => {
println("map--------")
("a", i)
})
mapRdd.cache()
// checkpoint會把前面的RDD執行兩次 配合cache()便可只執行一次
mapRdd.checkpoint()
mapRdd.reduceByKey(_ + _).collect().foreach(println)
mapRdd.groupByKey().collect().foreach(println)
sc.stop()
}
複製代碼
Spark目前支持Hash分區和Range分區,和用戶自定義分區。Hash分區爲當前的默認分區。分區器直接決定了RDD中分區的個數、RDD中每條數據通過Shuffle後進入哪一個分區,進而決定了Reduce的個數。oop
HashPartitioner
具體代碼可自行查看RangePartitioner
具體代碼可自行查看def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
("b", 4), ("b", 5), ("a", 6), ("c", 6), ("d", 6)
),5 )
val parRDD = rdd.partitionBy(new MyPartitioner)
val reduceRdd1 = parRDD.reduceByKey((i,j)=>{
println("reduceRdd1")
i+j
})
val reduceRdd2 = reduceRdd1.reduceByKey((i,j)=>{
println("reduceRdd2")
i+j
})
reduceRdd2.saveAsTextFile("out1")
sc.stop()
}
/** * 實現把 * a 分到0區 * b 分到1區 * 其餘 分到2區 */
class MyPartitioner extends Partitioner {
/** * numPartitions 和 getPartition 是必須重寫的方法 */
override def numPartitions: Int = 3
override def getPartition(key: Any): Int = key match {
case "a" => 0
case "b" => 1
case _ => 2
}
/** * equals 和 hashCode 可選 */
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
複製代碼
Spark的數據讀取及數據保存能夠從兩個維度來做區分:文件格式以及文件系統。 文件格式分爲:text文件、csv文件、sequence文件以及Object文件; 文件系統分爲:本地文件系統、HDFS、HBASE以及數據庫。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)
/** * text文件 */
rdd.saveAsTextFile("text")
sc.textFile("text").collect().foreach(println)
sc.stop()
}
複製代碼
SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。在SparkContext中,能夠調用sequenceFile[keyClass, valueClass](path)
。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)
/** * Sequence文件 */
rdd.saveAsSequenceFile("sequence")
sc.sequenceFile[String, Int]("sequence").collect().foreach(println)
sc.stop()
}
複製代碼
對象文件是將對象序列化後保存的文件,採用Java的序列化機制。能夠經過objectFile[T: ClassTag](path)
函數接收一個路徑,讀取對象文件,返回對應的RDD,也能夠經過調用saveAsObjectFile()
實現對對象文件的輸出。由於是序列化因此要指定類型。
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)
/** * object文件 */
rdd.saveAsObjectFile("object")
sc.objectFile[(String, Int)]("object").collect().foreach(println)
sc.stop()
}
複製代碼