Spark核心編程的三大數據結構 之 RDD基礎編程 (一)

這是我參與更文挑戰的第1天,活動詳情查看:更文挑戰git

1 RDD建立

在Spark中建立RDD的建立方式能夠分爲四種:github

  • 從集合(內存)中建立RDD
    • 從集合中建立RDD,Spark主要提供了兩個方法:parallelize和makeRDD
    • makeRDD底層代碼調用的parallelize,因此兩個方法同樣
//內存建立RDD
    def main(args: Array[String]): Unit = {

        val sc: SparkContext = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-Mem")
        )

        val rdd1: RDD[Int] = sc.makeRDD(
            List(1, 2, 4, 5, 6)
        )
        val rdd2: RDD[Int] = sc.parallelize(
            Array(1, 2, 3, 4, 5, 6)
        )

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)
    }

複製代碼
//makeRDD源碼
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      parallelize(seq, numSlices)
  }
複製代碼
  • 從外部存儲(文件)建立RDD
    • 由外部存儲系統的數據集建立RDD包括:本地的文件系統,全部Hadoop支持的數據集,好比HDFS、HBase等。
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-File")
        )

        val rdd1: RDD[String] = sc.textFile("data")
        //wholeTextFiles Tuple第一個數據爲文件全路徑 Tuple第二個爲每行數據
        val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)

    }
複製代碼
  • 從其餘RDD建立
    • 主要是經過一個RDD運算完後,再產生新的RDD。詳情請參考後續章節
  • 直接建立RDD(new)
    • 使用new的方式直接構造RDD,通常由Spark框架自身使用。

2 RDD並行度與分區

默認狀況下,Spark能夠將一個做業切分多個任務後,發送給Executor節點並行計算,而可以並行計算的任務數量咱們稱之爲並行度。這個數量能夠在構建RDD時指定。記住,這裏的並行執行的任務數量,並非指的切分任務的數量,不要混淆了。apache

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
    sc.makeRDD(
        List(1,2,3,4),
        4)
val fileRDD: RDD[String] =
    sc.textFile(
        "input",
        2)
fileRDD.collect().foreach(println)
sparkContext.stop()
複製代碼
  • 讀取內存數據時,數據能夠按照並行度的設定進行數據的分區操做,數據分區規則的Spark核心源碼以下:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of partitions required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    //計算每一個分區開始位置和結束位置
    //[1,2,3,4,5] 分紅兩個分區後會成爲 [1,2][3,4,5]
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }
    //下面爲具體的拆分代碼
    seq match {
      case r: Range =>
        positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }.toSeq.asInstanceOf[Seq[Seq[T]]]
      case nr: NumericRange[_] =>
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      case _ =>
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map { case (start, end) =>
            array.slice(start, end).toSeq
        }.toSeq
    }
  }

複製代碼
  • 讀取文件數據時,數據是按照Hadoop文件讀取的規則進行切片分區,而切片規則和數據讀取的規則有些差別,具體Spark核心源碼以下
override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 分區
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
 ..........

複製代碼
// 具體如何分區
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {

    long totalSize = 0;
    for (FileStatus file: files) {
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
      
    ...
    
    for (FileStatus file: files) {
    
        ...
    
    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          ...

  }
  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

複製代碼

3.RDD序列化

  • 閉包檢查

從計算的角度, 算子之外的代碼都是在Driver端執行, 算子裏面的代碼都是在Executor端執行。那麼在scala的函數式編程中,就會致使算子內常常會用到算子外的數據,這樣就造成了閉包的效果,若是使用的算子外的數據沒法序列化,就意味着沒法傳值給Executor端執行,就會發生錯誤,因此須要在執行任務計算前,檢測閉包內的對象是否能夠進行序列化,這個操做咱們稱之爲閉包檢測。Scala2.12版本後閉包編譯方式發生了改變編程

  • 序列化方法和屬性

從計算的角度, 算子之外的代碼都是在Driver端執行, 算子裏面的代碼都是在Executor端執行,代碼以下:數組

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("測試序列化")
        )

        val dept1 = new Dept(1, "研發部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }
    
    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }
複製代碼
//校驗的代碼
 private def clean(
      func: AnyRef,
      checkSerializable: Boolean,
      cleanTransitively: Boolean,
      accessedFields: Map[Class[_], Set[String]]): Unit = {
    ..............
    // 校驗序列化
    if (checkSerializable) {
      ensureSerializable(func)
    }
  }
  private def ensureSerializable(func: AnyRef): Unit = {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

//不實現序列號接口會跑出以下異常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)

複製代碼
  • Kryo序列化框架

項目地址: github.com/EsotericSof…
Java的序列化可以序列化任何的類。可是比較重(字節多),序列化後,對象的提交也比較大。Spark出於性能的考慮,Spark2.0開始支持另一種Kryo序列化機制。Kryo速度是Serializable的10倍。當RDD在Shuffle數據的時候,簡單數據類型、數組和字符串類型已經在Spark內部使用Kryo來序列化
注意:即便使用Kryo序列化,也要繼承Serializable接口。markdown

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local[*]")
                .setAppName("測試序列化")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(Array(classOf[Dept]))
        )

        val dept1 = new Dept(1, "研發部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }

    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }
複製代碼
相關文章
相關標籤/搜索