Spark簡明筆記

1、Spark結構

  • 使用java、scala、python任意一種語言編寫的Spark應用叫Driver
  • Driver程序通常負責初始SparkContext,而後經過SparkContext與整個集羣通訊,進行分佈式計算,好比經過SparkContext建立RDD。鑑於Driver行駛的地位,其角色上有可叫central coordinator
  • SparkContext與集羣通訊的方式
    1. 第一步先經過Cluster Manager申請計算資源Executor
    2. 第二步,SparkContext與Executor直接通訊,將分佈式計算程序發送到每一個Executor
    3. 第三步,SparkContext發送當前要執行的計算Task給Executor執行
  • Worker Node是Spark集羣中的某個具體節點,也叫slave
  • Executor是在Worker Node上開的一個應用執行器,他會在worknode上起一個JVM, 他能夠執行多個Task, Executor是應用隔離的。也即一個Executor只能屬於某一個Spark應用,這樣Spark集羣才能同時服務多個Spark應用,互不干擾。
  • Executor有點像Java中的工做線程同樣,能夠執行SparkContext發來的多個任務。不一樣的是Executor是一個獨立的JVM進程
  • Cluster Manager是有多種類型,能夠是Spark自帶的Standalone 集羣,也能夠是YARN或Mesos集羣

2、如何部署Spark程序

以scala爲例,咱們經過IDE編寫Spark應用後,將其打包成jar包,而後使用spark-submit程序進行部署java

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
複製代碼
  • --class: 應用的主方法入口
  • --master: cluster manager集羣地址,能夠是local,也能夠是yarn或mesos,或者spark自帶的standalone 地址 
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap 「key=value」 in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any

2.1 上述master參數可配置的值以下

2.2 Spark配置優先級

優先級從高到低依次是:node

  • 直接在代碼中經過SparkConf控制,好比指定cluster manager的master參數,能夠在代碼中配置python

    val conf = new SparkConf().setAppName("WordCount").setMaster("local");sql

  • 在命令中指定,好比:apache

    ./bin/spark-submit
    --class org.apache.spark.examples.SparkPi
    --master yarn
    --deploy-mode cluster \ # can be client for client mode --executor-memory 20G
    --num-executors 50
    /path/to/examples.jar
    1000緩存

  • 在spark的安裝目錄下,經過spark-defaults.conf配置。性能優化

3、RDD

RDD是一個統一分佈式數據抽象數據集。其下對應實際的數據存儲介質,多是文件,也能夠是hadoop。經過RDD能夠進行tranformation和action操做,從而實現分佈式計算。網絡

3.1 關鍵數據結構

一個RDD具備如下固定的數據結構數據結構

  • 須要應用的計算操做,也即transformation
  • 當前RDD對應的分區列表。由於數據是分區存儲的
  • 當前RDD依賴的父數據集。每一個RDD都維護一個其依賴關係,這就構成了一個親緣圖譜叫作DAG(Directed Acyclic Graph),中文稱做有向無環圖。

總結來講,一個RDD的關鍵信息無非是,定義了數據來源,數據分佈存儲的狀況,以及準備執行的計算邏輯。經過這些新,咱們能夠構建一個圖,圖的兩個vertex分別是RDD,edge爲computation架構

private[spark] def conf = sc.conf
  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]//當前RDD須要執行的計算

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]//當前RDD對應的分區

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps//當前RDD依賴的父親數據集

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = null&emsp;//當前RDD的名稱
複製代碼

####3.2 RDD 特色

  • 分佈式
  • 不可變性,一個RDD生成後,就不可變,全部Transformation操做,都是在原RDD基礎上生成新的RDD。
  • 自動容錯特性,Spark RDD記錄了數據譜系信息(Data lineage),也即check point。這樣在某步失敗後,能夠直接重試那一步,而不用全部計算過程重來。譜系信息記錄了,輸入的數據,以及處理函數。因爲RDD的不可變特性,以及處理函數的冪等性,使得整個重試不會有side effect。依然能保持計算的一致性
  • 沒有性能優化 DataFrame會根據用戶的sql,自動作性能優化。而RDD要求用戶本身組織transformation atcion代碼,可能用戶組織的不合理,會致使數據頻繁在集羣間移動
  • 沒有結構化信息 DataFrame有字段的名稱,類型等信息,但RDD沒有
  • Lazy Comuting.只有action時,前面全部的transformation動做纔會執行。這節約了空間和時間。試想,若是每一個transformation都單獨執行一次,那每一次的計算調度都有時間成本,以及中間結果的存儲成本

4、Spark的計算流程

  • driver建立RDD
  • RDD經過SparkContext的runJob方法,提交一次數據計算
  • SparkContext最終又交由DAGScheduler的runJob進行計算job執行
  • DAGScheduler使用handleJobSubmitted方法處理job,第一步是根據RDD中的DAG構建Stage列表。不涉及數據移動的transformation會被放到一個stage裏面,好比filter和map操做,他們能夠並行的在各分區中執行。第二步經過submitStage提交Stage到集羣
  • DAGScheduler submitStage再調用submitMissingTasks方法。submitMissingTasks會將stage轉化成task
  • task最後經過TaskScheduler提交到spark集羣的worknode,進行實際執行

###5、RDD Transformation 將RDD進行一系列變換,生成新的RDD的過程,叫作Transformation。全部那些能夠就地計算,而不須要數據遷移的transformation叫作Narrow Transformation。

####5.1 transformation大概源碼 以map操做爲例

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //將傳進來的函數f進行clean,這裏先不深究,只須要知道clean後的函數,跟原函數功能相同
    
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    //這裏返回MapPartitionsRDD對象,其構造參數爲當前RDD和一個將f應用於迭代器的函數定義
  }
複製代碼
  • 能夠看到map方法並無執行任何函數,而只是將全部計算過程和數據包裝成MapPartitionsRDD後返回。這也就是transformation操做,lazy Computing的特色所在。
  • 全部tranformation返回的都是RDD,好比filter。其他transfomation 函數源碼大體同map相似,再也不贅述

5.2 flatMap

map操做是將迭代RDD中的每一個元素,而後將其作必定加工,返回的的依然是一個元素。而flapMap接受的函數參數的入參是RDD中的每一個元素,但對該元素處理後,返回的是一個集合,而不是一個元素。flatMap源碼以下:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }
複製代碼

總結來講,map和flapMap的異同點以下: - map接受的函數參數簽名是:(f: T => U)而flatMap接受的函數參數簽名爲:(f: T => TraversableOnce[U]),能夠看到返回的是集合

  • map和flatMap的返回值都是RDD[T]。也便是說,flatMap擁有將多個集合數據合併,抹平的功效,從該函數的命名也可看出這一點,flat是平的意思

5.2 Narrow Transformations

Narrow Transformation操做有

5.3 Wide Transformations

有些計算,須要依賴其餘節點數據,這種計算會致使數據移動,成爲Wide Transformations。好比,基於某個key分類的操做GroupByKey,這個Key可能散落在不一樣的work node上,爲了進行GroupByKey計算,須要計算節點間進行數據移動,好比將某個Key對應的數據,統一移動到一個節點上。Wide Transformation操做有以下:

6、RDD Action

全部Tranformation操做,都不會真正執行,直到Action操做被調用,Action操做返回是具體值,而不是RDD。這種特性成爲Lazy Computing. Action操做觸發後,會將執行結果發給Driver 或者寫如到外部存儲。如下操做屬於Action操做: First(), take(), reduce(), collect(), count()

6.1 關鍵action源碼

全部action操做,最終都會調用SparkContext的runJob方法。runJob有需多重載方法,以其中一個爲例

def runJob[T, U: ClassTag](
      rdd: RDD[T],//須要處理的RDD數據
      processPartition: Iterator[T] => U,//須要在每一個數據分區上進行的操做
      resultHandler: (Int, U) => Unit)//如何將上述每一個分區處理後的結果進行處理
複製代碼

能夠看到runJob中體現了全部分佈式計算理論架構,即MapReduce。其中processPartition定義每一個分區要須要作的map操做,這一步將減小數據量,將map操做的結果作爲輸入,傳進reduce操做,進行彙總處理。

6.2 aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
    sc.runJob(this, aggregatePartition, mergeResult)//4
    jobResult//5
  }
複製代碼
  1. 定義一個結果彙總變量,它將存儲aggregate方法最終的返回結果,初始值爲zeroValue
  2. 在每一個RDD數據分區上,使用迭代器,應用aggregate方法,初始值爲都爲zeroValue
  3. 對每一個分區的結果,使用combOp方法進行彙總計算。輸入index爲分區的編號,taskResult爲上一步每一個分區計算後的結果,同彙總變量jobResult再來進行combOp計算。從第一步可知,jobResult的初始值爲zeroValue
  4. 將上述兩個函數做爲入參,傳遞給sc.runJob方法,在spark集羣進行執行
  5. 返回結果

舉例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.aggregate(3)(
      (acc, value) => {
        println(acc+":"+value)
        (acc + value._2)
      },
      (acc1, acc2) => (acc1 * acc2)
    )
    println(result)//輸出4032
複製代碼

解釋:

  • 上述RDD,被切分紅兩個分區。第一個分區數據是("maths", 21) ,另外一個是:("english", 22),("science", 31)

  • (acc + value._2)是每一個分區要執行的操做,迭代器帶入zeroValue=3後,兩個分片的計算中間值以下

    3+21=24//分區1 3+22+31=56//分區2

  • 最後將每一個分區結果帶入(acc1 * acc2)函數,從aggregate源碼得知,結果計算也要運用zeroValue,在這裏也就是3.因而最終步執行的計算以下:

    32456=4032

6.3 fold

fold函數同aggregate相似,一樣是調用SparkContext的runJob函數,只不過fold只接受一個值參數,和一個函數參數,其內部在調用runJob時,分區計算和結果計算都使用一樣的函數。源碼以下:

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }
複製代碼

舉例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
  (acc, ele) => {
    println(acc+":"+ele)
    ("result",acc._2 + ele._2)
  }
)
println(result)//輸出:(result,83)
複製代碼

假設註釋1中切分的2個分區爲("maths", 21)和("english", 22),("science", 31),那麼執行過程以下:

  1. 3+21=24
  2. 3+22+31=56
  3. 3+24+56=83

6.4 reduce

reduce一樣調用了SparkContext的runJob函數,但reduce接收的參數在fold上進一步簡化,少了zeroValue參數,只接收一個函數參數便可。一樣該參數,在調用runJob時,即做爲分區收斂的函數,記做爲分區彙總計算的函數

def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }
複製代碼

舉例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.reduce(
      (acc, ele) => {
        println(acc+":"+ele)
        ("result",acc._2 + ele._2)
      }
    )
    println(result)//結果爲:(result,74)
複製代碼

6.5 collect&top

collect和top方法都會將數據收集到driver本地,前者是收集所有,後者是收集指定條數。因此最好知道收集的數據集較小時使用。不然會有很大的性能問題,好比大數量的傳輸,以及driver本地的內存壓力

6.6 reduce和reduceByKey

前者是action操做,後者是transformation操做

###7、RDD cache優化 RDD的數據,來至於外部存儲介質,好比磁盤。而每一次用該RDD,都要去磁盤加載,這有時間和性能上的損耗。可使用rdd的cahce方法,將該RDD緩存到內存,這樣後續重複使用該RDD時,直接去內存拿。 cache的幾個級別

  • MEMORY_ONLY 只緩存到內存,內存裝不下的部分,下次用到時再從新計算
  • MEMORY_AND_DISK 緩存到內存,內存裝不下的,緩存到磁盤。這樣,下次須要時,不在內存部分的數據直接從磁盤獲取就行,不用從新計算
  • MEMORY_ONLY_SER 只緩存到內存,但爲了節約空間,將緩存對象序列化後存儲
  • MEMORY_AND_DISK_SER 緩存到內存,裝不下的數據緩存到磁盤,都是用序列化方式存儲
  • DISK_ONLY 只緩存到磁盤

7.1 stage

按數據是否在分區間遷移,來劃分stage。一個stage有多個task,他們會併發的在不一樣的分區上執行相同的計算代碼。好比緊鄰的map和filter就會被劃在同一個stage,由於他們能夠併發在各分區上執行,而不須要數據移動。而reduceByKey則會單獨成爲一個stage,由於其涉及到數據移動

8、RDD lineage & DAG

RDD 從一個RDD轉化成另外一個RDD時,每一步都會記錄上一個RDD關係。因而這造成一個血統譜系。具體

val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    println(wordCount1.toDebugString)
複製代碼

最終輸出:

(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
 +-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
    |  MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
    |  InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
    |  InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []
複製代碼

能夠看到結果以倒序的方式輸出,有點像java異常時,打出的依賴棧。從最近的依賴點,一直回溯

9、DataFrame

在RDD上進一步封裝的數據結構。這種數據結構可使用SparkSql去操做處理數據,這下降了對分佈式數據集的使用難度。由於你只要會sql,就能夠進行一些處理

10、GraphX

###11、 如何調優 一個Spark應用最會對應多個JVM進程。分佈式driver,以及該應用在每一個worknode上起的JVM進程,因爲driver擔任的協調者角色,實際執行是worknode上的EXECUTOR,因此對於JVM的調優,主要指對Executor的調優。這些JVM進程彼此會通訊,好比數據shuffle。因此優化Spark應用的思路主要從如下個方面入手:

  • 作個一個JVM應用,須要關注JVM的垃圾回收狀況,各年齡帶的內存分配。這個須要基於具體應用具體分析
  • 因爲多個JVM進程之間設計跨網絡,跨機器的數據傳輸,那麼須要考慮如何減少傳輸數據量。好比將數據序列化
  • 對於Spark計算框架自己的特色,還有對數據量較大的輸入,採用提升併發度,來切分輸入大小。頻繁使用的RDD,進行緩存,減少集羣重複計算加載的開銷。將各分區都要用到的公共大變量,提早brodcast到各集羣等

11.1 序列化

經過sparkConf conf.set(「spark.serializer」, 「org.apache.spark.serializer.KyroSerializer」)來配置,指定數據對象的序列化方式

12、參考資料

data-flair.training/blogs/spark…

spark.apache.org/docs/1.6.1/

相關文章
相關標籤/搜索