降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

  • 王振華,趣頭條大數據總監,趣頭條大數據負責人
  • 曹佳清,趣頭條大數據離線團隊高級研發工程師,曾就任於餓了麼大數據INF團隊負責存儲層和計算層組件研發,目前負責趣頭條大數據計算層組件Spark的建設
  • 範振,花名辰繁,阿里雲計算平臺EMR高級技術專家,目前主要關注開源大數據技術以及雲原生技術。

1. 業務場景與現狀

趣頭條是一家依賴大數據的科技公司,在2018-2019年經歷了業務的高速發展,主App和其餘創新App的日活增長了10倍以上,相應的大數據系統也從最初的100臺機器增長到了1000臺以上規模。多個業務線依賴於大數據平臺展開業務,大數據系統的高效和穩定成了公司業務發展的基石,在大數據的架構上咱們使用了業界成熟的方案,存儲構建在HDFS上、計算資源調度依賴Yarn、表元數據使用Hive管理、用Spark進行計算,具體如圖1所示:
image.png
圖1 趣頭條離線大數據平臺架構圖apache

其中Yarn集羣使用了單一大集羣的方案,HDFS使用了聯邦的方案,同時基於成本因素,HDFS和Yarn服務在ECS上進行了DataNode和NodeManager的混部。
在趣頭條天天有6W+的Spark任務跑在Yarn集羣上,天天新增的Spark任務穩定在100左右,公司的迅速發展要求需求快速實現,積累了不少治理欠債,種種問題表現出來集羣穩定性須要提高,其中Shuffle的穩定性愈來愈成爲集羣的桎梏,亟需解決。後端

2. 當前大數據平臺的挑戰與思考

近半年大數據平臺主要的業務指標是降本增效,一方面業務方但願離線平臺天天可以承載更多的做業,另外一方面咱們自身有降本的需求,如何在降本的前提下支撐更多地業務量對於每一個技術人都是很是大地挑戰。熟悉Spark的同窗應該很是清楚,在大規模集羣場景下,Spark Shuffle在實現上有比較大的缺陷,體如今如下的幾個方面:緩存

  • Spark Shuffle Fetch過程存在大量的網絡小包,現有的External Shuffle Service設計並無很是細緻的處理這些RPC請求,大規模場景下會有不少connection reset發生,致使FetchFailed,從而致使stage重算。
  • Spark Shuffle Fetch過程存在大量的隨機讀,大規模高負載集羣條件下,磁盤IO負載高、CPU滿載時常發生,極容易發生FetchFailed,從而致使stage重算。
  • 重算過程會放大集羣的繁忙程度,搶佔機器資源,致使惡性循環嚴重,SLA完不成,須要運維人員手動將做業跑在空閒的Label集羣。
  • 計算和Shuffle過程架構不能拆開,不能把Shuffle限定在指定的集羣內,不能利用部分SSD機器。
  • M*N次的shuffle過程:對於10K mapper,5K reducer級別的做業,基本跑不完。
  • NodeManager和Spark Shuffle Service是同一進程,Shuffle過程過重,常常致使NodeManager重啓,從而影響Yarn調度穩定性。

以上的這些問題對於Spark研發同窗是很是痛苦的,好多做業天天運行時長方差會很是大,並且總有一些沒法完成的做業,要麼業務進行拆分,要麼跑到獨有的Yarn集羣中。除了現有面臨的挑戰以外,咱們也在積極構建下一代基礎架構設施,隨着雲原生Kubernetes概念愈來愈火,Spark社區也提供了Spark on Kubernetes版本,相比較於Yarn來講,Kubernetes可以更好的利用雲原生的彈性,提供更加豐富的運維、部署、隔離等特性。可是Spark on Kubernetes目前還存在不少問題沒有解決,包括容器內的Shuffle方式、動態資源調度、調度性能有限等等。咱們針對Kubernetes在趣頭條的落地,主要有如下幾個方面的需求:服務器

  • 實時集羣、OLAP集羣和Spark集羣以前都是相互獨立的,怎樣可以將這些資源造成統一大數據資源池。經過Kubernetes的天生隔離特性,更好的實現離線業務與實時業務混部,達到降本增效目的。
  • 公司的在線業務都運行在Kubernetes集羣中,如何利用在線業務和大數據業務的不一樣特色進行錯峯調度,達成ECS的總資源量最少。
  • 但願可以基於Kubernetes來包容在線服務、大數據、AI等基礎架構,作到運維體系統一化。

由於趣頭條的大數據業務目前全都部署在阿里雲上,阿里雲EMR團隊和趣頭條的大數據團隊進行了深刻技術共創,共同研發了Remote Shuffle Service(如下簡稱RSS),旨在解決Spark on Yarn層面提到的全部問題,併爲Spark跑在Kubernetes上提供Shuffle基礎組件。網絡

3. Remote Shuffle Service設計與實現

3.1 Remote Shuffle Service的背景

早在2019年初咱們就關注到了社區已經有相應的討論,如SPARK-25299。該Issue主要但願解決的問題是在雲原生環境下,Spark須要將Shuffle數據寫出到遠程的服務中。可是咱們通過調研後發現Spark 3.0(以前的master分支)只支持了部分的接口,而沒有對應的實現。該接口主要但願在現有的Shuffle代碼框架下,將數據寫到遠程服務中。若是基於這種方式實現,好比直接將Shuffle以流的方式寫入到HDFS或者Alluxio等高速內存系統,會有至關大的性能開銷,趣頭條也作了一些相應的工做,並進行了部分的Poc,性能與原版Spark Shuffle實現相差特別多,最差性能可降低3倍以上。同時咱們也調研了一部分其餘公司的實現方案,例如Facebook的Riffle方案以及LinkedIn開源的Magnet,這些實現方案是首先將Shuffle文件寫到本地,而後在進行Merge或者Upload到遠程的服務上,這和後續咱們的Kubernetes架構是不兼容的,由於Kubernetes場景下,本地磁盤Hostpath或者LocalPV並非一個必選項,並且也會存在隔離和權限的問題。
基於上述背景,咱們與阿里雲EMR團隊共同開發了Remote Shuffle Service。RSS能夠提供如下的能力,完美的解決了Spark Shuffle面臨的技術挑戰,爲咱們集羣的穩定性和容器化的落地提供了強有力的保證,主要體如今如下幾個方面:架構

  • 高性能服務器的設計思路,不一樣於Spark原有Shuffle Service,RPC更輕量、通用和穩定。
  • 兩副本機制,可以保證的Shuffle fetch極小機率(低於0.01%)失敗。
  • 合併shuffle文件,從M*N次shuffle變成N次shuffle,順序讀HDD磁盤會顯著提高shuffle heavy做業性能。
  • 減小Executor計算時內存壓力,避免map過程當中Shuffle Spill。
  • 計算與存儲分離架構,能夠將Shuffle Service部署到特殊硬件環境中,例如SSD機器,能夠保證SLA極高的做業。
  • 完美解決Spark on Kubernetes方案中對於本地磁盤的依賴。

3.2 Remote Shuffle Service的實現

3.2.1 總體設計

Spark RSS架構包含三個角色: Master, Worker, Client。Master和Worker構成服務端,Client以不侵入的方式集成到Spark ShuffleManager裏(RssShuffleManager實現了ShuffleManager接口)。app

  • Master的主要職責是資源分配與狀態管理。
  • Worker的主要職責是處理和存儲Shuffle數據。
  • Client的主要職責是緩存和推送Shuffle數據。

總體流程以下所示(其中ResourceManager和MetaService是Master的組件),如圖2。
image.png
圖2 RSS總體架構圖負載均衡

3.2.2 實現流程

下面重點來說一下實現的流程:框架

  • RSS採用Push Style的shuffle模式,每一個Mapper持有一個按Partition分界的緩存區,Shuffle數據首先寫入緩存區,每當某個Partition的緩存滿了即觸發PushData。
  • Driver先和Master發生StageStart的請求,Master接受到該RPC後,會分配對應的Worker Partition並返回給Driver,Shuffle Client獲得這些元信息後,進行後續的推送數據。
  • Client開始向主副本推送數據。主副本Worker收到請求後,把數據緩存到本地內存,同時把該請求以Pipeline的方式轉發給從副本,從而實現了2副本機制。
  • 爲了避免阻塞PushData的請求,Worker收到PushData請求後會以純異步的方式交由專有的線程池異步處理。根據該Data所屬的Partition拷貝到事先分配的buffer裏,若buffer滿了則觸發flush。RSS支持多種存儲後端,包括DFS和Local。若後端是DFS,則主從副本只有一方會flush,依靠DFS的雙副本保證容錯;若後端是Local,則主從雙方都會flush。
  • 在全部的Mapper都結束後,Driver會觸發StageEnd請求。Master接收到該RPC後,會向全部Worker發送CommitFiles請求,Worker收到後把屬於該Stage buffer裏的數據flush到存儲層,close文件,並釋放buffer。Master收到全部響應後,記錄每一個partition對應的文件列表。若CommitFiles請求失敗,則Master標記此Stage爲DataLost。
  • 在Reduce階段,reduce task首先向Master請求該Partition對應的文件列表,若返回碼是DataLost,則觸發Stage重算或直接abort做業。若返回正常,則直接讀取文件數據。

整體來說,RSS的設計要點總結爲3個層面:運維

  • 採用PushStyle的方式作shuffle,避免了本地存儲,從而適應了計算存儲分離架構。
  • 按照reduce作聚合,避免了小文件隨機讀寫和小數據量網絡請求。
  • 作了2副本,提升了系統穩定性。

3.2.3 容錯

對於RSS系統,容錯性是相當重要的,咱們分爲如下幾個維度來實現:

  • PushData失敗

    • 當PushData失敗次數(Worker掛了,網絡繁忙,CPU繁忙等)超過MaxRetry後,Client會給Master發消息請求新的Partition Location,此後本Client都會使用新的Location地址,該階段稱爲Revive。
    • 若Revive是由於Client端而非Worker的問題致使,則會產生同一個Partition數據分佈在不一樣Worker上的狀況,Master的Meta組件會正確處理這種情形。
    • 若發生WorkerLost,則會致使大量PushData同時失敗,此時會有大量同一Partition的Revive請求打到Master。爲了不給同一個Partition分配過多的Location,Master保證僅有一個Revive請求真正獲得處理,其他的請求塞到pending queue裏,待Revive處理結束後返回同一個Location。
  • Worker宕機

    • 當發生WorkerLost時,對於該Worker上的副本數據,Master向其peer發送CommitFile的請求,而後清理peer上的buffer。若Commit Files失敗,則記錄該Stage爲DataLost;若成功,則後續的PushData經過Revive機制從新申請Location。
  • 數據去重

    • Speculation task和task重算會致使數據重複。解決辦法是每一個PushData的數據片裏編碼了所屬的mapId,attemptId和batchId,而且Master爲每一個map task記錄成功commit的attemtpId。read端經過attemptId過濾不一樣的attempt數據,並經過batchId過濾同一個attempt的重複數據。
  • 多副本

    • RSS目前支持DFS和Local兩種存儲後端。
    • 在DFS模式下,ReadPartition失敗會直接致使Stage重算或abort job。在Local模式,ReadPartition失敗會觸發從peer location讀,若主從都失敗則觸發Stage重算或abort job。

3.2.4 高可用

你們能夠看到RSS的設計中Master是一個單點,雖然Master的負載很小,不會輕易地掛掉,可是這對於線上穩定性來講無疑是一個風險點。在項目的最初上線階段,咱們但願能夠經過SubCluster的方式進行workaround,即經過部署多套RSS來承載不一樣的業務,這樣即便RSS Master宕機,也只會影響有限的一部分業務。可是隨着系統的深刻使用,咱們決定直面問題,引進高可用Master。主要的實現以下:

  • 首先,Master目前的元數據比較多,咱們能夠將一部分與ApplD+ShuffleId自己相關的元數據下沉到Driver的ShuffleManager中,因爲元數據並不會不少,Driver增長的內存開銷很是有限。
  • 另外,關於全局負載均衡的元數據和調度相關的元數據,咱們利用Raft實現了Master組件的高可用,這樣咱們經過部署3或5臺Master,真正的實現了大規模可擴展的需求。

4. 實際效果與分析

4.1 性能與穩定性

團隊針對TeraSort,TPC-DS以及大量的內部做業進行了測試,在Reduce階段減小了隨機讀的開銷,任務的穩定性和性能都有了大幅度提高。
圖3是TeraSort的benchmark,以10T Terasort爲例,Shuffle量壓縮後大約5.6T。能夠看出該量級的做業在RSS場景下,因爲Shuffle read變爲順序讀,性能會有大幅提高。
image.png
圖3 TeraSort性能測試(RSS性能更好)

圖4是一個線上實際脫敏後的Shuffle heavy大做業,以前在混部集羣中很小几率能夠跑完,天天任務SLA不能按時達成,分析緣由主要是因爲大量的FetchFailed致使stage進行重算。使用RSS以後天天能夠穩定的跑完,2.1T的shuffle也不會出現任何FetchFailed的場景。在更大的數據集性能和SLA表現都更爲顯著。
image.png
圖4 實際業務的做業stage圖(使用RSS保障穩定性和性能)

4.2 業務效果

在大數據團隊和阿里雲EMR團隊的共同努力下,通過近半年的上線、運營RSS,以及和業務部門的長時間測試,業務價值主要體如今如下方面:

  • 降本增效效果明顯,在集羣規模小幅降低的基礎上,支撐了更多的計算任務,TCO成本降低20%。
  • SLA顯著提高,大規模Spark Shuffle任務從跑不完到能跑完,咱們可以將不一樣SLA級別做業合併到同一集羣,減少集羣節點數量,達到統一管理,縮小成本的目的。本來業務方有一部分SLA比較高的做業在一個獨有的Yarn集羣B中運行,因爲主Yarn集羣A的負載很是高,若是跑到集羣A中,會常常的掛掉。利用RSS以後能夠放心的將做業跑到主集羣A中,從而釋放掉獨有Yarn集羣B。
  • 做業執行效率顯著提高,跑的慢 -> 跑的快。咱們比較了幾個典型的Shuffle heavy做業,一個重要的業務線做業本來須要3小時,RSS版本須要1.6小時。抽取線上5~10個做業,大做業的性能提高至關明顯,不一樣做業平均下來有30%以上的性能提高,即便是shuffle量不大的做業,因爲比較穩定不須要stage重算,長期運行平均時間也會減小10%-20%。
  • 架構靈活性顯著提高,升級爲計算與存儲分離架構。Spark在容器中運行的過程當中,將RSS做爲基礎組件,可使得Spark容器化可以大規模的落地,爲離線在線統一資源、統一調度打下了基礎。

5. 將來展望

趣頭條大數據平臺和阿里雲EMR團隊後續會繼續保持深刻共創,將探索更多的方向。主要有如下的一些思路:

    • RSS存儲能力優化,包括將雲的對象存儲做爲存儲後端。
    • RSS多引擎支持,例如MapReduce、Tez等,提高歷史任務執行效率。
    • 加速大數據容器化落地,配合RSS能力,解決K8s調度器性能、調度策略等一系列挑戰。
    • 持續優化成本,配合EMR的彈性伸縮功能,一方面Spark可使用更多的阿里雲ECS/ECI搶佔式實例來進一步壓縮成本,另外一方面將已有機器包括阿里雲ACK,ECI等資源造成統一大池子,將大數據的計算組件和在線業務進行錯峯調度以及混部。


原文連接本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索