Spark Programming--- Shuffle operations

 

Spark Programming--- Shuffle operations

http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operationshtml

一些spark的特定操做將會觸發被稱爲shuffle的事件。Shuffle是Spark用於從新分佈數據的機制,這樣能夠在不一樣的分區來分組。這一般涉及到在executor和機器之間進行拷貝數據,因此shuffle是一個很複雜而且消耗高的操做。web

背景

爲了瞭解shuffle期間發生了什麼,咱們能夠考慮reduceByKey操做做爲例子。reduceByKey操做生成了一個新的RDD經過全部的單個鍵值組合爲一個元組-關鍵字和針對與該關鍵字相關的全部值執行reduce函數的結果。這裏的挑戰不是全部的值對一個單獨的鍵都在同一個分區上或者甚至說在一臺機器上,而是它們必須被從新分佈來計算結果。
在Spark,數據一般不會跨分區分佈到特定操做的必要位置。在計算中,一個單獨的任務將會在一個單獨的分區上操做-然而爲了組織全部的數據來被一個的單獨reduceByKey 的reduce任務來執行,Spark須要來執行一個all-to-all操做。它必須讀取全部分區來找到全部鍵的值,而後將它們帶到一塊兒跨分區來爲每個鍵計算最終的結果---這個被稱爲shuffle。
儘管在每個分區中的新的shuffled數據的元素集是很重要的,一樣分區本身的順序也很重要,而元素之間的順序就不是了。若是一個想要預測shuffle中的順序數據那麼可使用: 算法

  1. mapPartitions 來排序每個分區,好比,.sorted
  2. repartitionAndSortWithinPartitions 來有效分區同時同步從新分區。
  3. sortBy 創造一個全局的排序的RDD

能夠引發一個shuffle 的操做包括:repartition 和 coalesce,ByKey的操做,除了counting以外的好比:groupByKey 和reduceByKey,以及join操做好比cogroup 和 join。apache

性能影響

Shuffle是一個昂貴的操做由於它涉及到磁盤I/O,數據序列化和網絡I/O。爲了給shuffle組織數據,spark生成一系列任務-maps用於組織數據,以及一系列reduce任務來彙集它。這個命名系統來自於MapReduce並且並不直接和SparK的map,reduce操做有關。
在內部,單獨的map任務的結果會被保存在內存中直到它們不適用。而後這些結果會被根據目標分區排序而且寫向單一的文件。在reduce方面,任務讀取相關的排序塊。
必定的shuffle操做會消耗明顯的數量的堆內存由於它們使用的是在內存中的數據結構來組織記錄在傳輸以前或者以後。明顯的,reduceByKey和AggregateByKey創造了這些結構在map階段,以及 ‘Bykey的操做生成了它們在reduce階段。當數據不能放進內存中時,Spark將會將這些表散落到硬盤中,會引發而外的磁盤I/O和增長垃圾回收次數。
Shuffle一樣會生成大量的中間文件在磁盤中。從Spark1.3開始,這些文件被保存直到對應的RDDs再也不被使用以及已經被垃圾回收了。這樣作是爲了shuffle文件不須要被從新創造若是lineage被從新計算時。垃圾回收也許會發生只有在一段很長時間,若是這個應用保留了對RDD的引用或者若是GC沒有頻繁的發生。這意味着長期運行的spark任務也許會消耗大量的磁盤空間。這個零時的磁盤目錄會被spark.local.dir參數所指定。
Shuffle行爲能夠被調整經過一系列的參數。能夠參考 Spark Configuration Guide.‘Shuffle Behavior’章節。緩存

Shuffle Behavior

屬性名稱Property Name 默認值Default 含義Meaning
spark.reducer.maxSizeInFlight 48m 從每個reduce任務中同步獲取的map輸出的最大值。因爲每個輸出須要咱們創造一個緩存來接受它,這個表明了每一個任務的固定的內存開銷,因此儘可能保證它較小除非你有不少內存。
spark.reducer.maxReqsInFlight Int.MaxValue 這個配置限制了任意給定點遠程請求獲取塊數。當集羣中的主機數量增長的時候,它也許會致使一個很是大數量的內部鏈接到一到多個節點,引發worker在負載下失敗。經過容許它來限制獲取請求的數量,這個狀況也許會緩解
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 這個配置限制了每個從給定端口裏的的reduce任務能夠獲取的遠程端口數量。當一個大量的block被一個給定的地址在一次單獨獲取或者同步獲取所請求時,可能會沖垮服務的executor或者Node Manager。這個配置對於減小Node Manager的負載尤其有用當外部的shuffle是被容許的。你能夠經過設定一個較低值來減輕這個狀況。
spark.maxRemoteBlockSizeFetchToMem Long.MaxValue 遠程的塊將會被獲取到磁盤中,當這個塊的大小超過了這個配置的值在byte單位上。這個用於避免一個巨大的請求佔據了太多的內存。咱們能夠將這個配置爲一個指定的值(好比,200M)。注意到這個配置將會影響到shuffle的獲取以及遠程塊獲取的塊管理。對於容許了外部shuffle服務的用戶,這個特性只會在外部shuffle服務版本高於Spark2。2時有效。
spark.shuffle.compress true 是否壓縮map的輸出文件,一般是一個好想法。壓縮將會使用spark.io.compression.codec.
spark.shuffle.file.buffer 32k 對每個shuffle文件輸出流的在內存中的緩存大小,單位是KiB除非有其餘的特別指定。這些緩存減小了硬盤查找和系統調用建立中間shuffle文件的過程。
spark.shuffle.io.maxRetries 3 (Netty only)最大自動重複嘗試的次數若是這個值沒有被設置爲0.這個重試邏輯有助於穩定大型的shuffle在長時間的GC暫停或者暫時的網絡鏈接問題上。
spark.shuffle.io.numConnectionsPerPeer 1 (Netty only) 節點之間的鏈接的重複使用爲了減小大型集羣中重複創建鏈接的狀況。對於有不少硬盤和不多主機的集羣,這個將會致使併發行不足以飽和全部硬盤,所以用戶可能會考慮增長這個值。
spark.shuffle.io.preferDirectBufs true (Netty only) 堆外緩衝區在shuffle和緩存塊轉移期間被用於減小垃圾回收。對於對外緩存內存數量有限的環境,用戶也許想要關掉這個來強迫全部的來自於Netty的分配都是在堆上。
spark.shuffle.io.retryWait 5s (Netty only) 在每一次重試直接須要等待多久。最大的延遲時間默認是15秒,maxRetries * retryWait.
spark.shuffle.service.enabled false 容許外部shuffle服務。這個服務保存了經過executor所寫的shuffle文件,這樣這個executor能夠安全的被移除。這個配置必須被容許若是spark.dynamicAllocation.enabled是「true」。這個外部的shuffle服務必須被啓動。查看dynamic allocation configuration and setup documentation 來得到更多信息。
spark.shuffle.service.port 7337 外部shuffle服務將會運行的端口。
spark.shuffle.service.index.cache.size 100m 緩存條目限制在指定的內存佔用空間中,以字節爲單位
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE 在shuffle服務中同一時間最大容許傳輸的塊數量。注意到新來的鏈接將會被關閉若是達到了最大數量。這個客戶端將會嘗試從新鏈接根據shuffle的重試配置(see spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait),若是這個限制也被達到了,那麼這個任務將會失敗。
spark.shuffle.sort.bypassMergeThreshold 200 (Advanced)在基於排序的shuffle管理中,避免合併排序數據若是這裏沒有map-side的聚合和這裏最多有配置的這麼多的reduce分區。
spark.shuffle.spill.compress true 是否壓縮溢出的數據在shuffle期間
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 閥值是以bytes爲單位,高於此值將準確記錄HighlyCompressedMapStatus中的shuffle塊的大小。這個用於幫助阻止OOM經過避免錯誤估計了shuffle塊大小當獲取了shuffle塊時。
spark.shuffle.registration.timeout 5000 註冊外部shuffle服務的超時時間,單位是毫秒
spark.shuffle.registration.maxAttempts 3 當註冊外部shuffle服務失敗的時候,咱們會重複嘗試的最大次數
spark.io.encryption.enabled false 容許IO編碼。目前支持全部的模式除了Mesos。當使用這個特性的時候,咱們推薦RPC編碼。
spark.io.encryption.keySizeBits 128 IO編碼的值大小單位爲bit。支持的值有128,192和256.
spark.io.encryption.keygen.algorithm HmacSHA1 當生成一個IO編碼鍵值時使用的算法。被支持的算法在Java Cryptography Architecture Standard Algorithm Name 文檔的KeyGenerator章節中被描述。
相關文章
相關標籤/搜索