TransportConf給Spark的RPC框架提供配置信息,它有兩個成員屬性——配置提供者conf和配置的模塊名稱module。這兩個屬性的定義以下:apache
//配置提供者 private final ConfigProvider conf; //模塊名稱 private final String module;
ConfigProvider是一個抽象類,代碼以下:網絡
/** * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration. */ public abstract class ConfigProvider { /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */ public abstract String get(String name); public String get(String name, String defaultValue) { try { return get(name); } catch (NoSuchElementException e) { return defaultValue; } } public int getInt(String name, int defaultValue) { return Integer.parseInt(get(name, Integer.toString(defaultValue))); } public long getLong(String name, long defaultValue) { return Long.parseLong(get(name, Long.toString(defaultValue))); } public double getDouble(String name, double defaultValue) { return Double.parseDouble(get(name, Double.toString(defaultValue))); } public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); } }
ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,這些方法都是基於抽象方法get獲取值,通過一次類型轉換而實現。這個抽象的get方法將須要子類去實現。架構
實際代碼中,get的實現實際是代理了SparkConf的get方法app
/** Get a parameter; throws a NoSuchElementException if it's not set */ def get(key: String): String = { getOption(key).getOrElse(throw new NoSuchElementException(key)) }
由scala單例類SparkTransportConf建立TransportConf對象。框架
package org.apache.spark.network.netty import org.apache.spark.SparkConf import org.apache.spark.network.util.{ConfigProvider, TransportConf} /** * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor, * Driver, or a standalone shuffle service) into a TransportConf with details on our environment * like the number of cores that are allocated to this JVM. */ object SparkTransportConf { /** * Specifies an upper bound on the number of Netty threads that Spark requires by default. * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes * at a premium. * * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory * allocation. It can be overridden by setting the number of serverThreads and clientThreads * manually in Spark's configuration. */ private val MAX_DEFAULT_NETTY_THREADS = 8 /** * Utility for creating a [[TransportConf]] from a [[SparkConf]]. * @param _conf the [[SparkConf]] * @param module the module name * @param numUsableCores if nonzero, this will restrict the server and client threads to only * use the given number of cores, rather than all of the machine's cores. * This restriction will only occur if these properties are not already set. */ def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily // assuming we have all the machine's cores). // NB: Only set if serverThreads/clientThreads not already set. val numThreads = defaultNumThreads(numUsableCores) conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString) conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString) new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) }) } /** * Returns the default number of threads for both the Netty client and server thread pools. * If numUsableCores is 0, we will use Runtime get an approximate number of available cores. */ private def defaultNumThreads(numUsableCores: Int): Int = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) } }
從代碼看到,能夠使用SparkTransportConf的fromSparkConf方法來構造TransportConf。傳遞的三個參數分別爲SparkConf、模塊名module及可用的內核數num-UsableCores。若是numUsableCores小於等於0,那麼線程數是系統可用處理器的數量,不過系統的內核數不可能所有用於網絡傳輸,因此這裏將分配給網絡傳輸的內核數量最多限制在8個。ide
最終肯定的線程數將用於設置客戶端傳輸線程spark.module.io.clientThreads屬性和服務端傳輸線程數spark.module.io.serverThreads屬性。ui
博客基於《Spark內核設計的藝術:架構設計與實現》一書