Spark Shuffle 專業級核心參數調優源碼深刻剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。node

1 Spark運行資源優化配置

./bin/spark-submit \  
    --master yarn-cluster \  
    --num-executors 100 \  
    --executor-memory 6G \ 
    --executor-cores 4 \
    --driver-memory 1G \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.5 \  
    --conf spark.shuffle.memoryFraction=0.3 \
複製代碼

2 Spark運行資源優化配置


  • spark.reducer.maxSizeInFlight算法

  • 默認值:48mapache

  • 參數說明:該參數用於設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。緩存

  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。網絡

    * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
       * requesting them from other nodes' block stores.
    
      private[spark] class BlockStoreShuffleReader[K, C](
          handle: BaseShuffleHandle[K, _, C],
          startPartition: Int,
          endPartition: Int,
          context: TaskContext,
          serializerManager: SerializerManager = SparkEnv.get.serializerManager,
          blockManager: BlockManager = SparkEnv.get.blockManager,
          mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
        extends ShuffleReader[K, C] with Logging {
      
        private val dep = handle.dependency
      
        /** Read the combined key-values for this reduce task */
        override def read(): Iterator[Product2[K, C]] = {
        
          val wrappedStreams = new ShuffleBlockFetcherIterator(
            context,
            blockManager.shuffleClient,
            blockManager,
            mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
            serializerManager.wrapStream,
            // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
            
            SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,       <=神來之筆
            
            SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
            SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
            SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
            SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
    複製代碼

  • spark.shuffle.io.maxRetries架構

  • 默認值:3app

  • 參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。框架

  • 調優建議:對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。less

    public TransportConf(String module, ConfigProvider conf) {
      this.module = module;
      this.conf = conf;
      SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
      SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
      SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
      SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
      SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
      SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
      SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
      SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
      SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
      SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
      SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
      SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
      SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
      SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
    }
    複製代碼

  • spark.shuffle.io.retryWait
  • 默認值:5s
  • 參數說明: shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的,該參數表明了每次重試拉取數據的等待間隔,默認是5s。
  • 調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。

  • spark.shuffle.memoryFraction
  • 默認值:0.2
  • 參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。
  • 調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。

在這裏好好唱一齣戲:

(1) StaticMemoryManager 靜態內存分配

private def getMaxStorageMemory(conf: SparkConf): Long = {
  
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)          <=神來之筆
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)          <=神來之筆
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                        <=神來之筆
  }


    private def getMaxExecutionMemory(conf: SparkConf): Long = {
    
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < MIN_MEMORY_BYTES) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)              <=神來之筆
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)              <=神來之筆
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                            <=神來之筆
  }



  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024                            <=神來之筆

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,        <=神來之筆
      numCores = numCores)
    }
複製代碼

(2) UnifiedMemoryManager 統一內存分配

/**
       * Return the total amount of memory shared between execution and storage, in bytes.
       */
      private def getMaxMemory(conf: SparkConf): Long = {
      
        val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)    <=神來之筆
        val reservedMemory = conf.getLong("spark.testing.reservedMemory",                        <=神來之筆
        
          if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
        val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
        
        if (systemMemory < minSystemMemory) {
          throw new IllegalArgumentException(s"System memory $systemMemory must " +
            s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
            s"option or spark.driver.memory in Spark configuration.")
        }
        // SPARK-12759 Check executor memory to fail fast if memory is insufficient
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < minSystemMemory) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$minSystemMemory. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
        val usableMemory = systemMemory - reservedMemory
        
        val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)                       <=神來之筆
        (usableMemory * memoryFraction).toLong
      }


      def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
        val maxMemory = getMaxMemory(conf)
        new UnifiedMemoryManager(
          conf,
          maxHeapMemory = maxMemory,
          onHeapStorageRegionSize =
            (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,          <=神來之筆    
          numCores = numCores)
      }
複製代碼

  • spark.shuffle.manageride

  • 默認值:sort

  • 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。

  • 調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。

  • 配置參數:spark.shuffle.manager,默認是sort。

    val shortShuffleMgrNames = Map(
        "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
        "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    複製代碼

  • spark.shuffle.sort.bypassMergeThreshold
  • 默認值:200
  • 參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。
  • 調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

  • spark.shuffle.consolidateFiles
  • 默認值:false
  • 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。
  • 調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。

  • Spark.Shuffle.blockTransferService
  • 默認值:Netty
  • 實如今Executor之間傳遞Shuffle緩存塊,有Netty和Nio兩種可用的實現。

  • Spark.Shuffle.compress

  • 默認是true

  • 判斷是否對mapper端的聚合輸出進行壓縮,表示每個shuffle過程都會對mapper端的輸出進行壓縮。舉例以下:若是有幾千臺或者上萬臺的機器進行匯聚計算,數據量和網絡傳輸會很是大,這樣會形成大連好的內存消耗,磁盤I/O消耗,以及網絡I/O消耗。若是在Mapper端進行壓縮,就會減小shuffle過程當中下一個Stage向上一個Stage抓數據的網絡開銷。

    * Merge zero or more spill files together, choosing the fastest merging strategy based on the
        * number of spills and the IO compression codec.
        * @return the partition lengths in the merged file.
     
       private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
    
         final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);   <=神來之筆
         
         final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
         
         final boolean fastMergeEnabled =
           sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);                    <=神來之筆
           
         final boolean fastMergeIsSupported = !compressionEnabled ||
           CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);    <=神來之筆
           
         final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
         try {
           if (spills.length == 0) {
             new FileOutputStream(outputFile).close(); // Create an empty file
             return new long[partitioner.numPartitions()];
           } else if (spills.length == 1) {
             // Here, we don't need to perform any metrics updates because the bytes written to this
             // output file would have already been counted as shuffle bytes written.
             Files.move(spills[0].file, outputFile);
             return spills[0].partitionLengths;
           } else {
             final long[] partitionLengths;
             // There are multiple spills to merge, so none of these spill files' lengths were counted
             // towards our shuffle write count or shuffle write time. If we use the slow merge path,
             // then the final output file's size won't necessarily be equal to the sum of the spill
             // files' sizes. To guard against this case, we look at the output file's actual size when
             // computing shuffle bytes written.
             //
             // We allow the individual merge methods to report their own IO times since different merge
             // strategies use different IO techniques.  We count IO during merge towards the shuffle
             // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
             // branch in ExternalSorter.
             
             if (fastMergeEnabled && fastMergeIsSupported) {
             
               // Compression is disabled or we are using an IO compression codec that supports
               // decompression of concatenated compressed streams, so we can perform a fast spill merge
               // that doesn't need to interpret the spilled bytes.
               if (transferToEnabled && !encryptionEnabled) {
                 logger.debug("Using transferTo-based fast merge");
                 partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
               } else {
                 logger.debug("Using fileStream-based fast merge");
                 partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
               }
             } else {
               logger.debug("Using slow merge");
               partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
             }
             // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
             // in-memory records, we write out the in-memory records to a file but do not count that
             // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
             // to be counted as shuffle write, but this will lead to double-counting of the final
             // SpillInfo's bytes.
             writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
             writeMetrics.incBytesWritten(outputFile.length());
             return partitionLengths;
           }
         } catch (IOException e) {
           if (outputFile.exists() && !outputFile.delete()) {
             logger.error("Unable to delete output file {}", outputFile.getPath());
           }
           throw e;
         }
       }
    複製代碼

  • spark.io.compression.codec

  • 該參數用來壓縮內部數據,如:RDD分區,廣播變量和shuffle輸出的數據等,所採用的壓縮有LZ4,Lzf,Snappy等三種選擇,默認是Snappy,可是和Snappy相比較,Lzf的壓縮率較高。建議在大量Shuffle過程當中,能夠選擇Lzf4。

  • 默認是Snappy

    private[spark] object CompressionCodec {
        private val configKey = "spark.io.compression.codec"
        private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
          (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
            || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
        }
      
        private val shortCompressionCodecNames = Map(
          "lz4" -> classOf[LZ4CompressionCodec].getName,
          "lzf" -> classOf[LZFCompressionCodec].getName,
          "snappy" -> classOf[SnappyCompressionCodec].getName,
          "zstd" -> classOf[ZStdCompressionCodec].getName)
    複製代碼

  • spark.shuffle.file.buffer

  • 默認值:32k(考慮最小硬件下都能成功)

  • 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。

  • 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比64k),從而減小shuffle write過程當中溢寫磁盤文件的次數,也就能夠減小磁盤IO次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

    private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
          ConfigBuilder("spark.shuffle.file.buffer")
            .doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
              "otherwise specified. These buffers reduce the number of disk seeks and system calls " +
              "made in creating intermediate shuffle files.")
            .bytesConf(ByteUnit.KiB)
            .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
              s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
            .createWithDefaultString("32k")
    
    
    
      final class ShuffleExternalSorter extends MemoryConsumer {
      
        private static final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
      
        @VisibleForTesting
        static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
      
        private final int numPartitions;
        private final TaskMemoryManager taskMemoryManager;
        private final BlockManager blockManager;
        private final TaskContext taskContext;
        private final ShuffleWriteMetrics writeMetrics;
      
        /**
         * Force this sorter to spill when there are this many elements in memory.
         */
        private final int numElementsForSpillThreshold;
      
        /** The buffer size to use when writing spills using DiskBlockObjectWriter */
        private final int fileBufferSizeBytes;
      
        /** The buffer size to use when writing the sorted records to an on-disk file */
        private final int diskWriteBufferSize;
      
        /**
         * Memory pages that hold the records being sorted. The pages in this list are freed when
         * spilling, although in principle we could recycle these pages across spills (on the other hand,
         * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
         * itself).
         */
        private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
      
        private final LinkedList<SpillInfo> spills = new LinkedList<>();
      
        /** Peak memory used by this sorter so far, in bytes. **/
        private long peakMemoryUsedBytes;
      
        // These variables are reset after spilling:
        @Nullable private ShuffleInMemorySorter inMemSorter;
        @Nullable private MemoryBlock currentPage = null;
        private long pageCursor = -1;
      
        ShuffleExternalSorter(
            TaskMemoryManager memoryManager,
            BlockManager blockManager,
            TaskContext taskContext,
            int initialSize,
            int numPartitions,
            SparkConf conf,
            ShuffleWriteMetrics writeMetrics) {
          super(memoryManager,
            (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
            memoryManager.getTungstenMemoryMode());
          this.taskMemoryManager = memoryManager;
          this.blockManager = blockManager;
          this.taskContext = taskContext;
          this.numPartitions = numPartitions;
          
          // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
          this.fileBufferSizeBytes =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;            <=神來之筆
              
          this.numElementsForSpillThreshold =
              (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
              
          this.writeMetrics = writeMetrics;
          
          this.inMemSorter = new ShuffleInMemorySorter(
            this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
            
          this.peakMemoryUsedBytes = getMemoryUsage();
          this.diskWriteBufferSize =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
        }
    複製代碼

  • spark.shuffle.io.numConnectionsPerPeer

  • 僅Netty使用,複用主機之間的鏈接,以減小大型集羣的鏈接創建,

  • 默認是1

    TransportConf :
        Number of concurrent connections between two nodes for fetching data.
    
        public int numConnectionsPerPeer() {
          return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
        }
    複製代碼

  • Spark.Shuffle.io.preferDirectBufs

  • 僅限Netty使用,堆外緩存能夠有效減小垃圾回收和緩存複製。對於堆外內存緊張的用戶來講,能夠考慮禁用這個選項,從而迫使Netty全部的內存都分配到堆上,默認是true。

    TransportConf:

    /** If true, we will prefer allocating off-heap byte buffers within Netty. */
        public boolean preferDirectBufs() {
          return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
        }
    複製代碼

  • spark.shuffle.service.enabled

  • 默認爲false,若是配置成true,BlocakManager實例生成時,須要讀取Spark.Shuffle.service.port配置的端口,注意此時BlockManager的ShuffleClient再也不是默認的BlocakTransferSerice實例,而是ExternalShuffleClient。

  • 啓用外部的Shuffle Service , NodeManager中會長期運行一個輔助任務,用於提高Shuffle計算性能。

    private[spark] class BlockManager(
          executorId: String,
          rpcEnv: RpcEnv,
          val master: BlockManagerMaster,
          val serializerManager: SerializerManager,
          val conf: SparkConf,
          memoryManager: MemoryManager,
          mapOutputTracker: MapOutputTracker,
          shuffleManager: ShuffleManager,
          val blockTransferService: BlockTransferService,
          securityManager: SecurityManager,
          numUsableCores: Int)
        extends BlockDataManager with BlockEvictionHandler with Logging {
      
        private[spark] val externalShuffleServiceEnabled =
          conf.getBoolean("spark.shuffle.service.enabled", false)
    
      // Port used by the external shuffle service. In Yarn mode, this may be already be
        // set through the Hadoop configuration as the server is launched in the Yarn NM.
        private val externalShuffleServicePort = {
          val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
          if (tmpPort == 0) {
            // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
            // an open port.  But we still need to tell our spark apps the right port to use.  So
            // only if the yarn config has the port set to 0, we prefer the value in the spark config
            conf.get("spark.shuffle.service.port").toInt
          } else {
            tmpPort
          }
        }
    
        // Client to read other executors' shuffle files. This is either an external service, or just the
        // standard BlockTransferService to directly connect to other Executors.
        private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
          val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
          new ExternalShuffleClient(transConf, securityManager,
            securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
        } else {
          blockTransferService
        }
    複製代碼

基於Yarn的動態資源分配配置以下:

首先須要對YARN的NodeManager進行配置,使其支持Spark的Shuffle Service。
(1)修改每臺NodeManager上的yarn-site.xml:
    ##修改
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    ##增長
    <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
    <name>spark.shuffle.service.port</name>
    <value>7337</value>
    </property>

(2)將$SPARK_HOME/lib/spark-1.5.0-yarn-shuffle.jar拷貝到每臺NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
(3)重啓全部NodeManager。
複製代碼

  • Spark.shuffle.Sort.bypassMergeThreshold

  • 默認值爲200

  • 場景以下:若是Shuffle Read Task 的數量小於這個閾值(默認是200),那麼Shuffle Write的過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager方式去寫數據,最終仍是會將每個Task所產生的全部臨時磁盤文件合併成一個文件,並建立單獨索引。

    private[spark] object SortShuffleWriter {
        def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
          // We cannot bypass sorting if we need to do map-side aggregation.
          if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
            false
          } else {
            val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
            dep.partitioner.numPartitions <= bypassMergeThreshold
          }
        }
    複製代碼

  • Spark.Shuffle.spill

  • 默認是True

  • 即容許溢出到磁盤。

    private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
        if (!conf.getBoolean("spark.shuffle.spill", true)) {
          logWarning(
            "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
              " Shuffle will continue to spill to disk when necessary.")
        }
    複製代碼

  • spark.shuffle.spill.compress

  • 設置爲True是合理的,由於網絡帶寬每每最容易成爲瓶頸

  • 建議綜合考慮cpu ,磁盤,網絡的實際能力。

    * Component which configures serialization, compression and encryption for various Spark
       * components, including automatic selection of which [[Serializer]] to use for shuffles.
       */
      private[spark] class SerializerManager(
          defaultSerializer: Serializer,
          conf: SparkConf,
          encryptionKey: Option[Array[Byte]]) {
    
       // Whether to compress broadcast variables that are stored
        private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
        // Whether to compress shuffle output that are stored
        private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
        // Whether to compress RDD partitions that are stored serialized
        private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
        // Whether to compress shuffle output temporarily spilled to disk
        private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
    複製代碼

3 總結

本文綜合Spark的核心參數配置,花大量時間,閱讀源碼並找到參數調優的位置和條件,一份好文實屬不易,禁止轉載,歡迎學習

秦凱新 於深圳

相關文章
相關標籤/搜索