這是我參與更文挑戰的第1天,活動詳情查看:更文挑戰git
在Spark中建立RDD的建立方式能夠分爲四種:github
//內存建立RDD
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-Mem")
)
val rdd1: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6)
)
val rdd2: RDD[Int] = sc.parallelize(
Array(1, 2, 3, 4, 5, 6)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
複製代碼
//makeRDD源碼
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
複製代碼
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-File")
)
val rdd1: RDD[String] = sc.textFile("data")
//wholeTextFiles Tuple第一個數據爲文件全路徑 Tuple第二個爲每行數據
val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
複製代碼
默認狀況下,Spark能夠將一個做業切分多個任務後,發送給Executor節點並行計算,而可以並行計算的任務數量咱們稱之爲並行度。這個數量能夠在構建RDD時指定。記住,這裏的並行執行的任務數量,並非指的切分任務的數量,不要混淆了。apache
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sc.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sc.textFile(
"input",
2)
fileRDD.collect().foreach(println)
sparkContext.stop()
複製代碼
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
//計算每一個分區開始位置和結束位置
//[1,2,3,4,5] 分紅兩個分區後會成爲 [1,2][3,4,5]
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
//下面爲具體的拆分代碼
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
複製代碼
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
try {
// 分區
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
..........
複製代碼
// 具體如何分區
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
複製代碼
從計算的角度, 算子之外的代碼都是在Driver端執行, 算子裏面的代碼都是在Executor端執行。那麼在scala的函數式編程中,就會致使算子內常常會用到算子外的數據,這樣就造成了閉包的效果,若是使用的算子外的數據沒法序列化,就意味着沒法傳值給Executor端執行,就會發生錯誤,因此須要在執行任務計算前,檢測閉包內的對象是否能夠進行序列化,這個操做咱們稱之爲閉包檢測。Scala2.12版本後閉包編譯方式發生了改變編程
從計算的角度, 算子之外的代碼都是在Driver端執行, 算子裏面的代碼都是在Executor端執行,代碼以下:數組
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("測試序列化")
)
val dept1 = new Dept(1, "研發部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
複製代碼
//校驗的代碼
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {
..............
// 校驗序列化
if (checkSerializable) {
ensureSerializable(func)
}
}
private def ensureSerializable(func: AnyRef): Unit = {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
//不實現序列號接口會跑出以下異常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
複製代碼
項目地址: github.com/EsotericSof…
Java的序列化可以序列化任何的類。可是比較重(字節多),序列化後,對象的提交也比較大。Spark出於性能的考慮,Spark2.0開始支持另一種Kryo序列化機制。Kryo速度是Serializable的10倍。當RDD在Shuffle數據的時候,簡單數據類型、數組和字符串類型已經在Spark內部使用Kryo來序列化
注意:即便使用Kryo序列化,也要繼承Serializable接口。markdown
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local[*]")
.setAppName("測試序列化")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Dept]))
)
val dept1 = new Dept(1, "研發部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
複製代碼