Apache Spark探祕:Spark Shuffle實現

Apache Spark探祕:Spark Shuffle實現html

三月 26, 2014 at 9:40 上午 by Donggit

Tags: Spark
github

做者:Dong | 新浪微博:西成懂 | 能夠轉載, 但必須以超連接形式標明文章原始出處和做者信息及版權聲明
網址:http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/
本博客的文章集合:http://dongxicheng.org/recommend/apache


本博客微信公共帳號:hadoop123(微信號爲:hadoop-123),分享hadoop技術內幕,hadoop最新技術進展,發佈hadoop相關職位和求職信息,hadoop技術交流聚會、講座以及會議等。二維碼以下:
微信


對於大數據計算框架而言,Shuffle階段的設計優劣是決定性能好壞的關鍵因素之一。本文將介紹目前Spark的shuffle實現,並將之與MapReduce進行簡單對比。本文的介紹順序是:shuffle基本概念,MapReduce Shuffle發展史以及Spark Shuffle發展史。架構

(1)  shuffle基本概念與常見實現方式併發

shuffle,是一個算子,表達的是多對多的依賴關係,在類MapReduce計算框架中,是鏈接Map階段和Reduce階段的紐帶,即每一個Reduce Task從每一個Map Task產生數的據中讀取一片數據,極限狀況下可能觸發M*R個數據拷貝通道(M是Map Task數目,R是Reduce Task數目)。一般shuffle分爲兩部分:Map階段的數據準備和Reduce階段的數據拷貝。首先,Map階段需根據Reduce階段的Task數量決定每一個Map Task輸出的數據分片數目,有多種方式存放這些數據分片:框架

1) 保存在內存中或者磁盤上(Spark和MapReduce都存放在磁盤上);oop

2) 每一個分片一個文件(如今Spark採用的方式,若干年前MapReduce採用的方式),或者全部分片放到一個數據文件中,外加一個索引文件記錄每一個分片在數據文件中的偏移量(如今MapReduce採用的方式)。源碼分析

在Map端,不一樣的數據存放方式各有優缺點和適用場景。通常而言,shuffle在Map端的數據要存儲到磁盤上,以防止容錯觸發重算帶來的龐大開銷(若是保存到Reduce端內存中,一旦Reduce Task掛掉了,全部Map Task須要重算)。但數據在磁盤上存放方式有多種可選方案,在MapReduce前期設計中,採用瞭如今Spark的方案(目前一直在改進),每一個Map Task爲每一個Reduce Task產生一個文件,該文件只保存特定Reduce Task需處理的數據,這樣會產生M*R個文件,若是M和R很是龐大,好比均爲1000,則會產生100w個文件,產生和讀取這些文件會產生大量的隨機IO,效率很是低下。解決這個問題的一種直觀方法是減小文件數目,經常使用的方法有:

        1) 將一個節點上全部Map產生的文件合併成一個大文件(MapReduce如今採用的方案),

        2) 每一個節點產生{(slot數目)*R}個文件(Spark優化後的方案)。對後面這種方案簡單解釋一下:不論是MapReduce 1.0仍是Spark,每一個節點的資源會被抽象成若干個slot,因爲一個Task佔用一個slot,所以slot數目可當作是最多同時運行的Task數目。若是一個Job的Task數目很是多,限於slot數目有限,可能須要運行若干輪。這樣,只須要由第一輪產生{(slot數目)*R}個文件,後續幾輪產生的數據追加到這些文件末尾便可。

        所以,後一種方案可減小大做業產生的文件數目。

        在Reduce端,各個Task會併發啓動多個線程同時從多個Map Task端拉取數據。因爲Reduce階段的主要任務是對數據進行按組規約。

        也就是說,須要將數據分紅若干組,以便以組爲單位進行處理。你們知道,分組的方式很是多,常見的有:Map/HashTable(key相同的,放到同一個value list中)和Sort(按key進行排序,key相同的一組,經排序後會挨在一塊兒),這兩種方式各有優缺點,第一種複雜度低,效率高,可是須要將數據所有放到內存中,第二種方案複雜度高,但可以藉助磁盤(外部排序)處理龐大的數據集。Spark前期採用了第一種方案,而在最新的版本中加入了第二種方案, MapReduce則從一開始就選用了基於sort的方案。

(2) MapReduce Shuffle發展史

【階段1】:MapReduce Shuffle的發展也並非一馬平川的,剛開始(0.10.0版本以前)採用了「每一個Map Task產生R個文件」的方案,前面提到,該方案會產生大量的隨機讀寫IO,對於大數據處理而言,很是不利。

【階段2】:爲了不Map Task產生大量文件,HADOOP-331嘗試對該方案進行優化,優化方法:爲每一個Map Task提供一個環形buffer,一旦buffer滿了後,則將內存數據spill到磁盤上(外加一個索引文件,保存每一個partition的偏移量),最終合併產生的這些spill文件,同時建立一個索引文件,保存每一個partition的偏移量。

(階段2):這個階段並無對shuffle架構作調成,只是對shuffle的環形buffer進行了優化。在Hadoop 2.0版本以前,對MapReduce做業進行參數調優時,Map階段的buffer調優很是複雜的,涉及到多個參數,這是因爲buffer被切分紅兩部分使用:一部分保存索引(好比parition、key和value偏移量和長度),一部分保存實際的數據,這兩段buffer均會影響spill文件數目,所以,須要根據數據特色對多個參數進行調優,很是繁瑣。而MAPREDUCE-64則解決了該問題,該方案讓索引和數據共享一個環形緩衝區,再也不將其分紅兩部分獨立使用,這樣只需設置一個參數控制spill頻率。

【階段3(進行中)】:目前shuffle被當作一個子階段被嵌到Reduce階段中的。因爲MapReduce模型中,Map Task和Reduce Task能夠同時運行,所以一個做業前期啓動的Reduce Task將一直處於shuffle階段,直到全部Map Task運行完成,而在這個過程當中,Reduce Task佔用着資源,但這部分資源利用率很是低,基本上只使用了IO資源。爲了提升資源利用率,一種很是好的方法是將shuffle從Reduce階段中獨立處理,變成一個獨立的階段/服務,由專門的shuffler service負責數據拷貝,目前百度已經實現了該功能(準備開源?),且收益明顯,具體參考:MAPREDUCE-2354

(3) Spark Shuffle發展史

目前看來,Spark Shuffle的發展史與MapReduce發展史很是相似。初期Spark在Map階段採用了「每一個Map Task產生R個文件」的方法,在Reduce階段採用了map分組方法,但隨Spark變得流行,用戶逐漸發現這種方案在處理大數據時存在嚴重瓶頸問題,所以嘗試對Spark進行優化和改進,相關連接有:External Sorting for Aggregator and CoGroupedRDDs,「Optimizing Shuffle Performance in Spark」,「Consolidating Shuffle Files in Spark」,優化動機和思路與MapReduce很是相似。

Spark在前期設計中過多依賴於內存,使得一些運行在MapReduce之上的大做業難以直接運行在Spark之上(可能遇到OOM問題)。目前Spark在處理大數據集方面尚不完善,用戶需根據做業特色選擇性的將一部分做業遷移到Spark上,而不是總體遷移。隨着Spark的完善,不少內部關鍵模塊的設計思路將變得與MapReduce升級版Tez很是相似。

【其餘參考資料】

Spark源碼分析 – Shuffle

詳細探究Spark的shuffle實現

原創文章,轉載請註明: 轉載自董的博客

本文連接地址: http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/

做者:Dong,做者介紹:http://dongxicheng.org/about/

本博客的文章集合:http://dongxicheng.org/recommend/

相關文章
相關標籤/搜索