本文源碼基於spark 2.2.0html
用戶編寫的Spark程序,經過一個有main方法的類執行,完成一個計算任務的處理。它是由一個Driver程序和一組運行於Spark集羣上的Executor組成java
彈性分佈式數據集。RDD是Spark的核心數據結構,能夠經過一系列算子進行操做。當RDD遇到Action算子時,將以前的全部算子造成一個有向無環圖(DAG)。再在Spark中轉化爲Job,提交到集羣執行node
spark2.x後就使用DataFrame/DateSet了web
SparkContext是Spark的入口,負責鏈接Spark集羣,建立RDD,累積量和廣播量等。從本質上來講,SparkContext是Spark的對外接口,負責向調用者提供Spark的各類功能。 算法
driver program經過SparkContext鏈接到集羣管理器來實現對集羣中任務的控制。Spark配置參數的設置以及對SQLContext、HiveContext和StreamingContext的控制也要經過SparkContext進行Only one SparkContext may be active per JVM. You must
stop()
the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
每一個JVM只有一個SparkContext,一臺服務器能夠啓動多個JVMshell
The entry point to programming Spark with the Dataset and DataFrame API.
包含了SQLContext和HiveContextapache
運行main方法的Java虛擬機進程,負責監聽spark application的executor進程發來的通訊和鏈接,將工程jar發送到全部的executor進程中
Driver與Master、Worker協做完成application進程的啓動、DAG劃分、計算任務封裝、分配task到executor上、計算資源的分配等調度執行做業等
driver調度task給executor執行,因此driver最好和spark集羣在一片網絡內,便以通訊
driver進程一般在worker節點中,和Cluster Manager不在同一個節點上json
Cluster Manager做用對象是整個saprk集羣(集羣資源分配),全部應用,而Driver是做用於某一個應用(協調已經分配給application的資源),管理層面不同api
集羣中的工做節點,啓動並運行executor進程,運行做業代碼的節點
standalone模式下:Worker進程所在節點
yarn模式下: yarn的nodemanager進程所在的節點數組
運行在worker節點上,負責執行做業的任務,並將數據保存在內存或磁盤中
每一個spark application,都有屬於本身的executor進程,spark application不會共享一個executor進程
在啓動參數中有
executor-cores
,executor-memory
,每一個executor都會佔用cpu core和內存,又spark application間不會複用executor,則很容易致使worker資源不足
executor在整個spark application運行的生命週期內,executor能夠動態增長/釋放,見動態資源分配一節
executor使用多線程運行SparkContext分配過來的task,來一批task就執行一批
一個spark application可能會被分爲多個job,每次調用Action時,邏輯上會生成一個Job,一個Job包含了一個或多個Stage。
每一個job都會劃分爲一個或多個stage(階段),每一個stage都會有對應的一批task(即一個taskset),分配到executor上去執行
Stage包括兩類:ShuffleMapStage和ResultStage,若是用戶程序中調用了須要進行Shuffle計算的Operator,如groupByKey等,就會以Shuffle爲邊界分紅ShuffleMapStage和ResultStage。
若是一次shuffle都沒執行,那就只有一個stage
一組關聯的,但相互之間沒有Shuffle依賴關係的Task集合;Stage能夠直接映射爲TaskSet,一個TaskSet封裝了一次須要運算的、具備相同處理邏輯的Task,這些Task能夠並行計算,粗粒度的調度是以TaskSet爲單位的。
一個stage對應一個taskset
driver發送到executor上執行的計算單元,每一個task負責在一個階段(stage),處理一小片數據,計算出對應的結果
Task是在物理節點上運行的基本單位,Task包含兩類:ShuffleMapTask和ResultTask,分別對應於Stage中ShuffleMapStage和ResultStage中的一個執行基本單元。
InputSplit-task-partition有一一對應關係,Spark會爲每個partition運行一個task來進行處理(見本文知識點-Spark集羣中的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係一節)
手動設置task數量spark.default.parallelism
集羣管理器,爲每一個spark application在集羣中調度和分配資源的組件,如Spark Standalone、YARN、Mesos等
不管是standalone/yarn,都分爲兩種模式,client和cluster,區別在於driver運行的位置
client模式下driver運行在提交spark做業的機器上,能夠實時看到詳細的日誌信息,方便追蹤和排查錯誤,用於測試
cluster模式下,spark application提交到cluster manager,cluster manager(好比master)負責在集羣中某個節點上,啓動driver進程,用於生產環境
一般狀況下driver和worker在同一個網絡中是最好的,而client極可能就是driver worker分開佈置,這樣網絡通訊很耗時,cluster沒有這樣的問題
master作集羣管理
Master進程和Worker進程組成的集羣, 能夠不須要yarn集羣,不須要HDFS
standalone模式下,集羣管理器(Cluster Manager)的一種,爲每一個spark application在集羣中調度和分配資源的組件
注意和driver的區別,即Cluster Manager和driver的區別
yarn作集羣管理
ResourceManager進程和NodeManager進程組成的集羣
根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler。
將Taskset提交給Worker node集羣運行並返回結果。
Driver向Master申請資源;
Master讓Worker給程序分配具體的Executor
Driver把劃分好的Task傳送給Executor,Task就是咱們的Spark程序的業務邏輯代碼
job生成,stage劃分和task分配都是發生在driver端?是
Spark和MapReduce最大不一樣:迭代式計算
全稱爲Resillient Distributed Dataset,即彈性分佈式數據集。
提供了容錯性,能夠自動歷來源數據從新計算,從節點失敗中恢復過來
默認是在內存中,內存不足則寫入磁盤
一個RDD是分佈式的,數據分佈在一批節點上,每一個節點存儲了RDD部分partition
RDD內存不足會自動寫入磁盤,調用cache()和persist()會將RDD數據按storelevel存儲
SparkContext.wholeTextFiles()
能夠針對一個目錄中的大量小文件,返回<filename,fileContent>
組成的個PairRDDSparkContext.sequenceFile[K,V]()
能夠針對SequenceFile建立RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,好比IntWritable、Text等。SparkContext.hadoopRDD()
能夠針對Hadoop的自定義輸入類型建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。SparkContext.objectFile()
方法,能夠針對以前調用RDD.saveAsObjectFile()
建立的對象序列化的文件,反序列化文件中的數據,並建立一個RDD。並行化建立RDD
調用parallelize()
方法,能夠指定要將集合切分紅多少個partition(實際上應該是指定了InputSplit數量,InputSplit-task-partition),Spark會爲每個partition運行一個task來進行處理(見本文知識點-Spark集羣中的節點個數、RDD分區個數、cpu內核個數三者與並行度的關係一節)
Spark官方建議爲集羣中的每一個CPU建立2~4個partition,避免CPU空載
若是集羣中運行了多個任務,包括spark hadoop任務,是否也是以一個cpu core負載2-4個計算任務來配置?
針對已有的RDD建立一個新的RDD
transformation具備lazy特性,只是記錄了對RDD所作的操做,可是不會自發地執行。只有Action操做後,全部的transformation纔會執行,能夠避免產生過多中間結果
操做 | 介紹 |
---|---|
map | 將RDD中的每一個元素傳入自定義函數,獲取一個新的元素,而後用新的元素組成新的RDD |
filter | 對RDD中每一個元素進行判斷,若是返回true則保留,返回false則剔除。 |
flatMap | 與map相似,是先映射後扁平化 |
gropuByKey | 根據key進行分組,每一個key對應一個Iterable |
reduceByKey | 對每一個key對應的value進行reduce操做。 |
sortByKey | 對每一個key對應的value進行排序操做。 |
join | 對兩個包含<key,value>對的RDD進行join操做,每一個key join上的pair,都會傳入自定義函數進行處理。 |
cogroup | 同join,可是每一個key對應的Iterable都會傳入自定義函數進行處理。 |
map與flatMap的區別
map對rdd之中的元素逐一進行函數操做映射爲另一個rdd。
flatMap對集合中每一個元素進行操做而後再扁平化。一般用來切分單詞
實驗:flatMap是否會將多層嵌套的元素再拍扁
實驗結論:只往下一層作flatten操做,不會遞歸進去作flatten操做
val arr = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
arr.flatMap(x => (x._1 + x._2)).foreach(print) //A1B2C3
val arr2 = sc.parallelize(Array(
Array(
("A", 1), ("B", 2), ("C", 3)),
Array(
("C", 1), ("D", 2), ("E", 3)),
Array(
("F", 1), ("G", 2), ("H", 3))))
arr2.flatMap(x => x).foreach(print) //(A,1)(B,2)(C,3)(C,1)(D,2)(E,3)(F,1)(G,2)(H,3)
val arr3 = sc.parallelize(Array(
Array(
Array(("A", 1), ("B", 2), ("C", 3))),
Array(
Array(("C", 1), ("D", 2), ("E", 3))),
Array(
Array(("F", 1), ("G", 2), ("H", 3)))))
arr3.flatMap(x => x).foreach(print) //[Lscala.Tuple2;@11074bf8 [Lscala.Tuple2;@c10a22d [Lscala.Tuple2;@40ef42cd
複製代碼
map和flatMap源碼
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
//直接遍歷元素,對元素應用f方法
def next() = f(self.next())
}
/** Creates a new iterator by applying a function to all values produced by this iterator * and concatenating the results. * * @return the iterator resulting from applying the given iterator-valued function * `f` to each value produced by this iterator and concatenating the results. */
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
//這一步只是取當前元素的Iterator,沒有遞歸往下層取
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
//在調用next方法時,最終會調用到nextCur方法
def next(): B = (if (hasNext) cur else empty).next()
}
複製代碼
join VS cogroup VS fullOuterJoin VS leftOuterJoin VS rightOuterJoin
val studentList = Array(
Tuple2(1, "leo"),
Tuple2(2, "jack"),
Tuple2(3, "tom"));
val scoreList = Array(
Tuple2(1, 100),
Tuple2(2, 90),
Tuple2(2, 90),
Tuple2(4, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
/* * (4,(CompactBuffer(),CompactBuffer(60))) * (1,(CompactBuffer(leo),CompactBuffer(100))) * (3,(CompactBuffer(tom),CompactBuffer())) * (2,(CompactBuffer(jack),CompactBuffer(90, 90))) */
val studentCogroup = students.cogroup(scores) //union key數組延長
/* * (1,(leo,100)) * (2,(jack,90)) * (2,(jack,90)) */
val studentJoin = students.join(scores) //交集
/* * (4,(None,Some(60))) * (1,(Some(leo),Some(100))) * (3,(Some(tom),None)) * (2,(Some(jack),Some(90))) * (2,(Some(jack),Some(90))) */
val studentFullOuterJoin = students.fullOuterJoin(scores) //some可爲空 union
/* * (1,(leo,Some(100))) * (3,(tom,None)) * (2,(jack,Some(90))) * (2,(jack,Some(90))) */
val studentLeftOuterJoin = students.leftOuterJoin(scores) //左不爲空
/* * (4,(None,60)) * (1,(Some(leo),100)) * (2,(Some(jack),90)) * (2,(Some(jack),90)) */
val studentRightOuterJoin = students.rightOuterJoin(scores) //右不爲空
複製代碼
對RDD進行最後的操做,如遍歷,reduce,save等,啓動計算操做,並向用戶程序返回值或向外部存儲寫數據
會觸發一個spark job的運行,從而觸發這個action以前全部的transformation的執行 對於操做key-value對的Tuple2 RDD,如groupByKey,scala是經過隱式轉換爲PairRDDFunction,再提供對應groupByKey方法實現的,須要手動導入Spark的相關隱式轉換,import org.apache.spark.SparkContext._
對groupByKey,saprk2.2顯式使用HashPartitioner,沒有看到隱式轉換爲PairRDDFunction Action操做必定會將結果返回給driver?是的,見下文的runJob方法
Action操做特徵
Action操做在源碼上必調用runJob()
方法,多是直接或間接調用
//直接調用了runJob
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param resultHandler callback to pass each result to */
//會把結果傳遞給handler function,handle function就是對返回結果進行處理的方法
//如上文的collect方法的handler function就是 (iter: Iterator[T]) => iter.toArray
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
複製代碼
操做 | 介紹 |
---|---|
reduce | 將RDD中的全部元素進行聚合操做。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。 |
collect | 將RDD中全部元素獲取到本地客戶端。注意數據傳輸問題,spark.driver.maxResultSize 能夠限制action算子返回driver的結果集最大數量 |
count | 獲取RDD元素總數。 |
take(n) | 獲取RDD中前n個元素。 |
saveAsTextFile | 將RDD元素保存到文件中,對每一個元素調用toString方法 |
countByKey | 對每一個key對應的值進行count計數。 |
foreach | 遍歷RDD中的每一個元素。 |
//從本地文件建立
val lines = spark.sparkContext.textFile("hello.txt")
//Transformation,返回(key,value)的RDD
val linePairs = lines.map(line => (line, 1))
//Transformation,隱式裝換爲PairRDDFunction,提供reduceByKey等方法
//源碼中是用HashPartitioner
val lineCounts = linePairs.reduceByKey(_ + _)
//Action,發送到driver端執行
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + lineCount._2 + " times."))
複製代碼
map:一次處理一個partition中的一條數據
mapPartitions:一次處理一個partition中全部的數據
使用場景:
RDD的數據量不是特別大,建議採用mapPartitions算子替代map算子,能夠加快處理速度,若是RDD的數據量特別大,則不建議用mapPartitions,可能會內存溢出
val studentScoresRDD = studentNamesRDD.mapPartitions { it =>
var studentScoreList = Array("a")
while (it.hasNext) {
...
}
studentScoreList.iterator
}
複製代碼
mapPartitionsWithIndex:加上了partition的index
studentNamesRDD.mapPartitionsWithIndex{(index:Int,it:Iterator[String])=>
...
}
複製代碼
rdd.coalesce(3)
:壓縮成3個partitioncoalesce和repartition區別
repartition是coalesce的簡化版
/** * 返回一個通過簡化到numPartitions個分區的新RDD。這會致使一個窄依賴 * 例如:你將1000個分區轉換成100個分區,這個過程不會發生shuffle,相反若是10個分區轉換成100個分區將會發生shuffle。 * 然而若是你想大幅度合併分區,例如合併成一個分區,這會致使你的計算在少數幾個集羣節點上計算(言外之意:並行度不夠) * 爲了不這種狀況,你能夠將第二個shuffle參數傳遞一個true,這樣會在從新分區過程當中多一步shuffle,這意味着上游的分區能夠並行運行。 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {...}
/** * 返回一個剛好有numPartitions個分區的RDD,能夠增長或者減小此RDD的並行度。 * 在內部,這將使用shuffle從新分佈數據,若是你減小分區數,考慮使用coalesce,這樣能夠避免執行shuffle */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
複製代碼
設RDD分區數從N變動爲M
分區數關係 | shuffle = true | shuffle = false |
---|---|---|
N < M | N個分區有數據分佈不均勻的情況,利用HashPartitioner函數將數據從新分區爲M個 | coalesce爲無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關係 |
N > M | 將N個分區中的若干個分區合併成一個新的分區,最終合併爲M個分區 | |
N >> M | shuffle = true,在從新分區過程當中多一步shuffle,上游的分區能夠並行運行,使coalesce以前的操做有更好的並行度 | 父子RDD是窄依賴關係,在同一個Stage中,可能形成Spark程序的並行度不夠(計算在少數幾個集羣節點上進行),從而影響性能 |
cache()和persist()
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
複製代碼
若是須要從內存中清除緩存,那麼可使用unpersist()方法。
class StorageLevel private( private var _useDisk: Boolean, //磁盤 private var _useMemory: Boolean,//內存 private var _useOffHeap: Boolean,//內存滿就存磁盤 private var _deserialized: Boolean,//序列化儲存 private var _replication: Int = 1)//冗餘備份,默認1,只本身儲存一份
extends Externalizable {
複製代碼
持久化級別 | 含義 |
---|---|
MEMORY_ONLY | 以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它的時候,從新被計算。 |
MEMORY_AND_DISK | 同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取。 |
MEMORY_ONLY_SER | 同MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷。 |
MEMORY_AND_DSK_SER | 同MEMORY_AND_DSK。可是使用序列化方式持久化Java對象。 |
DISK_ONLY | 使用非序列化Java對象的方式持久化,徹底存儲到磁盤上。 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 | 若是是尾部加了2的持久化級別,表示會將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可。 |
優先級排序(內存優先)
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
val factorBroadcast = sc.broadcast(3)
val sumAccumulator = new DoubleAccumulator()
//Accumulator must be registered before send to executor
sc.register(sumAccumulator)
val multipleRdd = rdd.map(num => num * factorBroadcast.value)
//不能獲取值,只能在driver端獲取
val accumulator = rdd.map(num2 => sumAccumulator.add(num2.toDouble))
//action:3,6,9,12,15
multipleRdd.foreach(num => println(num))
//要先執行action操做才能獲取值
accumulator.collect() //15
println(sumAccumulator.value)
accumulator.count() //30,再次加15
println(sumAccumulator.value)
複製代碼
standalone模式下
TaskScheduler把taskSet裏每個task提交到executor上執行
窄依賴(narrow dependency):每一個parent RDD 的 partition 最多被 child RDD的一個partition使用
寬依賴(wide dependency):每一個parent RDD 的 partition 被多個 child RDD的partition使用
區別:
在spark提交腳本中設置
--master參數值爲yarn-cluster / yarn-client
默認是standalone 模式
/usr/local/spark/bin/spark-submit \
--class com.feng.spark.spark1.StructuredNetworkWordCount \
--master spark://spark1:7077 \ #standalone模式
--num-executors 3 \ // #分配3個executor
--driver-memory 500m \
--executor-memory 500m \ # //每一個executor500m內存
--executor-cores 2 \ // # 每一個executor2個core
/usr/local/test_data/spark1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
複製代碼
整個應用須要3*500=1500m
內存,3*2=6
個core
--master local[8]
:進程中用8個線程來模擬集羣的執行
--total-executor-cores
:指定全部executor的總cpu core數量
--supervise
:指定了spark監控driver節點,若是driver掛掉,自動重啓driver
按優先級從高到低排序
SparkConf.set("spark.default.parallelism", "100")
spark-submit: --conf spark.default.parallelism=50
spark-defaults.conf: spark.default.parallelism 10
複製代碼
在spark-submit腳本中,可使用--verbose,打印詳細的配置屬性的信息
能夠先在程序中建立一個空的SparkConf對象,如
val sc = new SparkContext(new SparkConf())
複製代碼
而後在spark-submit腳本中用--conf
設置屬性值,如
--conf spark.eventLog.enabled=false
複製代碼
--jars
:額外依賴的jar包會自動被髮送到集羣上去
指定關聯的jar:
文件和jar都會被拷貝到每一個executor的工做目錄中,這就會佔用很大一片磁盤空間,所以須要在以後清理掉這些文件
在yarn上運行spark做業時,依賴文件的清理都是自動進行的
使用standalone模式,須要配置spark.worker.cleanup.appDataTtl
屬性,來開啓自動清理依賴文件和jar包
相關參數見conf/spark-evnsh參數一節
--packages
:綁定maven的依賴包
--repositories
:綁定額外的倉庫
用於生產模式,driver運行在nodeManager,沒有網卡流量激增問題,但查看log麻煩,調試不方便
yarn-client用於測試,driver運行在本地客戶端,負責調度application,會與yarn集羣產生超大量的網絡通訊,從而致使網卡流量激增
yarn-client能夠在本地看到全部log,方便調試
基本要點:
橙色:提交用戶Spark程序
用戶提交一個Spark程序,主要的流程以下所示:
結合4,5,應該是表示用戶程序已經在master註冊,但driver可能並未啓動
MasterEndPoint應該會向DriverClient返回一個相似DriverStatusResponse的應答?週期性應答,當獲知driver已啓動,則致使5
紫色:啓動Driver進程
當用戶提交用戶Spark程序後,須要啓動Driver來處理用戶程序的計算邏輯,完成計算任務,這時Master須要啓動一個Driver:
當前worker節點運行driver進程
紅色:註冊Application
Dirver啓動之後,它會建立SparkContext對象,初始化計算過程當中必需的基本組件,並向Master註冊Application,流程描述以下:
SparkEnv Holds all the runtime environment objects for a running Spark instance (either master or worker), including the serializer, RpcEnv, block manager, map output tracker, etc. Currently Spark code finds the SparkEnv through a global variable, so all the threads can access the same SparkEnv
只是建立,還未啓動
藍色:啓動Executor進程
executor還未真實啓動,master只是發出一個啓動executor的消息給worker而已. 這一步代表master纔是負責啓動和分配executor,driver只是提交task到executor
這裏master才真正告訴driver executor已經啓動
粉色:啓動Task執行
以前已經建立,但未啓動,以前和master的通訊都是StandaloneSchedulerBackend完成的
啓動一個driver-revive-thread後臺線程,週期性地發送ReviveOffers給本身,讓本身檢查executor狀態
DriverEndpoint是CoarseGrainedSchedulerBackend內部的一個持有對象
綠色:Task運行完成
StandaloneSchedulerBackend父類CoarseGrainedSchedulerBackend內部持有DriverEndpoint(內部類),DriverEndpoint收到StatusUpdate信息後,直接調用
scheduler.statusUpdate(taskId, state, data.value)
用start-all.sh
腳本能夠啓動master進程和全部worker進程,快速啓動整個spark standalone集羣
分別啓動能夠經過命令行參數,爲進程配置一些獨特的參數
如監聽端口號、web ui端口號、使用的cpu和內存
如同一臺機器上不只運行了saprk程序,還運行了storm程序,就能夠限制spark worker進程使用更少的資源(cpu core,memory),而非機器上全部資源
參數 | 含義 | 對象 | 使用頻率 |
---|---|---|---|
-h HOST, --ip HOST | 在哪臺機器上啓動,默認就是本機 | master & worker | 不經常使用 |
-p PORT, --port PORT | 在機器上啓動後,使用哪一個端口對外提供服務,master默認是7077,worker默認是隨機的 | master & worker | 不經常使用 |
--webui-port PORT | web ui的端口,master默認是8080,worker默認是8081 | master & worker | 不經常使用 |
-c CORES, --cores CORES | 總共能讓spark做業使用多少個cpu core,默認是當前機器上全部的cpu core | worker | 經常使用 |
-m MEM, --memory MEM | 總共能讓spark做業使用多少內存,是100M或者1G這樣的格式,默認是1g | worker | 經常使用 |
-d DIR, --work-dir DIR | 工做目錄,默認是SPARK_HOME/work目錄 | worker | 經常使用 |
--properties-file FILE | master和worker加載默認配置文件的地址,默認是conf/spark-defaults.conf | master & worker | 不經常使用 |
先啓動master,再啓動worker,由於worker啓動之後,須要向master註冊
關閉順序1.worker(
./stop-slave.sh
) ;2. master(./stop-master);3. 關閉集羣./stop-all.sh
start-master.sh
啓動spark://HOST:PORT
,這就是master的URL地址,worker進程就會經過這個URL地址來鏈接到master進程,並進行註冊
可使用
SparkSession.master()
設置master地址
http://MASTER_HOST:8080
來訪問master集羣的監控web ui,web ui上, 會顯示master的URL地址使用start-slave.sh <master-spark-URL>
在當前節點上啓動worker進程
http://MASTER_HOST:8080
web ui上會顯示該節點的cpu和內存資源等信息
eg:./start-slave.sh spark://192.168.0.001:8080 --memory 500m
參數 | 含義 |
---|---|
sbin/start-all.sh | 根據配置,在集羣中各個節點上,啓動一個master進程和多個worker進程 |
sbin/stop-all.sh | 在集羣中中止全部master和worker進程 |
sbin/start-master.sh | 在本地啓動一個master進程 |
sbin/stop-master.sh | 關閉master進程 |
sbin/start-slaves.sh | 根據conf/slaves文件中配置的worker節點,啓動全部的worker進程 |
sbin/stop-slaves.sh | 關閉全部worker進程 |
sbin/start-slave.sh | 在本地啓動一個worker進程 |
配置做爲worker節點的機器,如hostname/ip地址,一個機器是一行
配置後,全部的節點上,都拷貝這份文件
默認狀況下,沒有conf/slaves文件,只有一個空conf/slaves.template, 此時,就只是在當前主節點上啓動一個master進程和一個worker進程,此時就是master進程和worker進程在一個節點上,也就是僞分佈式部署
conf/slaves文件樣本
spark1
spark2
spark3
複製代碼
是對整個spark的集羣部署,配置各個master和worker
和啓動腳本--參數的效果同樣
./start-slave.sh spark://192.168.0.001:8080 --memory 500m
,臨時修改參數時這種腳本命令更適合
命令行參數優先級更高,會覆蓋spark-evnsh參數
參數 | 含義 |
---|---|
SPARK_MASTER_IP | 指定master進程所在的機器的ip地址 |
SPARK_MASTER_PORT | 指定master監聽的端口號(默認是7077) |
SPARK_MASTER_WEBUI_PORT | 指定master web ui的端口號(默認是8080) |
SPARK_MASTER_OPTS | 設置master的額外參數,使用"-Dx=y"設置各個參數 |
SPARK_LOCAL_DIRS | spark的工做目錄,包括了shuffle map輸出文件,以及持久化到磁盤的RDD等 |
SPARK_WORKER_PORT | worker節點的端口號,默認是隨機的 |
SPARK_WORKER_WEBUI_PORT | worker節點的web ui端口號,默認是8081 |
SPARK_WORKER_CORES | worker節點上,容許spark做業使用的最大cpu數量,默認是機器上全部的cpu core |
SPARK_WORKER_MEMORY | worker節點上,容許spark做業使用的最大內存量,格式爲1000m,2g等,默認最小是1g內存 |
SPARK_WORKER_INSTANCES | 當前機器上的worker進程數量,默認是1,能夠設置成多個,可是這時必定要設置SPARK_WORKER_CORES,限制每一個worker的cpu數量 |
SPARK_WORKER_DIR | spark做業的工做目錄,包括了做業的日誌等,默認是spark_home/work |
SPARK_WORKER_OPTS | worker的額外參數,使用"-Dx=y"設置各個參數 |
SPARK_DAEMON_MEMORY | 分配給master和worker進程本身自己的內存,默認是1g |
SPARK_DAEMON_JAVA_OPTS | 設置master和worker本身的jvm參數,使用"-Dx=y"設置各個參數 |
SPARK_PUBLISC_DNS | master和worker的公共dns域名,默認是沒有的 |
SPARK_MASTER_OPTS
設置master的額外參數,使用-Dx=y
設置各個參數
eg:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"
參數名 | 默認值 | 含義 |
---|---|---|
spark.deploy.retainedApplications | 200 | 在spark web ui上最多顯示多少個application的信息 |
spark.deploy.retainedDrivers | 200 | 在spark web ui上最多顯示多少個driver的信息 |
spark.deploy.spreadOut | true | 資源調度策略,spreadOut會盡可能將application的executor進程分佈在更多worker上,適合基於hdfs文件計算的狀況,提高數據本地化機率;非spreadOut會盡可能將executor分配到一個worker上,適合計算密集型的做業 |
spark.deploy.defaultCores | 無限大 | 每一個spark做業最多在standalone集羣中使用多少個cpu core,默認是無限大,有多少用多少 |
spark.deploy.timeout | 60 | 單位秒,一個worker多少時間沒有響應以後,master認爲worker掛掉了 |
SPARK_WORKER_OPTS
worker的額外參數
參數名 | 默認值 | 含義 |
---|---|---|
spark.worker.cleanup.enabled | false | 是否啓動自動清理worker工做目錄,默認是false |
spark.worker.cleanup.interval | 1800 | 單位秒,自動清理的時間間隔,默認是30分鐘 |
spark.worker.cleanup.appDataTtl | 7 * 24 * 3600 | 默認將一個spark做業的文件在worker工做目錄保留多少時間,默認是7天 |
主要用於本機測試
/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 2 \
/usr/local/test/xxx.jar \
複製代碼
standalone模式與local區別,就是要將master設置成spark://master_ip:port
,如spark://192.168.0.103:7077
val spark = SparkSession.builder().master("spark://IP:PORT")...
spark-submit: --master spark://IP:PORT --deploy-mode client/cluster
spark-shell: --master spark://IP:PORT
:用於實驗和測試/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--master spark://192.168.0.103:7077 \
--deploy-mode client \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/usr/local/test/xxx.jar \
複製代碼
--master
:
spark://xxx
:standalone模式,會提交到指定的URL的Master進程上去yarn-xxx
:yarn模式,會讀取hadoop配置文件,而後鏈接ResourceManager提交運行做業後,當即使用jps查看進程,能夠看到啓動了以下進程
standalone cluster模式支持監控driver進程,而且在driver掛掉的時候,自動重啓該進程,主要是用於spark streaming中的HA高可用性,spark-submit腳本中,使用--supervise
標識便可
要殺掉反覆掛掉的driver進程
bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
,經過http://<maser url>:8080
可查看到driver id
yarn下殺掉application
yarn application -kill applicationid
進程:
...
--deploy-mode cluster \
--num-executors 1 \
--executor-cores 1 \
...
複製代碼
cluster模式下
cpu core太少,可能致使executor沒法啓動,一直waiting,好比只有一個worker,一個cpu core時
在 cluster 模式下,driver 是在集羣中的某個 Worker中的進程中啓動,而且 client進程將會在完成提交應用程序的任務以後退出,而不須要等待應用程序完成再退出
默認提交的每個spark做業都會嘗試使用集羣中全部可用的cpu資源,此時只能支持做業串行起來運行,因此standalone集羣對於同時提交上來的多個做業,僅僅支持FIFO調度策略
spark.cores.max
參數,限制每一個做業可以使用的最大的cpu core數量,讓做業不會使用全部的cpu資源,後面提交上來的做業就能夠獲取到資源運行,默認狀況下,它將獲取集羣中的 all cores (核),這隻有在某一時刻只容許一個應用程序運行時纔有意義spark.conf.set("spark.cores.max", "num")
spark-submit: --master spark://IP:PORT --conf spark.cores.max=num
spark-env.sh
全局配置:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=num" 默認數量
spark standalone模式默認在master機器上的8080端口提供web ui,能夠經過配置spark-env.sh
文件等方式,來配置web ui的端口,地址如spark://192.168.0.103:8080
spark yarn模式下應該在YARN web ui上查看,如
http://192.168.0.103:8088/
application detail ui在做業的driver所在的機器的4040端口
System.out.println
;System.err.println
和系統級別log
做業運行完,信息消失,須要啓動history server
前提:spark-env.sh
文件中,配置HADOOP_CONF_DIR
或者YARN_CONF_DIR
屬性,值爲hadoop的配置文件目錄HADOOP_HOME/etc/hadoop
,其中包含了hadoop和yarn全部的配置文件,好比hdfs-site、yarn-site等
用途:spark讀寫hdfs,鏈接到yarn resourcemanager上
日誌散落在集羣中各個機器上,參數配置yarn-site.xml
屬性設置 | 含義 |
---|---|
yarn.log-aggregation-enable=true |
container的日誌會拷貝到hdfs上去,並從機器中刪除 |
yarn.nodemanager.remote-app-log-dir |
當應用程序運行結束後,日誌被轉移到的HDFS目錄(啓用日誌彙集功能時有效) |
yarn.nodemanager.remote-app-log-dir-suffix |
遠程日誌目錄子目錄名稱(啓用日誌彙集功能時有效) |
yarn.log-aggregation.retain-seconds |
聚合後的日誌在HDFS上保存多長時間,單位爲s |
yarn logs -applicationId <app ID> |
查看日誌,yarn web ui上能夠查看到applicationId(也能夠直接在hdfs上查看日誌文件) |
yarn.nodemanager.log.retain-second |
當不啓用日誌聚合此參數生效,日誌文件保存在本地的時間,單位爲s |
yarn.log-aggregation.retain-check-interval-seconds |
隔多久刪除過時的日誌 |
YARN_APP_LOGS_DIR
目錄下,如/tmp/logs
或者$HADOOP_HOME/logs/userlogs
system.out
日誌,須要在yarn-site.xml
文件中設置yarn.log.aggregation-enable
值爲ture(將日誌拷貝到hdfs上),查看時經過yarn logs -applicationId xxx
在機器上查看/usr/local/spark/bin/spark-submit \
--class xxx \
# 自動從hadoop配置目錄中的配置文件中讀取cluster manager地址
--master yarn-cluster/yarn-client \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
--conf <key>=<value> \
# 指定不一樣的hadoop隊列,項目或部門之間隊列隔離
--queue hadoop隊列 \
/usr/local/test/xxx.jar \
${1}
複製代碼
--conf
: 配置全部spark支持的配置屬性,使用key=value
的格式;若是value中包含了空格,那麼須要將key=value包裹的雙引號中--conf "<key>=<value>"
application-jar
: 打包好的spark工程jar包,在當前機器上的全路徑名
application-arguments
: 傳遞給主類的main方法的參數; 在shell中用${1}
佔位符接收傳遞給shell的參數;在java中能夠經過main方法的args[0]
等參數獲取,提交spark應用程序時,用 ./腳本.sh 參數值
能夠在提交腳本上--conf
設置屬性
屬性名稱 | 默認值 | 含義 |
---|---|---|
spark.yarn.am.memory | 512m | client模式下,YARN Application Master使用的內存總量 |
spark.yarn.am.cores | 1 | client模式下,Application Master使用的cpu數量 |
spark.driver.cores | 1 | cluster模式下,driver使用的cpu core數量,driver與Application Master運行在一個進程中,因此也控制了Application Master的cpu數量 |
spark.yarn.am.waitTime | 100s | cluster模式下,Application Master要等待SparkContext初始化的時長; client模式下,application master等待driver來鏈接它的時長 |
spark.yarn.submit.file.replication | hdfs副本數 | 做業寫到hdfs上的文件的副本數量,好比工程jar,依賴jar,配置文件等,最小必定是1 |
spark.yarn.preserve.staging.files | false | 若是設置爲true,那麼在做業運行完以後,會避免工程jar等文件被刪除掉 |
spark.yarn.scheduler.heartbeat.interval-ms | 3000 | application master向resourcemanager發送心跳的間隔,單位ms |
spark.yarn.scheduler.initial-allocation.interval | 200ms | application master在有pending住的container分配需求時,當即向resourcemanager發送心跳的間隔 |
spark.yarn.max.executor.failures | executor數量*2,最小3 | 整個做業斷定爲失敗以前,executor最大的失敗次數 |
spark.yarn.historyServer.address | 無 | spark history server的地址 |
spark.yarn.dist.archives | 無 | 每一個executor都要獲取並放入工做目錄的archive |
spark.yarn.dist.files | 無 | 每一個executor都要放入的工做目錄的文件 |
spark.executor.instances | 2 | 默認的executor數量 |
spark.yarn.executor.memoryOverhead | executor內存10% | 每一個executor的堆外內存大小,用來存放諸如常量字符串等東西 |
spark.yarn.driver.memoryOverhead | driver內存7% | 同上 |
spark.yarn.am.memoryOverhead | AM內存7% | 同上 |
spark.yarn.am.port | 隨機 | application master端口 |
spark.yarn.jar | 無 | spark jar文件的位置 |
spark.yarn.access.namenodes | 無 | spark做業能訪問的hdfs namenode地址 |
spark.yarn.containerLauncherMaxThreads | 25 | application master能用來啓動executor container的最大線程數量 |
spark.yarn.am.extraJavaOptions | 無 | application master的jvm參數 |
spark.yarn.am.extraLibraryPath | 無 | application master的額外庫路徑 |
spark.yarn.maxAppAttempts | 提交spark做業最大的嘗試次數 | |
spark.yarn.submit.waitAppCompletion | true | cluster模式下,client是否等到做業運行完再退出 |
standalone模式下調度器依託於master進程來作出調度決策,這可能會形成單點故障:若是master掛掉了,就無法提交新的應用程序了。
爲了解決這個問題,spark提供了兩種高可用性方案,分別是基於zookeeper的HA方案(推薦)以及基於文件系統的HA方案。
使用zookeeper來提供leader選舉以及一些狀態存儲,能夠在集羣中啓動多個master進程,讓它們鏈接到zookeeper實例。其中一個master進程會被選舉爲leader,其餘的master會被指定爲standby模式。
若是當前的leader master進程掛掉了,其餘的standby master會被選舉,從而恢復舊master的狀態。
在啓動一個zookeeper集羣以後,在多個節點上啓動多個master進程,而且給它們相同的zookeeper 配置(zookeeper url和目錄)。master就能夠被動態加入master集羣,並能夠在任什麼時候間被移除掉
在spark-env.sh
文件中,設置SPARK_DAEMON_JAVA_OPTS
選項:
spark.deploy.recoveryMode
:設置爲ZOOKEEPER來啓用standby master恢復模式(默認爲NONE)spark.deploy.zookeeper.url
:zookeeper集羣urlspark.deploy.zookeeper.dir
:zookeeper中用來存儲恢復狀態的目錄(默認是/spark
)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.0.103:2181,192.168.0.104:2181 -Dspark.deploy.zookeeper.dir=/spark"
複製代碼
若是在集羣中啓動了多個master節點,可是沒有正確配置master去使用zookeeper,master在掛掉進行恢復時是會失敗的,由於無法發現其餘master,而且都會認爲本身是leader。這會致使集羣的狀態不是健康的,由於全部master都會自顧自地去調度。
爲了調度新的應用程序或者向集羣中添加worker節點,它們須要知道當前leader master的ip地址,這能夠經過傳遞一個master列表來完成。能夠將SparkSession 的master鏈接的地址指向spark://host1:port1,host2:port2
。這就會致使SparkSession嘗試去註冊全部的master,若是host1掛掉了,那麼配置仍是正確的,由於會找到新的leader master
當一個應用程序啓動的時候,或者worker須要被找到而且註冊到當前的leader master的時候。一旦它成功註冊了,就被保存在zookeeper中了。若是故障發生了,new leader master會去聯繫全部的以前註冊過的應用程序和worker,而且通知它們master的改變。應用程序甚至在啓動的時候都不須要知道new master的存在。
故而,new master能夠在任什麼時候間被建立,只要新的應用程序和worker能夠找到而且註冊到master便可
在其餘節點啓動備用master:./start-master.sh
FILESYSTEM模式:當應用程序和worker都註冊到master以後,master就會將它們的信息寫入指定的文件系統目錄中,以便於重啓時恢復註冊的應用程序和worker狀態;
須要手動重啓
在spark-env.sh
中設置SPARK_DAEMON_JAVA_OPTS
spark.deploy.recoveryMode
:設置爲FILESYSTEM來啓用單點恢復(默認值爲NONE)spark.deploy.recoveryDirectory
:spark存儲狀態信息的文件系統目錄,必須是master能夠訪問的目錄eg:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark_recovery"
複製代碼
stop-master.sh
腳本殺掉一個master進程是不會清理它的恢復狀態,當重啓一個新的master進程時,它會進入恢復模式。須要等待以前全部已經註冊的worker等節點先timeout才能恢復。做業監控方式:Spark Web UI,Spark History Web UI,RESTFUL API以及Metrics
每提交Spark做業並啓動SparkSession後,會啓動一個對應的Spark Web UI服務。默認狀況下Spark Web UI的訪問地址是driver進程所在節點的4040端口,如http://<driver-node>:4040
Spark Web UI包括瞭如下信息:
若是多個driver在一個機器上運行,它們會自動綁定到不一樣的端口上。默認從4040端口開始,若是發現已經被綁定,那麼會選擇404一、4042等端口,以此類推。
這些信息默認狀況下在做業運行期間有效,一旦做業完畢,driver進程以及對應的web ui服務也會中止。若是要在做業完成以後,也能夠看到其Spark Web UI以及詳細信息,須要啓用Spark的History Server。
hdfs://ip:port/dirName
hdfs dfs -mkidr /dirName
spark-defaults.conf
spark.eventLog.enabled true #啓用
spark.eventLog.dir hdfs://ip:port/dirName
spark.eventLog.compress true #壓縮
複製代碼
spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://ip:port/dirName"
複製代碼
spark.eventLog.dir
指定做業事件記錄地址
spark.history.fs.logDirectory
指定從哪一個目錄中去讀取做業數據
兩個目錄地址要相同
./sbin/start-history-server.sh
在啓動界面能夠看到history-server的訪問地址,經過訪問地址打開History Web UI提供了RESTFUL API來返回關於日誌的json數據
API | 含義 |
---|---|
/applications | 獲取做業列表 |
/applications/[app-id]/jobs | 指定做業的job列表 |
/applications/[app-id]/jobs/[job-id] | 指定job的信息 |
/applications/[app-id]/stages | 指定做業的stage列表 |
/applications/[app-id]/stages/[stage-id] | 指定stage的全部attempt列表 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] | 指定stage attempt的信息 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary | 指定stage attempt全部task的metrics統計信息 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList | 指定stage attempt的task列表 |
/applications/[app-id]/executors | 指定做業的executor列表 |
/applications/[app-id]/storage/rdd | 指定做業的持久化rdd列表 |
/applications/[app-id]/storage/rdd/[rdd-id] | 指定持久化rdd的信息 |
/applications/[app-id]/logs | 下載指定做業的全部日誌的壓縮包 |
/applications/[app-id]/[attempt-id]/logs | 下載指定做業的某次attempt的全部日誌的壓縮包 |
eg:http://192.168.0.103:18080/api/v1/applications
默認的做業間資源分配策略爲靜態資源分配,在這種方式下,每一個做業都會被給予一個它能使用的 最大資源量的限額,而且能夠在運行期間持有這些資源。這是spark standalone集羣和YARN集羣使用的默認方式。
Standalone集羣
默認狀況下,提交到standalone集羣上的多個做業,會經過FIFO的方式來運行,每一個做業都會嘗試獲取全部的資源。
spark.cores.max
:限制每一個做業可以使用的cpu core最大數量
spark.deploy.defaultCores
:設置每一個做業默認cpu core使用量
spark.executor.memory
:設置每一個做業最大內存。
YARN
--num-executors
:配置做業能夠在集羣中分配到多少個executor
--executor-memory
和--executor-cores
能夠控制每一個executor可以使用的資源。
沒有一種cluster manager能夠提供多個做業間的內存共享功能,須要共享內存,能夠單獨使用一個服務(例如:alluxio),這樣就能實現多應用訪問同一個RDD的數據。
當資源被分配給了一個做業,但資源有空閒,能夠將資源還給cluster manager的資源池,被其餘做業使用。在spark中,動態資源分配在executor粒度上被實現,啓用時設置spark.dynamicAllocation.enabled
爲true,在每一個節點上啓動external shuffle service,並將spark.shuffle.service.enabled設爲true。external shuffle service 的目的是在移除executor的時候,可以保留executor輸出的shuffle文件。
spark application會在它有pending(等待執行)的task等待被調度時,申請額外的executor
task已提交但等待調度->executor數量不足
spark.dynamicAllocation.schedulerBacklogTimeout
有pending的task時,就會觸發真正的executor申請spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
,若是又有pending的task了,則再次觸發申請操做。一個spark做業會在它的executor出現了空閒超過必定時間後(spark.dynamicAllocation.executorIdleTimeout
),被移除掉。
這意味着沒有task被pending住,executor有空閒,和申請條件互斥。
spark使用一個外部的shuffle服務來保存每一個executor的中間寫狀態,這個服務是一個長時間運行的進程,集羣的每一個節點上都會運行一個,若是服務被啓用,那麼spark executor會在shuffle write和read時,將數據寫入該服務,並從該服務獲取數據。這意味着全部executor寫的shuffle數據均可以在executor聲明週期以外繼續使用。
多了箇中間數據存儲角色,也改變了executor的讀寫方式
除了寫shuffle文件,executor也會在內存或磁盤中持久化數據。當一個executor被移除掉時,全部緩存的數據都會消失。
shuffle服務寫入的數據和executor持久化數據不是一個概念?executor移除後/掛掉後,其持久化的數據將消失,而shuffle服務保存的數據還將存在
--conf spark.dynamicAllocation.enabled=true \
複製代碼
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
,並設置 spark.shuffle.service.enabled
爲true--conf spark.dynamicAllocation.enabled=true \
複製代碼
須要配置yarn的shuffle service(external shuffle service),用於保存executor的shuffle write文件,從而讓executor能夠被安全地移除.
$SPARK_HOME/lib
下的spark-<version>-yarn-shuffle.jar
加入到全部NodeManager的classpath中,即hadoop/yarn/lib
目錄中yarn-site.xml
<propert>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
<!-- <value>mapreduce_shuffle</value> -->
</property>
<propert>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
複製代碼
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.service.port=7337 \
--conf spark.dynamicAllocation.enabled=true \
複製代碼
job是一個spark action操做觸發的計算單元,在一個spark做業內部,多個並行的job是能夠同時運行的 。
默認狀況下,spark的調度會使用FIFO的方式來調度多個job。每一個job都會被劃分爲多個stage,並且第一個job會對全部可用的資源獲取優先使用權,而且讓它的stage的task去運行,而後第二個job再獲取資源的使用權,以此類推
在公平的資源共享策略下,spark會將多個job的task使用一種輪詢的方式來分配資源和執行,因此全部的job都有一個基本公平的機會去使用集羣的資源
conf.set("spark.scheduler.mode", "FAIR")
複製代碼
--conf spark.scheduler.mode=FAIR
複製代碼
fair scheduler也支持將job分紅多個組並放入多個池中,以及爲每一個池設置不一樣的調度優先級。這個feature對於將重要的和不重要的job隔離運行的狀況很是有用,能夠爲重要的job分配一個池,並給予更高的優先級; 爲不重要的job分配另外一個池,並給予較低的優先級。
在代碼中設置sparkContext.setLocalProperty("spark.scheduler.pool", "poolName")
,全部在這個線程中提交的job都會進入這個池中,設置是以線程爲單位保存的,很容易實現用同一線程來提交同一用戶的全部做業到同一個資源池中。設置爲null則清空池子。
默認狀況下,每一個池子都會對集羣資源有相同的優先使用權,可是在每一個池內,job會使用FIFO的模式來執行。
能夠經過配置文件來修改池的屬性
配置文件默認地址spark/conf/fairscheduler.xml
,自定義文件conf.set("spark.scheduler.allocation.file", "/path/to/file")
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
複製代碼
沒有在配置文件中配置的資源池都會使用默認配置(schedulingMode : FIFO,weight : 1,minShare : 0)。
不一樣之處
reduceByKey,中間多了一個MapPartitionsRDD,是本地數據聚合後的rdd,能夠減小網絡數據傳輸。
相同之處
read和聚合的過程基本和groupByKey相似。都是ShuffledRDD作shuffle read再聚合,獲得最終的rdd
cogroup算子是其餘算子的基礎,如join,intersection操做
先按RDD分區聚合結果,(hello,[(1,1),(1,1)]):第1個(1,1)是第一個RDD 的helo聚合結果,第二個(1,1)是第2個RDD聚合結果 若第一個RDD的第一個partition沒有hello,則(1),不是(,1)
filter:過濾掉兩個集合中任意一個集合爲空的key
笛卡爾乘積
通常用於減小partition數量
repartition算子=coalesce(true)
repartition操做在中間生成的隱式RDD中會給值計算出前綴做爲key,在最後作Shuffle操做時一個partition就放特定的一些key值對應的tuple,完成重分區操做
若是partition的數量多,能起實例的資源也多,那天然併發度就多
若是partition數量少,資源不少,則task數量不足,它也不會有不少併發
若是partition的數量不少,可是資源少(如core),那麼併發也不大,會算完一批再繼續起下一批
Task被執行的併發度 = Executor數目 * 每一個Executor核數
複製代碼
這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程?
每一個executor的core數目經過spark.executor.cores
參數設置。這裏的cores實際上是指的工做線程。cpu info裏看到的核數是物理核(或者通常機器開了超線程之後是的物理核數*2),和spark裏的core不是一個概念,可是通常來講spark做業配置的executor核數不該該超過機器的物理核數。
partition的數目
sc.textFile
,輸入文件被劃分爲多少InputSplit就會須要多少初始Task