Day1111
Spark任務調度
Spark幾個重要組件
Spark Core
RDD的概念和特性
生成RDD的兩種類型
RDD算子的兩種類型
算子練習
分區
RDD的依賴關係
DAG:有向無環圖
任務提交
緩存
checkPoint
自定義排序
自定義分區器
自定義累加器
廣播變量
Spark Shuffle過程
Spark優化過程
SparkSQL
集成Hive
一.Spark Core
1 Spark任務調度:
|->:standalone
|->:local
|->:Yarn
|->:Mesos
2 Spark幾個重要的組件
|->:Master:管理Worker,負責接收Driver發送的註冊信息(任務信息)
|->:Worker:負責本節點資源和任務的管理,啓動Exector進程
|->:Exector:負責計算任務
|->:Driver:用來提交任務(SparkSubmit進程)
3 Spark Core: RDD的概念和特性
數據的描述
1):一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。
2):一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。
3):RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。
4):一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
5):一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
基本特性:可分區,函數,依賴,分區器,就近原則
RDD的彈性
1): 自動進行內存和磁盤數據存儲的切換
Spark優先把數據放到內存中,若是內存放不下,就會放到磁盤裏面,程序進行自動的存儲切換
2): 基於血統的高效容錯機制
在RDD進行轉換和動做的時候,會造成RDD的Lineage依賴鏈,當某一個RDD失效的時候,能夠經過從新計算上游的RDD來從新生成丟失的RDD數據。
3): Task若是失敗會自動進行特定次數的重試
RDD的計算任務若是運行失敗,會自動進行任務的從新計算,默認次數是4次。
4): Stage若是失敗會自動進行特定次數的重試
若是Job的某個Stage階段計算失敗,框架也會自動進行任務的從新計算,默認次數也是4次。
5): Checkpoint和Persist可主動或被動觸發
RDD能夠經過Persist持久化將RDD緩存到內存或者磁盤,當再次用到該RDD時直接讀取就行。也能夠將RDD進行檢查點,檢查點會將數據存儲在HDFS中,該RDD的全部父RDD依賴都會被移除。
6): 數據調度彈性
Spark把這個JOB執行模型抽象爲通用的有向無環圖DAG,能夠將多Stage的任務串聯或並行執行,調度引擎自動處理Stage的失敗以及Task的失敗。
7): 數據分片的高度彈性
能夠根據業務的特徵,動態調整數據分片的個數,提高總體的應用執行效率。
RDD全稱叫作彈性分佈式數據集(Resilient Distributed Datasets):它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘RDD轉換而建立,爲此,RDD支持豐富的轉換操做(如map, join, filter, groupBy等),經過這種轉換操做,新的RDD則包含了如何從其餘RDDs衍生所必需的信息,因此說RDDs之間是有依賴關係的。基於RDDs之間的依賴,RDDs會造成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區,總結起來,基於RDD的流式計算任務可描述爲:從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的DAG,而後寫回穩定存儲。另外RDD還能夠將數據集緩存到內存中,使得在多個操做之間能夠重用數據集,基於這個特色能夠很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。能夠說Spark最初也就是實現RDD的一個分佈式系統,後面經過不斷髮展壯大成爲如今較爲完善的大數據生態系統,簡單來說,Spark-RDD的關係相似於Hadoop-MapReduce關係。
4 生成RDD的兩種類型
1:從集合中建立RDD
val conf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(conf)
//這兩個方法都有第二參數是一個默認值2 分片數量(partition的數量)
//scala集合經過makeRDD建立RDD,底層實現也是parallelize
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
//scala集合經過parallelize建立RDD
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
2:從外部存儲建立RDD
al rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")
5 RDD算子的兩種類型
|->:transformation算子:轉化成新RDD
|->:Action算子:轉化成非RDD
6 算子練習
|->迭代類型算子:map,flatMap,mapPartitions,foreach,foreachPartition...
|->shuffle類算子:
|->byKey:groupBy,reduceByKey(不必定),groupByKey,sortBy,SortByKey...
|->重分區算子:repartition(必然發生shuffle),colaesce(不必定,多分區變少分區不須要發生shuffle),partitionBy(發生shuffle),repartitionAndSortWithinPartitions
|->join類算子:join(不必定),fullOuterJoi,leftOuterJoin,rightOuterJoin
|->去重類算子:distinct,countApproxDistinct(返回去重的個數)
|->聚合類算子:reduce,reduceByKey,aggregate,aggregateByKey,fold,foldByKey,combineByKey,combineByKey,countByKey,countByValue
|->排序類算子:sortBy,sortByKey
優化:
1.map,mapPartition優化:必定要分數據量和對應的物力資源來肯定到底使用哪一個算子
數據量 | map | mapPartition
| 每一個元素 | 每一個分區
--------------------------------------
比較大 | | 優先選擇
海量數據 | 優先選擇 | 可能發生OOM
2.foreach,foreachPartition優化:須要考慮到持久化時可以承受的鏈接數
場景 | foreach | foreachPartition
| 每一個元素 | 每一個分區
---------------------------------------------------------
鏈接數據庫 | 每一個元素對應一個鏈接 | 優先選擇(一個分區對應一個鏈接)
海量數據 | 優先選擇 | 可能發生OOM
3.groupByKey,reduceByKey:若是能用reduceByKey解決的需求就用reduceByKey
場景 | groupByKey | reduceByKey(局部聚合)
---------------------------------------------------------
| | 優先選擇
4.join+filter(過濾):爲了不join過程產生很大的數據集的狀況,能夠先filter再join
filter:過濾後再計算可能發生嚴重的數據傾斜,能夠在過濾後先調整
5.序列化調優:
:RDD在計算過程當中,調用的算子和傳入算子的函數都是在Executor端執行,除此以外都是在Driver端執行的
class SearchFunction(val query: String) extends Serializable {
//第一個方法是判斷輸入的字符串是否存在query 存在返回true,不存在返回false
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 問題:"isMatch"表示"this.isMatch",所以咱們要傳遞整個"this"
def getMatchFunctionReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => this.isMatch(x))// 等價於:rdd.filter(isMatch)
// 問題:"query"表示"this.query",所以咱們要傳遞整個"this"
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => x.contains(this.query))
// 安全:只把咱們須要的字段拿出來放入局部變量中
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
val _query = this.query
rdd.filter(x => x.contains(_query))
}
}
object SearchFunctions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(SearchFunctions.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
val sf = new SearchFunction("hello")
sf.getMatchFunctionReference(rdd).foreach(println)
sf.getMatchesFieldReference(rdd).foreach(println)
sf.getMatchesNoReference(rdd).foreach(println)
sc.stop()
}
}
class Rules extends Serializable {
val rulesMap = Map("xiaoli" -> 23, "xiaoming" -> 26)
}
object ObjectRules extends Serializable {
val rulesMap = Map("jack" -> 27, "lucy" -> 22)
}
object SerializeTest_1 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//map方法中的函數是在Executor的某個Task中執行的
val res = lines.map(x => {
val rules = new Rules
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@5c3d762c),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@736d5f3b),
(localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@374cd5ba))
*/
sc.stop()
}
}
object SerializeTest_2 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該對象在Driver中建立
val rules = new Rules
//map方法中的函數是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@48158406),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@a287af2),
(localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@a287af2))
*/
sc.stop()
}
}
object SerializeTest_3 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該對象在Driver中建立單例對象
val rules = ObjectRules
//map方法中的函數是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593))
*/
sc.stop()
}
}
object SerializeTest_4 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該對象在Driver中建立單例對象
//map方法中的函數是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
/*
不用在Driver端去建立對象,Rules不用實現序列化
*/
(hostname, threadName, ObjectRules.rulesMap.getOrElse(x, 0), ObjectRules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6))
*/
sc.stop()
}
}
7 分區
textFile分片過程:由指定的cpu核數+指定的分區數+block塊的大小+文件的個數,通過分片算法獲得最終的分區數
8 RDD的依賴關係
|->寬依賴:一對多 一個父RDD分區會被多個子RDD使用
|->窄依賴:一對一,多對一
|->爲何區分寬窄依賴:
|->1:有寬窄依賴就能夠進行相應的容錯
|->2:寬依賴決定了stage的劃分的依據
9 DAG
爲何劃分stage:主要是爲了生成task,stage劃分過程實際上就將rdd的依賴按照shuffle來分爲一個到多個的範圍,task執行過程根本不會跨stage
task數量 = stage數量 * 分區數(注:前提是沒有手動更改分區數)
若是手動更改分區數,該stage的task數據由最後的分區數決定的