標籤(空格分隔): Sparkgit
先回顧一下WordCount的過程:web
sc.textFile("README.rd").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)
val rawFile = sc.textFile("README.rd")
HadoopRDD
--> MappedRDD
;val splittedText = rawFile.flatMap(line => line.split(" "))
MappedRDD
--> FlatMappedRDD
;val wordCount = splittedText.map(word => (word, 1))
FlatMappedRDD
-- > MappedRDD
;val reduceJob = wordCount.reduceByKey(_+_)
reduceByKey
不是MappedRDD
的方法。reduceJob.foreach(println)
ClosureCleaner
的主要功能當Scala在建立一個閉包時,須要先斷定那些變量會被閉包所使用並將這些須要使用的變量存儲在閉包以內。可是有時會捕捉太多沒必要要的變量,形成帶寬浪費和資源浪費,ClosureCleaner
則能夠移除這些沒必要要的外部變量。編程
常常會遇到Task Not Serializable
錯誤,產生沒法序列化的緣由就是在RDD的操做中引用了沒法序列化的變量。設計模式
做業的提交過程主要涉及Driver和Executor兩個節點。
在Driver中主要解決一下問題:api
(對於WordCount程序來講,一直到foreach()階段纔會被提交,分析,執行!!)瀏覽器
Spark中的RDD之間的依賴分爲窄依賴和寬依賴。緩存
將會致使窄依賴的Transformation
有:閉包
將會致使寬依賴的Transformation
有:併發
Scheduler會計算RDD之間的依賴關係,將擁有持續窄依賴的RDD歸併到同一個Stage中,而寬依賴則做爲劃分不一樣Stage的判斷標準。其中,handleJobSubmitted
和submitStage
主要負責依賴性分析,對其處理邏輯作進一步的分析。app
handleJobSubmitted
-- 生成finalStage
併產生ActiveJob
finalStage = new Stage(finalRDD, partitions.size, None, jobId, callSite); //生成finalStage val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //根據finalStage產生ActiveJob
newStage
-- 建立一個新的Stageprivate def newStage(rdd:RDD[_], numTasks:Int, shuffleDep:Option[shuffleDependency[_,_,_]], jobId:Int, callSite:CallSite) : Stage = { val stage = new Stage(id,rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) } //參數含義:id -- Stage的序號,數字越大,優先級越高 //rdd:Rdd[_] -- 歸屬本Stage的最後一個rdd //numTasks -- 建立的Task數目,等於父rdd的輸出Partition的數目 //parents -- 父Stage列表
也就是說,在建立Stage的時候,已經清楚該Stage須要從多少不一樣的Partition讀入數據,並寫出到多少個不一樣的Partition中,即輸入與輸出的個數已經明確。
submitStage
-- 遞歸完成所依賴的Stage而後提交1) 所依賴的Stage是否都已經完成,若是沒有則先執行所依賴的Stage;
2) 若是所依賴的Stage已經完成,則提交自身所處的Stage。
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if(jobId.isDefined) { .... //依次處理所依賴的沒有完成的Stage } else { abortStage(stage, "No active job for stage " + stage.id) //提交自身的Stage } }
getMissingParentStage
-- 經過圖的遍歷,找出依賴的全部父Stageprivate def getMissingParentStage(stage: Stage) : List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] }
Stage的劃分是如何肯定的呢? -- 重要的判斷依據是是否存在ShuffleDependency
,若是有則建立一個新的Stage。
如何判斷是否存在ShuffleDependency
呢? -- 取決於RDD的轉換。ShuffledRDD, CoGroupedRDD, SubtractedRDD
都會返回ShuffleDependency
。
getDependencies
-- 對於所建立的RDD,明確其Dependency類型override def getDependencies: Seq[Dependency[_]] = { List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) }
Stage劃分完畢就會明確如下內容:
1) 產生的Stage須要從多少個Partition中讀取數據;
2) 產生的Stage會生成多少個Partition -- 決定須要產生多少不一樣的Task;
3) 產生的Stage是否屬於ShuffleMap類型 -- 決定生成的Task類型。
Spark中共分2種不一樣的Task:ShuffleMap和ResultTask。
在做業提交及執行期間,Spark會產生大量的消息交互,那麼這些信息如何進行交互的呢?
HelloWorld in Akka:
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class HelloActor extends Actor { def receive = { case "hello" => println("hello back at you") case _ => println("huh?") } } object Main extends App { val system = ActorSystem("HelloSystem") //default Actor constructor val helloActor = System.actorOf(Props[HelloActor], name = "helloactor") helloActor ! "hello" helloActor ! "dias" }
注意:
!
;receive
函數來處理接收到的消息。ShuffleMapTask(Map)
和ResultTask(Reduce)
兩種;isShuffleMap
標記肯定Task的類型,若是標記爲True則建立shuffleMapTask
,不然建立ResultTask
;submitMissingTasks
負責建立新的Task(根據isShuffleMap
標誌來肯定是哪一種Task,而後肯定Stage的輸出和輸出Partition);makeOffers
-- 處理DriverActor接收到的消息信號TaskschedulerImpl
發送ReviveOffers
消息給DriverActor
,DriverActor
接收到消息後,調用makeOffers
處理消息;
def makeOffers() { launchTasks(scheduler.resourceOffers( executorHost.toArray.map{case(id, host) => new WorkerOffer(id, host, freeCores(id))})) }
makeOffers的處理邏輯爲:
resourceOffers
-- 任務分發SchedulerBackend
-- 將新建立的Task分發給ExecutorLaunchTasks
-- 發送指令TaskDescription
-- 完成序列化launchTask
//CoarseGrainedSchedulerBackend.launchTasks def launchTasks(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner
-- 反序列化updateDependencies
-- 解決依賴性問題TaskRunner會啓動一個新的線程,如何在run中調用用戶本身定義的處理函數呢?做用於RDD上的Operation是如何真正起做用的呢?
TaskRunner.run |_Task.run |_Task.runTask |_RDD.iterator |_RDD.computeOrReadCheckpoint |_RDD.compute
Task在執行的時候,會產生大量的數據交互,這些數據能夠分紅3種不一樣的類型:
1)狀態相關,如StatusUpdate;
2)中間結果;
3)計算相關的數據Metrics Data.
ShuffleMapTask和ResultTask返回的結果有什麼不一樣:
ShuffleMapTask
須要返回MapStatus,而ResultTask
只須要告知是否已經成功完成執行;ScheduleBack
接收到Executor發送過來的StatusUpdate;ScheduleBackend
接收到StatusUpdate以後:若是任務已經成功處理,則將其從監視列表中刪除。若是整個做業都完成,將佔用的資源釋放;TaskSchedulerImpl
將當前順利完成的任務放入完成隊列,同時取出下一個等待運行的Task;DAGSchedule
中的handleTaskCompletion
,會針對ResultTask和ShuffleMapTask區別對待結果:DAGSchedule
會發出TaskSucced來通知對整個做業執行狀況感興趣的監聽者出於容錯性及效率方面的考慮,有時須要將中間結果進行持久化保存,能夠方便後面再次利用到該RDD時不須要從新計算。
中間結果的存儲有兩種方式:Checkpoint 和 Cache
當用戶在使用Spark時,不管對Spark Cluster的運行狀況仍是Spark Application運行時的一些細節,但願可以可視化的觀察。
瀏覽器輸入:http://localhost:8080
Http Server是如何啓動的,網頁中顯示的數據是從哪裏獲得的?
1) Spark用到的Http Server是Jetty,用Java編寫,可以嵌入到用戶程序中執行,不用想Tomcat或JBoss那樣須要本身獨立的JVM進程。
2) SparkUI在SparkContext初始化時建立。
//Initial the spark UI, registering all asociated listeners private[spark] val ui = new SparkUI(this) ui.bind() //bind()函數真正啓動JettyServer
3) SparkListener持續監聽Stage和Task相關事件的發生,並進行數據更新(典型的觀察者設計模式)。
測量模塊是不可或缺的,經過測量數據來感知系統的運行狀況。在Spark中,由MetricsSystem來擔任這個任務。
Instance:
表示誰在使用MetricSystem -- Master,Worker,Executor,Client Driver;Source:
表示數據源;Sinks:
數據目的地:在WordCount程序中,在JobTracker提交以後,被DAGScheduler分爲兩個Stage:ShuffleMapTask和ResultTask。ShuffleMapTask的輸出數據是ResultTask的輸入。
ShuffleMapTask.runTask ---| |-->ShuffledRDD.compute ---| | | | V-Store V-Store
那麼問題來了,ShuffleMapTask的計算結果是如何被ResultTask得到的呢?
1)ShuffleMapTask將計算的狀態(不是具體的計算數值)包裝爲MapStatus返回給DAGScheduler;
2)DAGScheduler將MapStatus保存到MapOutputTrackerMaster中;
3)ResultTask在調用ShuffledRDD時會利用BlockStoreShuffleFetcher中的fetch方法獲取數據:
a. 首先要諮詢MapOutputTrackerMaster所要獲取數據的location;
b. 根據返回的結果調用BlockManager.getMultiple獲取到真正的數據。
其中,MapStatus的結構如上圖所示,由blockmanager_id 和 byteSize
構成,blockmanager_id
表示計算的中間結果數據實際存儲在哪一個BlockManager,byteSize
表示不一樣reduceid所要讀取的數據的大小。
寫入過程:
ShuffleMapTask.runTask HashShuffleWriter.write BlockObjectWriter.write
HashShuffleWriter.write
主要完成兩件事情:
<hello, 1>
和<hello, 1>
都要寫入的話,要先生成<hello, 2>
,再進行後續的寫入工做;<k, val>
寫入哪個文件中。(shuffle_id, map_id, reduce_id)
決定,;ShuffledRDD的compute函數式讀取ShuffleMapTask計算結果的觸點。
ShuffleRDD.compute()
-- 觸發讀取ShuffleMapTask的計算結果override def compute(split:Partition, context:TaskContext) : Iterator[P] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K,V,C]] SparkEnv.get.shuffleManager.getReader().**read()**.asInstanceOf[Iterator[P]] //getReader()返回HashShuffleReader ...... }
HashShuffleReader.read()
override def read() : Iterator[Product2[K,C]] = { val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, Serializer.getSerializer(dep.serializer)) ..... }
BlockStoreShuffleFetcher.fetch()
BlockStoreShuffleFetcher
須要解決的問題:
如何根據得到的MapStatus取相應的BlockManager獲取數據。
若是所要獲取的文件落在本地,則調用getLocal讀取;不然發送請求到遠端BlockManager。
Spark內存的消耗。
Spark對內存的要求較高,在ShuffleMapTask和ResultTask中,因爲須要先將計算結果保存在內存,而後寫入磁盤,若是每一個數據分區的數據很大則會消耗大量的內存。
在Spark運行過程當中,能夠將結果顯示地保存下來,那麼若是想獲取緩存中的數據該怎麼辦?
CacheManager
:RDD在進行計算轉換的時候,經過CacheManager來獲取數據,並經過CacheManager來存儲計算結果;BlockManager
:CacheManager在讀取和存儲數據的時候主要依賴BlockManager來操做,它決定數據是從內存仍是磁盤讀取數據;MemoryStore
:負責將數據保存在或從內存中讀取數據;DiskStore
:複雜將數據保存在或從內存中讀取數據;BlockManagerWorker
:數據寫入本地的MemoryStore或DiskStore是一個同步操做,爲了保證容錯性還須要將數據複製到其餘節點,由BlockManagerWorker異步完成數據複製操做;ConnectionManager
:負責與其餘計算節點創建鏈接,並負責數據的發送和接收;BlockManagerMaster
:該模塊只運行在Driver Application所在的Executor,功能是負責記錄下全部BlockId存儲在哪一個SlaveWorker上。每一個存儲子模塊有SparkEnv來建立,建立過程在SparkEnv.create中完成。
① RDD.iterator是與Storage子系統交互的入口;
② CacheManager.getOrCompute調用BlockManager中的put接口來寫入數據;
③ 數據優先寫入到MemoryStore,若是內存已滿,則將最近使用次數較少的數據寫入磁盤;
④ 通知BlockManagerMaster有新的數據寫入,在BlockManagerMaster中保存元數據;
⑤ 若是數據備份數目大於1,則將寫入的數據與其餘Slave Worker同步。
BlockManager.get()
,先嚐試從本地獲取,若是所要獲取的內容不在本地,則發起遠程獲取。getRemote -> doGetRemote
;Spark優先將計算結果存儲到內存中,當內存不足的時候,寫到外部磁盤,究竟是怎樣作的呢?
Tachyon以Master/Worker的方式組織集羣,由Master負責管理、維護文件系統,文件數據存儲在Worker節點中。
在最新的Spark中,Storage子系統引入了TachyonStore,在內存中實現了HDFS文件系統的接口,主要目的是儘量的利用內存來做爲數據持久層,避免過多的磁盤讀寫操做。