----本節內容-------node
1.遺留問題解答web
2.Spark核心概念面試
2.1 RDD及RDD操做算法
2.2 Transformation和Actionsql
2.3 Spark程序架構shell
2.4 Spark on Yarn運行流程編程
2.5 WordCount執行原理數組
3.Spark計算引擎原理緩存
3.1 Spark內部原理網絡
3.2 生成邏輯執行圖
3.3 生成物理執行圖
4.Spark Shuffle解析
4.1 Shuffle 簡史
4.2 Spark Shuffle
·Shuffle Write
·Shuffle Read
·Shuffle Aggregate
5.參考資料
---------------------
有些東西是要常提的,由於過重要了那麼有哪些概念是必須作到成竹在胸的呢?我以爲有如下幾點:1)RDD概念:RDD是什麼,幹什麼的,基本原理是什麼,爲何要整一個RDD這樣的概念,解決了什麼問題,有什麼特色;2)基本算子:Transformation算子,Action算子,經常使用的幾10種算子,哪些是transformation,哪些是action,哪些算子會觸發shuffle等;3)Spark基本架構,Spark程序基本構成,Spark程序執行的幾種模式,程序提交到執行的基本過程。不須要精通全部的細節,但基本概念和原理至少作到八九不離十。若是初次接觸不是很好理解,就多看看視頻各類機構的視頻,多記一點筆記,實在基礎不行,哪怕是抄寫一遍,每一次感受都會不同。這是本身的一點理解,也許每一個人的學習方法都不同,對Spark的理解也不一樣。
另外提一點,寫程序真是個體力活,不消耗體力,可是消耗體能,因此運動很重要,鍛鍊結合興趣才能樂在其中,堅持不解。系列博客中出現不少戶外圖片,其實都是和車友們騎行拍的,各位將就着看吧。
補充:本章的內容很關鍵,反覆學習了好幾遍才動筆寫。
1.遺留問題解答
1.Spark如何處理不能序列化的對象
將不能序列化的對象封裝成object對象。
2.企業級生產平臺如何搭建
說實話,關於生產集羣如何搭建,方法多樣,但細節特別多,不少人講的或者寫的我也不是特別滿意,後面我本身結合實踐寫幾篇對生產平臺搭建的見解;有些細節問題一交流就知道是否是有參與過大型生產集羣運維和開發。
3.使用Intelij開發工具鏈接Spark生產集羣
實際開發過程當中不多這樣作,Intelj編寫程序在本地運行和調試,打成jar包到開發環境,編寫代碼,本地模式跑結果,不作分佈式開發和調試,代價高
4.使用maven開發,打包
在BAT這種級別的公司,通常開發程序打包都是用maven,他們程序那麼多,一個個的打包是不現實的,必須用maven這樣的程序,自動化打包。
5.Spark日誌問題
能夠經過spark web ui查看日誌,前提額是要開啓historyServer,如何開啓spark的history Server自行百度,Spark的historyServer依賴MR的historyServer。若是沒有開啓也能夠經過命令拉取日誌,前提也是要作一些配置,而且用命令拉回任務日誌,用戶名要和提交任務的用戶名一致。
經過Web UI查看程序運行日誌
Web UI 的executor監控界面,這個很是詳細了,能夠看到driver在哪裏運行,消耗的資源,task在哪裏運行消耗的資源,strerr那個連接能夠看到更詳細的日誌狀況,不要誤認爲那裏只是打印錯誤日誌,其實不是的,全部日誌都會網那裏輸出,是標準日誌輸出。
若是提交一個程序啥都不指定,程序默認分配給executor2個task,一個task佔用1G內存和1個CPU.固然,你也能夠指定task佔用的資源,可是原則上不要超過資源分配器設置的閥值,由於資源不夠的時候,你指定多個task,它也不會給你啓動。另外還有一個防止內存碎片的機制,程序申請2.5G,會給你3G,自動補全機制,防止產生內存碎片。
另外從Web UI還能夠看出不少東西,如task執行時間,gc時間,數據是否傾斜,RDD之間的依賴關係等等。總之,Spark Web UI很牛,程序有問題,平臺有問題,都能看出個大概。
6.有時候xxRDD.foreach(println)在shell窗口打印不出來內容
client在本機,是能夠打印出來的,若client不是在本機運行(實際上是在driver運行節點上打印出來了),這樣執行在shell所在界面是打印不出內容的,用這種方式:xxRDD.collect().foreach(println)
7.啓動手報netty 4040端口已經被使用錯誤
正常的錯誤,由於又shell開啓來了,被佔用,會自動使用4041端口,還被佔用,就4042端口
8.Spark shell啓動時會啓動derby
spark shell啓動會啓動spark sql,spark sql默認使用derby保存元數據,可是儘可能不要用derby,它是單實例,不利於開發。會在本地生成一個文件metastore_db,若是啓動報錯,就把那個文件給刪了
9.KeyValue格式數據如何獲取Key和value
屬於scala基礎了
key._1:拿到kv的key
key._2:拿到kv的value
2.Spark核心概念
2.1 RDD及RDD操做
· RDD是什麼數據集,他是一個描述數據在哪裏,對數據作什麼操做,以及操做之間的依賴關係的一個數據集
· 爲何是彈性,主要是說他的存儲,既能夠在內存,也能夠在磁盤
· 分佈式:分佈在集羣上
· 自動重構:失效夠能夠自動重構
2.2 Transformation和Action
rdd是數據,有數據,就有計算操做,基本操做分紅2類(爲何分紅2類),
transformation:一類算子的簡稱,完成轉換功能,函數和算子一個意思,當作一個大的數組,裏面有元素,被切分放到各個節點上。
action:把rdd變換成一個或者一組值,這些是單機的,前面transformation都是分佈式的值,
2.3 Spark程序架構
2個組件組成,application = driver(1個)+ executor(多個)
driver:main函數,2g內存1個cpu,運行指定將相應的jar包和文件傳給work node
application:driver+executor,spark應用程序,2個應用程序是沒有任何關聯,若是共享數據只能hdfs或者tacyon. executor運行是指定,能夠同時跑幾個task,一個application轉爲多個task(driver轉化),task扔給executor執行,
2.4 Spark on Yarn運行流程
1)client發送資源申請請求
2)RM發送通知NodeManger要調用資源,
3)NodeManger啓動AppAplicationMaster
4)AppAplicationMaster通知nodeManager啓動各個Executor
5)nodeManager啓動Executor
6)nodeManager向Driver回報實時執行狀況,也會告知AppAplicationMaster
2.5 WordCount執行原理
WordCount,分佈式計算HelloWord,讀取數據,切分數據,將數據轉爲KeyValue對,根據Key進行規約求和,輸出規約結果,這就是WordCout的過程。就像1千個讀者有一千個哈姆雷特同樣,不一樣的工具設計者,對WordCount的底層實現不同。
MapReduce對WordCount分爲Map和Reduce兩個階執行,這個是很明智的,我以爲MapReduce編程模型就是分佈式計算模型的彙編語言,寫起來很羅嗦,可是霸道啊。Spark再優化,在牛逼,還不是脫離不了Map和Reduce這個編程思想,只不過是人家封裝好了,不要你再寫那麼繁雜的MR代碼罷。WordCount在Spark中執行大概經歷了這麼幾個過程,
每個環節都會產生一個RDD,生成一個RDD的依賴關係,這種依賴關係圖就是邏輯查詢計劃,不涉及物理查詢,根據邏輯關係,算出數據在那裏,考慮數據特徵,知道處理什麼數據,怎麼處理,生成一個物理查詢計劃,根據物理查詢計劃,將rdd劃分不少stage,每一個stage之間會有依賴關係,每一個stage內部會劃分多個task,這些操做都是在driver生成的,driver將生成的可執行物理任務分發到各個executor執行。
問題1:spark中task的類型有幾種?
只有2中,分別是
shuffleMapTask:非最後一輪的task都叫這個shuffleMapTask
ResultTask:最後一輪的task
問題2:每一個stage task數目如何決定?
第一個stage,由hdfs中block的個數決定,可使用命令查看,若是源數據是hbase,region個數決定
其餘stage的task,能夠本身設置,若是沒有設置,和前面task個數同樣
總結:
1)spark中一個action生成一個job。
2)每一個job生成多個stage ,spark有個優化機制一些已經執行的stage會自動跳過
3)每一個stage會有多個task在跑
因此結合WordCount,能夠得出一個做業提交之後在Spark中執行的流程是: driver生成邏輯執行計劃->driver生成物理執行計劃->driver任務調度->executor任務執行;前三個在driver執行,後面一個在節點上分佈式執行。
3.Spark計算引擎原理
上圖再次描述一個程序執行的過程,driver生成邏輯執行計劃->driver生成物理執行計劃->driver任務調度->executor任務執行 。假如面試被問到請你簡要介紹spark計算引擎的原理?我會這樣回答,(個人我的經驗,被面過無數次,也面過別人N次,面試被問到不會答的不要着急,想辦法把你懂的知識點都帶出來,一樣能夠達到效果了)
1)四個階段
邏輯執行計劃-》成物理執行計劃-》任務調度-》任務執行
2)四個對象
driver-》DAGScheduler-》TaskScheduler-》Executor
3)兩種模式
任務解析、優化和提交單機模式-》任務執行分佈式模式
再上一個圖,好好理解,後面都是基於這個展開了。
3.1 生成邏輯執行圖
邏輯執行計劃 , 描述RDD之間的依賴關係,這個是邏輯查詢計劃,但不知道join怎麼算,也不知道groupby該怎麼group,rdd選擇什麼樣的類型,不清楚,說白了就是知道大方向,具體如何作,不知道,紙上談兵。應用提交後,造成RDD Graph,而且在後臺建立DAG對象(spark不只僅用DAG建模,並且還會執行它,而且裏面不是用對象表示,而是用RDD對象之間的關係)
舉例: map->mapedRDD->compute()
這裏還有個重要的知識點,就是RDD之間的關係:寬依賴和窄依賴
前面提到過 RDD 被分紅幾個分區,分散在多臺機器上。當咱們把一個 RDD A 轉化成下一個 RDD B 時,這裏有兩種狀況:
窄依賴:有時候只須要一個 A 裏面的一個分區,就能夠產生 B 裏的一個分區了,好比 map 的例子:A 和 B 之間每一個分區是一一對應的關係,這就是 narrow transofmration.【一對一】
寬依賴: 須要 A 裏面全部的分區,才能產生 B 裏的一個分區,好比 reduceByKey的例子,這就是 wide transformation【多對一】.
爲何要分寬依賴和窄依賴,理解這個很重要,很重要,很重要,董先生竟然沒有和各位同胞講解?
大膽設想一下,若是每一個分區裏的數據就待在那臺機器的內存裏,咱們逐一的調用 map, filter, map 函數到這些分區裏,Job 就很好的完成。
更重要的是,因爲數據沒有轉移到別的機器,咱們避免了 Network IO 或者 Disk IO. 惟一的任務就是把 map / filter 的運行環境搬到這些機器上運行,這對現代計算機來講,overhead 幾乎能夠忽略不計。
這種把多個操做合併到一塊兒,在數據上一口氣運行的方法在 Spark 裏叫 pipeline (其實 pipeline 被普遍應用的不少領域,好比 CPU)。這時候不一樣就出現了:只有 narrow transformation 才能夠進行 pipleline 操做。對於 wide transformation, RDD 轉換須要不少分區運算,包括數據在機器間搬動,因此失去了 pipeline 的前提。
總結起來一句話:數據和算是否在一塊兒,計算的性能是不同的,爲了區分,就有了寬依賴和窄依賴。
3.2 生成物理執行圖
具體的物理查詢計劃是在,選擇什麼樣的算法,根據rdd數據量大小。
Spark 會把這個 RDD邏輯計劃DAG 交給一個叫 DAG scheduler 的模塊,DAG scheduler 會優先使用 pipeline 方法,把 RDD 的 transformation 壓縮;當咱們遇到 wide transformation 時,因爲以前的 narrow transformation 沒法和 wide transformation pipeline, 那 DAG scheduler 會把前面的 transformation 定義成一個 stage.
重要的事情說三遍:DAG scheduler 會分析 Spark Job 全部的 transformation, 用 wide transformation 做爲邊界,把全部 transformation 分紅若干個stages. 一個 stage 裏的一個分區就被 Spark 叫作一個task. 因此一個 task 是一個分區的數據和數據上面的操做,這些操做可能包括一個 transformation,也多是多個,但必定是 narrow transformation.
DAG scheduler 工做的結果就是產生一組 stages. 這組 stages 被傳到 Spark 的另外一個組件 task scheduler, task scheduler 會使用集羣管理器依次執行 task, 當全部的 task 執行完畢,一個 stage 標記完成;再運行下一個 stage …… 直到整個 Spark job 完成。
3.3 調度並執行task
將DAG Scheduler產生的stages傳送給task scheduler,task scheduler使用集羣管理器依次執行task,task被分配到各個work下執行,當全部的task執行完畢,一個stage標記完成,再運行下一個stage,直到整個spark job完成。
做業調度
FIFO或Fair
優化機制:數據本地性和推測執行
任務執行
Task被序列化後,發送到executor上執行
ShuffleMapTask將中間數據寫到本地,ResultTask遠程讀取數據
數據用的時候再算,並且數據是流到要計算的位置的
4.Spark Shuffle解析
4.1 Shuffle簡史
在MapReduce框架中,shuffle是鏈接Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須通過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark做爲MapReduce框架的一種實現,天然也實現了shuffle的邏輯。shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果須要按key哈希,而且分發到每個Reducer上去,這個過程就是shuffle。因爲shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以shuffle性能的高低直接影響到了整個程序的運行效率。下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間。概念上shuffle就是一個溝通數據鏈接的橋樑。
4.1 Spark Shuffle
以圖爲例簡單描述一下Spark中shuffle的整一個流程:
Shuffle 過程本質上都是將 Map 端得到的數據使用分區器進行劃分,並將數據發送給對應的 Reducer 的過程。首先每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M×RM×R,其中MM是Map的個數,RR是Reduce的個數。
其次Mapper產生的結果會根據設置的partition算法填充到每一個bucket中去。這裏的partition算法是能夠自定義的,固然默認的算法是根據key哈希到不一樣的bucket中去。
當Reducer啓動時,它會根據本身task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket做爲Reducer的輸入進行處理。
4.1Shuffle Write
在Spark 0.6和0.7的版本中,對於shuffle數據的存儲是以文件的方式存儲在block manager中,與rdd.persist(StorageLevel.DISk_ONLY)採起相同的策略,能夠參看:
我已經將一些干擾代碼刪去。能夠看到Spark在每個Mapper中爲每一個Reducer建立一個bucket,並將RDD計算結果放進bucket中。須要注意的是每一個bucket是一個ArrayBuffer,也就是說Map的輸出結果是會先存儲在內存。
Map的輸出必須先所有存儲到內存中,而後寫入磁盤。這對內存是一個很是大的開銷,當內存不足以存儲全部的Map output時就會出現OOM。
每個Mapper都會產生Reducer number個shuffle文件,若是Mapper個數是1k,Reducer個數也是1k,那麼就會產生1M個shuffle文件,這對於文件系統是一個很是大的負擔。同時在shuffle數據量不大而shuffle文件又很是多的狀況下,隨機寫也會嚴重下降IO的性能。
在Spark 0.8版本中,shuffle write採用了與RDD block write不一樣的方式,同時也爲shuffle write單首創建了ShuffleBlockManager,部分解決了0.6和0.7版本中遇到的問題。首先咱們來看一下Spark 0.8的具體實現:
在這個版本中爲shuffle write添加了一個新的類ShuffleBlockManager,由ShuffleBlockManager來分配和管理bucket。同時ShuffleBlockManager爲每個bucket分配一個DiskObjectWriter,每一個write handler擁有默認100KB的緩存,使用這個write handler將Map output寫入文件中。能夠看到如今的寫入方式變爲buckets.writers(bucketId).write(pair),也就是說Map output的key-value pair是逐個寫入到磁盤而不是預先把全部數據存儲在內存中在總體flush到磁盤中去。
Spark 0.8顯著減小了shuffle的內存壓力,如今Map output不須要先所有存儲在內存中,再flush到硬盤,而是record-by-record寫入到磁盤中。同時對於shuffle文件的管理也獨立出新的ShuffleBlockManager進行管理,而不是與rdd cache文件在一塊兒了。
可是這一版Spark 0.8的shuffle write仍然有兩個大的問題沒有解決:
首先依舊是shuffle文件過多的問題,shuffle文件過多一是會形成文件系統的壓力過大,二是會下降IO的吞吐量。
其次雖然Map output數據再也不須要預先在內存中evaluate顯著減小了內存壓力,可是新引入的DiskObjectWriter所帶來的buffer開銷也是一個不容小視的內存開銷。假定咱們有1k個Mapper和1k個Reducer,那麼就會有1M個bucket,於此同時就會有1M個write handler,而每個write handler默認須要100KB內存,那麼總共須要100GB的內存。這樣的話僅僅是buffer就須要這麼多的內存,內存的開銷是驚人的。固然實際狀況下這1k個Mapper是分時運行的話,所需的內存就只有cores * reducer numbers * 100KB大小了。可是reducer數量不少的話,這個buffer的內存開銷也是蠻厲害的。
爲了解決shuffle文件過多的狀況,Spark 0.8.1引入了新的shuffle consolidation,以期顯著減小shuffle文件的數量。
首先咱們以圖例來介紹一下shuffle consolidation的原理
假定該job有4個Mapper和4個Reducer,有2個core,也就是能並行運行兩個task。咱們能夠算出Spark的shuffle write共須要16個bucket,也就有了16個write handler。在以前的Spark版本中,每個bucket對應的是一個文件,所以在這裏會產生16個shuffle文件。
而在shuffle consolidation中每個bucket並不是對應一個文件,而是對應文件中的一個segment,同時shuffle consolidation所產生的shuffle文件數量與Spark core的個數也有關係。在上面的圖例中,job的4個Mapper分爲兩批運行,在第一批2個Mapper運行時會申請8個bucket,產生8個shuffle文件;而在第二批Mapper運行時,申請的8個bucket並不會再產生8個新的文件,而是追加寫到以前的8個文件後面,這樣一共就只有8個shuffle文件,而在文件內部這有16個不一樣的segment。所以從理論上講shuffle consolidation所產生的shuffle文件數量爲C×R,其中C是Spark集羣的core number,R是Reducer的個數。
須要注意的是當 M=C時shuffle consolidation所產生的文件數和以前的實現是同樣的。
Shuffle consolidation顯著減小了shuffle文件的數量,解決了以前版本一個比較嚴重的問題,可是writer handler的buffer開銷過大依然沒有減小,若要減小writer handler的buffer開銷,咱們只能減小Reducer的數量,可是這又會引入新的問題,下文將會有詳細介紹。
4.2 Shuffle Read
Shuffle write寫出去的數據要被Reducer使用,就須要shuffle fetcher將所需的數據fetch過來,這裏的fetch包括本地和遠端,由於shuffle數據有可能一部分是存儲在本地的。Spark對shuffle fetcher實現了兩套不一樣的框架:NIO經過socket鏈接去fetch數據;OIO經過netty server去fetch數據。分別對應的類是BasicBlockFetcherIterator和NettyBlockFetcherIterator。
在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle數據量比較大的狀況下performance始終不是很好,沒法充分利用網絡帶寬,爲了解決這個問題,添加了新的shuffle fetcher來試圖取得更好的性能。對於早期shuffle性能的評測能夠參看Spark usergroup。固然如今BasicBlockFetcherIterator的性能也已經好了不少,使用的時候能夠對這兩種實現都進行測試比較。
4.3 Shuffle Aggregate
咱們都知道在Hadoop MapReduce的shuffle過程當中,shuffle fetch過來的數據會進行merge sort,使得相同key下的不一樣value按序歸併到一塊兒供Reducer使用,這個過程能夠參看下圖:
全部的merge sort都是在磁盤上進行的,有效地控制了內存的使用,可是代價是更多的磁盤IO。
那麼Spark是否也有merge sort呢,仍是以別的方式實現,下面咱們就細細說明。
首先雖然Spark屬於MapReduce體系,可是對傳統的MapReduce算法進行了必定的改變。Spark假定在大多數用戶的case中,shuffle數據的sort不是必須的,好比word count,強制地進行排序只會使性能變差,所以Spark並不在Reducer端作merge sort。既然沒有merge sort那Spark是如何進行reduce的呢?這就要說到aggregator了。
aggregator本質上是一個hashmap,它是以map output的key爲key,以任意所要combine的類型爲value的hashmap。當咱們在作word count reduce計算count值的時候,它會將shuffle fetch到的每個key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到則更新value值)。這樣就不須要預先把全部的key-value進行merge sort,而是來一個處理一個,省下了外部排序這一步驟。但同時須要注意的是reducer的內存必須足以存放這個partition的全部key和count值,所以對內存有必定的要求。
在上面word count的例子中,由於value會不斷地更新,而不須要將其所有記錄在內存中,所以內存的使用仍是比較少的。考慮一下若是是group by key這樣的操做,Reducer須要獲得key對應的全部value。在Hadoop MapReduce中,因爲有了merge sort,所以給予Reducer的數據已是group by key了,而Spark沒有這一步,所以須要將key和對應的value所有存放在hashmap中,並將value合併成一個array。能夠想象爲了可以存放全部數據,用戶必須確保每個partition足夠小到內存可以容納,這對於內存是一個很是嚴峻的考驗。所以Spark文檔中建議用戶涉及到這類操做的時候儘可能增長partition,也就是增長Mapper和Reducer的數量。
增長Mapper和Reducer的數量當然能夠減少partition的大小,使得內存能夠容納這個partition。可是咱們在shuffle write中提到,bucket和對應於bucket的write handler是由Mapper和Reducer的數量決定的,task越多,bucket就會增長的更多,由此帶來write handler所需的buffer也會更多。在一方面咱們爲了減小內存的使用採起了增長task數量的策略,另外一方面task數量增多又會帶來buffer開銷更大的問題,所以陷入了內存使用的兩難境地。
爲了減小內存的使用,只能將aggregator的操做從內存移到磁盤上進行,Spark社區也意識到了Spark在處理數據規模遠遠大於內存大小時所帶來的問題。所以PR303提供了外部排序的實現方案,相信在Spark 0.9 release的時候,這個patch應該能merge進去,到時候內存的使用量能夠顯著地減小。
5.參考資料
1).https://my.oschina.net/repine/blog/545695詳細探究Spark的shuffle實現
2)董先生ppt