Spark學習(四) -- Spark做業提交

標籤(空格分隔): Sparkgit


做業提交

先回顧一下WordCount的過程:web

sc.textFile("README.rd").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)
  • 步驟一:val rawFile = sc.textFile("README.rd")
  • texyFile先生成HadoopRDD --> MappedRDD
  • 步驟二:val splittedText = rawFile.flatMap(line => line.split(" "))
  • flatMap將原來的MappedRDD --> FlatMappedRDD;
  • 步驟三:val wordCount = splittedText.map(word => (word, 1))
  • 將詞語生成相應的鍵值對,FlatMappedRDD -- > MappedRDD;
  • 步驟四:val reduceJob = wordCount.reduceByKey(_+_)
  • 其中,reduceByKey不是MappedRDD的方法。
  • Scala將MappedRDD隱式轉換爲PairRDDFunctions
  • 步驟五:觸發執行reduceJob.foreach(println)
  • foreach會調用sc.runjob,從而生成Job並提交到Spark集羣中運行。

ClosureCleaner的主要功能

當Scala在建立一個閉包時,須要先斷定那些變量會被閉包所使用並將這些須要使用的變量存儲在閉包以內。可是有時會捕捉太多沒必要要的變量,形成帶寬浪費和資源浪費,ClosureCleaner則能夠移除這些沒必要要的外部變量。編程

常常會遇到Task Not Serializable錯誤,產生沒法序列化的緣由就是在RDD的操做中引用了沒法序列化的變量。設計模式

做業執行

做業的提交過程主要涉及Driver和Executor兩個節點。
在Driver中主要解決一下問題:api

  • RDD依賴性分析,以生成DAG;
  • 根據RDD DAG將Job分割爲多個Stage;
  • Stage一經確認,即生成相應的Task,將生成的Task分發到Executor執行。

此處輸入圖片的描述

(對於WordCount程序來講,一直到foreach()階段纔會被提交,分析,執行!!)瀏覽器

依賴性分析及Stage劃分

Spark中的RDD之間的依賴分爲窄依賴和寬依賴。緩存

  • 窄依賴是指父RDD的全部輸出都會被指定的子RDD使用,也就是輸出路徑是指定的;
  • 寬依賴是指父RDD的輸出由不一樣的子RDD使用,輸出路徑不固定。

此處輸入圖片的描述

將會致使窄依賴的Transformation有:閉包

  • map
  • flatmap
  • filter
  • sample

將會致使寬依賴的Transformation有:併發

  • sortByKey
  • reduceByKey
  • groupByKey
  • cogroupByKey
  • join
  • cartensian

Scheduler會計算RDD之間的依賴關係,將擁有持續窄依賴的RDD歸併到同一個Stage中,而寬依賴則做爲劃分不一樣Stage的判斷標準。其中,handleJobSubmittedsubmitStage主要負責依賴性分析,對其處理邏輯作進一步的分析。app

handleJobSubmitted -- 生成finalStage併產生ActiveJob

finalStage = new Stage(finalRDD, partitions.size, None, jobId, callSite); //生成finalStage
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //根據finalStage產生ActiveJob

newStage -- 建立一個新的Stage

private def newStage(rdd:RDD[_], numTasks:Int, shuffleDep:Option[shuffleDependency[_,_,_]], jobId:Int, callSite:CallSite) : Stage = {
    val stage = new Stage(id,rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
}
//參數含義:id -- Stage的序號,數字越大,優先級越高
//rdd:Rdd[_] -- 歸屬本Stage的最後一個rdd
//numTasks -- 建立的Task數目,等於父rdd的輸出Partition的數目
//parents -- 父Stage列表

也就是說,在建立Stage的時候,已經清楚該Stage須要從多少不一樣的Partition讀入數據,並寫出到多少個不一樣的Partition中,即輸入與輸出的個數已經明確。

submitStage -- 遞歸完成所依賴的Stage而後提交

1) 所依賴的Stage是否都已經完成,若是沒有則先執行所依賴的Stage;
2) 若是所依賴的Stage已經完成,則提交自身所處的Stage。

private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if(jobId.isDefined) {
        ....
        //依次處理所依賴的沒有完成的Stage
    } else {
    abortStage(stage, "No active job for stage " + stage.id) //提交自身的Stage
    }
}

getMissingParentStage -- 經過圖的遍歷,找出依賴的全部父Stage

private def getMissingParentStage(stage: Stage) : List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
}

Stage的劃分是如何肯定的呢? -- 重要的判斷依據是是否存在ShuffleDependency,若是有則建立一個新的Stage。
如何判斷是否存在ShuffleDependency呢? -- 取決於RDD的轉換。ShuffledRDD, CoGroupedRDD, SubtractedRDD都會返回ShuffleDependency

getDependencies -- 對於所建立的RDD,明確其Dependency類型

override def getDependencies: Seq[Dependency[_]] = {
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

Stage劃分完畢就會明確如下內容:
1) 產生的Stage須要從多少個Partition中讀取數據;
2) 產生的Stage會生成多少個Partition -- 決定須要產生多少不一樣的Task;
3) 產生的Stage是否屬於ShuffleMap類型 -- 決定生成的Task類型。

Spark中共分2種不一樣的Task:ShuffleMap和ResultTask。

Actor Model和Akka -- 消息交互機制

在做業提交及執行期間,Spark會產生大量的消息交互,那麼這些信息如何進行交互的呢?

Actor Model

  • Actor Model最適合用於解決併發編程問題。
  • 每一個Actor都是一個獨立的個體,它們之間沒有任何繼承關係,全部的交互經過消息傳遞完成;
  • 每一個Actor的行爲只有3種:消息接收;消息處理;消息發送;
  • 爲啥不適用共享內存的方式來進行信息交互呢?
  • 共享內存會致使併發問題,爲了解決狀態不一致,要引入鎖,對鎖的申請處理很差又容易造成死鎖,同時性能會降低!

HelloWorld in Akka:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor {
    def receive = {
        case "hello" => println("hello back at you")
        case _       => println("huh?")
    }
}

object Main extends App {
    val system = ActorSystem("HelloSystem")
    //default Actor constructor
    val helloActor = System.actorOf(Props[HelloActor], name = "helloactor")
    helloActor ! "hello"
    helloActor ! "dias"
}

注意:

  1. 首先要建立一個Actor;
  2. 消息發送要使用!
  3. Actor中必須實現receive函數來處理接收到的消息。

任務建立和分發

  • Spark將由Executor執行的Task分爲ShuffleMapTask(Map)ResultTask(Reduce)兩種;
  • 每一個Stage生成Task的時候,根據Stage中的isShuffleMap標記肯定Task的類型,若是標記爲True則建立shuffleMapTask,不然建立ResultTask
  • submitMissingTasks負責建立新的Task(根據isShuffleMap標誌來肯定是哪一種Task,而後肯定Stage的輸出和輸出Partition);
  • 一旦任務任務類型及任務個數肯定後,由Executor啓動相應的線程來執行;

makeOffers -- 處理DriverActor接收到的消息信號

TaskschedulerImpl發送ReviveOffers消息給DriverActor,DriverActor接收到消息後,調用makeOffers處理消息;

def makeOffers() {
    launchTasks(scheduler.resourceOffers(
    executorHost.toArray.map{case(id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

makeOffers的處理邏輯爲:

  1. 找到空閒的Executor,分發的策略是隨機分發,儘量的將任務平攤到每一個Executor;
  2. 若是有空閒額Executor,就將任務列表中的部分任務利用launchTasks發送給指定的Executor。

resourceOffers -- 任務分發

SchedulerBackend -- 將新建立的Task分發給Executor

LaunchTasks -- 發送指令

TaskDescription -- 完成序列化

任務執行

  • LaunchTask消息被Executor接收,Executor會使用launchTask對給消息進行處理;
  • 若是Executor沒有被註冊到Driver,即便接收到launchTask指令,也不會作任何處理。

launchTask

//CoarseGrainedSchedulerBackend.launchTasks
def launchTasks(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
}

TaskRunner -- 反序列化

updateDependencies -- 解決依賴性問題

Shuffle Task

TaskRunner會啓動一個新的線程,如何在run中調用用戶本身定義的處理函數呢?做用於RDD上的Operation是如何真正起做用的呢?

TaskRunner.run
       |_Task.run
            |_Task.runTask
                    |_RDD.iterator
                            |_RDD.computeOrReadCheckpoint
                                    |_RDD.compute

Reduce Task

Task在執行的時候,會產生大量的數據交互,這些數據能夠分紅3種不一樣的類型:
1)狀態相關,如StatusUpdate;
2)中間結果;
3)計算相關的數據Metrics Data.

ShuffleMapTask和ResultTask返回的結果有什麼不一樣:

  • ShuffleMapTask須要返回MapStatus,而ResultTask只須要告知是否已經成功完成執行;
  • ScheduleBack接收到Executor發送過來的StatusUpdate;
  • ScheduleBackend接收到StatusUpdate以後:若是任務已經成功處理,則將其從監視列表中刪除。若是整個做業都完成,將佔用的資源釋放;
  • TaskSchedulerImpl將當前順利完成的任務放入完成隊列,同時取出下一個等待運行的Task;
  • DAGSchedule中的handleTaskCompletion,會針對ResultTask和ShuffleMapTask區別對待結果:
  • 若是ResultTask執行成功,DAGSchedule會發出TaskSucced來通知對整個做業執行狀況感興趣的監聽者

Checkpoint和Cache -- 存儲中間結果

出於容錯性及效率方面的考慮,有時須要將中間結果進行持久化保存,能夠方便後面再次利用到該RDD時不須要從新計算。

中間結果的存儲有兩種方式:Checkpoint 和 Cache

  • Checkpoint將計算結果寫入到HDFS文件系統中,但不會保存RDD Lineage;
  • Checkpoint有兩種類型:Data Checkepoint 和 Metadata Checkpoint;
  • Cache則將數據緩存到內存,若是內存不足時寫入到磁盤,同時將Lineage也保存下來。

WebUI和Metrics -- 可視化觀察工具

當用戶在使用Spark時,不管對Spark Cluster的運行狀況仍是Spark Application運行時的一些細節,但願可以可視化的觀察。

WebUI

瀏覽器輸入:http://localhost:8080

Http Server是如何啓動的,網頁中顯示的數據是從哪裏獲得的?

1) Spark用到的Http Server是Jetty,用Java編寫,可以嵌入到用戶程序中執行,不用想Tomcat或JBoss那樣須要本身獨立的JVM進程。
2) SparkUI在SparkContext初始化時建立。

//Initial the spark UI, registering all asociated listeners
private[spark] val ui = new SparkUI(this)
ui.bind() //bind()函數真正啓動JettyServer

3) SparkListener持續監聽Stage和Task相關事件的發生,並進行數據更新(典型的觀察者設計模式)。

Metrics

測量模塊是不可或缺的,經過測量數據來感知系統的運行狀況。在Spark中,由MetricsSystem來擔任這個任務。

  • Instance:表示誰在使用MetricSystem -- Master,Worker,Executor,Client Driver;
  • Source:表示數據源;
  • Sinks:數據目的地:
  • ConsoleSink -- 輸出到控制檯;
  • CSVSink -- 按期保存爲CSV文件;
  • JmxSink -- 註冊到Jmx;
  • MetricsServlet -- 在SparkUI中添加MetricsServlet,以查看Task運行時的測量數據;
  • GraphiteSink -- 發送給Grapgite以對整個系統進行監控。

存儲機制

在WordCount程序中,在JobTracker提交以後,被DAGScheduler分爲兩個Stage:ShuffleMapTask和ResultTask。ShuffleMapTask的輸出數據是ResultTask的輸入。

ShuffleMapTask.runTask ---|   |-->ShuffledRDD.compute ---|
                          |   |                          |
                          V-Store                        V-Store

那麼問題來了,ShuffleMapTask的計算結果是如何被ResultTask得到的呢?
1)ShuffleMapTask將計算的狀態(不是具體的計算數值)包裝爲MapStatus返回給DAGScheduler;
2)DAGScheduler將MapStatus保存到MapOutputTrackerMaster中;
3)ResultTask在調用ShuffledRDD時會利用BlockStoreShuffleFetcher中的fetch方法獲取數據:

a. 首先要諮詢MapOutputTrackerMaster所要獲取數據的location;

b. 根據返回的結果調用BlockManager.getMultiple獲取到真正的數據。

此處輸入圖片的描述

其中,MapStatus的結構如上圖所示,由blockmanager_id 和 byteSize構成,blockmanager_id表示計算的中間結果數據實際存儲在哪一個BlockManager,byteSize表示不一樣reduceid所要讀取的數據的大小。

Shuffle結果寫入

寫入過程:

ShuffleMapTask.runTask
    HashShuffleWriter.write
        BlockObjectWriter.write

HashShuffleWriter.write主要完成兩件事情:

  1. 判斷是否要進行聚合,好比<hello, 1><hello, 1>都要寫入的話,要先生成<hello, 2>,再進行後續的寫入工做;
  2. 利用Partitioner函數來決定<k, val>寫入哪個文件中。
  3. 每個臨時文件由三元組(shuffle_id, map_id, reduce_id)決定,;

shuffle結果讀取

ShuffledRDD的compute函數式讀取ShuffleMapTask計算結果的觸點。

ShuffleRDD.compute() -- 觸發讀取ShuffleMapTask的計算結果

override def compute(split:Partition, context:TaskContext) : Iterator[P] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K,V,C]]
    SparkEnv.get.shuffleManager.getReader().**read()**.asInstanceOf[Iterator[P]] //getReader()返回HashShuffleReader
    ......
}

HashShuffleReader.read()

override def read() : Iterator[Product2[K,C]] = {
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, Serializer.getSerializer(dep.serializer))
    .....
}

BlockStoreShuffleFetcher.fetch()

BlockStoreShuffleFetcher須要解決的問題:

  • 所要獲取的mapid的MapStatus的內容是什麼;
  • 如何根據得到的MapStatus取相應的BlockManager獲取數據。

  • 一個ShuffleMapTask會產生一個MapStatus,MapStatus中含有當前ShuffleMapTask產生的數據落到各個Partition中的大小,若是爲0則表示該分區沒有數據產生;
  • 索引爲reduceId,若是array(0) == 0則表示上一個ShuffleMapTask中生成的數據中沒有任何內容能夠做爲reduceId爲0的ResultTask的輸入;
  • 若是所要獲取的文件落在本地,則調用getLocal讀取;不然發送請求到遠端BlockManager。

Spark內存的消耗。
Spark對內存的要求較高,在ShuffleMapTask和ResultTask中,因爲須要先將計算結果保存在內存,而後寫入磁盤,若是每一個數據分區的數據很大則會消耗大量的內存。

  • 每一個Writer開啓100KB的緩存;
  • Records會佔用大量內存;
  • 在ResultTask的combine階段,利用HashMap來緩存數據。若是讀取的數據量很大或則分區不少,都會致使內存不足。

Memory Store -- 獲取緩存的數據

在Spark運行過程當中,能夠將結果顯示地保存下來,那麼若是想獲取緩存中的數據該怎麼辦?
此處輸入圖片的描述

  • CacheManager:RDD在進行計算轉換的時候,經過CacheManager來獲取數據,並經過CacheManager來存儲計算結果;
  • BlockManager:CacheManager在讀取和存儲數據的時候主要依賴BlockManager來操做,它決定數據是從內存仍是磁盤讀取數據;
  • MemoryStore:負責將數據保存在或從內存中讀取數據;
  • DiskStore:複雜將數據保存在或從內存中讀取數據;
  • BlockManagerWorker:數據寫入本地的MemoryStore或DiskStore是一個同步操做,爲了保證容錯性還須要將數據複製到其餘節點,由BlockManagerWorker異步完成數據複製操做;
  • ConnectionManager:負責與其餘計算節點創建鏈接,並負責數據的發送和接收;
  • BlockManagerMaster:該模塊只運行在Driver Application所在的Executor,功能是負責記錄下全部BlockId存儲在哪一個SlaveWorker上。

存儲子模塊啓動過程分析

每一個存儲子模塊有SparkEnv來建立,建立過程在SparkEnv.create中完成。

數據寫入過程

此處輸入圖片的描述

① RDD.iterator是與Storage子系統交互的入口;
② CacheManager.getOrCompute調用BlockManager中的put接口來寫入數據;
③ 數據優先寫入到MemoryStore,若是內存已滿,則將最近使用次數較少的數據寫入磁盤;
④ 通知BlockManagerMaster有新的數據寫入,在BlockManagerMaster中保存元數據;
⑤ 若是數據備份數目大於1,則將寫入的數據與其餘Slave Worker同步。

數據讀取過程

  • 數據讀取的入口是BlockManager.get(),先嚐試從本地獲取,若是所要獲取的內容不在本地,則發起遠程獲取。
  • 遠程獲取的代碼調用路徑爲:getRemote -> doGetRemote;

TachyonStore

Spark優先將計算結果存儲到內存中,當內存不足的時候,寫到外部磁盤,究竟是怎樣作的呢?

  • Spark實際上將中間結果放在了當前JVM的內存中,也就是JVM既是計算引擎,又是存儲引擎。
  • 當計算引擎中的錯誤致使JVM進程退出時,會致使全部存儲的內存所有消失;
  • 大量的Cache又會使得JVM發生GC的機率增大,嚴重影響計算性能。
  • 所以,使用Tachyon代替JVM的存儲功能。

Tachyon以Master/Worker的方式組織集羣,由Master負責管理、維護文件系統,文件數據存儲在Worker節點中。

  • 底層支持Plugable的文件系統,如HDFS用於用戶指定文件的持久化;
  • 使用Journal機制持久化文件系統中的Metadata;
  • 利用ZooKeeper構件Master的HA;
  • 採用和Spark RDD相似的Lineage的思想用於災難恢復。

在最新的Spark中,Storage子系統引入了TachyonStore,在內存中實現了HDFS文件系統的接口,主要目的是儘量的利用內存來做爲數據持久層,避免過多的磁盤讀寫操做。

相關文章
相關標籤/搜索