spark入門筆記

本文源碼基於spark 2.2.0html

基本概念

Application

用戶編寫的Spark程序,經過一個有main方法的類執行,完成一個計算任務的處理。它是由一個Driver程序和一組運行於Spark集羣上的Executor組成java

RDD

彈性分佈式數據集。RDD是Spark的核心數據結構,能夠經過一系列算子進行操做。當RDD遇到Action算子時,將以前的全部算子造成一個有向無環圖(DAG)。再在Spark中轉化爲Job,提交到集羣執行node

spark2.x後就使用DataFrame/DateSet了web

SparkContext

SparkContext是Spark的入口,負責鏈接Spark集羣,建立RDD,累積量和廣播量等。從本質上來講,SparkContext是Spark的對外接口,負責向調用者提供Spark的各類功能。 算法

SparkContext在Spark中的主要功能
driver program經過SparkContext鏈接到集羣管理器來實現對集羣中任務的控制。Spark配置參數的設置以及對SQLContext、HiveContext和StreamingContext的控制也要經過SparkContext進行

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
每一個JVM只有一個SparkContext,一臺服務器能夠啓動多個JVMshell

SparkSession

The entry point to programming Spark with the Dataset and DataFrame API.
包含了SQLContext和HiveContextapache

Driver

運行main方法的Java虛擬機進程,負責監聽spark application的executor進程發來的通訊和鏈接,將工程jar發送到全部的executor進程中
Driver與Master、Worker協做完成application進程的啓動、DAG劃分、計算任務封裝、分配task到executor上、計算資源的分配等調度執行做業等
driver調度task給executor執行,因此driver最好和spark集羣在一片網絡內,便以通訊
driver進程一般在worker節點中,和Cluster Manager不在同一個節點上json

Cluster Manager做用對象是整個saprk集羣(集羣資源分配),全部應用,而Driver是做用於某一個應用(協調已經分配給application的資源),管理層面不同api

Worker

集羣中的工做節點,啓動並運行executor進程,運行做業代碼的節點
standalone模式下:Worker進程所在節點
yarn模式下: yarn的nodemanager進程所在的節點數組

Executor

運行在worker節點上,負責執行做業的任務,並將數據保存在內存或磁盤中
每一個spark application,都有屬於本身的executor進程,spark application不會共享一個executor進程

在啓動參數中有executor-cores,executor-memory,每一個executor都會佔用cpu core和內存,又spark application間不會複用executor,則很容易致使worker資源不足

executor在整個spark application運行的生命週期內,executor能夠動態增長/釋放,見動態資源分配一節
executor使用多線程運行SparkContext分配過來的task,來一批task就執行一批

Job

一個spark application可能會被分爲多個job,每次調用Action時,邏輯上會生成一個Job,一個Job包含了一個或多個Stage

Stage

每一個job都會劃分爲一個或多個stage(階段),每一個stage都會有對應的一批task(即一個taskset),分配到executor上去執行

Stage包括兩類:ShuffleMapStage和ResultStage,若是用戶程序中調用了須要進行Shuffle計算的Operator,如groupByKey等,就會以Shuffle爲邊界分紅ShuffleMapStage和ResultStage。
若是一次shuffle都沒執行,那就只有一個stage

TaskSet

一組關聯的,但相互之間沒有Shuffle依賴關係的Task集合;Stage能夠直接映射爲TaskSet,一個TaskSet封裝了一次須要運算的、具備相同處理邏輯的Task,這些Task能夠並行計算,粗粒度的調度是以TaskSet爲單位的。

一個stage對應一個taskset

Task

driver發送到executor上執行的計算單元,每一個task負責在一個階段(stage),處理一小片數據,計算出對應的結果
Task是在物理節點上運行的基本單位,Task包含兩類:ShuffleMapTask和ResultTask,分別對應於Stage中ShuffleMapStage和ResultStage中的一個執行基本單元。
InputSplit-task-partition有一一對應關係,Spark會爲每個partition運行一個task來進行處理(見本文知識點-Spark集羣中的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係一節)
手動設置task數量spark.default.parallelism

Cluster Manager

集羣管理器,爲每一個spark application在集羣中調度和分配資源的組件,如Spark Standalone、YARN、Mesos等

Deploy Mode

不管是standalone/yarn,都分爲兩種模式,client和cluster,區別在於driver運行的位置
client模式下driver運行在提交spark做業的機器上,能夠實時看到詳細的日誌信息,方便追蹤和排查錯誤,用於測試
cluster模式下,spark application提交到cluster manager,cluster manager(好比master)負責在集羣中某個節點上,啓動driver進程,用於生產環境

一般狀況下driver和worker在同一個網絡中是最好的,而client極可能就是driver worker分開佈置,這樣網絡通訊很耗時,cluster沒有這樣的問題

standalone模式

master作集羣管理
Master進程和Worker進程組成的集羣, 能夠不須要yarn集羣,不須要HDFS

Master

standalone模式下,集羣管理器(Cluster Manager)的一種,爲每一個spark application在集羣中調度和分配資源的組件

注意和driver的區別,即Cluster Manager和driver的區別

yarn模式

yarn作集羣管理
ResourceManager進程和NodeManager進程組成的集羣

DAGScheduler

根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler。

TaskScheduler

將Taskset提交給Worker node集羣運行並返回結果。

spark組件-百度腦圖

spark基本工做原理

Driver向Master申請資源;
Master讓Worker給程序分配具體的Executor
Driver把劃分好的Task傳送給Executor,Task就是咱們的Spark程序的業務邏輯代碼

job生成,stage劃分和task分配都是發生在driver端?是

Spark VS MapReduce

Spark和MapReduce最大不一樣:迭代式計算

  • MapReduce
    一個job分兩個階段,map和reduce,兩階段處理完就算結束了
  • Spark
    可分爲n個階段,爲內存迭代式

RDD

全稱爲Resillient Distributed Dataset,即彈性分佈式數據集。
提供了容錯性,能夠自動歷來源數據從新計算,從節點失敗中恢復過來
默認是在內存中,內存不足則寫入磁盤
一個RDD是分佈式的,數據分佈在一批節點上,每一個節點存儲了RDD部分partition

RDD內存不足會自動寫入磁盤,調用cache()和persist()會將RDD數據按storelevel存儲

RDD建立

  1. SparkContext.wholeTextFiles()能夠針對一個目錄中的大量小文件,返回<filename,fileContent>組成的個PairRDD
  2. SparkContext.sequenceFile[K,V]()能夠針對SequenceFile建立RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,好比IntWritable、Text等。
  3. SparkContext.hadoopRDD()能夠針對Hadoop的自定義輸入類型建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。
  4. SparkContext.objectFile()方法,能夠針對以前調用RDD.saveAsObjectFile()建立的對象序列化的文件,反序列化文件中的數據,並建立一個RDD。

並行化建立RDD
調用parallelize()方法,能夠指定要將集合切分紅多少個partition(實際上應該是指定了InputSplit數量,InputSplit-task-partition),Spark會爲每個partition運行一個task來進行處理(見本文知識點-Spark集羣中的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係一節)
Spark官方建議爲集羣中的每一個CPU建立2~4個partition,避免CPU空載

若是集羣中運行了多個任務,包括spark hadoop任務,是否也是以一個cpu core負載2-4個計算任務來配置?

Transformation和Action

Transformation

針對已有的RDD建立一個新的RDD
transformation具備lazy特性,只是記錄了對RDD所作的操做,可是不會自發地執行。只有Action操做後,全部的transformation纔會執行,能夠避免產生過多中間結果

操做 介紹
map 將RDD中的每一個元素傳入自定義函數,獲取一個新的元素,而後用新的元素組成新的RDD
filter 對RDD中每一個元素進行判斷,若是返回true則保留,返回false則剔除。
flatMap 與map相似,是先映射後扁平化
gropuByKey 根據key進行分組,每一個key對應一個Iterable
reduceByKey 對每一個key對應的value進行reduce操做。
sortByKey 對每一個key對應的value進行排序操做。
join 對兩個包含<key,value>對的RDD進行join操做,每一個key join上的pair,都會傳入自定義函數進行處理。
cogroup 同join,可是每一個key對應的Iterable都會傳入自定義函數進行處理。

map與flatMap的區別
map對rdd之中的元素逐一進行函數操做映射爲另一個rdd。
flatMap對集合中每一個元素進行操做而後再扁平化。一般用來切分單詞

實驗:flatMap是否會將多層嵌套的元素再拍扁
實驗結論:只往下一層作flatten操做,不會遞歸進去作flatten操做

val arr = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
arr.flatMap(x => (x._1 + x._2)).foreach(print)  //A1B2C3

val arr2 = sc.parallelize(Array(
                              Array(
                                ("A", 1), ("B", 2), ("C", 3)),
                              Array(
                                ("C", 1), ("D", 2), ("E", 3)),
                              Array(
                                ("F", 1), ("G", 2), ("H", 3))))
arr2.flatMap(x => x).foreach(print)  //(A,1)(B,2)(C,3)(C,1)(D,2)(E,3)(F,1)(G,2)(H,3)

val arr3 = sc.parallelize(Array(
                              Array(
                                Array(("A", 1), ("B", 2), ("C", 3))),
                              Array(
                                Array(("C", 1), ("D", 2), ("E", 3))),
                              Array(
                                Array(("F", 1), ("G", 2), ("H", 3)))))
arr3.flatMap(x => x).foreach(print)  //[Lscala.Tuple2;@11074bf8 [Lscala.Tuple2;@c10a22d [Lscala.Tuple2;@40ef42cd
複製代碼

map和flatMap源碼

def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    //直接遍歷元素,對元素應用f方法
    def next() = f(self.next())
  }

  /** Creates a new iterator by applying a function to all values produced by this iterator * and concatenating the results. * * @return the iterator resulting from applying the given iterator-valued function * `f` to each value produced by this iterator and concatenating the results. */
  def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
    private var cur: Iterator[B] = empty
    //這一步只是取當前元素的Iterator,沒有遞歸往下層取
    private def nextCur() { cur = f(self.next()).toIterator }
    def hasNext: Boolean = {
      while (!cur.hasNext) {
        if (!self.hasNext) return false
        nextCur()
      }
      true
    }
    //在調用next方法時,最終會調用到nextCur方法
    def next(): B = (if (hasNext) cur else empty).next()
  }
複製代碼

join VS cogroup VS fullOuterJoin VS leftOuterJoin VS rightOuterJoin

val studentList = Array(
  Tuple2(1, "leo"),
  Tuple2(2, "jack"),
  Tuple2(3, "tom"));
val scoreList = Array(
  Tuple2(1, 100),
  Tuple2(2, 90),
  Tuple2(2, 90),
  Tuple2(4, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
/* * (4,(CompactBuffer(),CompactBuffer(60))) * (1,(CompactBuffer(leo),CompactBuffer(100))) * (3,(CompactBuffer(tom),CompactBuffer())) * (2,(CompactBuffer(jack),CompactBuffer(90, 90))) */
val studentCogroup = students.cogroup(scores)   //union key數組延長
/* * (1,(leo,100)) * (2,(jack,90)) * (2,(jack,90)) */
val studentJoin = students.join(scores) //交集
/* * (4,(None,Some(60))) * (1,(Some(leo),Some(100))) * (3,(Some(tom),None)) * (2,(Some(jack),Some(90))) * (2,(Some(jack),Some(90))) */
val studentFullOuterJoin = students.fullOuterJoin(scores) //some可爲空 union
/* * (1,(leo,Some(100))) * (3,(tom,None)) * (2,(jack,Some(90))) * (2,(jack,Some(90))) */
val studentLeftOuterJoin = students.leftOuterJoin(scores) //左不爲空
/* * (4,(None,60)) * (1,(Some(leo),100)) * (2,(Some(jack),90)) * (2,(Some(jack),90)) */
val studentRightOuterJoin = students.rightOuterJoin(scores) //右不爲空
複製代碼

Action

對RDD進行最後的操做,如遍歷,reduce,save等,啓動計算操做,並向用戶程序返回值或向外部存儲寫數據
觸發一個spark job的運行,從而觸發這個action以前全部的transformation的執行 對於操做key-value對的Tuple2 RDD,如groupByKey,scala是經過隱式轉換爲PairRDDFunction,再提供對應groupByKey方法實現的,須要手動導入Spark的相關隱式轉換,import org.apache.spark.SparkContext._

對groupByKey,saprk2.2顯式使用HashPartitioner,沒有看到隱式轉換爲PairRDDFunction Action操做必定會將結果返回給driver?是的,見下文的runJob方法

Action操做特徵
Action操做在源碼上必調用runJob()方法,多是直接或間接調用

//直接調用了runJob
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
  
  /** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param resultHandler callback to pass each result to */
   //會把結果傳遞給handler function,handle function就是對返回結果進行處理的方法
   //如上文的collect方法的handler function就是 (iter: Iterator[T]) => iter.toArray
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }
複製代碼
操做 介紹
reduce 將RDD中的全部元素進行聚合操做。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。
collect 將RDD中全部元素獲取到本地客戶端。注意數據傳輸問題,spark.driver.maxResultSize能夠限制action算子返回driver的結果集最大數量
count 獲取RDD元素總數。
take(n) 獲取RDD中前n個元素。
saveAsTextFile 將RDD元素保存到文件中,對每一個元素調用toString方法
countByKey 對每一個key對應的值進行count計數。
foreach 遍歷RDD中的每一個元素。
//從本地文件建立
val lines = spark.sparkContext.textFile("hello.txt")
//Transformation,返回(key,value)的RDD
val linePairs = lines.map(line => (line, 1))
//Transformation,隱式裝換爲PairRDDFunction,提供reduceByKey等方法
//源碼中是用HashPartitioner
val lineCounts = linePairs.reduceByKey(_ + _)
//Action,發送到driver端執行
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + lineCount._2 + " times."))
複製代碼

mapPartitions

map:一次處理一個partition中的一條數據
mapPartitions:一次處理一個partition中全部的數據
使用場景:
RDD的數據量不是特別大,建議採用mapPartitions算子替代map算子,能夠加快處理速度,若是RDD的數據量特別大,則不建議用mapPartitions,可能會內存溢出

val studentScoresRDD = studentNamesRDD.mapPartitions { it =>
    var studentScoreList = Array("a")
    while (it.hasNext) {
      ...
    }
    studentScoreList.iterator
}
複製代碼

mapPartitionsWithIndex:加上了partition的index

studentNamesRDD.mapPartitionsWithIndex{(index:Int,it:Iterator[String])=>
      ...
 }
複製代碼

其餘算子

  1. sample:按比例取樣本,transformation操做
  2. takeSample:按個數取樣本,action操做
  3. cartesian:笛卡爾積
  4. coalesce:將RDD的partition縮減,將數據壓縮到更少的partition中去.
    使用場景:若不少partition中的數據不均勻(如filter後),可使用coalesce壓縮rdd的partition數量,從而讓各個partition中的數據都更加的緊湊
    rdd.coalesce(3):壓縮成3個partition

coalesce和repartition區別
repartition是coalesce的簡化版

/** * 返回一個通過簡化到numPartitions個分區的新RDD。這會致使一個窄依賴 * 例如:你將1000個分區轉換成100個分區,這個過程不會發生shuffle,相反若是10個分區轉換成100個分區將會發生shuffle。 * 然而若是你想大幅度合併分區,例如合併成一個分區,這會致使你的計算在少數幾個集羣節點上計算(言外之意:並行度不夠) * 爲了不這種狀況,你能夠將第二個shuffle參數傳遞一個true,這樣會在從新分區過程當中多一步shuffle,這意味着上游的分區能夠並行運行。 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T] = withScope {...}
/** * 返回一個剛好有numPartitions個分區的RDD,能夠增長或者減小此RDD的並行度。 * 在內部,這將使用shuffle從新分佈數據,若是你減小分區數,考慮使用coalesce,這樣能夠避免執行shuffle */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}
複製代碼

設RDD分區數從N變動爲M

分區數關係 shuffle = true shuffle = false
N < M N個分區有數據分佈不均勻的情況,利用HashPartitioner函數將數據從新分區爲M個 coalesce爲無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關係
N > M 將N個分區中的若干個分區合併成一個新的分區,最終合併爲M個分區
N >> M shuffle = true,在從新分區過程當中多一步shuffle,上游的分區能夠並行運行,使coalesce以前的操做有更好的並行度 父子RDD是窄依賴關係,在同一個Stage中,可能形成Spark程序的並行度不夠(計算在少數幾個集羣節點上進行),從而影響性能
  1. 返回一個減小到M個分區的新RDD,這會致使窄依賴,不會發生shuffle
  2. 返回一個增長到M個分區的新RDD,會發生shuffle
  3. 若是shuff爲false時,N<M,RDD的分區數是不變的,也就是說不通過shuffle,是沒法將RDD的partition數變多的

RDD持久化

cache()和persist()

/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  /** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()
複製代碼

若是須要從內存中清除緩存,那麼可使用unpersist()方法。

class StorageLevel private( private var _useDisk: Boolean, //磁盤 private var _useMemory: Boolean,//內存 private var _useOffHeap: Boolean,//內存滿就存磁盤 private var _deserialized: Boolean,//序列化儲存 private var _replication: Int = 1)//冗餘備份,默認1,只本身儲存一份
  extends Externalizable {
複製代碼
持久化級別 含義
MEMORY_ONLY 以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它的時候,從新被計算
MEMORY_AND_DISK 同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取。
MEMORY_ONLY_SER 同MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷。
MEMORY_AND_DSK_SER 同MEMORY_AND_DSK。可是使用序列化方式持久化Java對象。
DISK_ONLY 使用非序列化Java對象的方式持久化,徹底存儲到磁盤上。
MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 若是是尾部加了2的持久化級別,表示會將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可。

優先級排序(內存優先)

  1. MEMORY_ONLY
  2. MEMORY_ONLY_SER,將數據進行序列化進行存儲
  3. DISK

共享變量

  1. 默認
    一個算子的函數中使用到了某個外部的變量,則拷貝變量到每一個task中,此時每一個task只能操做本身的那份變量副本
  2. Broadcast Variable(廣播變量)
    將使用到的變量,爲每一個節點拷貝一份(不是每一個task),減小網絡傳輸以及內存消耗
    只讀變量
  3. Accumulator(累加變量)
    讓多個task共同操做一份變量,主要能夠進行累加操做
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
val factorBroadcast = sc.broadcast(3)
val sumAccumulator = new DoubleAccumulator()
//Accumulator must be registered before send to executor
sc.register(sumAccumulator)

val multipleRdd = rdd.map(num => num * factorBroadcast.value)
//不能獲取值,只能在driver端獲取
val accumulator = rdd.map(num2 => sumAccumulator.add(num2.toDouble))
//action:3,6,9,12,15
multipleRdd.foreach(num => println(num))
//要先執行action操做才能獲取值
accumulator.collect()  //15
println(sumAccumulator.value) 
accumulator.count()    //30,再次加15
println(sumAccumulator.value)
複製代碼

spark 內核架構

standalone模式下

Spark架構原理-standalone模式下

TaskScheduler把taskSet裏每個task提交到executor上執行

spark內部組件

寬依賴與窄依賴

窄依賴(narrow dependency):每一個parent RDD 的 partition 最多被 child RDD的一個partition使用
寬依賴(wide dependency):每一個parent RDD 的 partition 被多個 child RDD的partition使用

區別:

  1. 窄依賴容許在一個集羣節點上以流水線的方式(pipeline)計算全部父分區。例如,逐個元素地執行map、而後filter操做;
    寬依賴則須要首先計算好全部父分區數據,而後在節點之間進行Shuffle,這與MapReduce相似。
  2. 窄依賴可以更有效地進行失效節點的恢復,即只需從新計算丟失RDD分區的父分區,並且不一樣節點之間能夠並行計算;
    而對於一個寬依賴關係的Lineage圖,單個節點失效可能致使這個RDD的全部祖先丟失部分分區,於是須要總體從新計算。

寬依賴與窄依賴

spark提交模式

  1. spark內核模式/standalone 模式 : 基於spark的master-worker集羣
  2. 基於Yarn的yarn-cluster模式
  3. 基於Yarn的yarn-client模式

在spark提交腳本中設置
--master參數值爲yarn-cluster / yarn-client
默認是standalone 模式

spark提交腳本

/usr/local/spark/bin/spark-submit \
--class com.feng.spark.spark1.StructuredNetworkWordCount \
--master spark://spark1:7077 \ #standalone模式
--num-executors 3 \     // #分配3個executor
--driver-memory 500m \  
--executor-memory 500m \    # //每一個executor500m內存
--executor-cores 2 \    // # 每一個executor2個core
/usr/local/test_data/spark1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
複製代碼

整個應用須要3*500=1500m內存,3*2=6個core
--master local[8]:進程中用8個線程來模擬集羣的執行
--total-executor-cores:指定全部executor的總cpu core數量
--supervise:指定了spark監控driver節點,若是driver掛掉,自動重啓driver

配置方式

按優先級從高到低排序

  1. SparkConf:經過程序設置,在編輯器中用local模式執行運行時,只能在SparkConf中設置屬性
  2. spark-submit腳本命令:當前應用有效,推薦
  3. spark-defaults.conf文件:全局配置
SparkConf.set("spark.default.parallelism", "100")
spark-submit: --conf spark.default.parallelism=50
spark-defaults.conf: spark.default.parallelism 10
複製代碼

在spark-submit腳本中,可使用--verbose,打印詳細的配置屬性的信息

能夠先在程序中建立一個空的SparkConf對象,如

val sc = new SparkContext(new SparkConf())
複製代碼

而後在spark-submit腳本中用--conf設置屬性值,如

--conf spark.eventLog.enabled=false
複製代碼

依賴管理

--jars:額外依賴的jar包會自動被髮送到集羣上去
指定關聯的jar:

  1. file: 由driver的http文件服務提供支持,全部的executor都會經過driver的HTTP服務來拉取文件
  2. hdfs:/http:/https:/ftp: 直接根據URI,從指定的地方拉取
  3. local: 這種格式的文件必須在每一個worker節點上都要存在,因此不須要經過網絡io去拉取文件,適用於特別大的文件或jar包,能夠提高做業的執行性能

文件和jar都會被拷貝到每一個executor的工做目錄中,這就會佔用很大一片磁盤空間,所以須要在以後清理掉這些文件
在yarn上運行spark做業時,依賴文件的清理都是自動進行的
使用standalone模式,須要配置spark.worker.cleanup.appDataTtl屬性,來開啓自動清理依賴文件和jar包

相關參數見conf/spark-evnsh參數一節

--packages:綁定maven的依賴包
--repositories:綁定額外的倉庫

yarn-cluster模式

用於生產模式,driver運行在nodeManager,沒有網卡流量激增問題,但查看log麻煩,調試不方便

基於YARN的提交模式-yarn-cluster

yarn-client模式

yarn-client用於測試,driver運行在本地客戶端,負責調度application,會與yarn集羣產生超大量的網絡通訊,從而致使網卡流量激增
yarn-client能夠在本地看到全部log,方便調試

基於YARN的提交模式-yarn-client

  1. yarn-client下,driver運行在spark-submit提交的機器上,ApplicationMaster只是至關於一個ExecutorLauncher,僅僅負責申請啓動executor;driver負責具體調度
  2. yarn-cluster下,ApplicationMaster是driver,ApplicationMaster負責具體調度

standalone核心組件交互流程

參見Spark架構簡明分析

基本要點:

  1. 一個Application會啓動一個Driver
  2. 一個Driver負責跟蹤管理該Application運行過程當中全部的資源狀態和任務狀態
  3. 一個Driver會管理一組Executor
  4. 一個Executor只執行屬於一個Driver的Task

standalone核心組件交互流程

  • 橙色:提交用戶Spark程序
    用戶提交一個Spark程序,主要的流程以下所示:

    1. 用戶spark-submit腳本提交一個Spark程序,會建立一個ClientEndpoint對象,該對象負責與Master通訊交互
    2. ClientEndpoint向Master發送一個RequestSubmitDriver消息,表示提交用戶程序
    3. Master收到RequestSubmitDriver消息,向ClientEndpoint回覆SubmitDriverResponse,表示用戶程序已經完成註冊

    結合4,5,應該是表示用戶程序已經在master註冊,但driver可能並未啓動

    1. ClientEndpoint向Master發送RequestDriverStatus消息,請求Driver狀態

    MasterEndPoint應該會向DriverClient返回一個相似DriverStatusResponse的應答?週期性應答,當獲知driver已啓動,則致使5

    1. 若是當前用戶程序對應的Driver已經啓動,則ClientEndpoint直接退出,完成提交用戶程序
  • 紫色:啓動Driver進程
    當用戶提交用戶Spark程序後,須要啓動Driver來處理用戶程序的計算邏輯,完成計算任務,這時Master須要啓動一個Driver:

    1. Maser內存中維護着用戶提交計算的任務Application,每次內存結構變動都會觸發調度,向Worker發送LaunchDriver請求
    2. Worker收到LaunchDriver消息,會啓動一個DriverRunner線程去執行LaunchDriver的任務
    3. DriverRunner線程在Worker上啓動一個新的JVM實例,該JVM實例內運行一個Driver進程,該Driver會建立SparkContext對象

    當前worker節點運行driver進程

  • 紅色:註冊Application
    Dirver啓動之後,它會建立SparkContext對象,初始化計算過程當中必需的基本組件,並向Master註冊Application,流程描述以下:

    1. 建立SparkEnv對象,建立並管理一些基本組件

    SparkEnv Holds all the runtime environment objects for a running Spark instance (either master or worker), including the serializer, RpcEnv, block manager, map output tracker, etc. Currently Spark code finds the SparkEnv through a global variable, so all the threads can access the same SparkEnv

    1. 建立TaskScheduler,負責Task調度
    2. 建立StandaloneSchedulerBackend,負責與ClusterManager進行資源協商
    3. 建立DriverEndpoint,其它組件能夠與Driver進行通訊

    只是建立,還未啓動

    1. 在StandaloneSchedulerBackend內部建立一個StandaloneAppClient,負責處理與Master的通訊交互
    2. StandaloneAppClient建立一個ClientEndpoint,實際負責與Master通訊
    3. ClientEndpoint向Master發送RegisterApplication消息,註冊Application
    4. Master收到RegisterApplication請求後,回覆ClientEndpoint一個RegisteredApplication消息,表示已經註冊成功
  • 藍色:啓動Executor進程

    1. Master向Worker發送LaunchExecutor消息,請求啓動Executor;同時Master會向Driver發送ExecutorAdded消息,表示Master已經新增了一個Executor(此時還未啓動)

    executor還未真實啓動,master只是發出一個啓動executor的消息給worker而已. 這一步代表master纔是負責啓動和分配executor,driver只是提交task到executor

    1. Worker收到LaunchExecutor消息,會啓動一個ExecutorRunner線程去執行LaunchExecutor的任務
    2. Worker向Master發送ExecutorStageChanged消息,通知Executor狀態已發生變化
    3. Master向Driver發送ExecutorUpdated消息,此時Executor已經啓動

    這裏master才真正告訴driver executor已經啓動

  • 粉色:啓動Task執行

    1. StandaloneSchedulerBackend啓動一個DriverEndpoint

    以前已經建立,但未啓動,以前和master的通訊都是StandaloneSchedulerBackend完成的

    1. DriverEndpoint啓動後,會週期性地檢查Driver維護的Executor的狀態,若是有空閒的Executor便會調度任務執行

    啓動一個driver-revive-thread後臺線程,週期性地發送ReviveOffers給本身,讓本身檢查executor狀態

    1. DriverEndpoint向TaskScheduler發送Resource Offer請求

    DriverEndpoint是CoarseGrainedSchedulerBackend內部的一個持有對象

    1. 若是有可用資源啓動Task,則DriverEndpoint向Executor發送LaunchTask請求
    2. Executor進程內部的CoarseGrainedExecutorBackend調用內部的Executor線程的launchTask方法啓動Task
    3. Executor線程內部維護一個線程池,建立一個TaskRunner線程並提交到線程池執行
  • 綠色:Task運行完成

    1. Executor進程內部的Executor線程通知CoarseGrainedExecutorBackend,Task運行完成
    2. CoarseGrainedExecutorBackend向DriverEndpoint發送StatusUpdated消息,通知Driver運行的Task狀態發生變動
    3. StandaloneSchedulerBackend調用TaskScheduler的updateStatus方法更新Task狀態

    StandaloneSchedulerBackend父類CoarseGrainedSchedulerBackend內部持有DriverEndpoint(內部類),DriverEndpoint收到StatusUpdate信息後,直接調用scheduler.statusUpdate(taskId, state, data.value)

    1. StandaloneSchedulerBackend繼續調用TaskScheduler的resourceOffers方法,調度其餘任務運行

Spark Standalone集羣單獨啓動master和worker

start-all.sh腳本能夠啓動master進程和全部worker進程,快速啓動整個spark standalone集羣

分別啓動master和worker進程

爲什麼要分別啓動

分別啓動能夠經過命令行參數,爲進程配置一些獨特的參數
如監聽端口號、web ui端口號、使用的cpu和內存
如同一臺機器上不只運行了saprk程序,還運行了storm程序,就能夠限制spark worker進程使用更少的資源(cpu core,memory),而非機器上全部資源

參數 含義 對象 使用頻率
-h HOST, --ip HOST 在哪臺機器上啓動,默認就是本機 master & worker 不經常使用
-p PORT, --port PORT 在機器上啓動後,使用哪一個端口對外提供服務,master默認是7077,worker默認是隨機的 master & worker 不經常使用
--webui-port PORT web ui的端口,master默認是8080,worker默認是8081 master & worker 不經常使用
-c CORES, --cores CORES 總共能讓spark做業使用多少個cpu core,默認是當前機器上全部的cpu core worker 經常使用
-m MEM, --memory MEM 總共能讓spark做業使用多少內存,是100M或者1G這樣的格式,默認是1g worker 經常使用
-d DIR, --work-dir DIR 工做目錄,默認是SPARK_HOME/work目錄 worker 經常使用
--properties-file FILE master和worker加載默認配置文件的地址,默認是conf/spark-defaults.conf master & worker 不經常使用

啓動順序

先啓動master,再啓動worker,由於worker啓動之後,須要向master註冊

關閉順序1.worker(./stop-slave.sh) ;2. master(./stop-master);3. 關閉集羣./stop-all.sh

啓動master

  1. 使用start-master.sh啓動
  2. 啓動日誌就會打印一行spark://HOST:PORT,這就是master的URL地址,worker進程就會經過這個URL地址來鏈接到master進程,並進行註冊

    可使用SparkSession.master()設置master地址

  3. 能夠經過http://MASTER_HOST:8080來訪問master集羣的監控web ui,web ui上, 會顯示master的URL地址

手動啓動worker進程

使用start-slave.sh <master-spark-URL>當前節點上啓動worker進程
http://MASTER_HOST:8080web ui上會顯示該節點的cpu和內存資源等信息
eg:./start-slave.sh spark://192.168.0.001:8080 --memory 500m

spark全部啓動和關閉腳本

參數 含義
sbin/start-all.sh 根據配置,在集羣中各個節點上,啓動一個master進程和多個worker進程
sbin/stop-all.sh 在集羣中中止全部master和worker進程
sbin/start-master.sh 在本地啓動一個master進程
sbin/stop-master.sh 關閉master進程
sbin/start-slaves.sh 根據conf/slaves文件中配置的worker節點,啓動全部的worker進程
sbin/stop-slaves.sh 關閉全部worker進程
sbin/start-slave.sh 在本地啓動一個worker進程

配置文件

worker節點配置

配置做爲worker節點的機器,如hostname/ip地址,一個機器是一行
配置後,全部的節點上,都拷貝這份文件
默認狀況下,沒有conf/slaves文件,只有一個空conf/slaves.template, 此時,就只是在當前主節點上啓動一個master進程和一個worker進程,此時就是master進程和worker進程在一個節點上,也就是僞分佈式部署
conf/slaves文件樣本

spark1  
spark2  
spark3  
複製代碼

conf/spark-evnsh參數

是對整個spark的集羣部署,配置各個master和worker

和啓動腳本--參數的效果同樣./start-slave.sh spark://192.168.0.001:8080 --memory 500m,臨時修改參數時這種腳本命令更適合
命令行參數優先級更高,會覆蓋spark-evnsh參數

參數 含義
SPARK_MASTER_IP 指定master進程所在的機器的ip地址
SPARK_MASTER_PORT 指定master監聽的端口號(默認是7077)
SPARK_MASTER_WEBUI_PORT 指定master web ui的端口號(默認是8080)
SPARK_MASTER_OPTS 設置master的額外參數,使用"-Dx=y"設置各個參數
SPARK_LOCAL_DIRS spark的工做目錄,包括了shuffle map輸出文件,以及持久化到磁盤的RDD等
SPARK_WORKER_PORT worker節點的端口號,默認是隨機的
SPARK_WORKER_WEBUI_PORT worker節點的web ui端口號,默認是8081
SPARK_WORKER_CORES worker節點上,容許spark做業使用的最大cpu數量,默認是機器上全部的cpu core
SPARK_WORKER_MEMORY worker節點上,容許spark做業使用的最大內存量,格式爲1000m,2g等,默認最小是1g內存
SPARK_WORKER_INSTANCES 當前機器上的worker進程數量,默認是1,能夠設置成多個,可是這時必定要設置SPARK_WORKER_CORES,限制每一個worker的cpu數量
SPARK_WORKER_DIR spark做業的工做目錄,包括了做業的日誌等,默認是spark_home/work
SPARK_WORKER_OPTS worker的額外參數,使用"-Dx=y"設置各個參數
SPARK_DAEMON_MEMORY 分配給master和worker進程本身自己的內存,默認是1g
SPARK_DAEMON_JAVA_OPTS 設置master和worker本身的jvm參數,使用"-Dx=y"設置各個參數
SPARK_PUBLISC_DNS master和worker的公共dns域名,默認是沒有的
  • SPARK_MASTER_OPTS
    設置master的額外參數,使用-Dx=y設置各個參數
    eg:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"

    參數名 默認值 含義
    spark.deploy.retainedApplications 200 在spark web ui上最多顯示多少個application的信息
    spark.deploy.retainedDrivers 200 在spark web ui上最多顯示多少個driver的信息
    spark.deploy.spreadOut true 資源調度策略,spreadOut會盡可能將application的executor進程分佈在更多worker上,適合基於hdfs文件計算的狀況,提高數據本地化機率;非spreadOut會盡可能將executor分配到一個worker上,適合計算密集型的做業
    spark.deploy.defaultCores 無限大 每一個spark做業最多在standalone集羣中使用多少個cpu core,默認是無限大,有多少用多少
    spark.deploy.timeout 60 單位秒,一個worker多少時間沒有響應以後,master認爲worker掛掉了
  • SPARK_WORKER_OPTS
    worker的額外參數

    參數名 默認值 含義
    spark.worker.cleanup.enabled false 是否啓動自動清理worker工做目錄,默認是false
    spark.worker.cleanup.interval 1800 單位秒,自動清理的時間間隔,默認是30分鐘
    spark.worker.cleanup.appDataTtl 7 * 24 * 3600 默認將一個spark做業的文件在worker工做目錄保留多少時間,默認是7天

Spark Application運行

local 模式

主要用於本機測試

/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 2 \
/usr/local/test/xxx.jar \
複製代碼

standalone模式

參數設置

standalone模式與local區別,就是要將master設置成spark://master_ip:port,如spark://192.168.0.103:7077

  1. 代碼:val spark = SparkSession.builder().master("spark://IP:PORT")...
  2. spark-submit: --master spark://IP:PORT --deploy-mode client/cluster
    默認client模式
  3. spark-shell: --master spark://IP:PORT:用於實驗和測試
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.study.core.xxx \
    --master spark://192.168.0.103:7077 \
    --deploy-mode client \
    --num-executors 1 \
    --driver-memory 100m \
    --executor-memory 100m \
    --executor-cores 1 \
    /usr/local/test/xxx.jar \
    複製代碼

--master:

  1. 不設置:local模式
  2. spark://xxx:standalone模式,會提交到指定的URL的Master進程上去
  3. yarn-xxx:yarn模式,會讀取hadoop配置文件,而後鏈接ResourceManager

standalone client模式做業進程

提交運行做業後,當即使用jps查看進程,能夠看到啓動了以下進程

  1. SparkSubmit: driver進程,在本機上啓動(spark-submit所在的機器)
  2. CoarseGrainedExecutorBackend(內部持有一個Executor對象,CoarseGrainedExecutorBackend即executor進程): 在執行spark做業的worker機器上,給做業分配和啓動一個executor進程
    SparkSubmit給CoarseGrainedExecutorBackend分配task

standalone cluster模式

standalone cluster模式支持監控driver進程,而且在driver掛掉的時候,自動重啓該進程,主要是用於spark streaming中的HA高可用性,spark-submit腳本中,使用--supervise標識便可

要殺掉反覆掛掉的driver進程bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>,經過http://<maser url>:8080可查看到driver id

yarn下殺掉applicationyarn application -kill applicationid

進程:

  1. SparkSubmit短暫執行,只是將driver註冊到master上,由master來啓動driver,立刻就中止;
  2. 在Worker上,會啓動DriverWrapper進程
  3. 若是可以申請到足夠的cpu資源,會在其餘worker上,啓動CoarseGrainedExecutorBackend進程
...
--deploy-mode cluster \
--num-executors 1 \
--executor-cores 1 \
...
複製代碼

cluster模式下

  1. worker啓動driver,佔用一個cpu core
  2. driver去跟master申請資源,在有空閒cpu資源的worker上啓動一個executor進程

cpu core太少,可能致使executor沒法啓動,一直waiting,好比只有一個worker,一個cpu core時

在 cluster 模式下,driver 是在集羣中的某個 Worker中的進程中啓動,而且 client進程將會在完成提交應用程序的任務以後退出,而不須要等待應用程序完成再退出

standalone多做業資源調度

默認提交的每個spark做業都會嘗試使用集羣中全部可用的cpu資源,此時只能支持做業串行起來運行,因此standalone集羣對於同時提交上來的多個做業,僅僅支持FIFO調度策略

  • 設置多做業同時運行
    能夠設置spark.cores.max參數,限制每一個做業可以使用的最大的cpu core數量,讓做業不會使用全部的cpu資源,後面提交上來的做業就能夠獲取到資源運行,默認狀況下,它將獲取集羣中的 all cores (核),這隻有在某一時刻只容許一個應用程序運行時纔有意義
  1. spark.conf.set("spark.cores.max", "num")
  2. 提交腳本命令spark-submit: --master spark://IP:PORT --conf spark.cores.max=num
  3. spark-env.sh全局配置:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=num" 默認數量

standalone web ui

spark standalone模式默認在master機器上的8080端口提供web ui,能夠經過配置spark-env.sh文件等方式,來配置web ui的端口,地址如spark://192.168.0.103:8080

spark yarn模式下應該在YARN web ui上查看,如http://192.168.0.103:8088/

  • application web ui

application detail ui在做業的driver所在的機器的4040端口

  • 做業層面
    能夠用於具體定位問題,如
    1. task數據分佈不均勻:數據傾斜
    2. stage運行時間長:根據stage劃分算法,定位stage對應的代碼,去優化性能
    3. 每一個做業在每一個executor上的日誌
      stdout:System.out.println;
      stderr:System.err.println和系統級別log

      做業運行完,信息消失,須要啓動history server

yarn模式

前提:spark-env.sh文件中,配置HADOOP_CONF_DIR或者YARN_CONF_DIR屬性,值爲hadoop的配置文件目錄HADOOP_HOME/etc/hadoop,其中包含了hadoop和yarn全部的配置文件,好比hdfs-site、yarn-site等
用途:spark讀寫hdfs,鏈接到yarn resourcemanager上

兩種運行模式

  • yarn-client模式
    driver進程會運行在提交做業的機器上,ApplicationMaster僅僅只是負責爲做業向yarn申請資源(executor)而已,driver仍是會負責做業調度
  • yarn-cluster模式
    driver進程會運行在yarn集羣的某個工做節點上,做爲一個ApplicationMaster進程運行

查看yarn日誌

日誌散落在集羣中各個機器上,參數配置yarn-site.xml

  1. 聚合日誌方式(推薦)
    屬性設置 含義
    yarn.log-aggregation-enable=true container的日誌會拷貝到hdfs上去,並從機器中刪除
    yarn.nodemanager.remote-app-log-dir 當應用程序運行結束後,日誌被轉移到的HDFS目錄(啓用日誌彙集功能時有效)
    yarn.nodemanager.remote-app-log-dir-suffix 遠程日誌目錄子目錄名稱(啓用日誌彙集功能時有效)
    yarn.log-aggregation.retain-seconds 聚合後的日誌在HDFS上保存多長時間,單位爲s
    yarn logs -applicationId <app ID> 查看日誌,yarn web ui上能夠查看到applicationId(也能夠直接在hdfs上查看日誌文件)
    yarn.nodemanager.log.retain-second 不啓用日誌聚合此參數生效,日誌文件保存在本地的時間,單位爲s
    yarn.log-aggregation.retain-check-interval-seconds 隔多久刪除過時的日誌
  2. web ui查看
    須要啓動History Server,運行spark history server和mapreduce history server
    不作配置就只能查看到正在運行的日誌
    配置見Spark History Web UI一節
  3. 分散查看
    默認日誌在YARN_APP_LOGS_DIR目錄下,如/tmp/logs或者$HADOOP_HOME/logs/userlogs
    若是yarn集羣中沒有開啓History Server,想要查看system.out日誌,須要在yarn-site.xml文件中設置yarn.log.aggregation-enable值爲ture(將日誌拷貝到hdfs上),查看時經過yarn logs -applicationId xxx在機器上查看

提交腳本

/usr/local/spark/bin/spark-submit \
--class xxx \
# 自動從hadoop配置目錄中的配置文件中讀取cluster manager地址
--master yarn-cluster/yarn-client \ 
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
--conf <key>=<value> \
# 指定不一樣的hadoop隊列,項目或部門之間隊列隔離
--queue hadoop隊列 \
/usr/local/test/xxx.jar \
${1}
複製代碼

--conf: 配置全部spark支持的配置屬性,使用key=value的格式;若是value中包含了空格,那麼須要將key=value包裹的雙引號中--conf "<key>=<value>"
application-jar: 打包好的spark工程jar包,在當前機器上的全路徑名
application-arguments: 傳遞給主類的main方法的參數; 在shell中用${1}佔位符接收傳遞給shell的參數;在java中能夠經過main方法的args[0]等參數獲取,提交spark應用程序時,用 ./腳本.sh 參數值

yarn模式運行spark做業屬性

能夠在提交腳本上--conf設置屬性

屬性名稱 默認值 含義
spark.yarn.am.memory 512m client模式下,YARN Application Master使用的內存總量
spark.yarn.am.cores 1 client模式下,Application Master使用的cpu數量
spark.driver.cores 1 cluster模式下,driver使用的cpu core數量,driver與Application Master運行在一個進程中,因此也控制了Application Master的cpu數量
spark.yarn.am.waitTime 100s cluster模式下,Application Master要等待SparkContext初始化的時長; client模式下,application master等待driver來鏈接它的時長
spark.yarn.submit.file.replication hdfs副本數 做業寫到hdfs上的文件的副本數量,好比工程jar,依賴jar,配置文件等,最小必定是1
spark.yarn.preserve.staging.files false 若是設置爲true,那麼在做業運行完以後,會避免工程jar等文件被刪除掉
spark.yarn.scheduler.heartbeat.interval-ms 3000 application master向resourcemanager發送心跳的間隔,單位ms
spark.yarn.scheduler.initial-allocation.interval 200ms application master在有pending住的container分配需求時,當即向resourcemanager發送心跳的間隔
spark.yarn.max.executor.failures executor數量*2,最小3 整個做業斷定爲失敗以前,executor最大的失敗次數
spark.yarn.historyServer.address spark history server的地址
spark.yarn.dist.archives 每一個executor都要獲取並放入工做目錄的archive
spark.yarn.dist.files 每一個executor都要放入的工做目錄的文件
spark.executor.instances 2 默認的executor數量
spark.yarn.executor.memoryOverhead executor內存10% 每一個executor的堆外內存大小,用來存放諸如常量字符串等東西
spark.yarn.driver.memoryOverhead driver內存7% 同上
spark.yarn.am.memoryOverhead AM內存7% 同上
spark.yarn.am.port 隨機 application master端口
spark.yarn.jar spark jar文件的位置
spark.yarn.access.namenodes spark做業能訪問的hdfs namenode地址
spark.yarn.containerLauncherMaxThreads 25 application master能用來啓動executor container的最大線程數量
spark.yarn.am.extraJavaOptions application master的jvm參數
spark.yarn.am.extraLibraryPath application master的額外庫路徑
spark.yarn.maxAppAttempts 提交spark做業最大的嘗試次數
spark.yarn.submit.waitAppCompletion true cluster模式下,client是否等到做業運行完再退出

關於master的高可用方案

standalone模式下調度器依託於master進程來作出調度決策,這可能會形成單點故障:若是master掛掉了,就無法提交新的應用程序了。
爲了解決這個問題,spark提供了兩種高可用性方案,分別是基於zookeeper的HA方案(推薦)以及基於文件系統的HA方案。

基於zookeeper的HA方案

概述

使用zookeeper來提供leader選舉以及一些狀態存儲,能夠在集羣中啓動多個master進程,讓它們鏈接到zookeeper實例。其中一個master進程會被選舉爲leader,其餘的master會被指定爲standby模式。
若是當前的leader master進程掛掉了,其餘的standby master會被選舉,從而恢復舊master的狀態。

配置

在啓動一個zookeeper集羣以後,在多個節點上啓動多個master進程,而且給它們相同的zookeeper 配置(zookeeper url和目錄)。master就能夠被動態加入master集羣,並能夠在任什麼時候間被移除掉

spark-env.sh文件中,設置SPARK_DAEMON_JAVA_OPTS選項:

  1. spark.deploy.recoveryMode:設置爲ZOOKEEPER來啓用standby master恢復模式(默認爲NONE)
  2. spark.deploy.zookeeper.url:zookeeper集羣url
  3. spark.deploy.zookeeper.dir:zookeeper中用來存儲恢復狀態的目錄(默認是/spark
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.0.103:2181,192.168.0.104:2181 -Dspark.deploy.zookeeper.dir=/spark"
複製代碼

若是在集羣中啓動了多個master節點,可是沒有正確配置master去使用zookeeper,master在掛掉進行恢復時是會失敗的,由於無法發現其餘master,而且都會認爲本身是leader。這會致使集羣的狀態不是健康的,由於全部master都會自顧自地去調度。

細節

爲了調度新的應用程序或者向集羣中添加worker節點,它們須要知道當前leader master的ip地址,這能夠經過傳遞一個master列表來完成。能夠將SparkSession 的master鏈接的地址指向spark://host1:port1,host2:port2。這就會致使SparkSession嘗試去註冊全部的master,若是host1掛掉了,那麼配置仍是正確的,由於會找到新的leader master

當一個應用程序啓動的時候,或者worker須要被找到而且註冊到當前的leader master的時候。一旦它成功註冊了,就被保存在zookeeper中了。若是故障發生了,new leader master會去聯繫全部的以前註冊過的應用程序和worker,而且通知它們master的改變。應用程序甚至在啓動的時候都不須要知道new master的存在。

故而,new master能夠在任什麼時候間被建立,只要新的應用程序和worker能夠找到而且註冊到master便可

在其餘節點啓動備用master:./start-master.sh

基於文件系統的HA方案

概述

FILESYSTEM模式:當應用程序和worker都註冊到master以後,master就會將它們的信息寫入指定的文件系統目錄中,以便於重啓時恢復註冊的應用程序和worker狀態;
須要手動重啓

配置

spark-env.sh中設置SPARK_DAEMON_JAVA_OPTS

  1. spark.deploy.recoveryMode:設置爲FILESYSTEM來啓用單點恢復(默認值爲NONE)
  2. spark.deploy.recoveryDirectory:spark存儲狀態信息的文件系統目錄,必須是master能夠訪問的目錄

eg:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark_recovery"
複製代碼

細節

  1. 該模式更加適合於開發和測試環境
  2. stop-master.sh腳本殺掉一個master進程是不會清理它的恢復狀態,當重啓一個新的master進程時,它會進入恢復模式。須要等待以前全部已經註冊的worker等節點先timeout才能恢復。
  3. 可使用一個NFS目錄(相似HDFS)做爲恢復目錄。若是原先的master節點掛掉,能夠在其餘節點上啓動一個master進程,它會正確地恢復以前全部註冊的worker和應用程序。以後的應用程序能夠找到新的master,而後註冊。

saprk做業監控

做業監控方式:Spark Web UI,Spark History Web UI,RESTFUL API以及Metrics

Spark Web UI

每提交Spark做業並啓動SparkSession後,會啓動一個對應的Spark Web UI服務。默認狀況下Spark Web UI的訪問地址是driver進程所在節點的4040端口,如http://<driver-node>:4040

Spark Web UI包括瞭如下信息:

  1. stage和task列表
  2. RDD大小以及內存使用的概覽
  3. 環境信息
  4. 做業對應的executor的信息

若是多個driver在一個機器上運行,它們會自動綁定到不一樣的端口上。默認從4040端口開始,若是發現已經被綁定,那麼會選擇404一、4042等端口,以此類推。

這些信息默認狀況下在做業運行期間有效,一旦做業完畢,driver進程以及對應的web ui服務也會中止。若是要在做業完成以後,也能夠看到其Spark Web UI以及詳細信息,須要啓用Spark的History Server。

Spark History Web UI

  1. 建立日誌儲存目錄
    建立的目錄hdfs://ip:port/dirName
    命令hdfs dfs -mkidr /dirName
  2. 修改spark-defaults.conf
    spark.eventLog.enabled  true    #啓用
    spark.eventLog.dir      hdfs://ip:port/dirName
    spark.eventLog.compress true    #壓縮
    複製代碼
  3. 修改spark-env.sh
    export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://ip:port/dirName"
    複製代碼

    spark.eventLog.dir指定做業事件記錄地址
    spark.history.fs.logDirectory指定從哪一個目錄中去讀取做業數據
    兩個目錄地址要相同

  4. 啓動HistoryServer
    ./sbin/start-history-server.sh 在啓動界面能夠看到history-server的訪問地址,經過訪問地址打開History Web UI

RESTFUL API

提供了RESTFUL API來返回關於日誌的json數據

API 含義
/applications 獲取做業列表
/applications/[app-id]/jobs 指定做業的job列表
/applications/[app-id]/jobs/[job-id] 指定job的信息
/applications/[app-id]/stages 指定做業的stage列表
/applications/[app-id]/stages/[stage-id] 指定stage的全部attempt列表
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 指定stage attempt的信息
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 指定stage attempt全部task的metrics統計信息
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 指定stage attempt的task列表
/applications/[app-id]/executors 指定做業的executor列表
/applications/[app-id]/storage/rdd 指定做業的持久化rdd列表
/applications/[app-id]/storage/rdd/[rdd-id] 指定持久化rdd的信息
/applications/[app-id]/logs 下載指定做業的全部日誌的壓縮包
/applications/[app-id]/[attempt-id]/logs 下載指定做業的某次attempt的全部日誌的壓縮包

eg:http://192.168.0.103:18080/api/v1/applications

做業資源調度

靜態資源分配

  1. application並行:每一個spark application都會運行本身獨立的一批executor進程,用於運行task和存儲數據,此時集羣管理器會提供同時調度多個application的功能
  2. job並行:在每一個spark application內部,多個job也能夠並行執行

同時提交多個spark application

默認的做業間資源分配策略爲靜態資源分配,在這種方式下,每一個做業都會被給予一個它能使用的 最大資源量的限額,而且能夠在運行期間持有這些資源。這是spark standalone集羣和YARN集羣使用的默認方式。

  • Standalone集羣
    默認狀況下,提交到standalone集羣上的多個做業,會經過FIFO的方式來運行,每一個做業都會嘗試獲取全部的資源。
    spark.cores.max:限制每一個做業可以使用的cpu core最大數量
    spark.deploy.defaultCores:設置每一個做業默認cpu core使用量
    spark.executor.memory:設置每一個做業最大內存。

  • YARN
    --num-executors:配置做業能夠在集羣中分配到多少個executor
    --executor-memory--executor-cores能夠控制每一個executor可以使用的資源。

沒有一種cluster manager能夠提供多個做業間的內存共享功能,須要共享內存,能夠單獨使用一個服務(例如:alluxio),這樣就能實現多應用訪問同一個RDD的數據。

動態資源分配

當資源被分配給了一個做業,但資源有空閒,能夠將資源還給cluster manager的資源池,被其餘做業使用。在spark中,動態資源分配在executor粒度上被實現,啓用時設置spark.dynamicAllocation.enabled爲true,在每一個節點上啓動external shuffle service,並將spark.shuffle.service.enabled設爲true。external shuffle service 的目的是在移除executor的時候,可以保留executor輸出的shuffle文件。

申請策略

spark application會在它有pending(等待執行)的task等待被調度時,申請額外的executor

task已提交但等待調度->executor數量不足

  1. driver輪詢式地申請executor
    當在必定時間內spark.dynamicAllocation.schedulerBacklogTimeout有pending的task時,就會觸發真正的executor申請
  2. 每隔必定時間後spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,若是又有pending的task了,則再次觸發申請操做。
  3. 每一輪申請到的executor數量採用指數級增長(好比1,2,4,8,..):採用指數級增加策略的緣由有兩個:
    第一,對於任何一個Spark應用若是隻須要多申請少數幾個執行器的話,那麼必須很是謹慎的啓動資源申請,這和TCP慢啓動有些相似;
    第二,若是一旦Spark應用確實須要申請多個執行器的話,那麼能夠確保其所需的計算資源及時增加。

移除策略

一個spark做業會在它的executor出現了空閒超過必定時間後(spark.dynamicAllocation.executorIdleTimeout),被移除掉。

這意味着沒有task被pending住,executor有空閒,和申請條件互斥。

保存中間狀態

spark使用一個外部的shuffle服務來保存每一個executor的中間寫狀態,這個服務是一個長時間運行的進程,集羣的每一個節點上都會運行一個,若是服務被啓用,那麼spark executor會在shuffle write和read時,將數據寫入該服務,並從該服務獲取數據。這意味着全部executor寫的shuffle數據均可以在executor聲明週期以外繼續使用。

多了箇中間數據存儲角色,也改變了executor的讀寫方式

除了寫shuffle文件,executor也會在內存或磁盤中持久化數據。當一個executor被移除掉時,全部緩存的數據都會消失。

shuffle服務寫入的數據和executor持久化數據不是一個概念?executor移除後/掛掉後,其持久化的數據將消失,而shuffle服務保存的數據還將存在

standalone模式下動態資源分配

  1. 在worker啓動前設置spark.shuffle.service.enabled爲true
  2. application
    --conf spark.dynamicAllocation.enabled=true \
    複製代碼

Mesos模式下動態資源分配

  1. 在各個節點上運行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh,並設置 spark.shuffle.service.enabled爲true
  2. application
    --conf spark.dynamicAllocation.enabled=true \
    複製代碼

yarn模式下動態資源分配

須要配置yarn的shuffle service(external shuffle service),用於保存executor的shuffle write文件,從而讓executor能夠被安全地移除.

  1. 添加jar包
    $SPARK_HOME/lib下的spark-<version>-yarn-shuffle.jar加入到全部NodeManager的classpath中,即hadoop/yarn/lib目錄中
  2. 修改yarn-site.xml
    <propert>
        <name>yarn.nodemanager.aux-services</name>
        <value>spark_shuffle</value>
        <!-- <value>mapreduce_shuffle</value> -->
    </property>
    <propert>
        <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
        <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    複製代碼
  3. 啓動spark application
    --conf spark.shuffle.service.enabled=true \
    --conf spark.shuffle.service.port=7337 \
    --conf spark.dynamicAllocation.enabled=true \
    複製代碼

參見Configuring the External Shuffle Service

多個job調度

job是一個spark action操做觸發的計算單元,在一個spark做業內部,多個並行的job是能夠同時運行的 。

FIFO調度

默認狀況下,spark的調度會使用FIFO的方式來調度多個job。每一個job都會被劃分爲多個stage,並且第一個job會對全部可用的資源獲取優先使用權,而且讓它的stage的task去運行,而後第二個job再獲取資源的使用權,以此類推

Fair調度

在公平的資源共享策略下,spark會將多個job的task使用一種輪詢的方式來分配資源和執行,因此全部的job都有一個基本公平的機會去使用集羣的資源

conf.set("spark.scheduler.mode", "FAIR")
複製代碼
--conf spark.scheduler.mode=FAIR
複製代碼

公平調度資源池

fair scheduler也支持將job分紅多個組並放入多個池中,以及爲每一個池設置不一樣的調度優先級。這個feature對於將重要的和不重要的job隔離運行的狀況很是有用,能夠爲重要的job分配一個池,並給予更高的優先級; 爲不重要的job分配另外一個池,並給予較低的優先級。

在代碼中設置sparkContext.setLocalProperty("spark.scheduler.pool", "poolName"),全部在這個線程中提交的job都會進入這個池中,設置是以線程爲單位保存的,很容易實現用同一線程來提交同一用戶的全部做業到同一個資源池中。設置爲null則清空池子。

默認狀況下,每一個池子都會對集羣資源有相同的優先使用權,可是在每一個池內,job會使用FIFO的模式來執行。

能夠經過配置文件來修改池的屬性

  1. schedulingMode: FIFO/FAIR,來控制池中的jobs是否要排隊,或者是共享池中的資源
  2. weight: 控制資源池相對其餘資源池,能夠分配到資源的比例。默認狀況下,全部池子的權重都是1.若是將某個資源池的 weight 設爲 2,那麼該資源池中的資源將是其餘池子的2倍,若是將 weight 設得很高,如 1000,能夠實現資源池之間的調度優先級 – weight=1000 的資源池總能當即啓動其對應的做業。
  3. minShare: 每一個資源池最小資源分配值(CPU 個數),公平調度器老是會嘗試優先知足全部活躍(active)資源池的最小資源分配值,而後再根據各個池子的 weight 來分配剩下的資源。所以,minShare 屬性可以確保每一個資源池都能至少得到必定量的集羣資源。minShare 的默認值是 0。

配置文件默認地址spark/conf/fairscheduler.xml,自定義文件conf.set("spark.scheduler.allocation.file", "/path/to/file")

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>
複製代碼

沒有在配置文件中配置的資源池都會使用默認配置(schedulingMode : FIFO,weight : 1,minShare : 0)。

Spark經常使用算子

union

union算子

  1. 新的rdd,會將舊的兩個rdd的partition,複製過去
  2. 新的rdd的partition的數量,就是舊的兩個rdd的partition的數量之和

groupByKey

groupByKey
在執行shuffle類的算子時,算子內部都會隱式地建立幾個RDD,主要是做爲這個操做的一些中間數據的表達,以及做爲stage劃分的邊界。

reduceByKey

reduceByKey
reduceByKey VS groupByKey

  • 不一樣之處
    reduceByKey,中間多了一個MapPartitionsRDD,是本地數據聚合後的rdd,能夠減小網絡數據傳輸。

  • 相同之處
    read和聚合的過程基本和groupByKey相似。都是ShuffledRDD作shuffle read再聚合,獲得最終的rdd

distinct

distinct

  1. 將每一個原始值轉換成tuple
  2. 會進行本地聚合(相似reduceByKey)
  3. 最後會將tuple轉換回單值

cogroup

cogroup算子是其餘算子的基礎,如join,intersection操做

cogroup

先按RDD分區聚合結果,(hello,[(1,1),(1,1)]):第1個(1,1)是第一個RDD 的helo聚合結果,第二個(1,1)是第2個RDD聚合結果 若第一個RDD的第一個partition沒有hello,則(1),不是(,1)

intersection

intersection

filter:過濾掉兩個集合中任意一個集合爲空的key

join

join

  1. cogroup,聚合兩個rdd的key
  2. flatMap,聚合後的每條數據,均可能返回多條數據 將每一個key對應的兩個集合的全部元素,作了一個笛卡爾積

sortByKey

sortByKey

  1. ShuffledRDD,作shuffle read,將相同的key拉到一個http://ozijnir4t.bkt.clouddn.com/spark/learning/sortByKey.pngpartition中來
  2. mapPartitions,對每一個partitions內的key進行全局的排序

cartesian

笛卡爾乘積

cartesian

coalesce

通常用於減小partition數量

coalesce

repartition

repartition算子=coalesce(true)

repartition

repartition操做在中間生成的隱式RDD中會給值計算出前綴做爲key,在最後作Shuffle操做時一個partition就放特定的一些key值對應的tuple,完成重分區操做

知識點

Spark集羣中的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係

並行度數量關係

  1. 每一個file包含多個block
  2. Spark讀取輸入文件時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件
  3. 一個InputSplit生成一個task
  4. 每一個Executor由若干core組成,每一個Executor的每一個core一次只能執行一個Task
  5. 每一個Task執行後生成了目標RDD的一個partiton

若是partition的數量多,能起實例的資源也多,那天然併發度就多
若是partition數量少,資源不少,則task數量不足,它也不會有不少併發
若是partition的數量不少,可是資源少(如core),那麼併發也不大,會算完一批再繼續起下一批

Task被執行的併發度 = Executor數目 * 每一個Executor核數
複製代碼

這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程?
每一個executor的core數目經過spark.executor.cores參數設置。這裏的cores實際上是指的工做線程。cpu info裏看到的核數是物理核(或者通常機器開了超線程之後是的物理核數*2),和spark裏的core不是一個概念,可是通常來講spark做業配置的executor核數不該該超過機器的物理核數。

partition的數目

  1. 數據讀入階段,如sc.textFile,輸入文件被劃分爲多少InputSplit就會須要多少初始Task
  2. Map階段partition數目保持不變
  3. Reduce階段,RDD的聚合會觸發shuffle操做,聚合後的RDD的partition數目跟具體操做有關,例如repartition操做會聚合成指定分區數,還有一些算子是可配置的

參考文獻

  1. 詳細探究Spark的shuffle實現
  2. Spark性能優化:資源調優篇
  3. 在Spark集羣中,集羣的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係
  4. Spark筆記-repartition和coalesce
  5. Spark 2.0系列之SparkSession詳解
  6. YARN日誌聚合相關參數配置
  7. 深刻理解Spark 2.1 Core (一):RDD的原理與源碼分析
  8. Spark 2.0從入門到精通
  9. spark 2.2.0文檔
相關文章
相關標籤/搜索