Spark Core知識點複習-1

Day1111

Spark任務調度
Spark幾個重要組件
Spark Core
    RDD的概念和特性
    生成RDD的兩種類型
    RDD算子的兩種類型
    算子練習
    分區
    RDD的依賴關係
    DAG:有向無環圖
    任務提交
    緩存
    checkPoint
    自定義排序
    自定義分區器
    自定義累加器
    廣播變量
    Spark Shuffle過程
    Spark優化過程
    SparkSQL
    集成Hive

一.Spark Core

1 Spark任務調度:

|->:standalone
    |->:local
    |->:Yarn
    |->:Mesos

2 Spark幾個重要的組件

|->:Master:管理Worker,負責接收Driver發送的註冊信息(任務信息)
        |->:Worker:負責本節點資源和任務的管理,啓動Exector進程
        |->:Exector:負責計算任務
        |->:Driver:用來提交任務(SparkSubmit進程)

3 Spark Core: RDD的概念和特性

數據的描述
    1):一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
    2):一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
    3):RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
    4):一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
    5):一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

        基本特性:可分區,函數,依賴,分區器,就近原則
RDD的彈性
    1): 自動進行內存和磁盤數據存儲的切換
        Spark優先把數據放到內存中,若是內存放不下,就會放到磁盤裏面,程序進行自動的存儲切換
    2): 基於血統的高效容錯機制
        在RDD進行轉換和動做的時候,會造成RDD的Lineage依賴鏈,當某一個RDD失效的時候,能夠經過從新計算上游的RDD來從新生成丟失的RDD數據。
    3): Task若是失敗會自動進行特定次數的重試
        RDD的計算任務若是運行失敗,會自動進行任務的從新計算,默認次數是4次。
    4): Stage若是失敗會自動進行特定次數的重試
        若是Job的某個Stage階段計算失敗,框架也會自動進行任務的從新計算,默認次數也是4次。
    5): Checkpoint和Persist可主動或被動觸發
        RDD能夠經過Persist持久化將RDD緩存到內存或者磁盤,當再次用到該RDD時直接讀取就行。也能夠將RDD進行檢查點,檢查點會將數據存儲在HDFS中,該RDD的全部父RDD依賴都會被移除。
    6): 數據調度彈性
        Spark把這個JOB執行模型抽象爲通用的有向無環圖DAG,能夠將多Stage的任務串聯或並行執行,調度引擎自動處理Stage的失敗以及Task的失敗。
    7): 數據分片的高度彈性
        能夠根據業務的特徵,動態調整數據分片的個數,提高總體的應用執行效率。
    RDD全稱叫作彈性分佈式數據集(Resilient Distributed Datasets):它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘RDD轉換而建立,爲此,RDD支持豐富的轉換操做(如map, join, filter, groupBy等),經過這種轉換操做,新的RDD則包含了如何從其餘RDDs衍生所必需的信息,因此說RDDs之間是有依賴關係的。基於RDDs之間的依賴,RDDs會造成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區,總結起來,基於RDD的流式計算任務可描述爲:從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的DAG,而後寫回穩定存儲。另外RDD還能夠將數據集緩存到內存中,使得在多個操做之間能夠重用數據集,基於這個特色能夠很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。能夠說Spark最初也就是實現RDD的一個分佈式系統,後面經過不斷髮展壯大成爲如今較爲完善的大數據生態系統,簡單來說,Spark-RDD的關係相似於Hadoop-MapReduce關係。

4 生成RDD的兩種類型

1:從集合中建立RDD
        val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)
      //這兩個方法都有第二參數是一個默認值2  分片數量(partition的數量)
      //scala集合經過makeRDD建立RDD,底層實現也是parallelize
      val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
     //scala集合經過parallelize建立RDD
      val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
    2:從外部存儲建立RDD
      al rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")

5 RDD算子的兩種類型

|->:transformation算子:轉化成新RDD
    |->:Action算子:轉化成非RDD

6 算子練習

|->迭代類型算子:map,flatMap,mapPartitions,foreach,foreachPartition...
        |->shuffle類算子:
                            |->byKey:groupBy,reduceByKey(不必定),groupByKey,sortBy,SortByKey...
                            |->重分區算子:repartition(必然發生shuffle),colaesce(不必定,多分區變少分區不須要發生shuffle),partitionBy(發生shuffle),repartitionAndSortWithinPartitions
                            |->join類算子:join(不必定),fullOuterJoi,leftOuterJoin,rightOuterJoin
                            |->去重類算子:distinct,countApproxDistinct(返回去重的個數)
        |->聚合類算子:reduce,reduceByKey,aggregate,aggregateByKey,fold,foldByKey,combineByKey,combineByKey,countByKey,countByValue
        |->排序類算子:sortBy,sortByKey
優化: 
  1.map,mapPartition優化:必定要分數據量和對應的物力資源來肯定到底使用哪一個算子
        數據量 |       map   |   mapPartition
                |   每一個元素   |        每一個分區
      --------------------------------------
        比較大 |               |         優先選擇
       海量數據   |     優先選擇 |   可能發生OOM

  2.foreach,foreachPartition優化:須要考慮到持久化時可以承受的鏈接數
         場景 |     foreach       |    foreachPartition
                |       每一個元素      |     每一個分區
      ---------------------------------------------------------
      鏈接數據庫   |   每一個元素對應一個鏈接    |    優先選擇(一個分區對應一個鏈接)
       海量數據   |     優先選擇           |    可能發生OOM

  3.groupByKey,reduceByKey:若是能用reduceByKey解決的需求就用reduceByKey
         場景    |    groupByKey      |    reduceByKey(局部聚合)
      ---------------------------------------------------------
                 |                |       優先選擇
  4.join+filter(過濾):爲了不join過程產生很大的數據集的狀況,能夠先filter再join
    filter:過濾後再計算可能發生嚴重的數據傾斜,能夠在過濾後先調整
  5.序列化調優:
    :RDD在計算過程當中,調用的算子和傳入算子的函數都是在Executor端執行,除此以外都是在Driver端執行的
class SearchFunction(val query: String) extends Serializable {
  //第一個方法是判斷輸入的字符串是否存在query 存在返回true,不存在返回false
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  // 問題:"isMatch"表示"this.isMatch",所以咱們要傳遞整個"this"
  def getMatchFunctionReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => this.isMatch(x))// 等價於:rdd.filter(isMatch)
  // 問題:"query"表示"this.query",所以咱們要傳遞整個"this"
  def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => x.contains(this.query))
  // 安全:只把咱們須要的字段拿出來放入局部變量中
  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    val _query = this.query
    rdd.filter(x => x.contains(_query))
  }
}
object SearchFunctions {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(SearchFunctions.getClass.getName).setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
    val sf = new SearchFunction("hello")
    sf.getMatchFunctionReference(rdd).foreach(println)
    sf.getMatchesFieldReference(rdd).foreach(println)
    sf.getMatchesNoReference(rdd).foreach(println)
    sc.stop()
  }
}
class Rules extends Serializable {
  val rulesMap = Map("xiaoli" -> 23, "xiaoming" -> 26)
}
object ObjectRules extends Serializable {
  val rulesMap = Map("jack" -> 27, "lucy" -> 22)
}
object SerializeTest_1 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //map方法中的函數是在Executor的某個Task中執行的
    val res = lines.map(x => {
      val rules = new Rules
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
    ArrayBuffer(
    (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@5c3d762c),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@736d5f3b),
    (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@374cd5ba))
     */
    sc.stop()
  }
}
object SerializeTest_2 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //該對象在Driver中建立
    val rules = new Rules
    //map方法中的函數是在Executor的某個Task中執行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
  ArrayBuffer(
  (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@48158406),
  (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@a287af2),
  (localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@a287af2))
     */
    sc.stop()
  }
}
object SerializeTest_3 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //該對象在Driver中建立單例對象
    val rules = ObjectRules
    //map方法中的函數是在Executor的某個Task中執行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      (hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
    })
    println(res.collect.toBuffer)
    /*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593))
     */
    sc.stop()
  }
}
object SerializeTest_4 {
  def main(args: Array[String]): Unit = {
    val conf = SparkUtil.getSparkConf
    val sc = new SparkContext(conf)
    val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
    //該對象在Driver中建立單例對象
        //map方法中的函數是在Executor的某個Task中執行的
    val res = lines.map(x => {
      val hostname = InetAddress.getLocalHost.getHostName
      val threadName = Thread.currentThread().getName
      /*
      不用在Driver端去建立對象,Rules不用實現序列化
       */
      (hostname, threadName, ObjectRules.rulesMap.getOrElse(x, 0), ObjectRules.toString)
    })
    println(res.collect.toBuffer)
    /*
    ArrayBuffer(
    (localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
    (localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6))
     */
    sc.stop()
  }
}

7 分區

textFile分片過程:由指定的cpu核數+指定的分區數+block塊的大小+文件的個數,通過分片算法獲得最終的分區數

8 RDD的依賴關係

|->寬依賴:一對多  一個父RDD分區會被多個子RDD使用
        |->窄依賴:一對一,多對一
        |->爲何區分寬窄依賴:
                    |->1:有寬窄依賴就能夠進行相應的容錯
                    |->2:寬依賴決定了stage的劃分的依據

9 DAG

爲何劃分stage:主要是爲了生成task,stage劃分過程實際上就將rdd的依賴按照shuffle來分爲一個到多個的範圍,task執行過程根本不會跨stage
        task數量 = stage數量 * 分區數(注:前提是沒有手動更改分區數)
        若是手動更改分區數,該stage的task數據由最後的分區數決定的
相關文章
相關標籤/搜索