以scala爲例,咱們經過IDE編寫Spark應用後,將其打包成jar包,而後使用spark-submit程序進行部署java
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
複製代碼
優先級從高到低依次是:node
直接在代碼中經過SparkConf控制,好比指定cluster manager的master參數,能夠在代碼中配置python
val conf = new SparkConf().setAppName("WordCount").setMaster("local");sql
在命令中指定,好比:apache
./bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode cluster \ # can be client for client mode --executor-memory 20G
--num-executors 50
/path/to/examples.jar
1000緩存
在spark的安裝目錄下,經過spark-defaults.conf配置。性能優化
RDD是一個統一分佈式數據抽象數據集。其下對應實際的數據存儲介質,多是文件,也能夠是hadoop。經過RDD能夠進行tranformation和action操做,從而實現分佈式計算。網絡
一個RDD具備如下固定的數據結構數據結構
總結來講,一個RDD的關鍵信息無非是,定義了數據來源,數據分佈存儲的狀況,以及準備執行的計算邏輯。經過這些新,咱們能夠構建一個圖,圖的兩個vertex分別是RDD,edge爲computation架構
private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]//當前RDD須要執行的計算
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]//當前RDD對應的分區
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps//當前RDD依賴的父親數據集
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
@transient var name: String = null //當前RDD的名稱
複製代碼
####3.2 RDD 特色
###5、RDD Transformation 將RDD進行一系列變換,生成新的RDD的過程,叫作Transformation。全部那些能夠就地計算,而不須要數據遷移的transformation叫作Narrow Transformation。
####5.1 transformation大概源碼 以map操做爲例
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
//將傳進來的函數f進行clean,這裏先不深究,只須要知道clean後的函數,跟原函數功能相同
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
//這裏返回MapPartitionsRDD對象,其構造參數爲當前RDD和一個將f應用於迭代器的函數定義
}
複製代碼
map操做是將迭代RDD中的每一個元素,而後將其作必定加工,返回的的依然是一個元素。而flapMap接受的函數參數的入參是RDD中的每一個元素,但對該元素處理後,返回的是一個集合,而不是一個元素。flatMap源碼以下:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
複製代碼
總結來講,map和flapMap的異同點以下: - map接受的函數參數簽名是:(f: T => U)而flatMap接受的函數參數簽名爲:(f: T => TraversableOnce[U]),能夠看到返回的是集合
Narrow Transformation操做有
有些計算,須要依賴其餘節點數據,這種計算會致使數據移動,成爲Wide Transformations。好比,基於某個key分類的操做GroupByKey,這個Key可能散落在不一樣的work node上,爲了進行GroupByKey計算,須要計算節點間進行數據移動,好比將某個Key對應的數據,統一移動到一個節點上。Wide Transformation操做有以下:
全部Tranformation操做,都不會真正執行,直到Action操做被調用,Action操做返回是具體值,而不是RDD。這種特性成爲Lazy Computing. Action操做觸發後,會將執行結果發給Driver 或者寫如到外部存儲。如下操做屬於Action操做: First(), take(), reduce(), collect(), count()
全部action操做,最終都會調用SparkContext的runJob方法。runJob有需多重載方法,以其中一個爲例
def runJob[T, U: ClassTag](
rdd: RDD[T],//須要處理的RDD數據
processPartition: Iterator[T] => U,//須要在每一個數據分區上進行的操做
resultHandler: (Int, U) => Unit)//如何將上述每一個分區處理後的結果進行處理
複製代碼
能夠看到runJob中體現了全部分佈式計算理論架構,即MapReduce。其中processPartition定義每一個分區要須要作的map操做,這一步將減小數據量,將map操做的結果作爲輸入,傳進reduce操做,進行彙總處理。
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
sc.runJob(this, aggregatePartition, mergeResult)//4
jobResult//5
}
複製代碼
舉例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
val result = inputrdd.aggregate(3)(
(acc, value) => {
println(acc+":"+value)
(acc + value._2)
},
(acc1, acc2) => (acc1 * acc2)
)
println(result)//輸出4032
複製代碼
解釋:
上述RDD,被切分紅兩個分區。第一個分區數據是("maths", 21) ,另外一個是:("english", 22),("science", 31)
(acc + value._2)是每一個分區要執行的操做,迭代器帶入zeroValue=3後,兩個分片的計算中間值以下
3+21=24//分區1 3+22+31=56//分區2
最後將每一個分區結果帶入(acc1 * acc2)函數,從aggregate源碼得知,結果計算也要運用zeroValue,在這裏也就是3.因而最終步執行的計算以下:
32456=4032
fold函數同aggregate相似,一樣是調用SparkContext的runJob函數,只不過fold只接受一個值參數,和一個函數參數,其內部在調用runJob時,分區計算和結果計算都使用一樣的函數。源碼以下:
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
複製代碼
舉例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
(acc, ele) => {
println(acc+":"+ele)
("result",acc._2 + ele._2)
}
)
println(result)//輸出:(result,83)
複製代碼
假設註釋1中切分的2個分區爲("maths", 21)和("english", 22),("science", 31),那麼執行過程以下:
reduce一樣調用了SparkContext的runJob函數,但reduce接收的參數在fold上進一步簡化,少了zeroValue參數,只接收一個函數參數便可。一樣該參數,在調用runJob時,即做爲分區收斂的函數,記做爲分區彙總計算的函數
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
複製代碼
舉例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
val result = inputrdd.reduce(
(acc, ele) => {
println(acc+":"+ele)
("result",acc._2 + ele._2)
}
)
println(result)//結果爲:(result,74)
複製代碼
collect和top方法都會將數據收集到driver本地,前者是收集所有,後者是收集指定條數。因此最好知道收集的數據集較小時使用。不然會有很大的性能問題,好比大數量的傳輸,以及driver本地的內存壓力
前者是action操做,後者是transformation操做
###7、RDD cache優化 RDD的數據,來至於外部存儲介質,好比磁盤。而每一次用該RDD,都要去磁盤加載,這有時間和性能上的損耗。可使用rdd的cahce方法,將該RDD緩存到內存,這樣後續重複使用該RDD時,直接去內存拿。 cache的幾個級別
按數據是否在分區間遷移,來劃分stage。一個stage有多個task,他們會併發的在不一樣的分區上執行相同的計算代碼。好比緊鄰的map和filter就會被劃在同一個stage,由於他們能夠併發在各分區上執行,而不須要數據移動。而reduceByKey則會單獨成爲一個stage,由於其涉及到數據移動
RDD 從一個RDD轉化成另外一個RDD時,每一步都會記錄上一個RDD關係。因而這造成一個血統譜系。具體
val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
println(wordCount1.toDebugString)
複製代碼
最終輸出:
(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
+-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
| MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
| InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
| InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []
複製代碼
能夠看到結果以倒序的方式輸出,有點像java異常時,打出的依賴棧。從最近的依賴點,一直回溯
在RDD上進一步封裝的數據結構。這種數據結構可使用SparkSql去操做處理數據,這下降了對分佈式數據集的使用難度。由於你只要會sql,就能夠進行一些處理
###11、 如何調優 一個Spark應用最會對應多個JVM進程。分佈式driver,以及該應用在每一個worknode上起的JVM進程,因爲driver擔任的協調者角色,實際執行是worknode上的EXECUTOR,因此對於JVM的調優,主要指對Executor的調優。這些JVM進程彼此會通訊,好比數據shuffle。因此優化Spark應用的思路主要從如下個方面入手:
經過sparkConf conf.set(「spark.serializer」, 「org.apache.spark.serializer.KyroSerializer」)來配置,指定數據對象的序列化方式